101 Switching Protocols;poll event loop;Learn Zig Series):Last episode we built the WebSocket protocol as a set of pure functions -- computeAccept, parseFrame, encodeFrame, the masking helper -- and I left you three exercises that fill the gaps a real endpoint needs. All three reuse the Opcode, Frame and parseFrame definitions from episode 87, so keep that file open beside this one.
Exercise 1: Detect fragmentation
A WebSocket message can be split: a first frame with fin = false, then any number of continuation frames, ending with one whose fin = true. The opcode of the whole message is taken from the first frame; the continuations carry opcode 0x0. Here's a small reassembler that feeds in frames and hands back a complete message when the final fragment lands:
const std = @import("std");
// Reuses Opcode, Frame, parseFrame from episode 87.
const Assembler = struct {
buf: std.ArrayListUnmanaged(u8) = .{},
first_opcode: Opcode = .text,
in_progress: bool = false,
/// Feed one parsed frame. Returns the assembled message when the final
/// fragment arrives, otherwise null (meaning "more fragments expected").
fn feed(self: *Assembler, alloc: std.mem.Allocator, frame: Frame) !?struct { opcode: Opcode, data: []u8 } {
if (frame.opcode == .continuation) {
if (!self.in_progress) return error.UnexpectedContinuation;
} else {
if (self.in_progress) return error.NestedMessage; // a new message before finishing the old one
self.first_opcode = frame.opcode; // opcode comes from the FIRST frame
self.in_progress = true;
}
try self.buf.appendSlice(alloc, frame.payload);
if (!frame.fin) return null;
self.in_progress = false;
return .{ .opcode = self.first_opcode, .data = self.buf.items };
}
};
test "reassemble a fragmented text message" {
const alloc = std.testing.allocator;
var state = Assembler{};
defer state.buf.deinit(alloc);
const f1 = Frame{ .fin = false, .opcode = .text, .payload = @constCast("Hel") };
const f2 = Frame{ .fin = true, .opcode = .continuation, .payload = @constCast("lo") };
try std.testing.expect((try state.feed(alloc, f1)) == null);
const msg = (try state.feed(alloc, f2)).?;
try std.testing.expectEqual(Opcode.text, msg.opcode);
try std.testing.expectEqualStrings("Hello", msg.data);
}
The two error cases are the ones people forget: a continuation arriving when nothing is in progress, and a fresh text/binary frame arriving while a message is still being assembled. The RFC says both are protocol violations, and an endpoint that ignores them is exactly the kind of soft target a fuzzer loves. We'll lift this very Assembler straight into the server in a moment.
Exercise 2: Write a client-side encoder
The server never masks, but a client always must. So encodeMaskedFrame sets the MASK bit, generates four random key bytes with std.crypto.random.bytes, writes the key into the frame, and XORs the payload with it:
const std = @import("std");
// Reuses Opcode and parseFrame from episode 87.
pub fn encodeMaskedFrame(opcode: Opcode, payload: []const u8, out: []u8) !usize {
out[0] = 0x80 | @as(u8, @intFromEnum(opcode)); // FIN=1, single frame
var i: usize = 2;
if (payload.len <= 125) {
out[1] = 0x80 | @as(u8, @intCast(payload.len)); // 0x80 = MASK bit
} else if (payload.len <= 0xFFFF) {
out[1] = 0x80 | 126;
std.mem.writeInt(u16, out[2..4], @intCast(payload.len), .big);
i = 4;
} else {
out[1] = 0x80 | 127;
std.mem.writeInt(u64, out[2..10], payload.len, .big);
i = 10;
}
var mask: [4]u8 = undefined;
std.crypto.random.bytes(&mask);
@memcpy(out[i..][0..4], &mask);
i += 4;
if (out.len < i + payload.len) return error.BufferTooSmall;
for (payload, 0..) |byte, j| out[i + j] = byte ^ mask[j % 4];
return i + payload.len;
}
test "masked client frame round-trips through parseFrame" {
var out: [64]u8 = undefined;
const n = try encodeMaskedFrame(.text, "hello zig", &out);
const p = (try parseFrame(out[0..n])).?; // parseFrame unmasks for us
try std.testing.expectEqual(Opcode.text, p.frame.opcode);
try std.testing.expectEqualStrings("hello zig", p.frame.payload);
}
Because parseFrame from last episode already handles the MASK bit and unmasks in place, the round-trip test needs no extra plumbing -- encode masked, parse, and the original bytes come back out. That symmetry is the payoff of having written the parser to handle both directions.
Exercise 3: Validate close codes
A close frame may carry a 2-byte status code, and the RFC reserves several that must never travel on the wire (1004, 1005, 1006, 1015, plus everything below 1000 and a band of unassigned protocol codes). An empty payload is legal -- it just means "no code given" -- but a single lonely byte is malformed:
const std = @import("std");
pub fn closeCode(payload: []const u8) !?u16 {
if (payload.len == 0) return null; // valid: closing with no status code
if (payload.len < 2) return error.MalformedClose;
const code = std.mem.readInt(u16, payload[0..2], .big);
const invalid = code < 1000 or
code == 1004 or code == 1005 or code == 1006 or code == 1015 or
(code >= 1016 and code <= 2999);
if (invalid) return error.InvalidCloseCode;
return code;
}
test "close code validation" {
try std.testing.expectEqual(@as(?u16, null), try closeCode(&.{}));
try std.testing.expectEqual(@as(?u16, 1000), try closeCode(&.{ 0x03, 0xE8 })); // 1000 = normal
try std.testing.expectError(error.InvalidCloseCode, closeCode(&.{ 0x03, 0xEE })); // 1006 = reserved
try std.testing.expectError(error.MalformedClose, closeCode(&.{0x03}));
}
0x03E8 is 1000 (normal closure) and 0x03EE is 1006 (which the RFC forbids on the wire -- it's only ever set locally to mean "the connection dropped abnormally"). Three lines of validation, and a whole class of malformed-input bugs disappears.
At the close of episode 87 I wrote that what we still lacked was "the thing that holds it all together over a live connection -- the loop that accepts a socket, performs the upgrade, then sits there reading frames and reacting to them." Well, here we go ;-) Today we wire those pure functions into an actual server: one that accepts TCP connections, upgrades each to WebSocket, reads frames as they arrive, answers pings, honours closes, reassembles fragmented messages, and -- the part that makes it feel alive -- broadcasts a message from one client to all the others. And it does all of that on a single thread, using the non-blocking poll muscle we trained back in episode 21 and exercised again last episode.
A raw TCP socket that wants to become a WebSocket goes through a tiny life cycle, and episode 33 (state machines with tagged unions) taught us to make that explicit rather than tracking it with a fistful of booleans. Four states:
const std = @import("std");
const posix = std.posix;
pub const ConnState = enum { handshaking, open, closing, closed };
pub const Connection = struct {
fd: posix.fd_t,
state: ConnState = .handshaking,
// bytes read off the socket but not yet consumed by the frame parser
in_buf: [16 * 1024]u8 = undefined,
in_len: usize = 0,
// reassembly state for a fragmented message in flight
msg: std.ArrayListUnmanaged(u8) = .{},
msg_opcode: Opcode = .text,
assembling: bool = false,
pub fn deinit(self: *Connection, alloc: std.mem.Allocator) void {
self.msg.deinit(alloc);
}
};
Every connection starts in handshaking. Once we've seen the HTTP upgrade request and answered it, it flips to open and stays there for the bulk of its life. When either side sends a close frame we move to closing (we still want to flush our own close reply), and finally closed means "tear this down and forget it". The in_buf is a fixed 16 KB accumulator -- the same size as a TLS record from episode 86, which is not a coincidence: matching those layers keeps the buffering predictable. The msg ArrayList is where fragments pile up until a message is complete.
Nota bene: a fixed 16 KB in_buf puts a hard ceiling on a single frame's header-plus-payload that we keep un-drained at once. For a chat server that's plenty; for anything streaming big binary blobs you'd grow this on demand. We come back to that ceiling in the performance section, because it's also your first line of defence against a hostile peer.
The handshake is an ordinary HTTP/1.1 request -- the kind we parsed back in episode 84 -- terminated by the blank line \r\n\r\n. We don't need a full HTTP parser here; we need exactly one header value, Sec-WebSocket-Key. So a small case-insensitive header scan does the job:
fn findHeader(request: []const u8, name: []const u8) ?[]const u8 {
var it = std.mem.splitSequence(u8, request, "\r\n");
_ = it.next(); // skip the request line ("GET /chat HTTP/1.1")
while (it.next()) |line| {
const colon = std.mem.indexOfScalar(u8, line, ':') orelse continue;
if (std.ascii.eqlIgnoreCase(line[0..colon], name)) {
return std.mem.trim(u8, line[colon + 1 ..], " ");
}
}
return null;
}
fn performUpgrade(conn: *Connection) !void {
const request = conn.in_buf[0..conn.in_len];
const key = findHeader(request, "Sec-WebSocket-Key") orelse return error.BadHandshake;
var resp_buf: [256]u8 = undefined;
const resp = try writeHandshakeResponse(key, &resp_buf); // from episode 87
try writeAll(conn.fd, resp);
conn.state = .open;
conn.in_len = 0; // the handshake bytes are spent; reuse the buffer for frames
}
The header names in WebSocket are case-insensitive (Sec-WebSocket-Key, sec-websocket-key, whatever a quirky client sends), which is why std.ascii.eqlIgnoreCase earns its keep. writeHandshakeResponse is the function from last episode -- it SHA-1's the key with the magic GUID and writes back the 101 Switching Protocols. Once that response is on the wire, both ends drop HTTP and the socket is a WebSocket. We zero in_len so the same buffer is now free for incoming frames.
That writeAll helper matters more than it looks. A write on a non-blocking socket can send fewer bytes than you asked, so you loop until the whole slice is gone:
fn writeAll(fd: posix.fd_t, bytes: []const u8) !void {
var sent: usize = 0;
while (sent < bytes.len) {
sent += posix.write(fd, bytes[sent..]) catch |err| switch (err) {
error.WouldBlock => continue, // kernel buffer full -- spin briefly (see exercises for the real fix)
else => return err,
};
}
}
For a teaching server, spinning on WouldBlock is acceptable. For a production one it is not -- a slow client would let you burn a core busy-waiting, and the honest fix is a per-connection write queue flushed on POLLOUT. That's exercise 1, and it's the single biggest difference between a toy and a real WebSocket server.
When poll tells us a socket is readable, we read whatever's there onto the tail of in_buf, then drain every complete frame the buffer now holds. Reads off a stream socket arrive in arbitrary chunks -- you might get half a frame, or three and a half frames, in one read -- so the loop keeps calling parseFrame until it returns null ("not enough bytes yet"):
pub fn onReadable(conn: *Connection, alloc: std.mem.Allocator) !void {
const space = conn.in_buf[conn.in_len..];
if (space.len == 0) return error.FrameTooLarge; // peer overran our 16 KB buffer
const n = try posix.read(conn.fd, space);
if (n == 0) { conn.state = .closed; return; } // orderly peer hangup
conn.in_len += n;
if (conn.state == .handshaking) {
// wait for the full request before attempting the upgrade
if (std.mem.indexOf(u8, conn.in_buf[0..conn.in_len], "\r\n\r\n") != null) {
try performUpgrade(conn);
}
return;
}
try drainFrames(conn, alloc);
}
fn drainFrames(conn: *Connection, alloc: std.mem.Allocator) !void {
while (try parseFrame(conn.in_buf[0..conn.in_len])) |parsed| {
try handleFrame(conn, parsed.frame, alloc);
// slide the unconsumed tail to the front of the buffer
const rest = conn.in_len - parsed.consumed;
std.mem.copyForwards(u8, conn.in_buf[0..rest], conn.in_buf[parsed.consumed..conn.in_len]);
conn.in_len = rest;
if (conn.state == .closed) return;
}
}
The copyForwards after each frame is the unglamorous heart of a stream parser: once a frame is consumed, shuffle the leftovers back to offset zero so the next read has room and the next parseFrame starts at a frame boundary. copyForwards (rather than @memcpy) because the source and destination ranges can overlap -- we're sliding bytes leftward within the same buffer, exactly the case @memcpy forbids. A fancier server would use a ring buffer to avoid the copy entirely, but for 16 KB the move is cheap and the code stays legible.
With a decoded Frame in hand, the server reacts according to its opcode. Control frames (ping, pong, close) get answered immediately; data frames (text, binary, continuation) flow into the reassembler. This is where the protocol's little courtesies live:
fn handleFrame(conn: *Connection, frame: Frame, alloc: std.mem.Allocator) !void {
switch (frame.opcode) {
.ping => {
var out: [128]u8 = undefined;
const n = try encodePong(frame.payload, &out); // echo the payload, episode 87
try writeAll(conn.fd, out[0..n]);
},
.pong => {}, // heartbeat acknowledged -- a real server resets a liveness timer here
.close => {
var out: [4]u8 = undefined;
const n = try encodeFrame(.close, &.{}, &out); // reply with an empty close
try writeAll(conn.fd, out[0..n]);
conn.state = .closed;
},
.text, .binary, .continuation => try handleData(conn, frame, alloc),
_ => return error.UnknownOpcode, // forward-compat: unknown opcodes are a protocol error
}
}
A ping demands a pong with the same payload echoed back -- that's the heartbeat that lets you notice a connection that's silently died behind a NAT timeout, something TCP itself can take minutes to admit. A close frame gets a close reply, and then we're done. The non-exhaustive _ arm catches any opcode the RFC hasn't defined; rejecting it with an error (rather than panicking) is the forward-compatible move episode 6 drilled into us.
Data frames go through the very Assembler logic from exercise 1, now folded into the connection. A message can be one frame (the common case, fin = true immediately) or many (fin = false until the last). We only hand the message up to the application once it's whole:
fn handleData(conn: *Connection, frame: Frame, alloc: std.mem.Allocator) !void {
if (frame.opcode != .continuation) {
if (conn.assembling) return error.NestedMessage;
conn.msg_opcode = frame.opcode; // opcode taken from the first frame
conn.assembling = true;
} else if (!conn.assembling) {
return error.UnexpectedContinuation;
}
try conn.msg.appendSlice(alloc, frame.payload);
if (frame.fin) {
try onMessage(conn, conn.msg_opcode, conn.msg.items);
conn.msg.clearRetainingCapacity(); // keep the allocation, drop the length
conn.assembling = false;
}
}
Notice clearRetainingCapacity instead of deinit -- we want to reuse that buffer for the next message rather than free-and-realloc on every single one (a lesson straight from the allocator episodes). The onMessage callback is where your application logic finally lives. The simplest possible one just echoes the message back to its sender:
fn onMessage(conn: *Connection, opcode: Opcode, data: []const u8) !void {
var hdr: [10]u8 = undefined;
const hn = try encodeFrameHeader(opcode, data.len, &hdr);
try writeAll(conn.fd, hdr[0..hn]); // header first...
try writeAll(conn.fd, data); // ...then the payload, uncopied
}
Here's a small but worthwhile refinement over last episode's encodeFrame, which copied the payload into the output buffer. For a server pushing large messages, that copy is pure waste -- the payload already sits in memory somewhere. So we split out a header-only encoder and write the payload straight from its own slice:
/// Write ONLY the server frame header (FIN set, MASK clear). Returns header length
/// so the caller can send the payload separately, with no copy.
fn encodeFrameHeader(opcode: Opcode, len: usize, out: []u8) !usize {
out[0] = 0x80 | @as(u8, @intFromEnum(opcode)); // FIN=1
if (len <= 125) {
out[1] = @intCast(len);
return 2;
} else if (len <= 0xFFFF) {
out[1] = 126;
std.mem.writeInt(u16, out[2..4], @intCast(len), .big);
return 4;
} else {
out[1] = 127;
std.mem.writeInt(u64, out[2..10], len, .big);
return 10;
}
}
Two writeAll calls in stead of one buffer-copy-plus-send. On a megabyte payload that's a megabyte of copying you simply don't do -- the kind of detail that separates a server that scales from one that quietly chokes under load.
Now the conductor. A single thread owns a listening socket plus a map of live connections, and poll tells it which file descriptors are ready each tick. This is the same architecture as the HTTP server from episode 51, just with WebSocket connections that persist rather than serving one request and closing:
pub fn run(alloc: std.mem.Allocator, address: std.net.Address) !void {
var listener = try address.listen(.{ .reuse_address = true });
defer listener.deinit();
try setNonBlocking(listener.stream.handle); // from episode 87's exercise 2
var conns = std.AutoHashMap(posix.fd_t, *Connection).init(alloc);
defer conns.deinit();
var pollfds = std.ArrayList(posix.pollfd).init(alloc);
defer pollfds.deinit();
while (true) {
// rebuild the pollfd set: the listener plus every live connection
pollfds.clearRetainingCapacity();
try pollfds.append(.{ .fd = listener.stream.handle, .events = posix.POLL.IN, .revents = 0 });
var keys = conns.keyIterator();
while (keys.next()) |fd| {
try pollfds.append(.{ .fd = fd.*, .events = posix.POLL.IN, .revents = 0 });
}
_ = try posix.poll(pollfds.items, -1); // block until at least one fd is ready
for (pollfds.items) |pfd| {
if (pfd.revents == 0) continue;
if (pfd.fd == listener.stream.handle) {
try acceptNew(alloc, &listener, &conns);
} else if (conns.get(pfd.fd)) |conn| {
onReadable(conn, alloc) catch { conn.state = .closed; };
if (conn.state == .closed) {
posix.close(conn.fd);
conn.deinit(alloc);
_ = conns.remove(pfd.fd);
alloc.destroy(conn);
}
}
}
}
}
The listener itself is non-blocking, so when it signals readable we drain the whole accept backlog in one go and stop on WouldBlock:
fn acceptNew(alloc: std.mem.Allocator, listener: *std.net.Server, conns: *std.AutoHashMap(posix.fd_t, *Connection)) !void {
while (true) {
const c = listener.accept() catch |err| switch (err) {
error.WouldBlock => return, // backlog drained
else => return err,
};
try setNonBlocking(c.stream.handle);
const conn = try alloc.create(Connection);
conn.* = .{ .fd = c.stream.handle };
try conns.put(c.stream.handle, conn);
}
}
One thread, no locks, hundreds of connections. Because every socket is non-blocking and we only ever touch one when poll says it's ready, nothing here ever sleeps holding up the others. This is the shape that the async groundwork from the last several episodes was quietly building toward -- a connection isn't a thread, it's an entry in a map plus a tiny state machine, and the event loop services whichever ones have news.
Echoing back to one sender is a warm-up. The thing people actually build WebSocket servers for -- chat rooms, live dashboards, multiplayer game state -- is fan-out: one message in, many copies out. With the connection registry already in hand, broadcast is a loop, and the header is encoded exactly once and reused for every recipient:
fn broadcast(conns: *std.AutoHashMap(posix.fd_t, *Connection), opcode: Opcode, data: []const u8) void {
var hdr: [10]u8 = undefined;
const hn = encodeFrameHeader(opcode, data.len, &hdr) catch return;
var it = conns.valueIterator();
while (it.next()) |conn| {
if (conn.*.state != .open) continue; // skip half-open / closing sockets
writeAll(conn.*.fd, hdr[0..hn]) catch { conn.*.state = .closed; continue; };
writeAll(conn.*.fd, data) catch { conn.*.state = .closed; };
}
}
A client that errors mid-write is simply marked closed and reaped on the next event-loop tick -- one slow or dead peer never takes down the broadcast for everyone else. From here, "rooms" is just a second map keyed by room name, and a chat server is mostly the glue between onMessage and broadcast. We've got every brick; the building is around the corner.
Three things bite once real traffic (or a real attacker) arrives. The first is partial writes: as flagged above, a non-blocking write can refuse, and a production server queues the unsent tail and flushes it on POLLOUT rather than spinning. Skip this and one slow reader pins a CPU.
The second is a maximum message size. The reassembly buffer (conn.msg) grows with every continuation frame, so a peer that streams continuations forever is asking you to allocate until the OOM killer ends the argument. Pick a ceiling -- say 1 MB for a chat app -- and the moment conn.msg.items.len would exceed it, send a close with code 1009 ("message too big") and drop the connection. That's exercise 3.
The third is masking throughput on the read side, which we met last episode: parseFrame unmasks every client byte, and on large payloads the i % 4 loop wants the @Vector SIMD treatment from episode 19 to mask a word (or sixteen bytes) at a time. Chat messages don't care; a server relaying screen-share frames very much does.
Having said that, the single most important rule is the boring one: never trust the length field. A 2-byte header can claim a 16-exabyte u64 payload, and a server that believes it will try to allocate the universe. Bound every length against your buffer and your max-message size before acting on it. The bit-twiddling is the fun part; the bounds checks are the part that keeps you out of the incident channel.
In C, this is roughly what libwebsockets does, and it's a sprawling, battle-tested library precisely because all the edges we just walked -- partial writes, fragmentation, control-frame interleaving, length validation -- are easy to get subtly wrong with raw pointers. The event-loop shape is identical; what C lacks is slices that carry their length and a non-exhaustive enum that turns an unknown opcode into a handled error rather than undefined behaviour.
In Go, you'd reach for gorilla/websocket or nhooyr.io/websocket and lean on a goroutine per connection, which makes the concurrency almost invisible -- conn.ReadMessage() in a loop and you're done. It's wonderfully productive; the trade is that you're inside Go's scheduler and garbage collector, with far less say over exactly when buffers are reused. Our single-threaded poll loop trades that ergonomic win for total control over allocation and scheduling.
In Rust, tokio-tungstenite gives you an async stream of messages, memory-safe and rigorous about the masking and UTF-8 rules the RFC demands. It is arguably the most correct-by-default option of the lot, at the price of wrestling async lifetimes. Zig sits where it likes to sit: you wrote the whole server in a couple hundred lines, you can see every byte and every allocation, and it cross-compiles to a tiny static binary with no runtime to ship. For production you might still pick a hardened library -- but now you understand precisely what it's doing under the hood, which was the entire point ;-)
Step back and look at what we've assembled. From episode 21's TCP sockets, episode 84's HTTP parsing, episode 85's binary framing, episode 86's TLS, and episode 87's protocol functions, we now have a complete, single-threaded, event-driven WebSocket server: it accepts connections, upgrades them, reads and dispatches frames, reassembles fragments, answers heartbeats, closes politely, and fans messages out to a whole room of clients. That is a genuinely useful piece of infrastructure, written from the bytes up with nothing hidden.
What it isn't yet is an application. A server that echoes and broadcasts is a stage with no play on it. The natural next move is to design a real message format on top of these frames -- who's speaking, to which room, with what history -- and build something people would actually open in a browser and use. The protocol is finished; what we put through it is where the fun starts.
The state machine, the event loop, the broadcast fan-out -- they aren't separate tricks, they're the skeleton of every real-time service you'll ever write, and you've now built it with your eyes wide open.
Add a write queue. Give Connection an outgoing buffer (a std.ArrayListUnmanaged(u8)). When writeAll hits error.WouldBlock, append the unsent tail to that buffer in stead of spinning, and register interest in POLL.OUT for that fd. In the event loop, when a socket reports writable, flush as much of the queue as the kernel accepts and clear POLL.OUT once it's empty. Prove it by writing to a client that never reads.
Implement a heartbeat. Every 30 seconds (use the timer approach from episode 70), send a ping to each open connection and record the time. If a connection hasn't answered with a pong within, say, 10 seconds, mark it closed and reap it. This is how you evict the half-dead sockets that TCP keepalive is too slow to catch.
Enforce a maximum message size. In handleData, before appendSlice, check whether adding this fragment would push conn.msg.items.len past a 1 MB cap. If it would, send a close frame with status code 1009 ("message too big"), set the state to closed, and stop reassembling. Add a test that feeds oversized continuation frames and asserts the connection is closed rather than the allocator exhausted.