embassy-usage

raw::Executor__pender

#[macro_use]
extern crate log;

use embassy_futures::yield_now;
use std::{cell::Cell, rc::Rc};

#[embassy_executor::task]
async fn run1() {
    info!("run1 starts");
    for i in 0..3 {
        info!("run1 yield_now");
        yield_now().await;
        info!("[{i}] run1");
    }
    info!("run1 ends");
}

#[embassy_executor::task]
async fn run2() {
    info!("run2 starts");
    for i in 0..3 {
        info!("run2 yield_now");
        yield_now().await;
        warn!("[{i}] run2");
    }
    info!("run2 ends");
}

fn main() {
    debug!("main starts");
    env_logger::builder().format_timestamp(None).format_target(false).init();
    let context = ExecutorCxt::default();
    let ctx = Box::into_raw(Box::new(context.clone()));
    let executor = Box::leak(Box::new(embassy_executor::raw::Executor::new(ctx.cast())));
    debug!("initialized: logger, executor, context");
    let spawner = executor.spawner();
    debug!("before spawn");
    spawner.spawn(run1()).unwrap();
    debug!("after spawn1");
    spawner.spawn(run2()).unwrap();
    debug!("after spawn2");
    while context.pend.take() {
        debug!("polling due to pender");
        unsafe { executor.poll() }
    }
    debug!("main ends");
}

#[derive(Default, Clone)]
struct ExecutorCxt {
    pend: Rc<Cell<bool>>,
}

#[export_name = "__pender"]
fn pender(context: *mut ()) {
    // schedule `poll()` to be called
    unsafe { &*context.cast::<ExecutorCxt>() }.pend.set(true);
    debug!("pender and notify");
}
点击“展开/收起” RUST_LOG=debug cargo r 输出

注意:

  • main 函数内的 debug! 被隐藏,点击上面代码块右上角按钮显示
  • [embassy-executor] 的日志来自我修改过的 embassy 子模块(具体 提交在这
[DEBUG] initialized: logger, executor, context
[DEBUG] before spawn
[DEBUG] [embassy-executor] (enqueue) prepend a task
[DEBUG] [embassy-executor] (run_queue) was empty queue: enqueue the given task and call pend
[DEBUG] pender and notify
[DEBUG] after spawn1
[DEBUG] [embassy-executor] (enqueue) prepend a task
[DEBUG] after spawn2
[DEBUG] polling due to pender
[DEBUG] [embassy-executor] empty the queue
[DEBUG] [embassy-executor] handle task 0
[INFO ] [embassy-executor] (dequeue_all): poll a task
[DEBUG] [embassy-executor] (state) (run_dequeue): unmark run-queued
[INFO ] run2 starts
[INFO ] run2 yield_now
[DEBUG] [embassy-executor] (state) spawn but not queued: wake_task run_enqueue
[DEBUG] [embassy-executor] (enqueue) prepend a task
[DEBUG] [embassy-executor] (run_queue) was empty queue: enqueue the given task and call pend
[DEBUG] pender and notify
[DEBUG] [embassy-executor] handle task 1
[INFO ] [embassy-executor] (dequeue_all): poll a task
[DEBUG] [embassy-executor] (state) (run_dequeue): unmark run-queued
[INFO ] run1 starts
[INFO ] run1 yield_now
[DEBUG] [embassy-executor] (state) spawn but not queued: wake_task run_enqueue
[DEBUG] [embassy-executor] (enqueue) prepend a task
[DEBUG] polling due to pender
[DEBUG] [embassy-executor] empty the queue
[DEBUG] [embassy-executor] handle task 0
[INFO ] [embassy-executor] (dequeue_all): poll a task
[DEBUG] [embassy-executor] (state) (run_dequeue): unmark run-queued
[INFO ] [0] run1
[INFO ] run1 yield_now
[DEBUG] [embassy-executor] (state) spawn but not queued: wake_task run_enqueue
[DEBUG] [embassy-executor] (enqueue) prepend a task
[DEBUG] [embassy-executor] (run_queue) was empty queue: enqueue the given task and call pend
[DEBUG] pender and notify
[DEBUG] [embassy-executor] handle task 1
[INFO ] [embassy-executor] (dequeue_all): poll a task
[DEBUG] [embassy-executor] (state) (run_dequeue): unmark run-queued
[WARN ] [0] run2
[INFO ] run2 yield_now
[DEBUG] [embassy-executor] (state) spawn but not queued: wake_task run_enqueue
[DEBUG] [embassy-executor] (enqueue) prepend a task
[DEBUG] polling due to pender
[DEBUG] [embassy-executor] empty the queue
[DEBUG] [embassy-executor] handle task 0
[INFO ] [embassy-executor] (dequeue_all): poll a task
[DEBUG] [embassy-executor] (state) (run_dequeue): unmark run-queued
[WARN ] [1] run2
[INFO ] run2 yield_now
[DEBUG] [embassy-executor] (state) spawn but not queued: wake_task run_enqueue
[DEBUG] [embassy-executor] (enqueue) prepend a task
[DEBUG] [embassy-executor] (run_queue) was empty queue: enqueue the given task and call pend
[DEBUG] pender and notify
[DEBUG] [embassy-executor] handle task 1
[INFO ] [embassy-executor] (dequeue_all): poll a task
[DEBUG] [embassy-executor] (state) (run_dequeue): unmark run-queued
[INFO ] [1] run1
[INFO ] run1 yield_now
[DEBUG] [embassy-executor] (state) spawn but not queued: wake_task run_enqueue
[DEBUG] [embassy-executor] (enqueue) prepend a task
[DEBUG] polling due to pender
[DEBUG] [embassy-executor] empty the queue
[DEBUG] [embassy-executor] handle task 0
[INFO ] [embassy-executor] (dequeue_all): poll a task
[DEBUG] [embassy-executor] (state) (run_dequeue): unmark run-queued
[INFO ] [2] run1
[INFO ] run1 ends
[DEBUG] [embassy-executor] despawn
[DEBUG] [embassy-executor] handle task 1
[INFO ] [embassy-executor] (dequeue_all): poll a task
[DEBUG] [embassy-executor] (state) (run_dequeue): unmark run-queued
[WARN ] [2] run2
[INFO ] run2 ends
[DEBUG] [embassy-executor] despawn
[DEBUG] main ends

任务的执行顺序

通过调高日志等级 RUST_LOG=info cargo r,可以观察到 embassy-executor 执行这两个任务的顺序为

2 - 1 - 1 - 2 - 2 - 1 (结束) - 2 (结束) 点击“展开/收起” info 级别的日志
[INFO ] [embassy-executor] (dequeue_all): poll a task
[INFO ] run2 starts
[INFO ] run2 yield_now
[INFO ] [embassy-executor] (dequeue_all): poll a task
[INFO ] run1 starts
[INFO ] run1 yield_now
[INFO ] [embassy-executor] (dequeue_all): poll a task
[INFO ] [0] run1
[INFO ] run1 yield_now
[INFO ] [embassy-executor] (dequeue_all): poll a task
[WARN ] [0] run2
[INFO ] run2 yield_now
[INFO ] [embassy-executor] (dequeue_all): poll a task
[WARN ] [1] run2
[INFO ] run2 yield_now
[INFO ] [embassy-executor] (dequeue_all): poll a task
[INFO ] [1] run1
[INFO ] run1 yield_now
[INFO ] [embassy-executor] (dequeue_all): poll a task
[INFO ] [2] run1
[INFO ] run1 ends
[INFO ] [embassy-executor] (dequeue_all): poll a task
[WARN ] [2] run2
[INFO ] run2 ends

这个顺序是因为 embassy-executor 的任务栈采用单向链表结构,使用后进先出的方式取出任务。

(虽然 embassy 给任务栈取名叫 RunQueue,但它并不是队列,它不是先进先出取出任务)

#![allow(unused)]
fn main() {
// src: https://github.com/embassy-rs/embassy/blob/6bbb870bfade23e814169eb48e42e8bc55d9ff8f/embassy-executor/src/raw/run_queue_atomics.rs#L70
impl RunQueue {
    /// Empty the queue, then call `on_task` for each task that was in the queue.
    /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
    /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
    pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
        // Atomically empty the queue.
        let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);

        // safety: the pointer is either null or valid
        let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };

        // Iterate the linked list of tasks that were previously in the queue.
        while let Some(task) = next {
            // If the task re-enqueues itself, the `next` pointer will get overwritten.
            // Therefore, first read the next pointer, and only then process the task.
            // safety: there are no concurrent accesses to `next`
            next = unsafe { task.header().run_queue_item.next.get() };

            on_task(task);
        }
    }
}

// src: https://github.com/embassy-rs/embassy/blob/6bbb870bfade23e814169eb48e42e8bc55d9ff8f/embassy-executor/src/raw/mod.rs#L386
impl SyncExecutor {
    // 这不是完整代码,只展示传递给 dequeue_all 的回调函数
    pub(crate) unsafe fn poll(&'static self) {
        loop {
            self.run_queue.dequeue_all(|p| {
                let task = p.header();

                #[cfg(feature = "integrated-timers")]
                task.expires_at.set(u64::MAX);
                    return;
                }

                // Run the task
                task.poll_fn.get().unwrap_unchecked()(p);

                // Enqueue or update into timer_queue
                #[cfg(feature = "integrated-timers")]
                self.timer_queue.update(p);
            });
        }
    }
}
}