Rust async/await: Async executor

This article is a continuation of the previous articles on Rust async/await. If you haven't read them yet, I recommend you read them first:

This article will teach us how the async executor schedules async tasks. We will see how a simple async executor works, and how it is implemented in Rust. We will analyze async scheduling of the goto and patrol functions from the previous articles.

Note: The code in this article is taken from a fork of the simple async local executor.

goto and patrol functions

To recap, here are the goto and patrol functions from the previous articles:

#[derive(Default)]
struct Unit {
    /// The 1-D position of the unit. The unit can only move along this axis.
    pub pos: i32,
}
type UnitRef = Rc<RefCell<Unit>>;

/// A future that will move the unit towards `target_pos` at each step,
/// and complete when the unit has reached that position.
struct UnitGotoFuture {
    unit: UnitRef,
    target_pos: i32,
}
impl Future for UnitGotoFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
        let unit_pos = self.unit.borrow().pos;
        if unit_pos == self.target_pos {
            Poll::Ready(())
        } else {
            self.unit.borrow_mut().pos += (self.target_pos - unit_pos).signum();
            Poll::Pending
        }
    }
}

/// Helper async function to write unit behavior nicely
async fn goto(unit: UnitRef, pos: i32) {
    UnitGotoFuture {
        unit,
        target_pos: pos,
    }
    .await;
}

/// Let a unit go back and forth between two positions
async fn patrol(unit: UnitRef, poses: [i32; 2]) {
    loop {
        goto(unit.clone(), poses[0]).await;
        goto(unit.clone(), poses[1]).await;
    }
}

Async executor

An async executor is a piece of code that runs async tasks. It is responsible for scheduling the tasks and waking them up when they are ready to progress.

An async program's main function creates the async tasks and then calls the executor to run the tasks. The executor will keep running the tasks until all tasks are completed. In our example, we simulate the executor by calling the Executor::step() method in a loop. We limit the number of steps to 30 to run the example in the Rust playground.

/// Test program with two units
fn main() {
    let executor = Executor::default();
    let units: [UnitRef; 2] = Default::default();
    executor.spawn(patrol(units[0].clone(), [-5, 5]));
    // executor.spawn(patrol(units[1].clone(), [-1,1]));
    for _ in 0..30 {
        executor.step();
 }
}

The flow chart of the Executor::step() method is shown below. The Executor::step() method is the crucial method of the executor. It is responsible for scheduling the tasks. When the function starts, it will look for any new spawned tasks and add them to the task_queue. Then, it will loop through the task_queue and execute the tasks. A task will be added to the pending_tasks list if it is not ready. The pending_tasks list will be used for the next step. The Executor::step() method returns true if there are still tasks to run.

Async scheduler

The Executor::step() method is implemented as follows:

pub fn step(&self) -> bool {
    // Dummy waker and context (not used as we poll all tasks)
    let waker = dummy_waker();
    let mut context = Context::from_waker(&waker);
    // Append new tasks created since the last step into the task queue
    let mut task_queue = self.inner.task_queue.borrow_mut();
    task_queue.append(&mut self.inner.new_tasks.borrow_mut());

    // Loop over all tasks, polling them. If a task is not ready, add it to the pending tasks.
    let mut pending_tasks = Vec::new();
    let mut any_left = false;
    for mut task in task_queue.drain(..) {
        match task.poll(&mut context) {
            Poll::Ready(()) => {} // task done
            Poll::Pending => {
                pending_tasks.push(task);
                any_left = true;
            }
        }
    }
    // Keep pending tasks for the next step
    *task_queue = pending_tasks;
    any_left // Return true if there are still tasks to run
}

Understanding the async executor flow

Now that we have seen how the async executor works let's see how the executor schedules the goto and patrol functions. We will use the following sequence diagrams to understand how the executor schedules the async tasks.

The complete flow is split into the following parts:

  1. Spawn an async task
  2. Start executing async tasks
  3. Continue execution of async tasks

Spawn an async task

The following sequence diagram shows how an executor spawns a task and how the task is executed.

  1. The executor is created.
  2. The executor returns to main.
  3. The main creates the patrol_closure and captures the unit[0] and [5, -5] in the closure environment. The initial state of the closure is set to 🚀 Patrol::Start(0).
  4. The executor creates a new task from the closure and stores it in the new_tasks list.
  5. The executor returns to main.

Spawn an async task

Start executing async tasks

