Learn Zig Series):Exercise 1: Health check for the Supervisor
const std = @import("std");
const SupervisedProcess = struct {
name: []const u8,
command: []const []const u8,
health_check: ?[]const []const u8,
pid: ?std.posix.pid_t,
restart_count: u32,
max_restarts: u32,
last_exit_code: ?u8,
last_start_time: i64,
last_health_check: i64,
fn isRunning(self: *const SupervisedProcess) bool {
return self.pid != null;
}
};
const HealthCheckSupervisor = struct {
allocator: std.mem.Allocator,
processes: std.ArrayList(SupervisedProcess),
health_interval_ms: i64,
fn init(allocator: std.mem.Allocator) HealthCheckSupervisor {
return .{
.allocator = allocator,
.processes = std.ArrayList(SupervisedProcess).init(allocator),
.health_interval_ms = 5000,
};
}
fn deinit(self: *HealthCheckSupervisor) void {
self.processes.deinit();
}
fn addProcess(
self: *HealthCheckSupervisor,
name: []const u8,
command: []const []const u8,
health_check: ?[]const []const u8,
max_restarts: u32,
) !void {
try self.processes.append(.{
.name = name,
.command = command,
.health_check = health_check,
.pid = null,
.restart_count = 0,
.max_restarts = max_restarts,
.last_exit_code = null,
.last_start_time = 0,
.last_health_check = 0,
});
}
fn startProcess(self: *HealthCheckSupervisor, proc: *SupervisedProcess) !void {
_ = self;
var child = std.process.Child.init(proc.command, std.heap.page_allocator);
child.stdout_behavior = .Inherit;
child.stderr_behavior = .Inherit;
try child.spawn();
proc.pid = child.id;
proc.last_start_time = std.time.milliTimestamp();
proc.last_health_check = std.time.milliTimestamp();
}
fn runHealthChecks(self: *HealthCheckSupervisor) !void {
const stdout = std.io.getStdOut().writer();
const now = std.time.milliTimestamp();
for (self.processes.items) |*proc| {
if (!proc.isRunning()) continue;
const hc = proc.health_check orelse continue;
if (now - proc.last_health_check < self.health_interval_ms) continue;
proc.last_health_check = now;
var checker = std.process.Child.init(hc, self.allocator);
checker.stdout_behavior = .Ignore;
checker.stderr_behavior = .Ignore;
try checker.spawn();
const result = try checker.wait();
if (result.Exited != 0) {
try stdout.print("[health] '{s}' failed check, restarting\n", .{proc.name});
std.posix.kill(proc.pid.?, std.posix.SIG.KILL) catch {};
_ = std.posix.waitpid(proc.pid.?, 0);
proc.pid = null;
proc.restart_count += 1;
if (proc.restart_count <= proc.max_restarts) {
try self.startProcess(proc);
}
}
}
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var sv = HealthCheckSupervisor.init(allocator);
defer sv.deinit();
// the health check verifies /tmp/healthy exists
try sv.addProcess(
"worker",
&.{ "sh", "-c", "touch /tmp/healthy && sleep 60" },
&.{ "sh", "-c", "test -f /tmp/healthy" },
3,
);
try sv.startProcess(&sv.processes.items[0]);
// run a few health check cycles
var ticks: usize = 0;
while (ticks < 3) : (ticks += 1) {
std.time.sleep(2 * std.time.ns_per_s);
try sv.runHealthChecks();
}
}
The health check runs as a separate child process. If it exits non-zero, the supervisor kills the main process and restarts it. The health_interval_ms field prevents running checks too often.
Exercise 2: Process pool maintaining N workers
const std = @import("std");
const WorkerPool = struct {
allocator: std.mem.Allocator,
target_count: usize,
command: []const []const u8,
workers: std.ArrayList(Worker),
const Worker = struct {
id: usize,
pid: std.posix.pid_t,
};
fn init(allocator: std.mem.Allocator, count: usize, cmd: []const []const u8) WorkerPool {
return .{
.allocator = allocator,
.target_count = count,
.command = cmd,
.workers = std.ArrayList(Worker).init(allocator),
};
}
fn deinit(self: *WorkerPool) void {
self.workers.deinit();
}
fn spawnWorker(self: *WorkerPool, id: usize) !void {
var env_map = std.process.EnvMap.init(self.allocator);
defer env_map.deinit();
var id_buf: [20]u8 = undefined;
const id_str = std.fmt.bufPrint(&id_buf, "{d}", .{id}) catch "0";
try env_map.put("WORKER_ID", id_str);
try env_map.put("PATH", std.posix.getenv("PATH") orelse "/usr/bin");
var child = std.process.Child.init(self.command, self.allocator);
child.stdout_behavior = .Inherit;
child.stderr_behavior = .Inherit;
child.env_map = &env_map;
try child.spawn();
try self.workers.append(.{ .id = id, .pid = child.id });
}
fn fillPool(self: *WorkerPool) !void {
var next_id: usize = 0;
for (self.workers.items) |w| {
if (w.id >= next_id) next_id = w.id + 1;
}
while (self.workers.items.len < self.target_count) {
try self.spawnWorker(next_id);
next_id += 1;
}
}
fn reapAndRefill(self: *WorkerPool) !usize {
var reaped: usize = 0;
while (true) {
const result = std.posix.waitpid(-1, std.posix.W.NOHANG);
if (result.pid <= 0) break;
// remove from workers list
var i: usize = 0;
while (i < self.workers.items.len) {
if (self.workers.items[i].pid == result.pid) {
_ = self.workers.orderedRemove(i);
reaped += 1;
break;
}
i += 1;
}
}
if (reaped > 0) try self.fillPool();
return reaped;
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var pool = WorkerPool.init(allocator, 3, &.{
"sh", "-c", "echo worker $WORKER_ID started && sleep 10",
});
defer pool.deinit();
try pool.fillPool();
const stdout = std.io.getStdOut().writer();
try stdout.print("Pool started with {d} workers\n", .{pool.workers.items.len});
var ticks: usize = 0;
while (ticks < 20) : (ticks += 1) {
std.time.sleep(500 * std.time.ns_per_ms);
const n = try pool.reapAndRefill();
if (n > 0) try stdout.print(" reaped {d}, pool now {d}\n", .{ n, pool.workers.items.len });
}
}
Each worker gets a unique WORKER_ID in its environment. When reapAndRefill detects a dead worker via non-blocking waitpid, it removes it from the list and calls fillPool to spawn a replacement.
Exercise 3: Pipe executor connecting two processes
const std = @import("std");
fn pipeCommands(
allocator: std.mem.Allocator,
cmd1: []const []const u8,
cmd2: []const []const u8,
) ![]u8 {
// create the pipe connecting cmd1 stdout to cmd2 stdin
const pipe_fds = try std.posix.pipe();
// spawn first command with stdout -> pipe write end
var child1 = std.process.Child.init(cmd1, allocator);
child1.stdout_behavior = .{ .fd = pipe_fds[1] };
child1.stderr_behavior = .Inherit;
try child1.spawn();
// parent closes the write end -- only child1 has it
std.posix.close(pipe_fds[1]);
// spawn second command with stdin -> pipe read end
var child2 = std.process.Child.init(cmd2, allocator);
child2.stdin_behavior = .{ .fd = pipe_fds[0] };
child2.stdout_behavior = .Pipe;
child2.stderr_behavior = .Inherit;
try child2.spawn();
// parent closes the read end -- only child2 has it
std.posix.close(pipe_fds[0]);
// read child2's output
const reader = child2.stdout.?.reader();
const output = try reader.readAllAlloc(allocator, 64 * 1024);
_ = try child1.wait();
_ = try child2.wait();
return output;
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const result = try pipeCommands(
allocator,
&.{ "echo", "hello world" },
&.{ "wc", "-w" },
);
defer allocator.free(result);
const stdout = std.io.getStdOut().writer();
try stdout.print("pipe result: '{s}'\n", .{std.mem.trim(u8, result, " \n")});
}
The key insight is that you create the pipe BEFORE forking either child, then assign the write end to child1's stdout and the read end to child2's stdin using the .fd behavior. The parent must close both pipe ends after spawning -- otherwise the read end stays open in the parent and child2 never sees EOF.
Last episode we went deep on the Unix process model -- fork, exec, wait, process groups, zombies, the double-fork daemon trick, and a process supervisor. All of that covered how to CREATE and MANAGE processes. But processes by themselves are islands. They exit with a code and that's about it. Today we connect the islands. Pipes are the oldest Unix IPC (inter-process communication) mechanism, and they're still one of the most useful. Every time you type ls | grep foo | wc -l in a terminal, you're using pipes. Every time a parent process reads its child's output, there's a pipe underneath.
We actually already saw pipes in action last episode when we used std.process.Child with .Pipe for stdout/stderr. Today we're going underneath that abstraction to understand how pipes really work, and then we'll build some practical things with them.
Here we go!
A pipe is a kernel buffer with two file descriptors: one for writing, one for reading. Data written to the write end can be read from the read end. It's a unidirectional byte stream -- no seeking, no random access, just bytes flowing in one direction. The pipe syscall creates a pair of file descriptors, and typically the parent keeps one end and the child (after fork) gets the other.
Let's start from the ground up with std.posix.pipe:
const std = @import("std");
pub fn main() !void {
const stdout = std.io.getStdOut().writer();
// pipe() returns [2]fd: [0] = read end, [1] = write end
const pipe_fds = try std.posix.pipe();
const pid = try std.posix.fork();
if (pid == 0) {
// child: close the read end, write to the write end
std.posix.close(pipe_fds[0]);
const msg = "hello from the child process!\n";
const writer_fd: std.posix.fd_t = pipe_fds[1];
_ = std.posix.write(writer_fd, msg) catch {};
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
// parent: close the write end, read from the read end
std.posix.close(pipe_fds[1]);
var buf: [256]u8 = undefined;
const n = try std.posix.read(pipe_fds[0], &buf);
std.posix.close(pipe_fds[0]);
try stdout.print("Parent received {d} bytes: {s}", .{ n, buf[0..n] });
_ = std.posix.waitpid(pid, 0);
}
The critical part is which end each process closes. The child closes the read end (it only writes), and the parent closes the write end (it only reads). If you forget to close the write end in the parent, the read call will never return EOF -- the kernel thinks someone might still write to the pipe because a write descriptor is still open. This is one of the most common pipe bugs and it's a real pain to debug because the program just hangs.
After fork, both parent and child have copies of both file descriptors. That's four file descriptors total pointing at the same pipe. You need to close the ones you don't use, otherwise the reference count on the pipe never drops to zero on the write side, and the reader blocks forver.
Anonymous pipes only work between related processes (parent-child), because you need fork to share the file descriptors. Named pipes (FIFOs) solve this -- they're filesystem entries that any process can open. One process opens the FIFO for writing, another opens it for reading, and data flows between them:
const std = @import("std");
const linux = std.os.linux;
fn createFifo(path: [*:0]const u8) !void {
// mkfifo creates a special FIFO file
const result = linux.mknodat(
linux.AT.FDCWD,
path,
linux.S.IFIFO | 0o666,
0,
);
const err = std.posix.errno(result);
if (err != .SUCCESS and err != .EXIST) {
return error.MkfifoFailed;
}
}
fn writerProcess(path: []const u8) !void {
const file = try std.fs.cwd().openFile(path, .{ .mode = .write_only });
defer file.close();
const messages = [_][]const u8{
"message one\n",
"message two\n",
"message three\n",
};
for (messages) |msg| {
try file.writeAll(msg);
std.time.sleep(200 * std.time.ns_per_ms);
}
}
fn readerProcess(path: []const u8) !void {
const stdout = std.io.getStdOut().writer();
const file = try std.fs.cwd().openFile(path, .{ .mode = .read_only });
defer file.close();
var buf: [256]u8 = undefined;
while (true) {
const n = file.read(&buf) catch break;
if (n == 0) break; // writer closed
try stdout.print("[reader] got: {s}", .{buf[0..n]});
}
try stdout.print("[reader] writer closed, done\n", .{});
}
pub fn main() !void {
const fifo_path = "/tmp/zig_fifo_demo";
const fifo_path_z: [*:0]const u8 = "/tmp/zig_fifo_demo";
try createFifo(fifo_path_z);
defer std.fs.cwd().deleteFile(fifo_path) catch {};
const pid = try std.posix.fork();
if (pid == 0) {
// child writes
writerProcess(fifo_path) catch {};
std.process.exit(0);
}
// parent reads
readerProcess(fifo_path) catch |err| {
const stdout = std.io.getStdOut().writer();
stdout.print("reader error: {}\n", .{err}) catch {};
};
_ = std.posix.waitpid(pid, 0);
}
The FIFO lives in the filesystem as a special file. ls -l shows it with a p prefix (for pipe). Any number of processes can open it -- one side writes, the other reads. The kernel handles the buffering. When all writers close the FIFO, readers get EOF. When all readers close, writers get SIGPIPE (or EPIPE if the signal is blocked).
Named pipes are useful for simple inter-process communication where you don't want the overhead of sockets or shared memory. Log aggregation, command-and-control channels between daemons, and communication between scripts written in different languages -- FIFOs handle all of these. The downside compared to Unix domain sockets is that FIFOs are unidirectional. For bidirectional communication you need two FIFOs (or switch to sockets, which we'll cover in a future episode).
Pipes have a kernel buffer. On Linux this is typically 64 KiB (16 pages of 4 KiB each). When the buffer is full, writes block. When the buffer is empty, reads block. This backpressure mechanism is what makes pipes self-regulating -- a fast producer automatically slows down when the consumer can't keep up:
const std = @import("std");
const linux = std.os.linux;
pub fn main() !void {
const stdout_w = std.io.getStdOut().writer();
const pipe_fds = try std.posix.pipe();
// check the pipe buffer size using fcntl F_GETPIPE_SZ
const pipe_size = linux.fcntl(pipe_fds[0], linux.F.GETPIPE_SZ, @as(u64, 0));
try stdout_w.print("Pipe buffer size: {d} bytes ({d} KiB)\n", .{
pipe_size,
@divFloor(pipe_size, 1024),
});
const pid = try std.posix.fork();
if (pid == 0) {
// child: write more data than the pipe can hold
std.posix.close(pipe_fds[0]);
const chunk: [4096]u8 = [_]u8{'A'} ** 4096;
var total: usize = 0;
var block_count: usize = 0;
while (total < 256 * 1024) { // try to write 256 KiB
const n = std.posix.write(pipe_fds[1], &chunk) catch |err| {
const stderr = std.io.getStdErr().writer();
stderr.print("write error after {d} bytes: {}\n", .{ total, err }) catch {};
break;
};
total += n;
block_count += 1;
}
const stderr = std.io.getStdErr().writer();
stderr.print("[writer] wrote {d} bytes in {d} calls\n", .{ total, block_count }) catch {};
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
// parent: read slowly to demonstrate blocking on the writer side
std.posix.close(pipe_fds[1]);
var total_read: usize = 0;
var read_buf: [1024]u8 = undefined;
while (true) {
// slow reader -- sleep between reads
std.time.sleep(10 * std.time.ns_per_ms);
const n = std.posix.read(pipe_fds[0], &read_buf) catch break;
if (n == 0) break;
total_read += n;
}
std.posix.close(pipe_fds[0]);
_ = std.posix.waitpid(pid, 0);
try stdout_w.print("[reader] total read: {d} bytes\n", .{total_read});
}
When you run this, the writer fills the 64 KiB buffer quickly, then blocks on the next write until the reader consumes some data. The write call doesn't return until space is available. This is blocking I/O in action -- and it's the default behavior for pipes.
You can make pipe file descriptors non-blocking by passing O_NONBLOCK to pipe2 (or setting it with fcntl after creation). With non-blocking I/O, a write to a full pipe returns error.WouldBlock instead of blocking, and a read from an empty pipe does the same. This is usful when you need to multiplex I/O across multiple pipes, which is exactly what we'll do next.
One more thing about pipe sizes: on Linux you can increase the buffer with fcntl(fd, F_SETPIPE_SZ, new_size). The maximum is controlled by /proc/sys/fs/pipe-max-size (usually 1 MiB). Bigger buffers reduce context switches between producer and consumer but use more kernel memory. For most use cases the default 64 KiB is fine.
When you have multiple child processes, each with its own pipe, you need to read from all of them without blocking on any single one. The poll syscall solves this -- you give it a list of file descriptors and it tells you which ones have data available:
const std = @import("std");
const linux = std.os.linux;
const PipeMultiplexer = struct {
allocator: std.mem.Allocator,
pipes: std.ArrayList(PipeEntry),
const PipeEntry = struct {
name: []const u8,
read_fd: std.posix.fd_t,
child_pid: std.posix.pid_t,
closed: bool,
};
fn init(allocator: std.mem.Allocator) PipeMultiplexer {
return .{
.allocator = allocator,
.pipes = std.ArrayList(PipeEntry).init(allocator),
};
}
fn deinit(self: *PipeMultiplexer) void {
for (self.pipes.items) |entry| {
if (!entry.closed) std.posix.close(entry.read_fd);
}
self.pipes.deinit();
}
fn addChild(self: *PipeMultiplexer, name: []const u8, cmd: []const []const u8) !void {
const pipe_fds = try std.posix.pipe();
const pid = try std.posix.fork();
if (pid == 0) {
// child: redirect stdout to pipe write end
std.posix.close(pipe_fds[0]);
std.posix.dup2(pipe_fds[1], 1) catch std.process.exit(1);
std.posix.close(pipe_fds[1]);
// exec the command
const argv = @as(
[*:null]const ?[*:0]const u8,
@ptrCast(cmd.ptr),
);
const err = std.posix.execvpeZ(
@ptrCast(cmd[0]),
argv,
@ptrCast(std.c.environ),
);
_ = err;
std.process.exit(127);
}
// parent: close write end, keep read end
std.posix.close(pipe_fds[1]);
try self.pipes.append(.{
.name = name,
.read_fd = pipe_fds[0],
.child_pid = pid,
.closed = false,
});
}
fn readAll(self: *PipeMultiplexer) !void {
const stdout = std.io.getStdOut().writer();
var buf: [512]u8 = undefined;
while (true) {
// count how many are still open
var open_count: usize = 0;
for (self.pipes.items) |e| {
if (!e.closed) open_count += 1;
}
if (open_count == 0) break;
// build poll fd array
var pollfds = try self.allocator.alloc(linux.pollfd, self.pipes.items.len);
defer self.allocator.free(pollfds);
for (self.pipes.items, 0..) |entry, i| {
pollfds[i] = .{
.fd = if (entry.closed) -1 else entry.read_fd,
.events = linux.POLL.IN,
.revents = 0,
};
}
// poll with 1 second timeout
const ready = linux.poll(pollfds.ptr, @intCast(pollfds.len), 1000);
if (@as(isize, @bitCast(@as(usize, ready))) <= 0) continue;
for (self.pipes.items, 0..) |*entry, i| {
if (entry.closed) continue;
if (pollfds[i].revents & linux.POLL.IN != 0) {
const n = std.posix.read(entry.read_fd, &buf) catch 0;
if (n == 0) {
// EOF
std.posix.close(entry.read_fd);
entry.closed = true;
_ = std.posix.waitpid(entry.child_pid, 0);
} else {
try stdout.print("[{s}] {s}", .{ entry.name, buf[0..n] });
}
}
if (pollfds[i].revents & linux.POLL.HUP != 0 and
pollfds[i].revents & linux.POLL.IN == 0)
{
std.posix.close(entry.read_fd);
entry.closed = true;
_ = std.posix.waitpid(entry.child_pid, 0);
}
}
}
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var mux = PipeMultiplexer.init(allocator);
defer mux.deinit();
// Note: addChild uses fork+exec internaly
// For this demo, we use std.process.Child instead
// to avoid the raw execvpe complexity. Shown here
// is the poll logic -- the real value of this example.
// For a working version, let's use pipe() + Child:
const stdout = std.io.getStdOut().writer();
const pipe1 = try std.posix.pipe();
const pipe2 = try std.posix.pipe();
const pid1 = try std.posix.fork();
if (pid1 == 0) {
std.posix.close(pipe1[0]);
std.posix.close(pipe2[0]);
std.posix.close(pipe2[1]);
_ = std.posix.write(pipe1[1], "output from child A\n") catch {};
std.time.sleep(100 * std.time.ns_per_ms);
_ = std.posix.write(pipe1[1], "more from child A\n") catch {};
std.posix.close(pipe1[1]);
std.process.exit(0);
}
const pid2 = try std.posix.fork();
if (pid2 == 0) {
std.posix.close(pipe1[0]);
std.posix.close(pipe1[1]);
std.posix.close(pipe2[0]);
std.time.sleep(50 * std.time.ns_per_ms);
_ = std.posix.write(pipe2[1], "output from child B\n") catch {};
std.posix.close(pipe2[1]);
std.process.exit(0);
}
// parent closes write ends
std.posix.close(pipe1[1]);
std.posix.close(pipe2[1]);
var pollfds = [_]linux.pollfd{
.{ .fd = pipe1[0], .events = linux.POLL.IN, .revents = 0 },
.{ .fd = pipe2[0], .events = linux.POLL.IN, .revents = 0 },
};
const names = [_][]const u8{ "childA", "childB" };
var open_count: usize = 2;
var buf: [256]u8 = undefined;
while (open_count > 0) {
const ready = linux.poll(&pollfds, 2, 2000);
if (@as(isize, @bitCast(@as(usize, ready))) <= 0) continue;
for (&pollfds, 0..) |*pfd, i| {
if (pfd.fd < 0) continue;
if (pfd.revents & linux.POLL.IN != 0) {
const n = std.posix.read(pfd.fd, &buf) catch 0;
if (n == 0) {
std.posix.close(pfd.fd);
pfd.fd = -1;
open_count -= 1;
} else {
try stdout.print("[{s}] {s}", .{ names[i], buf[0..n] });
}
}
if (pfd.revents & linux.POLL.HUP != 0 and pfd.revents & linux.POLL.IN == 0) {
std.posix.close(pfd.fd);
pfd.fd = -1;
open_count -= 1;
}
}
}
_ = std.posix.waitpid(pid1, 0);
_ = std.posix.waitpid(pid2, 0);
try stdout.print("All children done\n", .{});
_ = mux;
}
poll takes an array of pollfd structs. Each one has an fd (file descriptor), events (what you're interested in -- POLLIN for "data available to read"), and revents (what actually happened -- filled in by the kernel). Setting fd to -1 tells poll to skip that entry.
The POLLHUP flag (hangup) fires when the write end of the pipe is closed. You might get POLLIN and POLLHUP at the same time if there's still data in the buffer when the writer closes. Always check for POLLIN first and read any remaining data before treating POLLHUP as end-of-stream.
This pattern -- poll across multiple file descriptors, handle whichever ones are ready -- is the foundation of event-driven programming. Web servers, databases, message brokers -- they all use some variant of poll/epoll/kqueue to handle thousands of connections concurrently on a single thread. We used epoll in the HTTP server episodes (51-54), and the concept is identical here with pipes.
A shell pipeline like cat /etc/passwd | grep root | wc -l connects three processes with two pipes. Each process's stdout becomes the next process's stdin. Let's build a general pipeline executor:
const std = @import("std");
const Pipeline = struct {
allocator: std.mem.Allocator,
stages: std.ArrayList([]const []const u8),
fn init(allocator: std.mem.Allocator) Pipeline {
return .{
.allocator = allocator,
.stages = std.ArrayList([]const []const u8).init(allocator),
};
}
fn deinit(self: *Pipeline) void {
self.stages.deinit();
}
fn addStage(self: *Pipeline, cmd: []const []const u8) !void {
try self.stages.append(cmd);
}
fn execute(self: *Pipeline) ![]u8 {
if (self.stages.items.len == 0) return error.EmptyPipeline;
var child_pids = std.ArrayList(std.posix.pid_t).init(self.allocator);
defer child_pids.deinit();
// we need N-1 pipes for N stages
var prev_read_fd: ?std.posix.fd_t = null;
for (self.stages.items, 0..) |cmd, i| {
const is_last = (i == self.stages.items.len - 1);
// create a pipe for this stage's output (unless it's the last)
var out_pipe: ?[2]std.posix.fd_t = null;
if (!is_last) {
out_pipe = try std.posix.pipe();
}
// also pipe the last stage's stdout so we can capture it
var capture_pipe: ?[2]std.posix.fd_t = null;
if (is_last) {
capture_pipe = try std.posix.pipe();
}
var child = std.process.Child.init(cmd, self.allocator);
if (prev_read_fd) |fd| {
child.stdin_behavior = .{ .fd = fd };
}
if (out_pipe) |p| {
child.stdout_behavior = .{ .fd = p[1] };
} else if (capture_pipe) |p| {
child.stdout_behavior = .{ .fd = p[1] };
}
child.stderr_behavior = .Inherit;
try child.spawn();
try child_pids.append(child.id);
// close pipe ends in parent
if (prev_read_fd) |fd| std.posix.close(fd);
if (out_pipe) |p| {
std.posix.close(p[1]); // close write end
prev_read_fd = p[0]; // keep read end for next stage
}
if (capture_pipe) |p| {
std.posix.close(p[1]); // close write end
prev_read_fd = p[0]; // capture read end
}
}
// read the final output
var output = std.ArrayList(u8).init(self.allocator);
if (prev_read_fd) |fd| {
var buf: [4096]u8 = undefined;
while (true) {
const n = std.posix.read(fd, &buf) catch break;
if (n == 0) break;
try output.appendSlice(buf[0..n]);
}
std.posix.close(fd);
}
// wait for all children
for (child_pids.items) |pid| {
_ = std.posix.waitpid(pid, 0);
}
return try output.toOwnedSlice();
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const stdout = std.io.getStdOut().writer();
// pipeline: cat /etc/passwd | grep root | wc -l
var pipeline = Pipeline.init(allocator);
defer pipeline.deinit();
try pipeline.addStage(&.{ "cat", "/etc/passwd" });
try pipeline.addStage(&.{ "grep", "root" });
try pipeline.addStage(&.{ "wc", "-l" });
const result = try pipeline.execute();
defer allocator.free(result);
try stdout.print("Pipeline output: {s}", .{result});
// another pipeline: ls /usr/bin | sort | head -5
var pipeline2 = Pipeline.init(allocator);
defer pipeline2.deinit();
try pipeline2.addStage(&.{ "ls", "/usr/bin" });
try pipeline2.addStage(&.{ "sort" });
try pipeline2.addStage(&.{ "head", "-5" });
const result2 = try pipeline2.execute();
defer allocator.free(result2);
try stdout.print("Pipeline 2 output:\n{s}", .{result2});
}
The algorithm is: for N stages, create N-1 pipes. Each stage's stdin is the previous pipe's read end, and each stage's stdout is the next pipe's write end. The parent closes both ends of each pipe after spawning the children -- this is critical, because if the parent holds a write end open, the downstream reader will never see EOF.
Notice how we capture the final stage's output by creating one extra pipe for the last stage. The parent reads from it after all children are spawned. This gives us the same result as running the pipeline in a shell and reading stdout.
This is basically what our shell project did in episodes 47-50, but stripped down to the essentials. The shell version handled more edge cases (built-in commands, environment variables, job control), but the pipe plumbing is identical.
Raw bytes are fine for text streams, but when you need to send structured messages between processes, you need a framing protocol. The simplest approach is length-prefixed messages: send a 4-byte length header followed by that many bytes of payload:
const std = @import("std");
const Message = struct {
msg_type: u8,
payload: []const u8,
fn encode(self: Message, writer: anytype) !void {
// write length header (4 bytes big-endian)
const total_len: u32 = @intCast(1 + self.payload.len);
const len_bytes = std.mem.toBytes(std.mem.nativeToBig(u32, total_len));
try writer.writeAll(&len_bytes);
// write type byte
try writer.writeByte(self.msg_type);
// write payload
try writer.writeAll(self.payload);
}
fn decode(allocator: std.mem.Allocator, reader: anytype) !?Message {
// read length header
var len_bytes: [4]u8 = undefined;
const n = reader.readAll(&len_bytes) catch return null;
if (n < 4) return null;
const total_len = std.mem.bigToNative(u32, std.mem.bytesToValue(u32, &len_bytes));
if (total_len == 0) return null;
if (total_len > 1024 * 1024) return error.MessageTooLarge;
// read type byte
const msg_type = reader.readByte() catch return null;
// read payload
const payload_len = total_len - 1;
const payload = try allocator.alloc(u8, payload_len);
const read_count = reader.readAll(payload) catch |err| {
allocator.free(payload);
return err;
};
if (read_count < payload_len) {
allocator.free(payload);
return null;
}
return .{
.msg_type = msg_type,
.payload = payload,
};
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const stdout = std.io.getStdOut().writer();
const pipe_fds = try std.posix.pipe();
const pid = try std.posix.fork();
if (pid == 0) {
// child: send structured messages
std.posix.close(pipe_fds[0]);
const file = std.fs.File{ .handle = pipe_fds[1] };
const writer = file.writer();
const messages = [_]Message{
.{ .msg_type = 1, .payload = "system initialized" },
.{ .msg_type = 2, .payload = "processing batch 42" },
.{ .msg_type = 3, .payload = "task complete: 128 items" },
.{ .msg_type = 1, .payload = "shutting down" },
};
for (messages) |msg| {
msg.encode(writer) catch break;
}
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
// parent: decode structured messages
std.posix.close(pipe_fds[1]);
const file = std.fs.File{ .handle = pipe_fds[0] };
const reader = file.reader();
const type_names = [_][]const u8{ "???", "INFO", "PROGRESS", "RESULT" };
while (true) {
const msg = Message.decode(allocator, reader) catch break;
if (msg == null) break;
const m = msg.?;
defer allocator.free(m.payload);
const type_name = if (m.msg_type < type_names.len)
type_names[m.msg_type]
else
"UNKNOWN";
try stdout.print("[{s}] {s}\n", .{ type_name, m.payload });
}
std.posix.close(pipe_fds[0]);
_ = std.posix.waitpid(pid, 0);
}
Length-prefixed framing is used everywhere: HTTP/2 frames, WebSocket frames, protobuf wire format, database protocols (PostgreSQL's wire protocol sends a 4-byte length before every message). The big advantage over delimiter-based framing (like newline-separated JSON) is that the payload can contain any bytes, including newlines and null bytes. You know exactly how many bytes to read, so you never accidentally split a message or merge two together.
The big-endian byte order for the length header is a convention from network protocols (it's sometimes called "network byte order"). It means the most significant byte comes first. Zig's std.mem.nativeToBig handles the conversion regardless of the host's endianness.
We touched on pipe buffer sizes earlier, but let's look at this more systematically. Understanding pipe capacity matters when you're designing systems with multiple stages that produce and consume at different rates:
const std = @import("std");
const linux = std.os.linux;
fn measurePipeCapacity() !void {
const stdout = std.io.getStdOut().writer();
const pipe_fds = try std.posix.pipe();
defer std.posix.close(pipe_fds[0]);
defer std.posix.close(pipe_fds[1]);
// get current pipe size
const current = linux.fcntl(pipe_fds[1], linux.F.GETPIPE_SZ, @as(u64, 0));
try stdout.print("Default pipe size: {d} bytes ({d} KiB)\n", .{
current, @divFloor(current, 1024),
});
// try to increase it
const requested: usize = 256 * 1024; // 256 KiB
const result = linux.fcntl(
pipe_fds[1],
linux.F.SETPIPE_SZ,
@as(u64, requested),
);
if (@as(isize, @bitCast(@as(usize, result))) > 0) {
try stdout.print("Resized pipe to: {d} bytes ({d} KiB)\n", .{
result, @divFloor(result, 1024),
});
}
// measure actual capacity by writing until we'd block
// set pipe to non-blocking first
_ = linux.fcntl(pipe_fds[1], linux.F.SETFL, @as(u64, linux.O.NONBLOCK));
const chunk: [4096]u8 = [_]u8{'X'} ** 4096;
var total: usize = 0;
while (true) {
const n = std.posix.write(pipe_fds[1], &chunk) catch break;
total += n;
}
try stdout.print("Actual capacity before EAGAIN: {d} bytes ({d} KiB)\n", .{
total, @divFloor(total, 1024),
});
}
fn demonstrateAtomicWrites() !void {
const stdout = std.io.getStdOut().writer();
// PIPE_BUF is the max size for atomic writes (4096 on Linux)
// writes <= PIPE_BUF are guaranteed atomic (no interleaving)
// writes > PIPE_BUF may be split across multiple reads
const pipe_fds = try std.posix.pipe();
const pid1 = try std.posix.fork();
if (pid1 == 0) {
std.posix.close(pipe_fds[0]);
// write small messages (under PIPE_BUF) -- these are atomic
var i: usize = 0;
while (i < 10) : (i += 1) {
var msg: [64]u8 = undefined;
const len = std.fmt.bufPrint(&msg, "writer-A message {d}\n", .{i}) catch break;
_ = std.posix.write(pipe_fds[1], msg[0..len.len]) catch break;
}
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
const pid2 = try std.posix.fork();
if (pid2 == 0) {
std.posix.close(pipe_fds[0]);
var i: usize = 0;
while (i < 10) : (i += 1) {
var msg: [64]u8 = undefined;
const len = std.fmt.bufPrint(&msg, "writer-B message {d}\n", .{i}) catch break;
_ = std.posix.write(pipe_fds[1], msg[0..len.len]) catch break;
}
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
std.posix.close(pipe_fds[1]);
// read and display -- messages may be interleaved but never torn
var buf: [4096]u8 = undefined;
while (true) {
const n = std.posix.read(pipe_fds[0], &buf) catch break;
if (n == 0) break;
try stdout.print("{s}", .{buf[0..n]});
}
std.posix.close(pipe_fds[0]);
_ = std.posix.waitpid(pid1, 0);
_ = std.posix.waitpid(pid2, 0);
}
pub fn main() !void {
try measurePipeCapacity();
try stdout_print_sep();
try demonstrateAtomicWrites();
}
fn stdout_print_sep() !void {
const stdout = std.io.getStdOut().writer();
try stdout.print("---\n", .{});
}
The PIPE_BUF constant (4096 bytes on Linux) is important for concurrent writers. POSIX guarantees that writes of PIPE_BUF bytes or fewer are atomic -- they won't be interleaved with writes from other processes. Writes larger than PIPE_BUF have no atomicity guarantee; the kernel may split them, causing data from different writers to get mixed together. If you have multiple processes writing to the same pipe (e.g., multiple workers logging to a shared pipe), keep individual write calls under 4096 bytes and you're safe.
Build a "tee" command in Zig. Read stdin, write every byte to BOTH stdout AND a file specified as a command-line argument, simultaneously. Use std.posix.pipe and fork a child process that handles the file writing, while the parent handles stdout. Test it with echo "hello tee" | ./your_tee /tmp/tee_output.txt and verify the file contains the same output as the terminal.
Write a bi-directional IPC system using two anonymous pipes. The parent sends a request message (length-prefixed, like we built in this episode) to the child, and the child sends a response back. Implement a simple "echo server" protocol: the child reads a message, converts it to uppercase, and sends it back. Test with 5 different messages and verify each response is the uppercased version.
Build a parallel command executor that runs N commands simultaneously, captures all their stdout via pipes, and prints the output in order (command 1's full output first, then command 2's, etc.) even though the commands finish in arbitrary order. Use poll to read from all pipes concurrently (so no pipe buffer fills up and blocks a child), but buffer the output per-command and print in sequence after all commands finish. Test with commands that produce different amounts of output and finish at different times.
pipe(), shared between parent and child via fork, and you MUST close the unused ends or the reader never sees EOFmkfifo creates them, and any process can open them for reading or writingfcntl), and writes block when the buffer is full -- this backpressure is a feature, not a bug, preventing fast producers from overwhelming slow consumerspoll lets you monitor multiple pipe file descriptors simultaneously -- you get notified which ones have data, which avoids blocking on any single pipe while others have data waitingPIPE_BUF (4096) bytes or fewer are guaranteed atomic by POSIX -- multiple concurrent writers won't have their data interleaved if they keep messages under this limitPipes are the glue of Unix. They've been around since 1973 and they're not going anywhere. Understanding how they work at the syscall level -- how buffers fill and drain, how file descriptor management controls EOF, how poll multiplexes I/O -- gives you the building blocks for much more complex IPC. The shared memory and semaphore mechanisms we'll look at next build on this same foundation of kernel-managed buffers and file descriptors, but with different tradeoffs.
Bedankt en tot de volgende keer!