tokio 源码阅读

Budget

问题背景:在系统负载不高的情况下,接收资源的速度很快,那么任务在 await 点上一直返回 Poll::Ready,从而没有把执行权交还给调度器,因此该任务长时间运行导致其他任务饥饿,并且所有任务的延迟越来越高。

一些可能的解决方法:

  1. 使用者可以在自己的代码中插入 yield_now() 来强制把执行权还给调度器,从而让其他任务执行。但实际很少有人这么做。
  2. 调度器实现抢占:但调度器只在任务的 await 点获取执行权,无法打断正在运行中的任务。

tokio 的解决方法:在每个任务操作上施加预算策略 (budget)。其特点:

  • 调度器切换到一个任务时,会重置预算,目前一个任务有 128 个预算;
  • 该预算表示连续进行资源操作的最大次数:每操作一次资源,预算减 1;
  • 当预算为 0,调度器不再执行该任务,并切换到下一个任务 👉 这就是所谓的自动 yield;
  • 每个 tokio 资源(socket、timer、channel)都知道还有多少预算;
    • 只要任务还有剩余预算(预算大于 0),资源就会正常运行;
    • 一旦任务超出预算(预算为 0),所有 tokio 资源将永远返回 not ready(即使背后的 IO 资源已经准备好了)。

因此从另一个角度看,这其实是任务持续地 Poll::Ready,在到达次数的上限时,调度器把执行权交给其他任务,从而这实现了一种抢占策略。

注意:

  • 预算策略在 tokio 类型上,不是用户感知的:使用者不需要改动基于 tokio 资源的代码就能获得好处。
  • 预算策略特定于运行时:tokio 内部的类型知道预算,但外部的类型并不知道,因此如果使用其他库的、没有考虑 tokio 预算的资源类型,那么并不会获得该策略的好处。
  • 预算策略涉及的公共 API:
    • unconstrained 用于退出 tokio 的预算策略机制,tokio 对该函数的参数 Future 不施加任何预算,因此对它不自动 yield。
    • consume_budget 用于消耗 1 个预算,可供外部类型接入预算策略。

consume_budget API 不仅适用于资源类型,也适用于计算密集的代码。虽然通常情况下,建议把计算密集的代码放到 blocking 线程,但利用该 API,可以将同步代码变成协作式的任务:

#![allow(unused)]
fn main() {
async fn sum_iterator(input: &mut impl std::iter::Iterator<Item=i64>) -> i64 {
    let mut sum: i64 = 0;
    while let Some(i) = input.next() {
        sum += i;

        // 减少一个预算:当任务的预算减少到 0 时,执行权交给其他任务。
        tokio::task::consume_budget().await
    
        // 当然,也可以不基于预算策略移交执行权,那么使用 yield_now 直接移交。
        // tokio::task::yield_now().await
    }
    sum
}
}

这个预算策略实现的收益:(来自《Reducing tail latencies with automatic cooperative task yielding (2020)》)

master 表示实现前,preempt 表示实现后,延迟最高几乎降低 3 倍。

其他资料

tokio 分析: