Tokio: Channels
This is a long section. Here are some anchors to jump to the respective part:
- Sharing data between tasks
- Multi-Producer, Single-Consumer
- Oneshot
- MPSC + Oneshot: Send the sender pattern
- Broadcast
- Watch
- Exercise 2: Send messages to all clients
- Solution
Sharing data between tasks #
When you write concurrent network applications there might be a good chance that you need to share data between tasks at some point. You can use the same primitives as you do with classic multi-threading in Rust, but Tokio offers tools that are more tailored to the async world.
For example, it’s totally fine to use a std::sync::Mutex
if you want to share e.g. global state in an Arc
across multiple tasks. In fact, if you write web applications with Axum, you might do this a lot. However, consider using Mutex
from tokio::sync
instead. It’s a light-weight wrapper around std::sync::Mutex
that works better for async code.
The reason being that the MutexGuard
that is being returned after requiring a Mutex lock is not Send
, meaning that it can’t be shared safely across threads. This makes sense when you think about mutexes mostly being locked inside a thread, where you work quickly on the data and then unlock again. In Tokio, you might however come across an .await
point, meaning that the subsequent code might run on a totally different worker thread altogether. tokio::sync::Mutex
solves this problem.
Still, while using mutexes or RwLock
s or similar concepts, there are ways that make sharing data between tasks more efficient and – dare I say it – more elegant. This is where channels come into play.
Like the standard library, Rust has a concept for channels that allow sending data between tasks. Compared to the standard library, Tokio’s has more variety in their channels and allows for nice communication patterns. Those are heavily influenced by the work of Rob Pike and his work on Go. I’d argue his famous quote from the “Go proverbs” describes the philosophy behind channels quite well:
Don’t communicate by sharing memory; share memory by communicating.
The following examples show both channel types and the patterns you can implement using them.
Multi-Producer, Single-Consumer #
The first channel type we look at is mpsc
, short for multi-producer, single-consumer. This is the async equivalent of the built-in mpsc
you get from std::sync
. As you have many producers that send data to a single consumer, this channel type is perfect for distributing work and collecting the results at the end.
Note: In the following examples I will use variable names tx
and rx
for sender and receiver respectively. This convention comes from radio transmission, where tx
stands for transmitter and rx
for receiver.
The following example shows the basic usage.
use tokio::{sync::mpsc, task};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(100); // (1)
for i in 0..10 {
let tx = tx.clone(); // (2)
task::spawn(async move {
// async computation ...
tx.send(format!("hello from {}", i)).await.unwrap(); // (3)
});
}
drop(tx); // (4)
// (5)
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}
}
Let’s break it down:
- We create a channel with a buffer size of 100. This is a bounded channel, which can buffer upt to 100 messages before the receiver needs to consume them. If the buffer is full, the sender will be suspended until the receiver has consumed a message. There’s also an unbounded channel, which doesn’t have a buffer size, but could cause the process to run out of memory.
- Since it’s a multi-producer, single-consumer channel, we can clone the sender and move the ownership of the clone to another task. This
.clone
talk creates another sender that points to the same receiver. - The newly spawned task takes ownership of the sender and sends a message to the receiver.
- Here we drop the original sender. This is important, as the receiver knows how many senders are still active. With each drop of a sender, it will count down. Once all senders are dropped, the following line will receive no more messages, allowing the program to exit. Since we only send clones to the tasks, the original sender is still active until dropped.
- We receive all messages from each task as long as there are active senders.
rx.recv().await
returns anOption<T>
, whereNone
indicates that the last sender has been dropped.
If you run this program, you will see that the messages are received in a random order. This is because the tasks are spawned concurrently and the order in which they finish is not guaranteed. Also rx.recv().await
is async, meaning that the programm will resume at this line the moment there’s another message to receive.
Oneshot #
Oneshot channels are … boring? You get one sender, and one receiver. You can send one message, and that’s it.
The following example is similar to the one you get from the docs, because if you look at a oneshot channel alone, there’s not a lot to it.
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel(); // (1)
tokio::spawn(async move {
// (2)
if tx.send("hello").is_err() {
println!("receiver dropped");
}
});
// (3)
match rx.await {
Ok(x) => println!("Received: {}", x),
Err(e) => println!("error: {:?}", e),
}
}
- We create a oneshot channel. We have one sender and one receiver.
- In an async task, we send stuff. What’s important here is that the
send
method is not async, but returns aResult
. This operation can fail if there’s no receiver left. - The receiver has no methods, it can only wait for the value to arrive. The resulting value is also a
Result
, as the sender might have dropped earlier without sending something.
Rust’s Result
types have your back should your program end up in a state where sending and receiving can’t be guaranteed anymore.
Again, oneshot channels by themselves are kind of boring, but they enable a couple of interesting patterns.
MPSC + Oneshot: Send the sender pattern #
My most favourite pattern is the send the sender pattern. It’s a combination of a oneshot channel and a mpsc channel.
Let’s assume you have two parts in your application that work independently from each other. For example a server and a background worker. The web server accepts incoming tasks via TCP, and sends them to the background worker doing the heavy lifting. How do you get the background worker’s result back to the web server? This is what the send the sender pattern is for.
This example is a bit longer, so we break it down into a few smaller pieces. The task is to create a server that sends commands to a background worker. The background worker increments a number based on the command.
First, the imports!
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpListener,
sync::{mpsc, oneshot},
task,
};
Next, the structs that we use to communicate accordingly.
enum CommandMessage {
Increment,
Decrement,
}
struct Command {
msg: CommandMessage,
tx: oneshot::Sender<u64>,
}
The CommandMessage
enum tells the background worker what to do. Increment your number, or decrement it. The Command
struct is a message that contains the command and a oneshot sender. The sender is used to send the result back to the server.
#[tokio::main]
async fn main() {
let (server_tx, mut worker_rx) = mpsc::channel::<Command>(100);
let listener = TcpListener::bind("localhost:8001").await.unwrap();
// tbd.
}
In the main
function, we create a mpsc channel with a buffer size of 100. The server will send commands to the worker. Therefore we need multiple producers. The worker will send the result back to the server.
We listen for incoming connections on port 8001.
// The "worker"
task::spawn(async move {
let mut count = 0;
while let Some(incoming) = worker_rx.recv().await {
match incoming.msg {
CommandMessage::Increment => count += 1,
CommandMessage::Decrement => count -= 1,
}
incoming.tx.send(count).unwrap();
}
});
The worker lives in its own task, has a counter variable that will be either incremented or decremented. It listens for incoming commands from the mpsc receiver. It is its single consumer. Once the command is received, it will increment or decrement the counter and send the result back to the server.
The server is a bit more complex. First, we use the same boilerplate as in the echo server example.
loop {
let (socket, _addr) = listener.accept().await.unwrap();
let server_tx = server_tx.clone();
task::spawn(async move {
let (reader, mut writer) = socket.into_split();
let mut buf = String::new();
let mut reader = BufReader::new(reader);
while let Ok(_bytes_read) = reader.read_line(&mut buf).await {
// tbd.
}
});
}
Refer to the previous section for an explanation. Essentially, this server accepts incoming connections and spawns a new task for each connection. The only difference is that we clone server_tx
to move it into the newly spawned task. Inside, we wait for incoming messages.
let msg = match buf.trim() {
"quit" => break,
"increment" => CommandMessage::Increment,
"decrement" => CommandMessage::Decrement,
_ => {
buf.clear();
continue;
}
};
Based on the message, we either quit the server, create a command to increment or decrement the worker’s counter, or just ignore the message.
The command itself needs moth the CommandMessage
and a oneshot sender. Once created, we send the command to the worker.
let (tx, rx) = oneshot::channel();
let cmd = Command { msg, tx };
server_tx.send(cmd).await.unwrap();
The command has now been sent to the worker. Based on the enum, it will either increment or decrement its counter. Inside the spawned connection task, we wait for a result.
let res = rx.await.unwrap();
writer
.write_all(format!("Count: {}\n", res).as_bytes())
.await
.unwrap();
buf.clear();
And send it back to the socket. You can find the whole script here. Once you run it, you can connect via Telnet and send commands to the worker. It’s especially fun if you connect multiple clients to it!
Broadcast #
The broadcast channel is perfect for fan-out, pub-sub or chat scenarios. It allows you to create many producers that talk to many consumers, with each consumer receiving each message.
The following example is similar to the one from the multiple producers, single consumer section. We create a broadcast channel with a buffer size of 100. We then spawn ten tasks that send a message to the broadcast channel. Each task also creates a new receiver and listens for messages. The main task also listens for messages.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(100); // (1)
for i in 0..10 {
let tx = tx.clone(); // (2)
let mut rx = tx.subscribe(); // (3)
tokio::spawn(async move {
let _ = tx.send(i);
});
tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
// (4)
println!("{} receives from: {}", i, msg);
}
});
}
drop(tx); // (5)
while let Ok(msg) = rx.recv().await {
// (6)
println!("Orig receives from: {}", msg);
}
}
Let’s break it down:
- We create a broadcast channel with a buffer size of 100.
- We clone the sender to move it into the spawned task. Each cloned sender is connected to each receiver.
- We subscribe to the cloned sender. Not only does this receiver suscribe to messages from the cloned sender, it will receive messages from all senders.
- We listen for messages from any sender that is out there. This
recv()
call will receive messages until all senders are dropped. - We drop the original sender. Otherwise all spawned tasks would wait for messages forever.
- The original receiver also waits for messages.
Try running it. It’s quite funny to see how many messages come in from different senders. Again, this is perfect for chat applications where you want to get one message out to many clients.
Watch #
Watch channels are great to distribute updates to a multitude of tasks. You have a single producer, but many consumers. The most recent value is stored in the channel. Only the most recent value is made available, so there is a chance that some values might be dropped!
use std::time::Duration;
use tokio::{sync::watch, task, time};
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel("Initial State"); // (1)
// (2)
let updater = task::spawn(async move {
let states = ["State 1", "State 2", "State 3"];
for &state in &states {
time::sleep(Duration::from_secs(2)).await; // (3)
tx.send(state).expect("Failed to send state"); // (4)
println!("Updated state to: {}", state);
}
});
// (5)
for i in 0..3 {
let mut rx = rx.clone(); // (6)
task::spawn(async move {
while rx.changed().await.is_ok() { // (7)
// Access the latest value
println!("Task {} received: {}", i, *rx.borrow()); // (8)
}
});
}
// Wait for the updater to complete
updater.await.unwrap();
}
Let’s break it down:
- We create a watch channel with an initial state of “Initial State”.
- We spawn a task that updates the state every two seconds. We store the
JoinHandle
inupdater
so we can wait for it to complete. - Using
tokio::time::sleep
, we wait for two seconds. We do this to simulate some real world going on. Try removing this line to see how the update behaviour changes! - We send the new state to the watch channel.
- We spawn three tasks that listen for changes on the watch channel.
- We clone the receiver to move it into the spawned task.
- We wait for the state to change. The
changed()
method waits for a change notification, then marks the newest value as seen. If the value has not been marked seen, it will return immediately. If the value has not changed (e.g. already has been seen), it will wait for the next change. Note that it doesn’t return a value, but aResult<(), RecvError>
. - We can access the latest value by calling
borrow()
on the receiver. There’s alsoborrow_and_update()
which gets you a reference to the most recently seen value and marks it also as seen.
You see that the combination of “has something changed” and “give me the latest value” is quite powerful. This channel is perfect for distributing updates to many tasks, updating configuration changes, monitor service health, or real-time state broadcasting.
Exercise 2: Send messages to all clients #
We are on our way to change the echo server from the previous exercise to a chat server. In this step, we want to send all incoming message to all connected clients.
Your server should:
- Accept multiple incoming connections
- Broadcast every message to all connected clients
- Makes sure that the message is not sent back to the client that sent it
It will be a bit clunky as we haven’t worked with concurrent futures yet, so it might be that you need to press enter before getting the next message. This is fine for now.
Solution #
This is the solution:
use std::net::SocketAddr;
use tokio::{
io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpListener,
sync::broadcast,
};
// (1)
#[derive(Clone, Debug)]
struct Message {
content: String,
id: SocketAddr,
}
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("localhost:8001").await?;
let (tx, _) = broadcast::channel::<Message>(100); // (2)
loop {
let (socket, addr) = listener.accept().await?;
println!("New connection at {}", addr);
let tx = tx.clone(); // (3)
let mut rx = tx.subscribe(); // (4)
tokio::spawn(async move {
let (reader, mut writer) = socket.into_split(); // (5)
// (6)
tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if msg.id != addr {
writer.write_all(msg.content.as_bytes())
.await
.unwrap();
}
}
});
// Receiving a message from the client
// sending it to the channel
let mut buf = String::new();
let mut reader = BufReader::new(reader);
while let Ok(_b_read) = reader.read_line(&mut buf).await {
if buf.trim() == "quit" {
break;
}
// (7)
tx.send(Message {
content: buf.clone(),
id: addr,
})
.unwrap();
buf.clear();
}
});
}
}
Let’s focus on the things that have changed.
- We define a
Message
struct that holds the content of the message and theSocketAddr
of the client that sent it. TheSocketAddr
serves as a kind of ID for the client. - We create a broadcast channel that can hold up to 100 messages. We don’t need to store the receiver, as we can create a new one for each client.
- For each incoming connection, we clone the sender
tx
. - We also create a new subscriber/receiver
rx
by subscribing to the current sender. In doing that, all senders and receivers are connected to each other. - When splitting the socket into a reader and writer, we use
into_split
as opposed tosplit
. Withinto_split
, the read half and write half can be owned, which means we can move them to their own tasks. - And this is exactly what we do here. The task we spawn reads incoming messages from the broadcast channel and writes them to the socket, given that the message was not sent by the current client.
- Instead of echoing the incoming message back to the client, we send it over the broadcast channel to all connected receivers.
And that’s it!