The BlockingCollection class is a blocking concurrent queue. It provides an implementation of the producer-consumer pattern. There are two parts to this pattern:
- Multiple producer threads enqueue items by calling BlockingCollection.Add().
- One consumer thread blocks while waiting for BlockingCollection.Take() to dequeue an item.
BlockingCollection is thread-safe, which means it’s designed to be used by many threads at once.
Here’s an example of using BlockingCollection with one consumer and two producers:
using System.Collections.Concurrent;
using System.Threading.Tasks;
var blockingQueue = new BlockingCollection<string>();
//Consumer
Task.Run(()=>
{
while (!blockingQueue.IsCompleted)
{
var message = blockingQueue.Take(); //Blocks until a new message is available
Console.WriteLine($"{DateTime.Now:hh:mm:ss} Message={message}");
}
});
//Multiple producers
Task.Run(async () =>
{
while (true)
{
blockingQueue.Add("A");
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
Task.Run(async () =>
{
while (true)
{
blockingQueue.Add("B");
await Task.Delay(TimeSpan.FromSeconds(3));
}
});
Console.ReadLine();
Code language: C# (cs)
Note: BlockingCollection.IsCompleted means the queue is empty not allowing any more items to be enqueued.
This outputs the following after running it for a few seconds:
03:41:16 Message=B
03:41:16 Message=A
03:41:17 Message=A
03:41:18 Message=A
03:41:19 Message=B
Code language: plaintext (plaintext)
If you want an async queue that doesn’t block, use a Channel instead of BlockingCollection.
BlockingCollection vs ConcurrentQueue
BlockingCollection uses ConcurrentQueue internally by default. It provides the following beneficial features on top of the queue:
Feature | BlockingCollection method |
Blocking dequeue. | Take() |
Bounded queue: Set the queue’s maximum capacity. Note: This makes Add() block when the queue is full. | BlockingCollection(int boundedCapacity) |
Completed queue: Prevent more items from being enqueued. Note: Calling Add() on a completed queue results in an InvalidOperationException. | CompleteAdding() |
In other words, BlockingCollection simplifies things for you by abstracting away low-level details.
Use TryTake() with a timeout
Take() blocks until an item can be dequeued. Sometimes you may find it necessary to wait with a timeout instead of blocking forever. The simplest way to do that is by using TryTake() with a timeout. It returns false when an item hasn’t been dequeued within a set amount of time.
Here’s an example:
var blockingQueue = new BlockingCollection<string>();
Task.Run(() =>
{
while (!blockingQueue.IsCompleted)
{
Console.WriteLine($"{DateTime.Now:hh:mm:ss} Waiting for message");
if (blockingQueue.TryTake(out var message, TimeSpan.FromSeconds(30)))
{
Console.WriteLine($"{DateTime.Now:hh:mm:ss} {message}");
}
else
{
Console.WriteLine($"{DateTime.Now:hh:mm:ss} No message in last 30 seconds");
}
}
});
Code language: C# (cs)
This outputs the following:
04:44:10 Waiting for message
04:44:40 No message in last 30 seconds
04:44:40 Waiting for message
04:44:55 A
04:44:55 Waiting for message
04:45:10 B
Code language: plaintext (plaintext)
Note: You can also use a CancellationToken with BlockingCollection. That’s the right choice if you’re already using a CancellationToken and want to chain the BlockingCollection to cancellation.