Skip to content
v1.0.0-zig0.15.2

Channels

Channels are the primary mechanism for passing data between tasks in Volt. Four channel types cover the common communication patterns.

TypePatternAllocationClose required
Channel(T)Bounded MPMC (multi-producer, multi-consumer)Yes (ring buffer)deinit()
Oneshot(T)Single value, one sender, one receiverNoNo
BroadcastChannel(T)Fan-out, all receivers get every messageYes (ring buffer)deinit()
Watch(T)Single value with change notificationNoNo

Each channel type has different return types for send and receive operations. Don’t assume they work the same way.

ChanneltrySend returnstryRecv returns
Channel(T).ok, .full, .closed.value, .empty, .closed
Oneshot(T)bool (via sender.send())?T (via receiver.tryRecv())
BroadcastChannel(T).ok(usize), .closed.value, .empty, .lagged(usize), .closed
Watch(T)void (via send())Borrow via rx.borrow(), check rx.hasChanged()

A bounded channel backed by a lock-free Vyukov/crossbeam-style ring buffer. Multiple producers and multiple consumers can operate concurrently. When the buffer is full, senders are suspended (not the OS thread); when empty, receivers are suspended.

const volt = @import("volt");
// Using the factory function
var ch = try volt.channel.bounded(u32, allocator, 100);
defer ch.deinit();
// Or directly
var ch2 = try volt.channel.Channel(u32).init(allocator, 100);
defer ch2.deinit();
// Send
switch (ch.trySend(42)) {
.ok => {}, // Value was placed in the buffer
.full => {}, // Buffer is full, try again later
.closed => {}, // Channel was closed
}
// Receive
switch (ch.tryRecv()) {
.value => |v| processValue(v),
.empty => {}, // No values available
.closed => {}, // Channel closed, no more values
}
// Send -- yields until a slot is available
ch.send(io, 42);
// Receive -- yields until a value is available (returns null if closed)
if (ch.recv(io)) |value| {
processValue(value);
} else {
// Channel closed and drained
}

Pass the io: volt.Io handle so the channel can yield to the scheduler when the buffer is full (send) or empty (recv) and resume the task when the operation can proceed.

For manual future composition or custom schedulers, use the Future-returning variants:

var send_future = ch.sendFuture(42);
// Poll through your scheduler...
var recv_future = ch.recvFuture();
// When future.poll() returns .ready, you have the value.

Close the channel to signal that no more values will be sent:

ch.close();

After closing, trySend returns .closed. tryRecv continues to drain buffered values, then returns .closed.

For custom scheduler integration:

// Receive with waiter
var waiter = volt.channel.channel_mod.RecvWaiter.init();
waiter.setWaker(@ptrCast(&my_ctx), myWakeCallback);
ch.recvWait(&waiter);
// Yield to scheduler. When woken, check waiter.status.load(.acquire):
// WAITER_COMPLETE (1) = success, WAITER_CLOSED (2) = channel closed.
// Send with waiter
var send_waiter = volt.channel.channel_mod.SendWaiter.init();
send_waiter.setWaker(@ptrCast(&my_ctx), myWakeCallback);
ch.sendWait(value, &send_waiter);

A oneshot channel delivers exactly one value from a sender to a receiver. It is zero-allocation and ideal for returning results from spawned tasks.

var os = volt.channel.oneshot(u32);
// os.sender -- use to send one value
// os.receiver -- use to receive the value

The sender can send exactly one value. send returns true if the value was delivered to a waiting receiver or stored for later pickup:

_ = os.sender.send(42);

If the sender is dropped without sending, the receiver sees the channel as closed.

// Non-blocking
if (os.receiver.tryRecv()) |value| {
// Got the value
} else {
// Not sent yet (or sender closed without sending)
}
// Yields until the value arrives or the sender closes
switch (os.receiver.recv(io)) {
.value => |v| {
// Got the value
useResult(v);
},
.closed => {
// Sender dropped without sending
},
}

For manual future composition:

var waiter = volt.channel.oneshot_mod.Oneshot(u32).RecvWaiter.init();
waiter.setWaker(@ptrCast(&my_ctx), myWakeCallback);
if (!os.receiver.recvWait(&waiter)) {
// Yield to scheduler. Woken when value arrives or sender closes.
}
if (waiter.value) |v| {
// Got the value
} else if (waiter.closed.load(.acquire)) {
// Sender closed without sending
}
fn deliverResult(io: volt.Io) void {
var os = volt.channel.oneshot(ComputeResult);
// Spawn computation
const f = try io.@"async"(struct {
fn run() void {
const result = expensiveComputation();
_ = os.sender.send(result);
}
}.run, .{});
_ = f;
// ... do other work ...
// Collect result (async -- yields until value arrives)
switch (os.receiver.recv(io)) {
.value => |result| useResult(result),
.closed => {},
}
}

A broadcast channel delivers every sent message to all subscribed receivers. It uses a ring buffer, and slow receivers that fall behind may miss messages.

var bc = try volt.channel.broadcast(Event, allocator, 16);
defer bc.deinit();

The capacity (16) is the size of the ring buffer.

Create receivers by subscribing to the channel:

var rx1 = bc.subscribe();
var rx2 = bc.subscribe();

Each receiver maintains its own read position in the ring buffer.

Sends are non-blocking and never fail (unless the channel is closed):

_ = bc.send(Event{ .kind = .user_joined, .user_id = 42 });

If a receiver is too slow and the ring buffer wraps, that receiver’s oldest unread messages are overwritten.

switch (rx1.tryRecv()) {
.value => |event| handleEvent(event),
.empty => {}, // No new messages
.lagged => |n| {
// Missed n messages due to slow consumption
// The next tryRecv will return the oldest available message
},
.closed => {}, // Channel was closed
}
// Yields until a new message is available
switch (rx1.recv(io)) {
.value => |event| handleEvent(event),
.empty => {}, // No messages available yet
.lagged => |n| {
// Missed n messages due to slow consumption
},
.closed => {}, // Channel was closed
}
var waiter = volt.channel.broadcast_mod.RecvWaiter.init();
waiter.setWaker(@ptrCast(&my_ctx), myWakeCallback);
rx1.recvWait(&waiter);
// Yield until a new message arrives.

A watch channel holds a single value and notifies receivers when it changes. Only the latest value is kept — there is no history. This is perfect for configuration or state that changes over time.

var watch = volt.channel.watch(Config, default_config);

Zero-allocation, no deinit required.

watch.send(Config{
.max_connections = 200,
.timeout_ms = 5000,
});
var rx = watch.subscribe();
// Check if value changed since last read
if (rx.hasChanged()) {
const config = rx.borrow();
applyConfig(config.*);
rx.markSeen();
}
// Yields until the value is updated
switch (rx.changed(io)) {
.changed => {
const config = rx.borrow();
applyConfig(config.*);
rx.markSeen();
},
.closed => {},
}

Advanced: changedFuture (low-level waiter)

Section titled “Advanced: changedFuture (low-level waiter)”
var waiter = volt.channel.watch_mod.ChangeWaiter.init();
waiter.setWaker(@ptrCast(&my_ctx), myWakeCallback);
rx.waitForChange(&waiter);
// Yield to scheduler. Woken when value is updated.
var config_watch = volt.channel.watch(AppConfig, default_config);
// Config reloader task
fn reloadLoop() void {
while (running) {
const new_config = loadConfigFromFile("config.toml");
config_watch.send(new_config);
sleep(Duration.fromSecs(30));
}
}
// Worker task
fn workerLoop() void {
var rx = config_watch.subscribe();
while (running) {
if (rx.hasChanged()) {
const cfg = rx.borrow();
updateWorkerSettings(cfg.*);
rx.markSeen();
}
doWork();
}
}

Bounded channels provide natural backpressure: when the buffer is full, senders are forced to slow down. Understanding this behavior is essential for building reliable pipelines.

APIBehavior when full
trySend(val)Returns .full immediately — the caller decides what to do
send(io, val)Suspends the task until a slot opens (another task calls tryRecv/recv)
sendFuture(val)Returns a future that resolves when the value is accepted

Drop oldest (lossy). If freshness matters more than completeness (e.g., metrics, telemetry), use trySend and discard on .full:

switch (ch.trySend(metric)) {
.ok => {},
.full => {}, // Drop this metric -- better than blocking the producer
.closed => return,
}

Exponential backoff. For producers that should slow down but not block forever:

var delay = volt.Duration.fromMillis(1);
while (true) {
switch (ch.trySend(item)) {
.ok => break,
.full => {
// Back off, doubling each time (capped at 1 second)
const sleep_f = volt.time.sleep(delay);
_ = sleep_f;
delay = volt.Duration.fromNanos(@min(
delay.toNanos() * 2,
volt.Duration.fromSecs(1).toNanos(),
));
},
.closed => return,
}
}

Blocking send. When every message matters, use send(io, val) to let the runtime manage the wait:

ch.send(io, important_event); // Suspends until space is available

A classic deadlock occurs when two tasks send to each other through full channels:

Task A: ch_x.send(io, val); // Blocks -- ch_x is full
ch_y.recv(io); // Never reached
Task B: ch_y.send(io, val); // Blocks -- ch_y is full
ch_x.recv(io); // Never reached

Prevention: Ensure at least one direction uses trySend with a fallback, or design your pipeline as a DAG (no cycles between channels).


NeedChannel type
Work queue with backpressureChannel (bounded MPMC)
Return a single result from a taskOneshot
Configuration that changes at runtimeWatch
Event fan-out to multiple consumersBroadcastChannel
Multiple producers, single consumerChannel
Multiple producers, multiple consumersChannel
Fire-and-forget notificationsBroadcastChannel
Latest-value observationWatch
TypeNeeds allocatorNeeds deinit()
Channel(T)YesYes
BroadcastChannel(T)YesYes
Oneshot(T)NoNo
Watch(T)NoNo
  • Channel uses a lock-free ring buffer for trySend/tryRecv. The waiter queue uses a separate mutex that never touches the data path.
  • Oneshot is entirely lock-free (atomic state machine).
  • BroadcastChannel uses atomic sequence numbers per slot for lock-free reads.
  • Watch uses a version counter with atomic compare-and-swap.