Event-driven .NET: Concurrent Producer/Consumer using a non-blocking, async queue

In a previous article I wrote about how to implement concurrent producer/consumer using a BlockingCollection. This is a thread-safe, event-driven approach that uses high-level concurrent constructs. The only downside is that the consumer uses a blocking call to dequeue messages. In other words, it wastes a thread.

Is there a way to implement this using a non-blocking approach?

Yes, by using the Channel class from System.Threading.Channels. This is essentially an async queue.

In this article I’ll show how to use a Channel to implement concurrent producer/consumer in a non-blocking way.

1 – Create a Consumer that uses a Channel as an async queue

Note: This is using System.Threading.Channels.

public class StringReverser
{
	private readonly Channel<string> messageQueue;
	public StringReverser(Channel<string> messageQueue)
	{
		this.messageQueue = messageQueue;
	}
	public async Task StartProcessing(CancellationToken cancelToken)
	{
		await foreach(var message in messageQueue.Reader.ReadAllAsync(cancelToken))
		{
			var reversedString = new string(message.Reverse().ToArray());

			Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} reverse({message})=>{reversedString}");
		}
	}
	public async Task QueueForProcessing(string Message, CancellationToken cancelToken)
	{
		await messageQueue.Writer.WriteAsync(Message, cancelToken);
	}
}

2 – Start the Consumer and start producing messages

public static async Task Main(string[] args)
{
	var messageQueue = Channel.CreateUnbounded<string>();
	var messageReverser = new StringReverser(messageQueue);

	CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();


	messageReverser.StartProcessing(cancellationTokenSource.Token);

	while (true)
	{
		Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} Write a sentence and see each word reversed: ");
		var msg = Console.ReadLine();
		Console.WriteLine("");

		foreach (var s in msg.Split())
		{
			await messageReverser.QueueForProcessing(s, cancellationTokenSource.Token);
		}

	}
}

3 – End results – running the console app

When I run this, you can see that the consumer is not blocking, otherwise I wouldn’t be able to type messages into the console.

Furthermore, notice at first it’s using thread 4, then at the end it switches to thread 5. It’s using threadpool threads, and is not sitting there wasting a dedicated thread. This is a key benefit to this async non-blocking approach.

Leave a Comment