Rust async/await: Async executor
This article is a continuation of the previous articles on Rust async/await. If you haven't read them yet, I recommend you read them first:
- Desugaring and assembly of async/await in Rust -
goto
- Nested async/await in Rust: Desugaring and assembly -
patrol
This article will teach us how the async executor schedules async tasks. We will see how a simple async executor works, and how it is implemented in Rust. We will analyze async scheduling of the goto
and patrol
functions from the previous articles.
Note: The code in this article is taken from a fork of the simple async local executor.
goto
and patrol
functions
To recap, here are the goto
and patrol
functions from the previous articles:
#[derive(Default)]
struct Unit {
/// The 1-D position of the unit. The unit can only move along this axis.
pub pos: i32,
}
type UnitRef = Rc<RefCell<Unit>>;
/// A future that will move the unit towards `target_pos` at each step,
/// and complete when the unit has reached that position.
struct UnitGotoFuture {
unit: UnitRef,
target_pos: i32,
}
impl Future for UnitGotoFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let unit_pos = self.unit.borrow().pos;
if unit_pos == self.target_pos {
Poll::Ready(())
} else {
self.unit.borrow_mut().pos += (self.target_pos - unit_pos).signum();
Poll::Pending
}
}
}
/// Helper async function to write unit behavior nicely
async fn goto(unit: UnitRef, pos: i32) {
UnitGotoFuture {
unit,
target_pos: pos,
}
.await;
}
/// Let a unit go back and forth between two positions
async fn patrol(unit: UnitRef, poses: [i32; 2]) {
loop {
goto(unit.clone(), poses[0]).await;
goto(unit.clone(), poses[1]).await;
}
}
Async executor
An async executor is a piece of code that runs async tasks. It is responsible for scheduling the tasks and waking them up when they are ready to progress.
An async program's main
function creates the async tasks and then calls the executor to run the tasks. The executor will keep running the tasks until all tasks are completed. In our example, we simulate the executor by calling the Executor::step()
method in a loop. We limit the number of steps to 30 to run the example in the Rust playground.
/// Test program with two units
fn main() {
let executor = Executor::default();
let units: [UnitRef; 2] = Default::default();
executor.spawn(patrol(units[0].clone(), [-5, 5]));
// executor.spawn(patrol(units[1].clone(), [-1,1]));
for _ in 0..30 {
executor.step();
}
}
The flow chart of the Executor::step()
method is shown below. The Executor::step()
method is the crucial method of the executor. It is responsible for scheduling the tasks. When the function starts, it will look for any new spawned tasks and add them to the task_queue
. Then, it will loop through the task_queue
and execute the tasks. A task will be added to the pending_tasks
list if it is not ready. The pending_tasks
list will be used for the next step. The Executor::step()
method returns true
if there are still tasks to run.
The Executor::step()
method is implemented as follows:
pub fn step(&self) -> bool {
// Dummy waker and context (not used as we poll all tasks)
let waker = dummy_waker();
let mut context = Context::from_waker(&waker);
// Append new tasks created since the last step into the task queue
let mut task_queue = self.inner.task_queue.borrow_mut();
task_queue.append(&mut self.inner.new_tasks.borrow_mut());
// Loop over all tasks, polling them. If a task is not ready, add it to the pending tasks.
let mut pending_tasks = Vec::new();
let mut any_left = false;
for mut task in task_queue.drain(..) {
match task.poll(&mut context) {
Poll::Ready(()) => {} // task done
Poll::Pending => {
pending_tasks.push(task);
any_left = true;
}
}
}
// Keep pending tasks for the next step
*task_queue = pending_tasks;
any_left // Return true if there are still tasks to run
}
Understanding the async executor flow
Now that we have seen how the async executor works let's see how the executor schedules the goto
and patrol
functions. We will use the following sequence diagrams to understand how the executor schedules the async tasks.
The complete flow is split into the following parts:
Spawn an async task
The following sequence diagram shows how an executor spawns a task and how the task is executed.
- The executor is created.
- The executor returns to
main
. - The
main
creates thepatrol_closure
and captures theunit[0]
and[5, -5]
in the closure environment. The initial state of the closure is set to 🚀Patrol::Start(0)
. - The executor creates a new task from the closure and stores it in the
new_tasks
list. - The executor returns to
main
.
Start executing async tasks
Once the task is spawned, the executor will keep running the async runtime if any events are scheduled. In our example, we call it Executor::step()
and put it in a loop. Note that we are limiting the number of steps to 30 to run the example in the Rust playground.
-
main
callsExecutor::step()
to execute the first task. * Theexecutor
looks for any new tasks that are spawned and adds them to thetask_queue
. -
The
executor
loops through thetask_queue
and executes the tasks. This scenario finds only one task in thetask_queue
. It calls thepoll
method on the task. -
The task scheduled here contains the
patrol_closure
. The task involves thepoll
method of thepatrol_closure
. -
The
patrol_closure
is in the 🚀Patrol::Start
state. In this state, the closure starts at the beginning of the function. Thegoto_closure
is created by capturing theunit
andposes[0]
in the closure environment. The initial state of the closure is set to 🚀Goto::Start
.async fn patrol(unit: UnitRef, poses: [i32; 2]) { loop { ➡️ goto(unit.clone(), poses[0]).await; goto(unit.clone(), poses[1]).await; } }
-
The
patrol_closure
polls thegoto_closure
to find if the first position specified inposes[0]
has been reached. Thepoll
method of thegoto_closure
results from the.await
marked with the ➡️ arrow in the above code snippet. -
The
goto_closure
creates aUnitGotoFuture
. -
The
goto_closure
polls theUnitGotoFuture
. Thepoll
method of theUnitGotoFuture
checks if theunit
has reached thepos
. In this case, theunit
has not reached thepos
yet. -
The
poll
method returnsPoll::Pending
to thegoto_closure
. Thegoto_closure
sets its state to 🕓Goto::Waiting
. -
The
goto_closure
returnsPoll::Pending
to thepatrol_closure
. Thepatrol_closure
sets its state to 🕓Patrol::WaitingToReachPosition0
. -
The
patrol_closure
returnsPoll::Pending
to thetask
. -
The
task
returnsPoll::Pending
to theexecutor
. -
Since the
task
is not ready, theexecutor
adds it to thepending_tasks
list to be scheduled later. Theexecutor
then returns to the caller ofExecutor::step()
.
Continue execution of async tasks
The executor
runs the async runtime if any events are scheduled. In our example, we call it Executor::step()
and put it in a loop. At each step, polling of the goto_closure
moves the unit
closer to the pos
. Now we will see how the goto_closure
returns Poll::Ready
to the patrol_closure
when the unit
reaches the pos
. We start at step 6 in the following sequence diagram.
-
Once the
unit
reaches thepos
, thegoto_closure
returnsPoll::Ready
to thepatrol_closure
. Thepatrol_closure
then polls thegoto_closure
again. The ➡️ shows the.await
point in thepatrol_closure
.async fn patrol(unit: UnitRef, poses: [i32; 2]) { loop { ➡️ goto(unit.clone(), poses[0]).await; goto(unit.clone(), poses[1]).await; } }
-
The
UnitGotoFuture
returnsPoll::Ready
to thegoto_closure
. -
The
goto_closure
returnsPoll::Ready
to thepatrol_closure
. Thegoto_closure
sets its state to ✅Goto::Done
. -
The
Poll::Ready
return helps us advance the execution until we hit the next.await
point (➡️ in the following code snippet). Thegoto_closure
captures theunit
andposes[1]
in the closure environment. The initial state of the closure is set to 🚀Goto::Start
.async fn patrol(unit: UnitRef, poses: [i32; 2]) { loop { goto(unit.clone(), poses[0]).await; ➡️ goto(unit.clone(), poses[1]).await; } }
-
The
patrol_closure
then triggers the nextpoll
to move theunit
to the nextpos
.
The execution of the patrol_closure
is now at the next .await
point. The patrol_closure
is now in the state 🕓 Patrol::WaitingToReachPosition1
. The patrol_closure
returns Poll::Pending
to the task
. The task
returns Poll::Pending
to the executor
. The executor
adds the task
to the pending_tasks
list to be scheduled later—the executor
returns to the Executor::step()
caller.
The patrol
function has an infinite loop, so returning Poll::Ready
from the goto_closure
does not terminate the patrol_closure
. The patrol_closure
will continue to poll the goto_closure
until the unit
reaches the next pos
.
Key takeaways
-
Spawning an async task has nothing to do with the operating system. It is just a closure that is scheduled to be executed by the executor.
-
The
await
keyword is used to suspend the execution of the async task until the future is ready. Theawait
keyword is used to poll the future. -
The executor calls the
poll
method of the future. The method returnsPoll::Pending
if the future is not ready. It returnsPoll::Ready
if the future is ready. -
If
await
is called inside an infinite loop, the function will never returnPoll::Ready
as it continues in the loop. Even a function with an infinite loop will returnPoll::Pending
if the future is not ready.
Experiment in the Rust playground
We have created a Rust playground to experiment with the code. You can modify the code and run it to see the trace output.
-
Scroll to the end of the code in the Rust playground and uncomment the
executor.spawn(patrol(units[1].clone(), [-1,1]));
line to spawn the second async task. Click on theRun
button on the top left to run the code.pub fn main() { let executor = Executor::default(); let units: [UnitRef; 2] = Default::default(); executor.spawn(patrol(units[0].clone(), [-5, 5])); // executor.spawn(patrol(units[1].clone(), [-1,1]));
-
Click the
...
button in the top left corner of the Rust playground to look at the generated assembly for themain
function.
Articles in the async/await series
- Desugaring and assembly of async/await in Rust -
goto
- Nested async/await in Rust: Desugaring and assembly -
patrol
- Rust async executor -
executor