Once the task is spawned, the executor will keep running the async runtime if any events are scheduled. In our example, we call it Executor::step() and put it in a loop. Note that we are limiting the number of steps to 30 to run the example in the Rust playground.

  1. main calls Executor::step() to execute the first task.     * The executor looks for any new tasks that are spawned and adds them to the task_queue.

  2. The executor loops through the task_queue and executes the tasks. This scenario finds only one task in the task_queue. It calls the poll method on the task.

  3. The task scheduled here contains the patrol_closure. The task involves the poll method of the patrol_closure.

  4. The patrol_closure is in the 🚀 Patrol::Start state. In this state, the closure starts at the beginning of the function. The goto_closure is created by capturing the unit and poses[0] in the closure environment. The initial state of the closure is set to 🚀 Goto::Start.

    async fn patrol(unit: UnitRef, poses: [i32; 2]) {
        loop {
        ➡️ goto(unit.clone(), poses[0]).await;
            goto(unit.clone(), poses[1]).await;
        }
    }
    
  5. The patrol_closure polls the goto_closure to find if the first position specified in poses[0] has been reached. The poll method of the goto_closure results from the .await marked with the ➡️ arrow in the above code snippet.

  6. The goto_closure creates a UnitGotoFuture.

  7. The goto_closure polls the UnitGotoFuture. The poll method of the UnitGotoFuture checks if the unit has reached the pos. In this case, the unit has not reached the pos yet.  

  8. The poll method returns Poll::Pending to the goto_closure. The goto_closure sets its state to 🕓 Goto::Waiting.

  9. The goto_closure returns Poll::Pending to the patrol_closure. The patrol_closure sets its state to 🕓 Patrol::WaitingToReachPosition0.

  10. The patrol_closure returns Poll::Pending to the task.

  11. The task returns Poll::Pending to the executor.

  12. Since the task is not ready, the executor adds it to the pending_tasks list to be scheduled later. The executor then returns to the caller of Executor::step().

Spawn an async task

Continue execution of async tasks

The executor runs the async runtime if any events are scheduled. In our example, we call it Executor::step() and put it in a loop. At each step, polling of the goto_closure moves the unit closer to the pos. Now we will see how the goto_closure returns Poll::Ready to the patrol_closure when the unit reaches the pos. We start at step 6 in the following sequence diagram.

  1. Once the unit reaches the pos, the goto_closure returns Poll::Ready to the patrol_closure. The patrol_closure then polls the goto_closure again. The ➡️ shows the .await point in the patrol_closure.

    async fn patrol(unit: UnitRef, poses: [i32; 2]) {
        loop {
            ➡️ goto(unit.clone(), poses[0]).await;
               goto(unit.clone(), poses[1]).await;
        }
    }
    
  2. The UnitGotoFuture returns Poll::Ready to the goto_closure.

  3. The goto_closure returns Poll::Ready to the patrol_closure. The goto_closure sets its state to ✅ Goto::Done.

  4. The Poll::Ready return helps us advance the execution until we hit the next .await point (➡️ in the following code snippet). The goto_closure captures the unit and poses[1] in the closure environment. The initial state of the closure is set to 🚀 Goto::Start.

    async fn patrol(unit: UnitRef, poses: [i32; 2]) {
        loop {
              goto(unit.clone(), poses[0]).await;
          ➡️ goto(unit.clone(), poses[1]).await;
        }
    }
    
  5. The patrol_closure then triggers the next poll to move the unit to the next pos.

The execution of the patrol_closure is now at the next .await point. The patrol_closure is now in the state 🕓 Patrol::WaitingToReachPosition1. The patrol_closure returns Poll::Pending to the task. The task returns Poll::Pending to the executor. The executor adds the task to the pending_tasks list to be scheduled later—the executor returns to the Executor::step() caller.

The patrol function has an infinite loop, so returning Poll::Ready from the goto_closure does not terminate the patrol_closure. The patrol_closure will continue to poll the goto_closure until the unit reaches the next pos.

Async task returns Poll::Ready

Key takeaways

Experiment in the Rust playground

We have created a Rust playground to experiment with the code. You can modify the code and run it to see the trace output.

  1. Scroll to the end of the code in the Rust playground and uncomment the executor.spawn(patrol(units[1].clone(), [-1,1])); line to spawn the second async task. Click on the Run button on the top left to run the code.

    pub fn main() {
        let executor = Executor::default();
        let units: [UnitRef; 2] = Default::default();
        executor.spawn(patrol(units[0].clone(), [-5, 5]));
        // executor.spawn(patrol(units[1].clone(), [-1,1]));
    
  2. Click the ... button in the top left corner of the Rust playground to look at the generated assembly for the main function.

Articles in the async/await series

  1. Desugaring and assembly of async/await in Rust - goto
  2. Nested async/await in Rust: Desugaring and assembly - patrol
  3. Rust async executor - executor