多线程那点事—Parallel.for
奔跑的男人 · 665浏览 · 发布于2021-01-04
先看段代码:
for (int i = 0; i < 10; i++) { Task.Factory.StartNew(()=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"));4 }
从代码上可以看出我们预期是打印1~10,但实际的打印结果是:
7 ~ 10 4 ~ 10 10 ~ 10 9 ~ 10 4 ~ 10 3 ~ 10 5 ~ 10 9 ~ 10 6 ~ 10 8 ~ 10
与预期的不一致,我们预期是打印数字1到10,但实际打印出来的是10次10。因为这几个lambda表达式中使用了同一个变量,并且这些匿名函数共享变量值。
再来看下面这段代码:
Action<int> displayNumber = n => Console.WriteLine(n); int i = 5; Task taskOne = Task.Factory.StartNew(() => displayNumber(i)); i = 7; Task taskTwo = Task.Factory.StartNew(() => displayNumber(i)); Task.WaitAll(taskOne,taskTwo);
输出结果:
7 7
当闭包通过lambda表达式捕获可变变量时,lambda捕获变量的引用,而不是捕获该变量的当前值。因此,如果任务在变量的引用值更改后运行,则该值将是内存中最新的值,而不是捕获变量时的值。
为解决该问题,我们引入Parallel类来解决问题:
Parallel.For(0,10,i=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"));
打印结果:
1 ~ 0 1 ~ 2 3 ~ 1 3 ~ 4 3 ~ 7 3 ~ 8 3 ~ 9 1 ~ 3 5 ~ 5 4 ~ 6
Parallel 类 提供对并行循环和区域的支持, 现在我们看下Parallel.for的代码:
// this needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel rootTask = new ParallelForReplicatingTask( parallelOptions, delegate { // // first thing we do upon enterying the task is to register as a new "RangeWorker" with the // shared RangeManager instance. // // If this call returns a RangeWorker struct which wraps the state needed by this task // // We need to call FindNewWork32() on it to see whether there's a chunk available. // // Cache some information about the current task Task currentWorkerTask = Task.InternalCurrent; bool bIsRootTask = (currentWorkerTask == rootTask); RangeWorker currentWorker = new RangeWorker(); Object savedStateFromPreviousReplica = currentWorkerTask.SavedStateFromPreviousReplica; if (savedStateFromPreviousReplica is RangeWorker) currentWorker = (RangeWorker)savedStateFromPreviousReplica; else currentWorker = rangeManager.RegisterNewWorker(); // These are the local index values to be used in the sequential loop. // Their values filled in by FindNewWork32 int nFromInclusiveLocal; int nToExclusiveLocal; if (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) == false || sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true) { return; // no need to run } // ETW event for ParallelFor Worker Fork if (TplEtwProvider.Log.IsEnabled()) { TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), forkJoinContextID); } TLocal localValue = default(TLocal); bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't try { // Create a new state object that references the shared "stopped" and "exceptional" flags // If needed, it will contain a new instance of thread-local state by invoking the selector. ParallelLoopState32 state = null; if (bodyWithState != null) { Contract.Assert(sharedPStateFlags != null); state = new ParallelLoopState32(sharedPStateFlags); } else if (bodyWithLocal != null) { Contract.Assert(sharedPStateFlags != null); state = new ParallelLoopState32(sharedPStateFlags); if (localInit != null) { localValue = localInit(); bLocalValueInitialized = true; } } // initialize a loop timer which will help us decide whether we should exit early LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount); // Now perform the loop itself. do { if (body != null) { for (int j = nFromInclusiveLocal; j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline || !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state j += 1) { body(j); } } else if (bodyWithState != null) { for (int j = nFromInclusiveLocal; j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline || !sharedPStateFlags.ShouldExitLoop(j)); j += 1) { state.CurrentIteration = j; bodyWithState(j, state); } } else { for (int j = nFromInclusiveLocal; j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline || !sharedPStateFlags.ShouldExitLoop(j)); j += 1) { state.CurrentIteration = j; localValue = bodyWithLocal(j, state, localValue); } } // Cooperative multitasking hack for AppDomain fairness. // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic // will detect this, and queue up a replacement task. Note that we don't do this on the root task. if (!bIsRootTask && loopTimer.LimitExceeded()) { currentWorkerTask.SavedStateForNextReplica = (object)currentWorker; break; } } // Exit if we can't find new work, or if the loop was stoppped. while (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) && ((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE) || !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal))); } catch { // if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow sharedPStateFlags.SetExceptional(); throw; } finally { // If a cleanup function was specified, call it. Otherwise, if the type is // IDisposable, we will invoke Dispose on behalf of the user. if (localFinally != null && bLocalValueInitialized) { localFinally(localValue); } // ETW event for ParallelFor Worker Join if (TplEtwProvider.Log.IsEnabled()) { TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), forkJoinContextID); } } }, creationOptions, internalOptions); rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); // might throw TSE rootTask.Wait(); // If we made a cancellation registration, we need to clean it up now before observing the OCE // Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null if (parallelOptions.CancellationToken.CanBeCanceled) { ctr.Dispose(); } // If we got through that with no exceptions, and we were canceled, then // throw our cancellation exception if (oce != null) throw oce;
body对于迭代范围 (的每个值调用一次委托 fromInclusive , toExclusive) 。提供两个参数:
1、一个 Int32 值,该值表示迭代次数。
2、ParallelLoopState可用于提前中断循环的实例。ParallelLoopState对象是由编译器创建的; 它不能在用户代码中实例化。
继续来看:
Parallel.For(0, 10, (i,state) => { if (i > 5) state.Break(); Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"); } );
输出:
1 ~ 0 1 ~ 1 1 ~ 2 1 ~ 3 1 ~ 4 1 ~ 5 1 ~ 6
在上面的方法中我们使用了 break方法。
调用 Break 方法会通知 for 操作,在当前的迭代之后,无需执行迭代。不过,如果所有迭代尚未执行,则仍必须执行当前的所有迭代。
因此,调用 Break 类似于 for c# 等语言中的传统循环内的中断操作,但它并不是完美的替代方法:例如,无法保证当前的迭代不会执行。
今天就先写道这里~
相关推荐
PHP实现部分字符隐藏
沙雕mars · 1321浏览 · 2019-04-28 09:47:56
Java中ArrayList和LinkedList区别
kenrry1992 · 905浏览 · 2019-05-08 21:14:54
5月语言排行榜:R 跌出前二十,Python 紧咬 C++
manongba · 685浏览 · 2019-05-09 17:27:24
Tomcat 下载及安装配置
manongba · 966浏览 · 2019-05-13 21:03:56
什么是SpringBoot
iamitnan · 1084浏览 · 2019-05-14 22:20:36
分类专栏
最新发布
最热排行
0评论