C# – Using Channel as an async queue

The Channel class (from System.Threading.Channels) is a non-blocking async queue. It implements the producer-consumer pattern, which has two parts:

  • Multiple producer threads enqueuing items by calling Channel.Writer.WriteAsync().
  • At least one consumer awaiting items to dequeue by calling Channel.Reader.ReadAllAsync().

Compare this with using BlockingCollection, which is a blocking concurrent queue.

In this article, I’ll show how to use a Channel.

1 – Create the Channel

The first step is to create the Channel object. Here’s an example of creating an unbounded (no max capacity) Channel that holds strings :

using System.Threading.Channels;

var channel = Channel.CreateUnbounded<string>();
Code language: C# (cs)

Note: Use Channel.CreateBounded() to create a bounded channel with a max capacity.

I suggest encapsulating the Channel object in a class, which I’ll show going forward in the next steps.

2 – Read from the Channel

You can dequeue items asynchronously by calling Channel.Reader.ReadAllAsync() in a foreach loop, like this (highlighted):

using System.Threading.Channels;
using System.Threading.Tasks;

public class MessageQueue
{
    private Channel<string> channel;
    public MessageQueue()
    {
        channel = Channel.CreateUnbounded<string>();
    }
    public async Task StartConsumer()
    { 
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got message: {message}");
        }
    }
}
Code language: C# (cs)

This is doing asynchronous iteration over the IAsyncEnumerable returned by ReadAllAsync(). This means it awaits while there are no items available. When an item is available, it’s yielded by IAsyncEnumerable and returned as the loop variable.

3 – Write to the Channel

You can write items to the Channel by calling Channel.Writer.WriteAsync(), like this (highlighted):

using System.Threading.Channels;
using System.Threading.Tasks;

public class MessageQueue
{
    private Channel<string> channel;
    public MessageQueue()
    {
        channel = Channel.CreateUnbounded<string>();
    }
    public async Task StartConsumer()
    { 
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got message: {message}");
        }
    }
    public async Task AddMessage(string message)
    {
        await channel.Writer.WriteAsync(message);
    }
}

Code language: C# (cs)

TryWrite() vs WriteAsync()

There are two scenarios to know about when using WriteAsync():

  • If you’re writing to a bounded Channel that has reached capacity, WriteAsync() won’t return until it’s able to actually write the item.
  • If you try writing to a closed Channel, it’ll throw ChannelClosedException: The channel has been closed.

Consider using TryWrite() instead of WriteAsync() if you need to handle these scenarios. It returns false if it isn’t able to write immediately.

Note: A closed Channel means Writer.Complete() was called. This prevents more items from being written to the Channel.

4 – Run the code and add messages

To wrap this up, here’s an example of running the code. This starts the consumer and then produces multiple messages:

var messageQueue = new MessageQueue();

//1 consumer
var consumerTask = messageQueue.StartConsumer(); //don't await it

//1 producer adding messages
await messageQueue.AddMessage("Hello world");
await messageQueue.AddMessage("This is an example of using Channel");
await messageQueue.AddMessage("Well, bye!");

await consumerTask; //so the console app doesn't close
Code language: C# (cs)

This outputs the following:

04:40:05 Got message: Hello world
04:40:05 Got message: This is an example of using Channel
04:40:05 Got message: Well, bye!Code language: plaintext (plaintext)

Leave a Comment