博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
c#执行并行任务之Parallel与TaskFactory
阅读量:6363 次
发布时间:2019-06-23

本文共 5180 字,大约阅读时间需要 17 分钟。

任务:几千条(大量)数据往服务器数据库填写。要求单开线程执行,分割成小数据包,多线程运行。

实现方法:Parallel与TaskFactory都可以。

主要代码:

Parallel:

Barrier _bar;int _maxLength = 20, _maxChannel = 2;//同时最多2条通道,每条通道最多20个数据bool _isCancel = false;private void btnWrite_Click(object sender, EventArgs e){    var tmpEmails = _emails.Where(x => !x.Value).Select(x => x.Key).ToList();    var state = 0;    _isCancel = false;    SetControlEnable(false);    lblProgress.Text = "* 已完成 0%";    var channels = (tmpEmails.Count / _maxLength) + ((tmpEmails.Count % _maxLength > 0) ? 1 : 0);//总共多少条通道    var times = (channels / _maxChannel) + ((channels % _maxChannel > 0) ? 1 : 0);//单服务器分多次    new Action(() =>    {        for (int j = 0; j < times; j++)        {            if (_isCancel)            {                MessageBox.Show("任务取消!");                break;            }            var currChannel = Math.Min(_maxChannel, (channels - j * _maxChannel));//两者取其小的            _bar = new Barrier(currChannel);//根据次数设置栅栏            var tasks = new Action[currChannel];            for (int i = 0; i < currChannel; i++)            {                var subData = tmpEmails.Skip((i + j * _maxChannel) * _maxLength).Take(_maxLength).ToList();                tasks[i] = () =>                {                    if (_isCancel) return;                    var resMsg = 0;                    Connect2WCF.RunSync(sc => resMsg = sc.UpdateMailState(subData, state));                    if (resMsg == -1)                        MessageBox.Show("保存失败了?详情可以查数据库日志表");                    else if (resMsg == 0)                        subData.ForEach(one => _emails[one] = true);//标记已经完成的。                    new Action(() => txtEmails.Text = string.Join("\r\n", _emails.Where(x => !x.Value).Select(x => x.Key))).InvokeRun(this);                    _bar.SignalAndWait();                };            }            Parallel.Invoke(tasks);            new Action(() => lblProgress.Text = "* 已完成 " + ((100 * (j + 1) / times)) + "%").InvokeRun(this);        }        new Action(() => SetControlEnable(true)).InvokeRun(this);    }).RunThread();}

用Barrier和Parallel.Invoke结合来实现分割小数据包,每次用两个线程,每个线程传递20条数据,两个线程的数据都完成后,刷新完成的进度。isCancel作为取消操作的开关。实现的效果较下面的TaskFactory好。

TaskFactory:

