Pipeline processing is composed of multiple producers and consumers, each depending on the output of its predecessor. You can think about pipeline processing as an assembly line, each worker in the line depends on all of the workers in previous workstations to produce an output.
A BlockingCollection is a thread-safe collection that implements IProducerConsumerCollection. A producer can add elements to a BlockingCollection as long as it was not marked as complete for adding and a consumer can wait for elements to become available for removal.
Let’s build an assembly line of our own. Lets design a line with multiple pipeline steps, each step does some processing and passes the output to the next BlockingCollection. All we have to do to get a final output is to add input to the first line BlockingCollection and wait for an output from the last BlockingCollection.
First let’s build the line:
private List<ProcessorStep> WorkFlowSteps = new List<ProcessorStep>(); // Add all the steps to be executed to this list
private CancellationTokenSource cts;
private CancellationToken token;
private BlockingCollection<PipelineInfo> stepsPipeline;
private Task stepsTasks;
private BlockingCollection<PipelineInfo> jobEndNodify = new BlockingCollection<PipelineInfo>();
private void LoadPipeline()
cts = new CancellationTokenSource();
token = cts.Token;
stepsPipeline = new BlockingCollection<PipelineInfo>[WorkFlowSteps.Count]; // We need a BlockingCollections to hold staps between executions
stepsTasks = new Task[WorkFlowSteps.Count]; // We need a Tasks to produce and consume steps from the BlockingCollections
for (int i = 0; i < WorkFlowSteps.Count; i++)
int stepCount = i;
stepsPipeline[stepCount] = new BlockingCollection<PipelineInfo>();
if (stepCount < WorkFlowSteps.Count - 1)// The last step doesnt need to be added to the next BlockingCollection so simply finish the job
stepsTasks[stepCount] = Task.Factory.StartNew(() => RunPipelinedStep(stepsPipeline[stepCount], stepsPipeline[stepCount + 1], cts, stepCount));
stepsTasks[stepCount] = Task.Factory.StartNew(() => RunPipelinedStep(stepsPipeline[stepCount], null, cts, stepCount));
Task.WaitAll(stepsTasks); // Wait until CancellationToken has been called
private void RunPipelinedStep(BlockingCollection<PipelineInfo> currentPipelineStep,
var token = cts.Token;
int currentStepNumber = stepNumber;
// This loop will block until there are new elements in the currentPipelineStep
foreach (var current in currentPipelineStep.GetConsumingEnumerable())
PipelineInfo currentPipelineInfo = current; // The job info from the pipeline
ProcessorStep workflowStep = WorkFlowSteps[stepNumber];
Task.Factory.StartNew<bool>(() => RunParalleledPipelinedStep(currentPipelineInfo, workflowStep, currentStepNumber))
if (result.Result && nextPipelineStep != null) // Add The job info info to the next section of the pipeline.
else if (result.Result && nextPipelineStep == null) // This is the last step in a job
// If all steps ran without error report successful job compilation
else // Job error
catch (Exception e)
if (!(e is OperationCanceledException))
if (nextPipelineStep != null)
private bool RunParalleledPipelinedStep(PipelineInfo currentPipelineInfo, ProcessorStep workflowStep, int currentStepNumber)
// Run the actual step here
/// Contains the information passed down an execution pipeline
internal struct PipelineInfo
Then add out input to start the chain of execution and get the result:
public void RunFrameworkJob(IWorkflowMessage workflowMessage, int retryJobTimes, bool isCheckDepends)
if (stepsPipeline == null)
// Block untill a job is finished
// This is done to regulate execution rate
If any error occurs on the line we need to stop the entire line by calling the CancellationTokenSource.
In my example I used Tasks to further help parallelize steps executions. You can imagine this as infinite worker number in each workstation, each working in parallel on the output of the previous station and each adding output to the next station.
Parallel pipeline processing can assure long running tasks are evenly distributed by multiple CPUs and speed up any parallel processing and I/O that might have only been accessed from single threads.
25 Dec 2010 7:52 PM