In a consumer/producer scenario, there are many reasons why you might want the consumer to read a batch of items. Maybe you’re bulk inserting into the database, or sending a payload with HttpClient. Sending lots of individual items over the network can be costly, and waiting for a full batch of items before sending is not ideal either.
In this article, I’ll show how to read a batch of items when using Threading.ChannelReader in a consumer/producer scenario.
Table of Contents
ChannelReader.ReadMultipleAsync() extension method for batch reading
Let’s say you want batches to contain up to 5 items. In other words, a batch will have between 1-5 items. You can wait asynchronously for an item to be available on the queue. Once you have one item, as long as there are items on the queue, keep reading until you have 5 items total.
You can’t use ChannelReader.ReadAllAsync() for this. Instead, you can use a combo of WaitToReadAsync() and TryRead().
Here’s an extension method that reads a batch of items using this approach:
using System.Threading.Channels;
using System.Threading.Tasks;
public static class ChannelReaderExtensions
{
public static async Task<List<T>> ReadMultipleAsync<T>(this ChannelReader<T> reader, int maxBatchSize, CancellationToken cancellationToken)
{
await reader.WaitToReadAsync(cancellationToken);
var batch = new List<T>();
while (batch.Count < maxBatchSize && reader.TryRead(out T message))
{
batch.Add(message);
}
return batch;
}
}
Code language: C# (cs)
This doesn’t check the output of WaitToReadAsync() – which returns false if writer.Complete() is called. This approach assumes you are consuming continuously while the program is running, and therefore wouldn’t need to deal with a completed writer scenario.
Example of using ChannelReader.ReadMultipleAsync() in a consumer loop
The following code is a consumer processing loop. It uses ReadMultipleAsync() to fetch a batch of items to process.
//Message queue was created with the following:
var messageQueue = Channel.CreateUnbounded<string>();
public async Task ConsumerLoop(CancellationToken cancelToken)
{
while (!cancelToken.IsCancellationRequested)
{
var batch = await messageQueue.Reader.ReadMultipleAsync(maxBatchSize: 5, cancelToken);
Console.WriteLine($"Processing batch: {string.Join(" ", batch)}");
await SendBatch(batch);
Console.WriteLine($"Finished processing {string.Join(" ", batch)}");
Console.WriteLine();
}
}
Code language: C# (cs)
When I run this code and enqueue the numbers between 1-12, it outputs the following:
Type in the items to enqueue. Separate items with ':' and then press enter
1:2:3:4:5:6:7:8:9:10:11:12
Processing batch: 1 2 3 4 5
Finished processing 1 2 3 4 5
Processing batch: 6 7 8 9 10
Finished processing 6 7 8 9 10
Processing batch: 11 12
Finished processing 11 12
Code language: plaintext (plaintext)
It batched 1-5, 6-10, and 11-12. It didn’t wait for more items to arrive to process the 11-12 batch. This shows that it’s able to process full batches and partial batches (which means batches with fewer items than the batch size limit).
Why can’t you batch read with the built-in ChannelReader.ReadAllAsync()?
ChannelReader.ReadAllAsync() works if you want to either process individual items or process FULL batches. You can’t use it to process partial batches. It’s not ideal to have to wait for a full batch before processing it. In most real world scenarios, you’ll want to be able to process partial batches.
To see why ReadAllAsync() can’t be used to batch read (unless you’re ok waiting for full batches), take a look at the source code:
public virtual async IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (TryRead(out T? item))
{
yield return item;
}
}
}
Code language: C# (cs)
Note: You can find the source code here in the .NET GitHub repository.
Let’s say the queue has 1 item. ReadAllAsync() will do the following:
- WaitToReadAsync() returns true, because there’s an item.
- TryRead() will read the item and return true.
- The item will be yielded to the calling code.
- TryRead() will return false, because there’s no more items.
- WaitToReadAsync() won’t return until there’s another item, which means the calling code won’t be able to continue and send the batch with the 1 item in it.
Here’s an example of a consumer loop that uses ReadAllAsync(). It’s only capable of processing full batches:
public async Task ConsumerLoop(CancellationToken cancelToken)
{
while (!cancelToken.IsCancellationRequested)
{
List<string> batch = new List<string>();
await foreach (var message in messageQueue.Reader.ReadAllAsync(cancelToken))
{
batch.Add(message);
if (batch.Count == 5)
{
Console.WriteLine($"Processing batch: {string.Join(" ", batch)}");
await SendBatch(batch);
Console.WriteLine($"Finished processing {string.Join(" ", batch)}");
Console.WriteLine();
}
}
}
}
Code language: C# (cs)
I ran this and enqueued the numbers 1-6. Here’s the output:
Type in the items to enqueue. Separate items with ':' and then press enter
1:2:3:4:5:6
Adding 1 to batch
Adding 2 to batch
Adding 3 to batch
Adding 4 to batch
Adding 5 to batch
Processing batch: 1 2 3 4 5
Finished processing 1 2 3 4 5
Adding 6 to batch
<the cursor is sitting here, because WaitToReadAsync() won't return until there's an item>
Code language: plaintext (plaintext)
It batched 1-5, then added 6 to a new batch but then waited forever. It never came out of the call to ReadAllAsync(), because there were no more items enqueued.
You can’t peek to check if there are items on the queue to be able to handle processing partial batches. This clearly shows that ReadAllAsync() can only be used for processing individual items or full batch processing, but can’t handle partial batch processing.