Gal Ratner
Gal Ratner is a Techie who lives and works in Los Angeles CA and Austin TX. Follow galratner on Twitter Google
Parallel pipeline processing with Blocking Collections

Pipeline processing is composed of multiple producers and consumers, each depending on the output of its predecessor. You can think about pipeline processing as an assembly line, each worker in the line depends on all of the workers in previous workstations to produce an output.

A BlockingCollection is a thread-safe collection that implements IProducerConsumerCollection. A producer can add elements to a BlockingCollection as long as it was not marked as complete for adding and a consumer can wait for elements to become available for removal.
Let’s build an assembly line of our own.  Lets design a line with multiple pipeline steps, each step does some processing and passes the output to the next BlockingCollection. All we have to do to get a final output is to add input to the first line BlockingCollection and wait for an output from the last BlockingCollection.

First let’s build the line:


class ExamplePipeline
        private List<ProcessorStep> WorkFlowSteps = new List<ProcessorStep>(); // Add all the steps to be executed to this list
        private CancellationTokenSource cts;
        private CancellationToken token;
        private BlockingCollection<PipelineInfo>[] stepsPipeline;
        private Task[] stepsTasks;
        private BlockingCollection<PipelineInfo> jobEndNodify = new BlockingCollection<PipelineInfo>();

        private void LoadPipeline()
            cts = new CancellationTokenSource();
            token = cts.Token;
            stepsPipeline = new BlockingCollection<PipelineInfo>[WorkFlowSteps.Count]; // We need a BlockingCollections to hold staps between executions
            stepsTasks = new Task[WorkFlowSteps.Count]; // We need a Tasks to produce and consume steps from the BlockingCollections
            for (int i = 0; i < WorkFlowSteps.Count; i++)
                int stepCount = i;
                stepsPipeline[stepCount] = new BlockingCollection<PipelineInfo>();
                if (stepCount < WorkFlowSteps.Count - 1)// The last step doesnt need to be added to the next BlockingCollection so simply finish the job
                    stepsTasks[stepCount] = Task.Factory.StartNew(() => RunPipelinedStep(stepsPipeline[stepCount], stepsPipeline[stepCount + 1], cts, stepCount));
                    stepsTasks[stepCount] = Task.Factory.StartNew(() => RunPipelinedStep(stepsPipeline[stepCount], null, cts, stepCount));
            Task.WaitAll(stepsTasks); // Wait until CancellationToken has been called

        private void RunPipelinedStep(BlockingCollection<PipelineInfo> currentPipelineStep,
            BlockingCollection<PipelineInfo> nextPipelineStep,
            CancellationTokenSource cts,
            int stepNumber)
            var token = cts.Token;
            int currentStepNumber = stepNumber;

                // This loop will block until there are new elements in the currentPipelineStep
                foreach (var current in currentPipelineStep.GetConsumingEnumerable())
                    if (token.IsCancellationRequested)
                    PipelineInfo currentPipelineInfo = current; // The job info from the pipeline
                    ProcessorStep workflowStep = WorkFlowSteps[stepNumber];
                    Task.Factory.StartNew<bool>(() => RunParalleledPipelinedStep(currentPipelineInfo, workflowStep, currentStepNumber))
                        .ContinueWith((result) =>
                            if (result.Result && nextPipelineStep != null// Add The job info info to the next section of the pipeline.
                                nextPipelineStep.Add(currentPipelineInfo, token);
                            else if (result.Result && nextPipelineStep == null// This is the last step in a job 
                                // If all steps ran without error report successful job compilation
                            else // Job error
            catch (Exception e)
                if (!(e is OperationCanceledException))
                if (nextPipelineStep != null)

        private bool RunParalleledPipelinedStep(PipelineInfo currentPipelineInfo, ProcessorStep workflowStep, int currentStepNumber)
            // Run the actual step here
            return true;

        /// <summary>
        /// Contains the information passed down an execution pipeline
        /// </summary>
        internal struct PipelineInfo

Then add out input to start the chain of execution and get the result:

public void RunFrameworkJob(IWorkflowMessage workflowMessage, int retryJobTimes, bool isCheckDepends)
            if (stepsPipeline == null)
            stepsPipeline[0].Add(new PipelineInfo());
            // Block untill a job is finished
            // This is done to regulate execution rate

If any error occurs on the line we need to stop the entire line by calling the CancellationTokenSource.
In my example I used Tasks to further help parallelize steps executions. You can imagine this as infinite worker number in each workstation, each working in parallel on the output of the previous station and each adding output to the next station.

Parallel pipeline processing can assure long running tasks are evenly distributed by multiple CPUs and speed up any parallel processing and I/O that might have only been accessed from single threads.


Shout it

Posted 25 Dec 2010 7:52 PM by Gal Ratner

Powered by Community Server (Non-Commercial Edition), by Telligent Systems