improvements for async-io

This commit is contained in:
Chris Boesch
2026-04-06 19:57:32 +02:00
parent 55a4841b07
commit 882c6aa0ab
2 changed files with 59 additions and 53 deletions

View File

@@ -16,16 +16,17 @@
// by a grasshopper) and left several bugs. Can you fix them? // by a grasshopper) and left several bugs. Can you fix them?
// //
// Here's what the program should do: // Here's what the program should do:
// 1. Three sensor tasks run concurrently, each sending // 1. Three sensor tasks send exactly 3 readings each through
// exactly 3 readings through a Queue // a Queue
// 2. A collector task receives readings, protected by a Mutex // 2. A collector task receives readings concurrently,
// protected by a Mutex
// 3. After all sensors finish, the queue is closed // 3. After all sensors finish, the queue is closed
// 4. The final report is written in a cancel-protected section // 4. The final report is written in a cancel-protected section
// //
// ************************************************************* // *************************************************************
// * A NOTE ABOUT THIS EXERCISE * // * A NOTE ABOUT THIS EXERCISE *
// * * // * *
// * This quiz uses concepts from exercises 084-093. * // * This quiz uses concepts from exercises 085-094. *
// * There are 6 bugs to fix — look for the ???s! * // * There are 6 bugs to fix — look for the ???s! *
// * * // * *
// ************************************************************* // *************************************************************
@@ -50,8 +51,8 @@ const GardenWeather = struct {
fn addReading(self: *GardenWeather, io: std.Io, reading: Reading) void { fn addReading(self: *GardenWeather, io: std.Io, reading: Reading) void {
// Bug 1: The collector needs to lock before modifying // Bug 1: The collector needs to lock before modifying
// shared state. What Mutex method acquires the lock? // shared state. What Mutex method acquires the lock?
self.mutex.lock(io) catch return;
self.mutex.???(io) catch return; self.mutex.???(io) catch return;
defer self.mutex.unlock(io);
switch (reading.sensor_type) { switch (reading.sensor_type) {
.thermometer => self.temperature = reading.value, .thermometer => self.temperature = reading.value,
@@ -70,39 +71,40 @@ pub fn main(init: std.process.Init) !void {
var reading_buf: [8]Reading = undefined; var reading_buf: [8]Reading = undefined;
var queue: std.Io.Queue(Reading) = .init(&reading_buf); var queue: std.Io.Queue(Reading) = .init(&reading_buf);
// Sensor group: runs all three sensors to completion. // The collector must run concurrently so it can process
// readings while the sensors are still sending.
// Start it FIRST to ensure its concurrency unit is reserved.
//
// Bug 2: The collector needs guaranteed concurrency.
// What method ensures a separate unit of concurrency?
// (Don't forget: it can fail!)
var collector_future = try io.???(collector, .{ io, &queue, &weather });
defer _ = collector_future.cancel(io);
// Sensor group: the sensors can use async — they just need
// to run, and async is more portable.
var sensors: std.Io.Group = .init; var sensors: std.Io.Group = .init;
// Start three sensor tasks. They need GUARANTEED concurrency sensors.async(io, sensor, .{ io, &queue, .thermometer, 20 });
// since they each simulate real-time measurement. sensors.async(io, sensor, .{ io, &queue, .hygrometer, 60 });
// sensors.async(io, sensor, .{ io, &queue, .anemometer, 10 });
// Bug 2: io.async doesn't guarantee a separate thread.
// Which Io method guarantees true concurrency?
// (Don't forget: it can fail, so you need 'try'!)
try sensors.???(io, sensor, .{ io, &queue, .thermometer, 20 });
try sensors.???(io, sensor, .{ io, &queue, .hygrometer, 60 });
try sensors.???(io, sensor, .{ io, &queue, .anemometer, 10 });
// Collector group: processes readings from the queue.
var collectors: std.Io.Group = .init;
collectors.async(io, collector, .{ io, &queue, &weather });
// Bug 3: Wait for ALL sensors to finish sending their readings. // Bug 3: Wait for ALL sensors to finish sending their readings.
// What Group method blocks until all tasks complete? // What Group method blocks until all tasks complete?
try sensors.await(io); try sensors.???(io);
// try sensors.???(io);
// All sensors done — close the queue so the collector knows // All sensors done — close the queue so the collector knows
// there's no more data coming. // there's no more data coming.
queue.close(io); queue.close(io);
// Wait for the collector to drain the queue. // Wait for the collector to drain the remaining queue.
try collectors.await(io); _ = collector_future.await(io);
// _ = collector_future.???(io);
// Now write the garden report. This is critical — it must // Now write the garden report. This is critical — it must
// NOT be interrupted, even if something tries to cancel us! // NOT be interrupted, even if something tries to cancel us!
// //
// Bug 4: Protect this section from cancellation. // Bug 5: Protect this section from cancellation.
// What Io method swaps the cancel protection state? // What Io method swaps the cancel protection state?
const old_protection = io.???(.blocked); const old_protection = io.???(.blocked);
defer _ = io.???(old_protection); defer _ = io.???(old_protection);
@@ -125,7 +127,7 @@ fn sensor(
.value = base_value + @as(i32, @intCast(i)), .value = base_value + @as(i32, @intCast(i)),
}; };
// Bug 5: Send the reading into the queue. // Bug 6: Send the reading into the queue.
// What Queue method sends a single element? // What Queue method sends a single element?
queue.???(io, reading) catch return; queue.???(io, reading) catch return;
} }
@@ -163,8 +165,7 @@ fn printGardenReport(weather: *GardenWeather) void {
// //
// This quiz covered the main async I/O primitives: // This quiz covered the main async I/O primitives:
// io.async() - launch a task (may run inline) // io.async() - launch a task (may run inline)
// io.concurrent() - launch with guaranteed parallelism // io.concurrent() - guaranteed unit of concurrency
// Group.concurrent() - concurrent tasks in a group
// Future.await/cancel - collect or cancel a single task // Future.await/cancel - collect or cancel a single task
// Group.async/await/cancel - manage fire-and-forget tasks // Group.async/await/cancel - manage fire-and-forget tasks
// Select.async/await - race tasks, act on first completion // Select.async/await - race tasks, act on first completion
@@ -180,7 +181,8 @@ fn printGardenReport(weather: *GardenWeather) void {
// Batch - submit multiple I/O operations at once // Batch - submit multiple I/O operations at once
// //
// The key insight: all of these work through the Io VTable, // The key insight: all of these work through the Io VTable,
// so your code is portable across backends (Threaded, Uring, // so your code is portable across backends — whether Threaded
// Kqueue, Dispatch) without any changes! // (OS thread pool), or Evented (M:N green threads / fibers
// that can provide concurrency even on a single OS thread).
// //
// Doctor Zoraptera approves. // Doctor Zoraptera approves.

View File

@@ -1,38 +1,42 @@
--- exercises/095_quiz_async.zig 2026-04-03 18:04:53.577391455 +0200 --- exercises/095_quiz_async.zig 2026-04-06 19:55:17.111817364 +0200
+++ answers/095_quiz_async.zig 2026-04-03 18:05:42.570392172 +0200 +++ answers/095_quiz_async.zig 2026-04-06 19:56:16.063974543 +0200
@@ -51,7 +51,7 @@ @@ -51,7 +51,7 @@
fn addReading(self: *GardenWeather, io: std.Io, reading: Reading) void {
// Bug 1: The collector needs to lock before modifying // Bug 1: The collector needs to lock before modifying
// shared state. What Mutex method acquires the lock? // shared state. What Mutex method acquires the lock?
self.mutex.lock(io) catch return;
- self.mutex.???(io) catch return; - self.mutex.???(io) catch return;
+ defer self.mutex.unlock(io); + self.mutex.lock(io) catch return;
defer self.mutex.unlock(io);
switch (reading.sensor_type) { switch (reading.sensor_type) {
.thermometer => self.temperature = reading.value, @@ -78,7 +78,7 @@
@@ -79,9 +79,9 @@ // Bug 2: The collector needs guaranteed concurrency.
// Bug 2: io.async doesn't guarantee a separate thread. // What method ensures a separate unit of concurrency?
// Which Io method guarantees true concurrency? // (Don't forget: it can fail!)
// (Don't forget: it can fail, so you need 'try'!) - var collector_future = try io.???(collector, .{ io, &queue, &weather });
- try sensors.???(io, sensor, .{ io, &queue, .thermometer, 20 }); + var collector_future = try io.concurrent(collector, .{ io, &queue, &weather });
- try sensors.???(io, sensor, .{ io, &queue, .hygrometer, 60 }); defer _ = collector_future.cancel(io);
- try sensors.???(io, sensor, .{ io, &queue, .anemometer, 10 });
+ try sensors.concurrent(io, sensor, .{ io, &queue, .thermometer, 20 }); // Sensor group: the sensors can use async — they just need
+ try sensors.concurrent(io, sensor, .{ io, &queue, .hygrometer, 60 }); @@ -91,7 +91,7 @@
+ try sensors.concurrent(io, sensor, .{ io, &queue, .anemometer, 10 });
// Collector group: processes readings from the queue.
var collectors: std.Io.Group = .init;
@@ -90,7 +90,6 @@
// Bug 3: Wait for ALL sensors to finish sending their readings. // Bug 3: Wait for ALL sensors to finish sending their readings.
// What Group method blocks until all tasks complete? // What Group method blocks until all tasks complete?
try sensors.await(io); - try sensors.???(io);
- // try sensors.???(io); + try sensors.await(io);
// All sensors done — close the queue so the collector knows // All sensors done — close the queue so the collector knows
// there's no more data coming. // there's no more data coming.
@@ -104,8 +103,8 @@ @@ -99,15 +99,14 @@
// Wait for the collector to drain the remaining queue.
_ = collector_future.await(io);
- // _ = collector_future.???(io);
// Now write the garden report. This is critical — it must
// NOT be interrupted, even if something tries to cancel us!
// //
// Bug 4: Protect this section from cancellation. // Bug 5: Protect this section from cancellation.
// What Io method swaps the cancel protection state? // What Io method swaps the cancel protection state?
- const old_protection = io.???(.blocked); - const old_protection = io.???(.blocked);
- defer _ = io.???(old_protection); - defer _ = io.???(old_protection);
@@ -41,9 +45,9 @@
printGardenReport(&weather); printGardenReport(&weather);
} }
@@ -127,7 +126,7 @@ @@ -129,7 +128,7 @@
// Bug 5: Send the reading into the queue. // Bug 6: Send the reading into the queue.
// What Queue method sends a single element? // What Queue method sends a single element?
- queue.???(io, reading) catch return; - queue.???(io, reading) catch return;
+ queue.putOne(io, reading) catch return; + queue.putOne(io, reading) catch return;