Event-driven .NET: Concurrent Producer/Consumer using BlockingCollection

With the Producer/Consumer pattern you have one or more threads producing new work and enqueuing it, and one or more threads consuming that work by dequeuing it and processing it. The consumers and producers share access to the work queue. Think of it like the Post Office. You have one or more people (producers) dropping off letters in a mailbox, and one or more postal workers (consumers) taking these letters and processing them.

There are multiple ways to implement the Producer/Consumer pattern in .NET. You need to make two design decisions:

  • How to check for new data on the consumer queue
  • How to handle concurrency in a thread-safe way. The consumer and producer threads have a shared resource – the work queue. Therefore access to the queue must be handled in a thread-safe manner.

The best way to implement this is by using the BlockingCollection class. To show why this is the best, here is a comparison of the four different approaches:

As you can see in the comparison matrix above, the BlockingCollection provides an event-driven, concurrent approach that abstracts away the low-level threading details.

Here’s a simple example of how to use the BlockingCollection.

1 – Create a Consumer that uses BlockingCollection

using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; namespace ProducerConsumer { public class StringReverser { private readonly BlockingCollection<string> messageQueue; public StringReverser(BlockingCollection<string> messageQueue) { this.messageQueue = messageQueue; } public void StartProcessing() { while (true) { var message = messageQueue.Take(); //Blocks until a new message is available var reversedString = new string(message.Reverse().ToArray()); Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} reverse({message})=>{reversedString}"); } } public void QueueForProcessing(string Message) { messageQueue.Add(Message); } } }

The important part here is the call to Take(). This blocks until a message is available on the messageQueue.

2 – Start the Consumer, and start producing messages

using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace ProducerConsumer { class Program { public static void Main(string[] args) { var messageQueue = new BlockingCollection<string>(); var messageReverser = new StringReverser(messageQueue); Task.Run(() => { messageReverser.StartProcessing(); }); while (true) { Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} Write a sentence and see each word reversed: "); var msg = Console.ReadLine(); Console.WriteLine(""); foreach(var s in msg.Split()) { messageQueue.Add(s); } } } } }

There are two important parts here:

  1. Starting the Consumer in another thread. This is important because the call to Take() is a blocking call – it blocks the thread it’s on from doing anything else.
  2. Produce new messages by adding them to the BlockingCollection.

Here’s what it looks like when I run this console app:

Notice that the Consumer (StringReverser) is running on a different thread.

Leave a Comment