MovGP0        Über mich        Hilfen        Artikel        Weblinks        Literatur        Zitate        Notizen        Programmierung        MSCert        Physik      


Parallel Loops

Bearbeiten
use when
  • same independent operation for each element of a collection
  • fixed number of iterations
  • order of execution is not relevant
TPL
  • Parallel.For(...)
  • Parallel.ForEach(...)
PLINQ
  • Enumerable.AsParallel().ForAll(DoWork);

Loop State

Bearbeiten
Break Stop
int n = ...;
var result = new double[n];

var loopResult = Parallel.For(0, n, (index, parallelLoopState) => {
   if(/* break condition */)
   {
      // do not execute loops with higher index, which are not yet run
      // all loops with lower index, which are not yet run, will continue to execute 
      parallelLoopState.Break(); 
      return;
   }
   result[index] = DoWork(index);
});

if(!loopResult.IsCompleted && loopResult.LowestBreakIteration.HasValue)
{
   Console.WriteLine("Loop encountered a break at index {0}", 
                     loopResult.LowestBreakIteration.Value);
}
int n = ...;
var result = new double[n];

var loopResult = Parallel.For(0, n, (index, parallelLoopState) => {
   if(/* break condition */)
   {
      // do not execute any loop that is not yet run
      parallelLoopState.Stop(); 
      return;
   }
   result[index] = DoWork(index);
});

if(!loopResult.IsCompleted && loopResult.LowestBreakIteration.HasValue)
{
   Console.WriteLine("Loop encountered a break at {0}", 
                     loopResult.LowestBreakIteration.Value);
}

Loop Cancellation Token

Bearbeiten
TPL
int n = ...;
var token = cancellationTokenSource.Token;
var options = new ParallelOptions {
   CancellationToken = token
};

try
{
   Parallel.For(0, n, options, i => {
      if(token.IsCancellationRequested)
      {
         return;
      }
      
      // do some work
   });
}
catch(OperationCancelledException)
{
   // ... 
}
PLINQ
  • use .WithCancellationToken(token)

Exception Handling

Bearbeiten
  • long running iterations can test if an exception has occured in another iteration using parallelLoopState.IsExceptional
  • throwing exception prevents execution of new iterations
  • exceptions are collected as AggregateException

Partitioner

Bearbeiten
  • group functions with small amount of work into larger chunks to get better performance
int n = ...;
var result = double[n];
Parallel.ForEach(Partitioner.Create(0, n), range => 
{
   foreach(var index in range)
   {
      // ...
   }
});

Degree of Parallelism

Bearbeiten
  • maximum number of concurrent threads by a parallel loop
  • larger numbers may require increase of ThreadPool.SetMinThreads setting, to start threads without delay
TPL
var n = ...;
var options = new ParallelOptions() {
   MaxDegreeOfParallelism = 2
};

Parallel.For(0, n, options, i => {
   // ...
});
PLINQ
enumerable.AsParallel().WithDegreeOfParallelism(n)

Task-Local State

Bearbeiten
TPL
int numberOfSteps = ...;
var result = new double[numberOfSteps];
var partitioner = Partitioner.Create(0, numberOfSteps);
var options = new ParallelOptions { /* ... */ };

Parallel.ForEach(partitioner, options, () => new Random(MakeRandomSeed()), (range, loopState, localRandom) => 
{
   foreach(var index in range)
   {
      result[i] = localRandom.NextDouble();
      return localRandom;
   }
}, localRandom => { /* finally */ });

Note:

  • Random is not thread-save
  • Random uses system clock for seed by default
  • use custom seed generator, to prevent same seed when two threads are created at the same time
  • use RNGCryptoServiceProvider for secure random number generation

Task Scheduler

Bearbeiten
  • create custom task scheduler to customize scheduling options
TPL
int n = ...;
var taskScheduler = new TaskScheduler(/* ... */);
var options = new ParallelOptions
{
   TaskScheduler = taskScheduler
};

Parallel.For(0, j, options, i => { /* ... */});
PLINQ
  • not supported

Antipatterns

Bearbeiten

Step size other than one

Bearbeiten
  • step number should be calculated within the loop
  • may indicate a data dependency
  • if the loop has a negative step size, the order likely matters

Hidden loop body dependencies

Bearbeiten
  • analyze all dependencies for thread safety (Random, DbConnection, etc.)
  • dependencies might be handled by synchronization and locking, but reduces performance

Small loop bodies with few iterations

Bearbeiten
  • overhead of Parallel.For might be bigger than performance gain

Processor oversubscription and undersubscription

Bearbeiten
  • oversubscritpion occurs when more compute-intense worker threads are created than corea are available
  • optimal number is given by
  • restrict degree of parallelism when other known tasks are running to prevent oversubscription

Parallel.ForEach on PLINQ Query

Bearbeiten
  • use .ForAll() instead
Do Don't
var query = data.AsParallel().ForAll(item => { /* ... */ });
var query = data.AsParallel();
Parallel.ForEach(query, item => { /* ... */ });

Duplicates in Input Enumeration

Bearbeiten
  • same reference to a given object instance appears twice in enumeration
  • might cause race condition when tasks want to update (and read) the same object

Design Notes

Bearbeiten
Adaptive Partitioning
  • Parallel Class adapts to number of iterations and size of units of work
Adaptive Concurrency
  • TPL can use fewer threads as requested
    • uses ParallelOptions to set MaxDegreeOfParallelism
  • PLINQ uses fixed number of tasks to execute in a query
    • number of logical cores, or
    • .WithDegreeOfParallelism() setting
  • .NET Thread Pool adapts number of threads to changing workloads using heuristics
    • can be set with SetMaxThreads, but should be avoided because of global effect; prefer ParallelOptions instead