PUBLISH packet -- topic, packet identifier, payload -- straight off the wire;+ and #) work, and how to match them in a handful of lines;Learn Zig Series):Last episode we wired the WebSocket protocol functions into a real single-threaded server -- accept, upgrade, drain frames, dispatch, broadcast. I left you three exercises, and all three are the difference between a server that demos nicely and one that survives a hostile peer. Here are the solutions, each reusing the Connection, writeAll and encode* helpers from episode 88.
Exercise 1: Add a write queue
The toy writeAll spins on WouldBlock, which means one slow reader pins a CPU core. The honest fix is to give every connection an outgoing buffer, append the unsent tail when the kernel says "full", and flush it later when poll reports the socket writable:
const std = @import("std");
const posix = std.posix;
// Connection now carries an outbound queue and a "want POLL.OUT" flag.
// out_buf holds bytes the kernel refused; want_write tells the event loop
// to add POLL.OUT to this fd's interest set.
fn queueWrite(conn: *Connection, alloc: std.mem.Allocator, bytes: []const u8) !void {
// If nothing is already queued, try to send immediately -- the common, fast path.
if (conn.out_buf.items.len == 0) {
var sent: usize = 0;
while (sent < bytes.len) {
sent += posix.write(conn.fd, bytes[sent..]) catch |err| switch (err) {
error.WouldBlock => break, // kernel buffer full: stash the rest
else => return err,
};
}
if (sent == bytes.len) return; // everything went out, no queueing needed
try conn.out_buf.appendSlice(alloc, bytes[sent..]);
} else {
// A backlog already exists: preserve ordering, just append.
try conn.out_buf.appendSlice(alloc, bytes);
}
conn.want_write = true; // event loop will register POLL.OUT for this fd
}
fn flushWrites(conn: *Connection) !void {
const q = conn.out_buf.items;
var sent: usize = 0;
while (sent < q.len) {
sent += posix.write(conn.fd, q[sent..]) catch |err| switch (err) {
error.WouldBlock => break, // still backed up: keep what's left, try next tick
else => return err,
};
}
// Slide the unflushed tail to the front and shrink.
std.mem.copyForwards(u8, conn.out_buf.items[0..], conn.out_buf.items[sent..]);
conn.out_buf.shrinkRetainingCapacity(q.len - sent);
if (conn.out_buf.items.len == 0) conn.want_write = false; // drop POLL.OUT interest
}
The key insight is the ordering rule: once anything is queued, everything must queue, or you'll interleave a fresh write ahead of bytes that were already waiting and corrupt the stream. The want_write flag is what you OR into that connection's events when you rebuild the pollfd set each tick -- no flag, no POLL.OUT, so you only ask the kernel about writability when you actually have something stuck.
Exercise 2: Implement a heartbeat
A NATted connection can die silently and TCP will not tell you for minutes. So we ping on a timer (the std.time.milliTimestamp approach from episode 70) and reap anything that does not pong back in time:
const std = @import("std");
// Connection gains: last_ping_ms: i64 = 0, awaiting_pong: bool = false.
fn tickHeartbeat(conn: *Connection, now_ms: i64) !void {
if (conn.state != .open) return;
// No outstanding ping and 30s elapsed -> send a fresh one.
if (!conn.awaiting_pong and now_ms - conn.last_ping_ms >= 30_000) {
var out: [2]u8 = undefined;
const n = try encodeFrame(.ping, &.{}, &out); // empty-payload ping, episode 87
try writeAll(conn.fd, out[0..n]);
conn.last_ping_ms = now_ms;
conn.awaiting_pong = true;
}
// Ping outstanding for more than 10s with no pong -> the peer is gone.
if (conn.awaiting_pong and now_ms - conn.last_ping_ms >= 10_000) {
conn.state = .closed;
}
}
// In the opcode dispatcher, a received pong clears the flag:
// .pong => conn.awaiting_pong = false,
You call tickHeartbeat for every connection on each loop iteration, passing one timestamp you sampled at the top of the tick (don't call the clock per-connection -- that's a syscall you don't need). The pong handler clears awaiting_pong, and the connection lives another 30 seconds. This is the eviction mechanism that TCP keepalive is simply too slow and too coarse to provide.
Exercise 3: Enforce a maximum message size
The reassembly buffer grows with every continuation frame, so a peer streaming continuations forever is asking you to allocate until the OOM killer settles the argument. Bound it, and close with status 1009 the instant the next fragment would cross the line:
const std = @import("std");
const max_message: usize = 1 << 20; // 1 MB ceiling for a chat-style server
fn handleData(conn: *Connection, frame: Frame, alloc: std.mem.Allocator) !void {
// Reject BEFORE appending -- never allocate the byte that breaks the rule.
if (conn.msg.items.len + frame.payload.len > max_message) {
var out: [4]u8 = undefined;
// 1009 = "message too big", big-endian 2-byte close code.
const n = try encodeFrame(.close, &.{ 0x03, 0xF1 }, &out);
writeAll(conn.fd, out[0..n]) catch {};
conn.state = .closed;
return;
}
if (frame.opcode != .continuation) {
if (conn.assembling) return error.NestedMessage;
conn.msg_opcode = frame.opcode;
conn.assembling = true;
} else if (!conn.assembling) {
return error.UnexpectedContinuation;
}
try conn.msg.appendSlice(alloc, frame.payload);
if (frame.fin) {
try onMessage(conn, conn.msg_opcode, conn.msg.items);
conn.msg.clearRetainingCapacity();
conn.assembling = false;
}
}
0x03F1 is 1009 (0x03F1 = 1009 decimal). Notice the check runs before appendSlice -- the whole point is to refuse the allocation, not to allocate and then regret it. A test that feeds a stream of oversized continuation frames and asserts conn.state == .closed (rather than watching the testing allocator blow up) is the proof that the bound actually bites.
Three exercises, three classes of resource-exhaustion bug closed off. Now that server is something you could point real traffic at.
Here we go ;-) Last episode we built a server that broadcasts one client's message to every other client. That fan-out -- one message in, many copies out -- is the beating heart of every real-time system, and at the very end I said the natural next move is to design a real message format on top of the bytes. MQTT is what happens when an entire industry does exactly that, refines it for twenty-odd years, and ends up with a protocol so small that the spec for the core fits comfortably in your head.
MQTT stands for Message Queuing Telemetry Transport, and the "telemetry" part is the tell: it was born in 1999 to push sensor data over satellite links that were slow, expensive and flaky. That heritage shaped every decision. The packets are tiny (a publish can be as small as a handful of bytes of overhead), the protocol is forgiving of dropped connections, and a single broker can fan messages out to thousands of subscribers. Today it runs the plumbing of a huge chunk of the IoT world -- home automation, connected cars, industrial sensors, and the backend of more than one chat app you've used. If episode 88 was "build the pipe", this episode is "design what flows through it".
With raw WebSockets, every client talks to the server and the server decides what to do. MQTT pushes that idea one step further and names the middleman: the broker. Clients never talk to each other directly. A client publishes a message to a topic (a string like home/livingroom/temperature), and the broker delivers it to every client that has subscribed to a matching topic. The publisher has no idea who is listening, or whether anyone is listening at all. The subscriber has no idea who sent it.
That decoupling is the whole point, and it buys you three things at once:
Compare that to the broadcast loop we wrote last episode, where the server hard-codes "send to everyone in the room". MQTT replaces "everyone in the room" with "everyone whose subscription matches this topic", and topics are just strings, so the routing rules live in data rather than in code. That is a genuinely different design, and once it clicks you start seeing brokers everywhere.
Having said that, MQTT still rides on a plain TCP connection (TLS-wrapped in production -- exactly the C-interop work from episode 86), so everything we learned about accept loops and non-blocking reads carries straight over. What changes is the bytes on the wire. Let's look at them.
Every MQTT packet -- and there are exactly fourteen kinds -- begins with the same two-part fixed header. The first byte splits cleanly down the middle: the high nibble is the packet type (1 through 14), the low nibble is four flag bits whose meaning depends on the type. This is the packed-struct, bit-twiddling muscle from episode 17, and Zig lets us name the type cleanly with a u4-backed enum:
const std = @import("std");
pub const PacketType = enum(u4) {
connect = 1, // client -> broker: open a session
connack = 2, // broker -> client: session acknowledged
publish = 3, // either way: a message on a topic
puback = 4, // QoS 1 acknowledgement
pubrec = 5, // QoS 2, step 1
pubrel = 6, // QoS 2, step 2
pubcomp = 7, // QoS 2, step 3
subscribe = 8, // client -> broker: I want these topics
suback = 9, // broker -> client: subscription granted
unsubscribe = 10,
unsuback = 11,
pingreq = 12, // keep-alive request
pingresp = 13, // keep-alive response
disconnect = 14, // graceful goodbye
};
Because the enum is backed by a u4, the value physically cannot hold anything outside 0-15, and the names mean we never sprinkle a bare 3 through the dispatch code wondering later whether it was a publish or a puback. That is the same readability win the WebSocket Opcode enum gave us last episode -- the bytes are inscrutable, the names are not.
Now the clever bit. After that first byte comes the Remaining Length -- the number of bytes in the rest of the packet (variable header plus payload). A sensor packet might have a remaining length of 12; a firmware-update publish might have 200 megabytes. Spending a fixed four bytes on every tiny packet would be wasteful on a satellite link, so MQTT uses a variable-length integer: seven bits of value per byte, with the top bit acting as a "more bytes follow" continuation flag.
So values 0-127 fit in one byte. 128-16383 take two. And so on, up to four bytes maximum (a ceiling of 268,435,455 bytes, about 256 MB). Encoding it is a tidy little loop:
/// Encode an MQTT "Remaining Length" varint into `out`. Returns bytes written (1..4).
pub fn encodeRemainingLength(value: u32, out: []u8) !usize {
if (value > 268_435_455) return error.ValueTooLarge; // MQTT's hard ceiling
var x = value;
var i: usize = 0;
while (true) {
if (i >= out.len) return error.BufferTooSmall;
var byte: u8 = @intCast(x & 0x7F); // low 7 bits of value
x >>= 7;
if (x > 0) byte |= 0x80; // set continuation bit if more remains
out[i] = byte;
i += 1;
if (x == 0) break;
}
return i;
}
Decoding is the mirror image, and this is where you must be paranoid: a malicious peer can send four bytes that all have the continuation bit set, and a naive decoder loops forever or overflows. The spec caps the encoding at four bytes precisely so you can hard-stop:
/// Decode a Remaining Length varint. Returns the value and how many bytes it occupied.
pub fn decodeRemainingLength(buf: []const u8) !struct { value: u32, len: usize } {
var multiplier: u32 = 1;
var value: u32 = 0;
var i: usize = 0;
while (i < 4) : (i += 1) { // never more than 4 bytes -- the spec's safety rail
if (i >= buf.len) return error.Incomplete; // need more bytes off the socket
const byte = buf[i];
value += @as(u32, byte & 0x7F) * multiplier;
if (byte & 0x80 == 0) return .{ .value = value, .len = i + 1 }; // top bit clear -> done
multiplier *= 128;
}
return error.MalformedLength; // 4 bytes and still asking for more: reject it
}
Nota bene: this exact seven-bits-plus-continuation scheme is not unique to MQTT. You'll meet it again very soon when we look at how other binary serialization formats squeeze integers down to their smallest honest size -- it's one of those ideas that, once you've implemented it once, you recognise instantly in five other protocols. Learning it here, with the bounds checks done properly, pays off repeatedly.
With the varint in hand, the full fixed header parse falls out naturally. Notice it returns null (not an error) when there simply aren't enough bytes yet -- the same "stream parser" convention parseFrame used last episode, so the same drain-loop pattern works here:
pub const FixedHeader = struct {
packet_type: PacketType,
flags: u4,
remaining_length: u32,
header_len: usize, // bytes the fixed header itself occupied (1 + varint length)
};
pub fn parseFixedHeader(buf: []const u8) !?FixedHeader {
if (buf.len < 2) return null; // need at least type byte + one length byte
const type_bits: u4 = @intCast(buf[0] >> 4);
const flags: u4 = @intCast(buf[0] & 0x0F);
const pt = std.meta.intToEnum(PacketType, type_bits) catch return error.UnknownPacketType;
const rl = decodeRemainingLength(buf[1..]) catch |err| switch (err) {
error.Incomplete => return null, // not enough bytes for the length yet
else => return err,
};
return FixedHeader{
.packet_type = pt,
.flags = flags,
.remaining_length = rl.value,
.header_len = 1 + rl.len,
};
}
std.meta.intToEnum is the safe conversion: it returns an error for an out-of-range value rather than producing illegal enum state. An attacker who sends packet type 0 or 15 (both reserved) gets a clean error.UnknownPacketType and a closed connection, not undefined behaviour. This is episode 6's lesson -- make the illegal state unrepresentable, and where you can't, make it a handled error -- applied to a real wire format.
Of the fourteen packet types, PUBLISH is the one that actually carries your data, so it earns the most attention. Its variable header is a topic name (a 2-byte big-endian length followed by that many UTF-8 bytes), then -- only if QoS is above zero -- a 2-byte packet identifier, then the rest of the remaining length is the payload (raw bytes, MQTT does not care what they mean). The flag nibble we already pulled out carries three things: bit 0 is RETAIN, bits 1-2 are the QoS level, bit 3 is DUP (a redelivery marker).
Since MQTT writes 2-byte-length-prefixed strings everywhere, a tiny reader helper pays for itself immediately:
/// Read a 2-byte-length-prefixed MQTT string, advancing `off`.
fn readString(buf: []const u8, off: *usize) ![]const u8 {
if (buf.len < off.* + 2) return error.Malformed;
const n = std.mem.readInt(u16, buf[off.*..][0..2], .big);
off.* += 2;
if (buf.len < off.* + n) return error.Malformed;
const s = buf[off.*..][0..n];
off.* += n;
return s;
}
Now parsing a publish is just "read the topic string, maybe read an id, the rest is payload". The QoS comes straight out of the flag bits, and @enumFromInt on a u2 is safe because all four QoS bit-patterns 0-3 are defined (well, 3 is reserved -- we'll reject it):
pub const QoS = enum(u2) { at_most_once = 0, at_least_once = 1, exactly_once = 2 };
pub const Publish = struct {
topic: []const u8,
payload: []const u8,
qos: QoS,
packet_id: ?u16, // present only for QoS 1 and 2
retain: bool,
dup: bool,
};
/// `body` is exactly the `remaining_length` bytes that followed the fixed header.
pub fn parsePublish(hdr: FixedHeader, body: []const u8) !Publish {
const qos_bits: u2 = @intCast((hdr.flags >> 1) & 0x3);
if (qos_bits == 3) return error.InvalidQoS; // 3 is reserved -- malformed
const qos: QoS = @enumFromInt(qos_bits);
var off: usize = 0;
const topic = try readString(body, &off);
if (std.mem.indexOfScalar(u8, topic, '+') != null or
std.mem.indexOfScalar(u8, topic, '#') != null)
return error.WildcardInPublish; // you may subscribe to wildcards, never publish to them
var packet_id: ?u16 = null;
if (qos != .at_most_once) {
if (body.len < off + 2) return error.Malformed;
packet_id = std.mem.readInt(u16, body[off..][0..2], .big);
off += 2;
}
return .{
.topic = topic,
.payload = body[off..],
.qos = qos,
.packet_id = packet_id,
.retain = (hdr.flags & 0x1) != 0,
.dup = (hdr.flags & 0x8) != 0,
};
}
That WildcardInPublish check is the sort of rule people skip and regret -- the spec is explicit that wildcards belong only in subscriptions, never in the topic you publish to, and an endpoint that lets them through is a routing bug waiting to happen.
Encoding a publish is the inverse, with the small wrinkle we already met in the WebSocket server: the Remaining Length sits before the body, so you must know the body size up front. Since we can compute it arithmetically, there's no need to encode-into-a-scratch-buffer-then-copy:
pub fn encodePublish(out: []u8, topic: []const u8, payload: []const u8, qos: QoS, packet_id: u16) !usize {
const id_len: usize = if (qos == .at_most_once) 0 else 2;
const remaining = 2 + topic.len + id_len + payload.len;
if (out.len < 5 + remaining) return error.BufferTooSmall; // 1 type byte + up to 4 varint bytes
out[0] = (@as(u8, @intFromEnum(PacketType.publish)) << 4) | (@as(u8, @intFromEnum(qos)) << 1);
var i: usize = 1;
i += try encodeRemainingLength(@intCast(remaining), out[i..]);
std.mem.writeInt(u16, out[i..][0..2], @intCast(topic.len), .big); // topic length...
i += 2;
@memcpy(out[i..][0..topic.len], topic); // ...then topic bytes
i += topic.len;
if (id_len == 2) {
std.mem.writeInt(u16, out[i..][0..2], packet_id, .big);
i += 2;
}
@memcpy(out[i..][0..payload.len], payload);
return i + payload.len;
}
MQTT's three QoS levels are its most quoted feature, and they change not just a flag bit but the entire conversation:
PUBLISH, no acknowledgement, no packet id. If the network eats it, it's gone. Perfect for a temperature sensor that reports every second; who cares about one missed reading.PUBACK. If the sender doesn't see it, it resends with the DUP flag set. The message arrives, but possibly twice, so your application must tolerate duplicates (be idempotent).PUBLISH -> PUBREC -> PUBREL -> PUBCOMP) that guarantees the message is delivered once and only once, even across reconnects. You pay two round-trips for that promise.The packet identifier (the u16 we conditionally parsed) is what stitches an acknowledgement back to its publish. Building a PUBACK is therefore the smallest possible packet that carries one -- a fixed header plus a 2-byte id, four bytes total:
/// PUBACK is the QoS-1 acknowledgement: just the packet identifier echoed back.
pub fn encodePubAck(out: []u8, packet_id: u16) !usize {
if (out.len < 4) return error.BufferTooSmall;
out[0] = @as(u8, @intFromEnum(PacketType.puback)) << 4; // flags nibble is 0
out[1] = 2; // remaining length is exactly 2 (the id)
std.mem.writeInt(u16, out[2..4], packet_id, .big);
return 4;
}
I won't spell out all four QoS-2 packets here -- they share this exact shape, only the type nibble changes -- but you can already see how mechanical it is. The protocol's genius is that the hard part (exactly-once semantics) is expressed entirely through these trivially small control packets and a state machine on each side. Which, by the way, is precisely the tagged-union state machine pattern from episode 33: each in-flight QoS-2 message is a little state that walks pending -> received -> released -> complete.
A topic is a UTF-8 string with levels separated by /, like home/livingroom/temperature. Subscriptions may use two wildcards: + matches exactly one level, and # matches every remaining level (and must be the final character). So home/+/temperature matches the living room and the kitchen but not home/livingroom/humidity, while home/# matches everything under home.
Matching a concrete topic against a filter is a level-by-level walk, and it comes out shorter than the english description of it:
pub fn topicMatches(filter: []const u8, topic: []const u8) bool {
var f_it = std.mem.splitScalar(u8, filter, '/');
var t_it = std.mem.splitScalar(u8, topic, '/');
while (f_it.next()) |f_level| {
if (std.mem.eql(u8, f_level, "#")) return true; // matches this level and all below
const t_level = t_it.next() orelse return false; // topic ran out, filter didn't
if (std.mem.eql(u8, f_level, "+")) continue; // single-level wildcard: accept anything
if (!std.mem.eql(u8, f_level, t_level)) return false; // literal mismatch
}
return t_it.next() == null; // filter is done; topic must be too
}
test "wildcard matching" {
try std.testing.expect(topicMatches("home/+/temperature", "home/livingroom/temperature"));
try std.testing.expect(!topicMatches("home/+/temperature", "home/livingroom/humidity"));
try std.testing.expect(topicMatches("home/#", "home/livingroom/temperature"));
try std.testing.expect(!topicMatches("home/livingroom", "home/livingroom/temperature"));
}
That last line of the function is the subtle one: after the filter's levels are exhausted, the topic must be exhausted too, otherwise home/livingroom would wrongly swallow home/livingroom/temperature. Edge cases like # matching the parent level (sport/# also matches sport) fall out correctly because the # arm returns true before we ever ask the topic iterator for another level. Get this function right and the broker's routing is basically free.
Tie it together and a tiny in-memory broker is almost embarrassingly small. A subscription is a (client, filter, qos) triple; routing a published topic means walking the subscription list and delivering to every match:
const Subscription = struct {
client_fd: posix.fd_t,
filter: []const u8,
qos: QoS,
};
const Broker = struct {
subs: std.ArrayListUnmanaged(Subscription) = .{},
fn subscribe(self: *Broker, alloc: std.mem.Allocator, fd: posix.fd_t, filter: []const u8, qos: QoS) !void {
try self.subs.append(alloc, .{ .client_fd = fd, .filter = filter, .qos = qos });
}
/// Deliver one published message to every subscriber whose filter matches.
fn route(self: *Broker, topic: []const u8, payload: []const u8) void {
var frame: [4096]u8 = undefined;
for (self.subs.items) |sub| {
if (!topicMatches(sub.filter, topic)) continue;
// Effective QoS is the min of publish QoS and subscription QoS; QoS 0 here for brevity.
const n = encodePublish(&frame, topic, payload, .at_most_once, 0) catch continue;
writeAll(sub.client_fd, frame[0..n]) catch continue; // dead client? skip, reap later
}
}
};
That route is the MQTT analogue of last episode's broadcast, with one beautiful difference: where broadcast blindly sent to everyone in a hard-coded room, route consults topicMatches and sends only to the clients who asked for this topic. The "room" became a query. A production broker keeps a smarter index (a tree keyed on topic levels, so you don't scan every subscription for every message), bounds the frame buffer instead of using a fixed 4 KB, and tracks per-client sessions -- but the core idea is right here, and it's small enough to hold whole in your head.
The protocol code splits cleanly into pure functions, and pure functions are a joy to test -- no socket, no broker, no clock. The single most valuable test for any binary format is the round-trip: encode a thing, parse it back, assert you got the same thing. If encode and decode disagree, one of them is wrong, and the round-trip catches it without you having to reason about exact byte layouts:
test "publish round-trips through encode and parse" {
var buf: [256]u8 = undefined;
const n = try encodePublish(&buf, "home/livingroom/temp", "21.5", .at_least_once, 42);
const hdr = (try parseFixedHeader(buf[0..n])).?;
try std.testing.expectEqual(PacketType.publish, hdr.packet_type);
const body = buf[hdr.header_len .. hdr.header_len + hdr.remaining_length];
const pub = try parsePublish(hdr, body);
try std.testing.expectEqualStrings("home/livingroom/temp", pub.topic);
try std.testing.expectEqualStrings("21.5", pub.payload);
try std.testing.expectEqual(QoS.at_least_once, pub.qos);
try std.testing.expectEqual(@as(?u16, 42), pub.packet_id);
}
test "remaining length round-trips at the boundaries" {
const cases = [_]u32{ 0, 127, 128, 16_383, 16_384, 268_435_455 };
for (cases) |value| {
var buf: [4]u8 = undefined;
const n = try encodeRemainingLength(value, &buf);
const decoded = try decodeRemainingLength(buf[0..n]);
try std.testing.expectEqual(value, decoded.value);
try std.testing.expectEqual(n, decoded.len);
}
}
The boundary values in that second test are deliberately the nasty ones -- 127/128 and 16383/16384 are exactly where the varint grows another byte, and off-by-one errors love to hide on those seams. Beyond round-trips, the other half of the job is malformed input: feed parsePublish a body that claims a 500-byte topic in a 10-byte buffer and assert it returns error.Malformed rather than reading out of bounds. Episode 12's TDD habits and a fuzzer pointed at parseFixedHeader will surface the cases your imagination misses.
MQTT was built for constrained devices, so the protocol itself is already lean -- the wins are in how you handle it. Three things matter once message rates climb. First, avoid the copy on the hot path: just as in the WebSocket server, encode the publish header once and write the payload straight from its source slice rather than memcpy-ing megabytes into a scratch buffer. Second, the broker's subscription lookup is the real bottleneck -- a linear scan of every subscription for every published message is fine for forty clients and catastrophic for forty thousand, so a serious broker indexes subscriptions in a topic tree so routing cost scales with topic depth, not subscriber count. Third, reuse buffers: a broker handling a firehose of small publishes should not allocate per message; the clearRetainingCapacity trick from the allocator episodes keeps one buffer warm and busy.
Having said that, do not optimise what you have not measured -- episode 34's profiler will tell you whether you're bottlenecked on the parse, the routing, or (almost always) the syscalls. My money is on the syscalls.
In C, the reference implementation everybody leans on is Eclipse Mosquitto -- a mature, battle-hardened broker, and a textbook example of how much careful pointer discipline the varint-and-length-prefix dance demands when the language won't track your buffer sizes for you. Our readString is one bounds check; the C equivalent is the same check that, forgotten, becomes a CVE.
In Go, you'd reach for eclipse/paho.mqtt.golang and a goroutine per connection, and the ergonomics are lovely -- channels make the publish/subscribe fan-out feel native. The trade is the usual one: the garbage collector decides when your buffers die, which for a million-message-per-second broker is a conversation you'd rather have yourself.
In Rust, rumqtt gives you an async, memory-safe broker and client with the varint and topic-matching rules encoded into types that make malformed packets hard to even construct. It is arguably the most correct-by-default of the bunch, at the cost of the async-lifetime wrestling match.
Zig lands where it always does: you wrote the whole parser and a working broker in a few hundred lines, every allocation is visible, every bounds check is yours, and it cross-compiles to a static binary small enough to flash onto the kind of constrained device MQTT was invented for in the first place. For a real deployment you'd likely still run Mosquitto -- but now you know exactly what it's doing with every byte, which was the whole point ;-)
Step back and look at what we've got. A fixed header that packs a type and flags into one byte, a variable-length integer that keeps small packets small, a PUBLISH parser, three QoS levels expressed through trivially small control packets, topic wildcards, and a broker that routes by matching rather than by hard-coded rooms. That is a real protocol, built from the bytes up, sitting comfortably on the TCP and TLS plumbing we spent the last dozen episodes building.
What we've quietly bumped into twice now -- in the Remaining Length varint here, and in the length-prefixed strings -- is the broader question of how you turn structured data into bytes and back efficiently. MQTT solves it ad-hoc, for one protocol. The next stretch of the series steps up a level to the formats whose entire job is exactly that: taking arbitrary structured messages, packing them into the fewest honest bytes, and unpacking them on the other side without ambiguity. The varint you wrote today is going to feel awfully familiar.
The pieces aren't separate tricks -- the bit-packed header, the varint, the wildcard matcher, the route-by-query broker -- they're the vocabulary of every messaging system you'll ever build, and you've now written each one yourself with your eyes wide open.
Parse a SUBSCRIBE packet. A SUBSCRIBE carries a 2-byte packet identifier, then one or more entries, each being a length-prefixed topic filter followed by a single QoS byte. Write parseSubscribe that returns a slice of (filter, qos) pairs, and reject the reserved upper bits of the QoS byte with error.MalformedSubscribe. Then write the matching encodeSubAck, which echoes the packet id and returns one granted-QoS byte per requested filter.
Build the CONNECT/CONNACK handshake. Parse a CONNECT packet far enough to extract the protocol name ("MQTT"), the protocol level byte (4 for MQTT 3.1.1), the keep-alive seconds, and the client identifier string. Reply with a CONNACK whose return code is 0 (accepted) for a valid level and 0x01 (unacceptable protocol version) otherwise. Add a test that rejects a CONNECT claiming protocol level 3.
Wire the broker into last episode's event loop. Replace the WebSocket frame handling with MQTT packet handling: on SUBSCRIBE, register the filters in the Broker; on PUBLISH, call route; on PINGREQ, answer with PINGRESP. Use the drain-loop pattern from episode 88 (parse fixed header, check you have header_len + remaining_length bytes, dispatch, slide the buffer). Prove it by connecting two clients, subscribing one to test/#, publishing from the other, and watching the message arrive.