ASP.NET – Async SSE endpoint

Server-Sent Events (SSE) allow a client to subscribe to messages on a server. The server pushes new messages to the client as they happen. This is an alternative to the client polling the server for new messages.

In this article I’ll show how to implement the messaging system shown in the diagram below. This uses an async SSE endpoint to relay messages from a message queue to the client.

Web API with an async SSE endpoint, a message queue, and a PostMessage endpoint. This shows how a message flows through the whole system.

Before I jump into the implementation, I’ll point out the basics of an async SSE endpoint using this simple example below. There are three key points (I put them as comments in the code below).

[ApiController] public class MessagesController : ControllerBase { [HttpGet] [Route("messages/sse/{id}")] public async Task SimpleSSE(string id) { //1. Set content type Response.ContentType = "text/event-stream"; Response.StatusCode = 200; StreamWriter streamWriter = new StreamWriter(Response.Body); while(!HttpContext.RequestAborted.IsCancellationRequested) { //2. Await something that generates messages await Task.Delay(5000, HttpContext.RequestAborted); //3. Write to the Response.Body stream await streamWriter.WriteLineAsync($"{DateTime.Now} Looping"); await streamWriter.FlushAsync(); } } }
Code language: C# (cs)

There are three key points to setting up an SSE endpoint:

  1. Set the Response.ContentType = “text/event-stream”.
  2. Await something that generates messages in an async manner.
  3. To actually send the message over the SSE stream, simply use a StreamWriter.WriteAsync + FlushAsync on the Response.Body stream.

Now, let’s take a look at how to implement the messaging system that uses an async SSE endpoint.

1 – MessagesController – Add a controller with an async SSE endpoint

The following code is setting up an SSE endpoint and awaiting messages from an async message queue. When messages arrive, it’s writing them out to the Response.Body stream.

[ApiController] public class MessagesController : ControllerBase { private readonly IMessageQueue MessageQueue; public MessagesController(IMessageQueue messageQueue) { MessageQueue = messageQueue; } [HttpGet] [Route("messages/subscribe/{id}")] public async Task Subscribe(string id) { Response.ContentType = "text/event-stream"; Response.StatusCode = 200; StreamWriter streamWriter = new StreamWriter(Response.Body); MessageQueue.Register(id); try { await MessageQueue.EnqueueAsync(id, $"Subscribed to id {id}", HttpContext.RequestAborted); await foreach (var message in MessageQueue.DequeueAsync(id, HttpContext.RequestAborted)) { await streamWriter.WriteLineAsync($"{DateTime.Now} {message}"); await streamWriter.FlushAsync(); } } catch(OperationCanceledException) { //this is expected when the client disconnects the connection } catch(Exception) { Response.StatusCode = 400; } finally { MessageQueue.Unregister(id); } } }
Code language: C# (cs)

2 – MessagesController – Add an endpoint for posting messages

In MessagesController add the following endpoint:

[HttpPost] [Route("messages/{id}")] public async Task<IActionResult> PostMessage(string id, string message) { try { await MessageQueue.EnqueueAsync(id, message, HttpContext.RequestAborted); return Ok(); } catch(Exception ex) { return BadRequest(ex.Message); } }
Code language: C# (cs)

An SSE endpoint is pointless if you don’t have something generating new messages. To keep this implementation simple, but realistic, I’m using an endpoint that allows you to post messages to the message queue. Because the SSE endpoint is awaiting messages from this queue, as soon as you post the message, the it’ll dequeue the message and send it to the SSE client.

3 – MessageQueue – Create an async message queue

Add interface

public interface IMessageQueue { void Register(string id); void Unregister(string id); IAsyncEnumerable<string> DequeueAsync(string id, CancellationToken cancelToken); Task EnqueueAsync(string id, string message, CancellationToken cancelToken); }
Code language: C# (cs)

Implement async MessageQueue

I’m using System.Threading.Channels as an async concurrent queue. Basically when a subscriber registers, I’m creating a new Channel<string>.

MessagesController.PostMessage(…) enqueues messages, while the SSE endpoint dequeues them.

public class MessageQueue : IMessageQueue { private ConcurrentDictionary<string, Channel<string>> clientToChannelMap; public MessageQueue() { clientToChannelMap = new ConcurrentDictionary<string, Channel<string>>(); } public IAsyncEnumerable<string> DequeueAsync(string id, CancellationToken cancelToken) { if (clientToChannelMap.TryGetValue(id, out Channel<string> channel)) { return channel.Reader.ReadAllAsync(cancelToken); } else { throw new ArgumentException($"Id {id} isn't registered"); } } public async Task EnqueueAsync(string id, string message, CancellationToken cancelToken) { if(clientToChannelMap.TryGetValue(id, out Channel<string> channel)) { await channel.Writer.WriteAsync(message, cancelToken); } } public void Register(string id) { if(!clientToChannelMap.TryAdd(id, Channel.CreateUnbounded<string>())) { throw new ArgumentException($"Id {id} is already registered"); } } public void Unregister(string id) { clientToChannelMap.TryRemove(id, out _); } private Channel<string> CreateChannel() { return Channel.CreateUnbounded<string>(); } }
Code language: C# (cs)

Register IMessageQueue in the Startup class

In order to inject the IMessageQueue dependency into the MessagesController, I need to register it in ConfigureServices(…) in the Startup class.

public class Startup { //other methods... public void ConfigureServices(IServiceCollection services) { services.AddSingleton<IMessageQueue, MessageQueue>(); //other service registrations } }
Code language: C# (cs)

Now when a register comes into the MessagesController, it’ll pass in the IMessageQueue singleton.

4 – End results – subscribe using multiple clients and post messages to them

Please refer to the following article about how to create an SSE Client console app.

I started the Web API in IISExpress, then launched five SSE clients. Each client is subscribed to a different id.

I’m using the Swagger-generated UI to post messages.

Using the Swagger-generated UI to post messages to the message queue

Here you can see all of the clients connecting, getting the initial subscription message, then receiving the messages that I posted.

SSE clients subscribing to the SSE endpoint and receiving messages via the SSE stream

Leave a Comment