Dec 12, 2024

Backpressure in WebSockets using Web Streams

WebSockets allow bi-directional communication between two processes over a TCP connection. Since its standardisation in 2011, it has become a well defined protocol.

The web platform offers the WebSockets API to interact with WebSockets through a simple interface - just attach an onmessage handler, and you’re good to go!

const ws = new WebSocket("wss://...");

ws.onmessage = (messageEvent) => {
  const data = JSON.parse(messageEvent.data);
  // ...
};

Unfortunately, this API does not support backpressure. A chatty websocket connection may end up overwhelming your application with messages if your handler is slow. This is a well known problem, and there are atleast two proposals to fix it:

  1. WebSocketStream - a Web Streams based API for web sockets to handle backpressure automatically. Unfortunately, it is a non-standardised API, and is currently only available on Chrome.
  2. WebTransport - a modern replacement for the WebSocket API. It is standardised, but unfortunately is not available on Safari yet.

At the time of writing, neither of these solutions are available widely enough to be usable in production. Fortunately, we can build our own version of the first proposal, and use the Web Streams API to handle backpressure!

Web Streams

The Streams API is all about processing small chunks of data in a pipeline.

There can be any number of TransformStreams in the middle of a pipeline, but only one readable stream at the start, and only one writable stream at the end.

Backpressure

If a source produces data faster than the destination can receive it, it causes backpressure in the pipeline. This pressure must be handled correctly, or it can lead to an application’s memory usage to grow uncontrollably, eventually causing it to lock up or crash.

To deal with backpressure, you can follow one of three strategies:

  1. Signal the source to slow down — this is great if your input source controls the speed at which it reads data — filesystems, HTTP responses, etc.
  2. Queue the unprocessed messages for consumption later by the stream — the size of the queue must be bounded though, so there is a limit to the number of messages that can be kept waiting.
  3. Drop messages — while it may not work for all applications, this is a perfectly valid strategy.

Web streams support all three strategies, and use an internal queue to buffer unprocessed data. The size of this queue is configurable through the highWaterMark property.

Thankfully, web streams have built in support for handling backpressure, and there is an excellent MDN guide on how to do so.

Web Sockets with Streams

In a web socket connection, you can not control the speed at which a server sends messages. Therefore, you have no choice but to drop them if the stream is backed up. When the stream is free to process data again, you can resume receiving messages.

You can implement this behaviour by creating a pull-based ReadableStream from a web socket.

function streamWs(ws: WebSocket): ReadableStream<MessageEvent> {
  const stream = new ReadableStream<MessageEvent>({
    // mandatory, but we don't need to do anything here
    start: () => {},
    pull: async (controller) => {
      const message = await nextMsg(ws);
      controller.enqueue(message);
    },
  });
}

async function nextMsg(ws: WebSocket): Promise<MessageEvent> {
  const message = await new Promise<MessageEvent>((resolve, reject) => {
    connection.onmessage = resolve;
    connection.onerror = reject;
  });

  return message;
}

Whenever the stream has the ability to accept more messages, it calls pull to fetch data from the source. This implementation awaits on the next message from the socket, and gives it to the stream.

The stream will continue pulling data until:

The size of the internal queue can be configured by supplying a backpressure strategy:

function streamWs(ws: WebSocket): ReadableStream<MessageEvent> {
  const stream = new ReadableStream<MessageEvent>(
    {
      start: () => {},
      pull: (controller) => { ... },
    },
    new CountQueuingStrategy({ highWaterMark: 100 })
  );
}

A stream should also propagate cancellation signals to its source, so you can close the web socket when the stream is cancelled.

function streamWs(ws: WebSocket): ReadableStream<MessageEvent> {
  const stream = new ReadableStream<MessageEvent>(
    {
      start: () => {},
      pull: (controller) => { ... },
      cancel: () => {
        ws.close();
      },
    },
    new CountQueuingStrategy({ highWaterMark: 100 })
  );
}

And voila! We now have a ReadableStream backed by a web socket that supports backpressure.

Usage

For my open source project ATProto Browser, I recently built a live feed of Bluesky posts on the home page based on the Bluesky Jetstream.

Jetstream gives you the ability to listen to Bluesky’s firehose for realtime updates on the network over a web socket connection.

I used the implementation described above to subscribe to Jetstream updates in a React component. Here is a simplified implementation.

import { useEffect, useState, useCallback } from "react";

function useJetstream() {
  // Buffered posts in memory
  const [posts, setPosts] = useState([]);

  const onNewPost = useCallback((post: AppBskyFeedPost.Record) => {
    // Add the new post to the front of the buffer
    // and remove the oldest post from the back.
    setPosts([post, ...posts.slice(0, BUFFER_SIZE - 1)]);
  }, []);

  useEffect(() => {
    const controller = new AbortController();
    const ws = new WebSocket("wss://jetstream1.us-east.bsky.network");

    streamWs(ws).pipeTo(collect(onNewPost), { signal: controller.signal });

    // Cancel the stream when the component unmounts
    return () => controller.abort();
  }, []);

  return { posts };
}

You can find the actual implementation here.

Notes