Server-Sent Events (SSE) allow a client to subscribe to messages on a server. The server pushes new messages to the client as they happen. This is an alternative to the client polling the server for new messages.
In this article I’ll show how to implement the messaging system shown in the diagram below. This uses an async SSE endpoint to relay messages from a message queue to the client.
Before I jump into the implementation, I’ll point out the basics of an async SSE endpoint using this simple example below. There are three key points (I put them as comments in the code below).
[ApiController]
public class MessagesController : ControllerBase
{
[HttpGet]
[Route("messages/sse/{id}")]
public async Task SimpleSSE(string id)
{
//1. Set content type
Response.ContentType = "text/event-stream";
Response.StatusCode = 200;
StreamWriter streamWriter = new StreamWriter(Response.Body);
while(!HttpContext.RequestAborted.IsCancellationRequested)
{
//2. Await something that generates messages
await Task.Delay(5000, HttpContext.RequestAborted);
//3. Write to the Response.Body stream
await streamWriter.WriteLineAsync($"{DateTime.Now} Looping");
await streamWriter.FlushAsync();
}
}
}
Code language: C# (cs)
There are three key points to setting up an SSE endpoint:
- Set the Response.ContentType = “text/event-stream”.
- Await something that generates messages in an async manner.
- To actually send the message over the SSE stream, simply use a StreamWriter.WriteAsync + FlushAsync on the Response.Body stream.
Now, let’s take a look at how to implement the messaging system that uses an async SSE endpoint.
1 – MessagesController – Add a controller with an async SSE endpoint
The following code is setting up an SSE endpoint and awaiting messages from an async message queue. When messages arrive, it’s writing them out to the Response.Body stream.
[ApiController]
public class MessagesController : ControllerBase
{
private readonly IMessageQueue MessageQueue;
public MessagesController(IMessageQueue messageQueue)
{
MessageQueue = messageQueue;
}
[HttpGet]
[Route("messages/subscribe/{id}")]
public async Task Subscribe(string id)
{
Response.ContentType = "text/event-stream";
Response.StatusCode = 200;
StreamWriter streamWriter = new StreamWriter(Response.Body);
MessageQueue.Register(id);
try
{
await MessageQueue.EnqueueAsync(id, $"Subscribed to id {id}", HttpContext.RequestAborted);
await foreach (var message in MessageQueue.DequeueAsync(id, HttpContext.RequestAborted))
{
await streamWriter.WriteLineAsync($"{DateTime.Now} {message}");
await streamWriter.FlushAsync();
}
}
catch(OperationCanceledException)
{
//this is expected when the client disconnects the connection
}
catch(Exception)
{
Response.StatusCode = 400;
}
finally
{
MessageQueue.Unregister(id);
}
}
}
Code language: C# (cs)
Read more about how you can change StreamWriter’s buffer size.
2 – MessagesController – Add an endpoint for posting messages
In MessagesController add the following endpoint:
[HttpPost]
[Route("messages/{id}")]
public async Task<IActionResult> PostMessage(string id, string message)
{
try
{
await MessageQueue.EnqueueAsync(id, message, HttpContext.RequestAborted);
return Ok();
}
catch(Exception ex)
{
return BadRequest(ex.Message);
}
}
Code language: C# (cs)
An SSE endpoint is pointless if you don’t have something generating new messages. To keep this implementation simple, but realistic, I’m using an endpoint that allows you to post messages to the message queue. Because the SSE endpoint is awaiting messages from this queue, as soon as you post the message, the it’ll dequeue the message and send it to the SSE client.
3 – MessageQueue – Create an async message queue
Add interface
public interface IMessageQueue
{
void Register(string id);
void Unregister(string id);
IAsyncEnumerable<string> DequeueAsync(string id, CancellationToken cancelToken);
Task EnqueueAsync(string id, string message, CancellationToken cancelToken);
}
Code language: C# (cs)
Implement async MessageQueue
When a subscriber registers, I’m adding them to a ConcurrentDictionary (and then removing them when they unregister). The dictionary contains the subscriber id and a Channel<string>. I’m using the Channel as a concurrent queue. The PostMessage() method enqueues new messages, while the SSE endpoint dequeues (and then sends them to the subscriber).
public class MessageQueue : IMessageQueue
{
private ConcurrentDictionary<string, Channel<string>> clientToChannelMap;
public MessageQueue()
{
clientToChannelMap = new ConcurrentDictionary<string, Channel<string>>();
}
public IAsyncEnumerable<string> DequeueAsync(string id, CancellationToken cancelToken)
{
if (clientToChannelMap.TryGetValue(id, out Channel<string> channel))
{
return channel.Reader.ReadAllAsync(cancelToken);
}
else
{
throw new ArgumentException($"Id {id} isn't registered");
}
}
public async Task EnqueueAsync(string id, string message, CancellationToken cancelToken)
{
if(clientToChannelMap.TryGetValue(id, out Channel<string> channel))
{
await channel.Writer.WriteAsync(message, cancelToken);
}
}
public void Register(string id)
{
if(!clientToChannelMap.TryAdd(id, Channel.CreateUnbounded<string>()))
{
throw new ArgumentException($"Id {id} is already registered");
}
}
public void Unregister(string id)
{
clientToChannelMap.TryRemove(id, out _);
}
private Channel<string> CreateChannel()
{
return Channel.CreateUnbounded<string>();
}
}
Code language: C# (cs)
Register IMessageQueue in the Startup class
In order to inject the IMessageQueue dependency into the MessagesController, I need to register it in ConfigureServices(…) in the Startup class.
public class Startup
{
//other methods...
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IMessageQueue, MessageQueue>();
//other service registrations
}
}
Code language: C# (cs)
Now when a register comes into the MessagesController, it’ll pass in the IMessageQueue singleton.
4 – End results – subscribe using multiple clients and post messages to them
To see this working, I connected to the endpoint with multiple clients and posted messages. The posted messages were relayed to the clients and they wrote them to the client. This outputs the following messages in a console app: