C# – Example of using BlockingCollection

The BlockingCollection class is a blocking concurrent queue. It provides an implementation of the producer-consumer pattern. There are two parts to this pattern:

  • Multiple producer threads enqueue items by calling BlockingCollection.Add().
  • One consumer thread blocks while waiting for BlockingCollection.Take() to dequeue an item.

BlockingCollection is thread-safe, which means it’s designed to be used by many threads at once.

Here’s an example of using BlockingCollection with one consumer and two producers:

using System.Collections.Concurrent;
using System.Threading.Tasks;

var blockingQueue = new BlockingCollection<string>();

//Consumer
Task.Run(()=>
{
    while (!blockingQueue.IsCompleted)
    {
        var message = blockingQueue.Take(); //Blocks until a new message is available

        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Message={message}");
    }
});

//Multiple producers
Task.Run(async () =>
{
    while (true)
    {
        blockingQueue.Add("A");
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
});

Task.Run(async () =>
{
    while (true)
    {
        blockingQueue.Add("B");
        await Task.Delay(TimeSpan.FromSeconds(3));
    }
});

Console.ReadLine();
Code language: C# (cs)

Note: BlockingCollection.IsCompleted means the queue is empty not allowing any more items to be enqueued.

This outputs the following after running it for a few seconds:

03:41:16 Message=B
03:41:16 Message=A
03:41:17 Message=A
03:41:18 Message=A
03:41:19 Message=B
Code language: plaintext (plaintext)

BlockingCollection vs ConcurrentQueue

BlockingCollection uses ConcurrentQueue internally by default. It provides the following beneficial features on top of the queue:

FeatureBlockingCollection method
Blocking dequeue.Take()
Bounded queue: Set the queue’s maximum capacity.

Note: This makes Add() block when the queue is full.
BlockingCollection(int boundedCapacity)
Completed queue: Prevent more items from being enqueued.

Note: Calling Add() on a completed queue results in an InvalidOperationException.
CompleteAdding()

In other words, BlockingCollection simplifies things for you by abstracting away low-level details.

Use TryTake() with a timeout

Take() blocks until an item can be dequeued. Sometimes you may find it necessary to wait with a timeout instead of blocking forever. The simplest way to do that is by using TryTake() with a timeout. It returns false when an item hasn’t been dequeued within a set amount of time.

Here’s an example:

var blockingQueue = new BlockingCollection<string>();

Task.Run(() =>
{
    while (!blockingQueue.IsCompleted)
    {
        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Waiting for message");
        if (blockingQueue.TryTake(out var message, TimeSpan.FromSeconds(30)))
        {
            Console.WriteLine($"{DateTime.Now:hh:mm:ss} {message}");
        }
        else
        {
            Console.WriteLine($"{DateTime.Now:hh:mm:ss} No message in last 30 seconds");
        }
    }
});
Code language: C# (cs)

This outputs the following:

04:44:10 Waiting for message
04:44:40 No message in last 30 seconds
04:44:40 Waiting for message
04:44:55 A
04:44:55 Waiting for message
04:45:10 BCode language: plaintext (plaintext)

Note: You can also use a CancellationToken with BlockingCollection. That’s the right choice if you’re already using a CancellationToken and want to chain the BlockingCollection to cancellation.

Leave a Comment