CancellationTokenSource cts = new CancellationTokenSource();int maxLength = 20, maxChannel = 2;//同时最多2条通道,每条通道最多20个数据private void btnWrite_Click(object sender, EventArgs e){    cts = new CancellationTokenSource();    var tmpEmails = _emails.Where(x => !x.Value).Select(x => x.Key).ToList();    var state = 0;    SetControlEnable(false);    lblProgress.Text = "* 已完成 0%";    var channels = (tmpEmails.Count / maxLength) + ((tmpEmails.Count % maxLength > 0) ? 1 : 0);//总共多少条通道    var times = (channels / maxChannel) + ((channels % maxChannel > 0) ? 1 : 0);//单服务器分多次    Action
, CancellationToken> doSave = (data, ct) => { if (ct.IsCancellationRequested) return; var msg = 0; Connect2WCF.RunSync(sc => msg = sc.UpdateMailState(data, state)); if (msg == -1) MessageBox.Show("保存失败了?详情可以查数据库日志表"); else if (msg == 0) data.ForEach(one => _emails[one] = true);//标记已经完成的。 new Action(() => txtEmails.Text = string.Join("\r\n", _emails.Where(x => !x.Value).Select(x => x.Key))).InvokeRun(this); }; for (int j = 0; j < times; j++) { int k = j; if (cts.Token.IsCancellationRequested) { MessageBox.Show("任务取消!"); break; } var currChannel = Math.Min(maxChannel, (channels - j * maxChannel));//两者取其小的 TaskFactory taskFactory = new TaskFactory(); Task[] tasks = new Task[currChannel]; for (int i = 0; i < currChannel; i++) { var subData = tmpEmails.Skip((i + j * maxChannel) * maxLength).Take(maxLength).ToList(); tasks[i] = new Task(() => doSave(subData, cts.Token), cts.Token); } taskFactory.ContinueWhenAll(tasks, x => new Action(() => lblProgress.Text = "* 已完成 " + ((100 * (k + 1) / times)) + "%").InvokeRun(this), CancellationToken.None); Array.ForEach(tasks, x => x.Start()); } SetControlEnable(true);}

用TaskFactory和CancellationTokenSource结合来实现,在保存修改数据上,实现的效果和上面的方法差不多,但是在中间取消的效果上差很多,取消后,不会有“任务取消”的弹框。后台的执行逻辑猜测是这样:由于Task是单开线程跑,所以在btn的事件中, 所有Tasks和TaskFactory的声明基本上是很快就执行完成了的(电脑执行速度来看可能是一瞬间)。至于保存数据的代码,则在每个Task的后台线程中各自执行,此时操作的时间早已经跳出了btn的事件函数,于是,点击取消之后,由于btn的事件函数早已执行完,因此不会出现"任务取消"的弹框。而每个Task的执行受到线程个数的限制以及每个TaskFactory的ContinueWhenAll函数的监视,它们是有先后顺序但是却又无序地执行。点击取消后,可能有几个线程正在执行保存数据的任务,已经跳过了cancel的判断,所以取消的命令不会立刻反应到后台执行中,会有一部分任务在取消后,仍然在运行。而剩下的其他任务会判断cancel之后取消。由于线程的执行速度不是固定的,因此,小数据包保存执行的顺序虽然大概按照增序执行,但是细节的排序可能有些插队。

所以,总体而言TaskFactory的执行顺序不可控。断点不可控。而parallel.Invoke函数只有在传入的Action[]全部执行完之后,才会返回,所以有效的保证了大层面的执行顺序。至于Action[]这个队列执行的顺序,在Parallel里面也是不可控的。

 

补充:4092条数据,开启一个通道时,TaskFactory:Parallel = 19:25;

开启5个通道时,多次测试的结果为TaskFactory:Parallel = {18,16,15}:{19,16,15},速度差不多。

一个明显的现象:在数据很多的时候,可以清晰的看到TaskFactory中已完成的百分数出现忽大忽小的情况。例如:1,4,7,12,17,6,12,19,23...

另外,Parallel刚开始执行时,有明显的停顿感,猜测可能是启动并行时产生的效率损耗。

 

如果希望能够操作过程中能暂停处理,可以使用Parallel,它有一个执行主线程,方便随时停止。如果没有暂停需要,而且电脑的核心数不多(只有一个)时,可以考虑用TaskFactory,效率要明显高于Parallel。

转载于:https://www.cnblogs.com/icyJ/p/Parallel_TaskFactory.html

你可能感兴趣的文章
将Mysql的一张表导出至Excel格式文件
查看>>
监听Android系统截屏
查看>>
Oracle三大经典表连接适用情况
查看>>
一个ajax请求,接收json数据
查看>>
359. Logger Rate Limiter - Easy
查看>>
[Programming Entity Framework] 第2章 探究实体数据模型(EDM)(一)
查看>>
DispatcherHelper
查看>>
read命令
查看>>
hadoop安装
查看>>
iOS UICollectionView简单使用
查看>>
centos 6.3 源码安装mysql
查看>>
小笔记----about JC
查看>>
sqlserver 获得行号作为唯一id
查看>>
.NET工作准备--04ASP.NET
查看>>
【SpringCloud微服务实战学习系列】创建应用及解析
查看>>
877E - Danil and a Part-time Job
查看>>
Linux环境下安装 ElasticHD
查看>>
mysql进阶(十三)命令行导出导入数据库
查看>>
html5 audio音频播放全解析
查看>>
DP (入门题)数塔
查看>>