如何在同步的 Rust 方法中调用异步代码?


Rust

3230 Words

2022-10-30 21:42 +0800


背景和问题

最近在做我们的 GreptimeDB 项目的时候遇到一个关于在同步 Rust 方法中调用异步代码的问题,排查清楚之后大大加深了对异步 Rust 的理解,因此在这篇文章中记录一下。

我们的整个项目是基于 Tokio 这个异步 Rust runtime 的,它将协作式的任务运行和调度方便地封装在 .await 调用中,非常简洁优雅。但是这样也让不熟悉 Tokio 底层原理的用户一不小心就掉入到坑里。

我们遇到的问题是,需要在一个第三方库的 trait 实现中执行一些异步代码,而这个 trait 是同步的😅 ,我们无法修改这个 trait 的定义。

1trait Sequencer {
2    fn generate(&self) -> Vec<i32>;
3}

我们用一个PlainSequencer来实现这个 trait ,而在实现 generate方法的时候依赖一些异步的调用(比如这里的 PlainSequencer::generate_async):

 1impl PlainSequencer {
 2    async fn generate_async(&self)->Vec<i32>{
 3        let mut res = vec![];
 4        for i in 0..self.bound {
 5            res.push(i);
 6            tokio::time::sleep(Duration::from_millis(100)).await;
 7        }
 8        res
 9    }
10}
11
12impl Sequencer for PlainSequencer {
13    fn generate(&self) -> Vec<i32> {
14        self.generate_async().await
15    }
16}

这样就会出现问题,因为 generate是一个同步方法,里面是不能直接 await 的。

1error[E0728]: `await` is only allowed inside `async` functions and blocks
2  --> src/common/tt.rs:32:30
3   |
431 | /     fn generate(&self) -> Vec<i32> {
532 | |         self.generate_async().await
6   | |                              ^^^^^^ only allowed inside `async` functions and blocks
733 | |     }
8   | |_____- this is not `async`

我们首先想到的是,tokio 的 runtime 有一个 Runtime::block_on[1] 方法,可以同步地等待一个 future 完成。

 1impl Sequencer for PlainSequencer {
 2    fn generate(&self) -> Vec<i32> {
 3        RUNTIME.block_on(async{
 4            self.generate_async().await
 5        })
 6    }
 7}
 8
 9#[cfg(test)]
10mod tests{
11    #[tokio::test]
12    async fn test_sync_method() {
13        let sequencer = PlainSequencer {
14            bound: 3
15        };
16        let vec = sequencer.generate();
17        println!("vec: {:?}", vec);
18    }
19}

编译通过,但是运行时直接报错:

1Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
2thread 'tests::test_sync_method' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs:39:9

提示不能从一个执行中一个 runtime 直接启动另一个异步 runtime。

看来 Tokio 为了避免这种情况特地在入口做了检查。 既然不行那我们就再看看其他的异步库是否有类似的异步转同步的方法。果然找到一个 futures::executor::block_on[2]

1impl Sequencer for PlainSequencer {
2    fn generate(&self) -> Vec<i32> {
3        futures::executor::block_on(async {
4            self.generate_async().await
5        })
6    }
7}

编译同样没问题,但是运行时代码直接直接 hang 住不返回了。

1cargo test --color=always --package tokio-demo --bin tt tests::test_sync_method --no-fail-fast -- --format=json --exact -Z unstable-options --show-output      
2   Compiling tokio-demo v0.1.0 (/Users/lei/Workspace/Rust/learning/tokio-demo)
3    Finished test [unoptimized + debuginfo] target(s) in 0.39s
4     Running unittests src/common/tt.rs (target/debug/deps/tt-adb10abca6625c07)
5{ "type": "suite", "event": "started", "test_count": 1 }
6{ "type": "test", "event": "started", "name": "tests::test_sync_method" }

明明 generate_async方法里面只有一个简单的 sleep 调用,但是为什么 future 一直没完成呢? 并且吊诡的是,同样的代码,在 tokio::test里面会 hang 住,但是在 tokio::main中则可以正常执行完毕:

1#[tokio::main]
2pub async fn main() {
3    let sequencer = PlainSequencer {
4        bound: 3
5    };
6    let vec = sequencer.generate();
7    println!("vec: {:?}", vec);
8}

执行结果:

1cargo run --color=always --package tokio-demo --bin tt
2    Finished dev [unoptimized + debuginfo] target(s) in 0.05s
3     Running `target/debug/tt`
4vec: [0, 1, 2]

其实当初真正遇到这个问题的时候定位到具体在哪里 hang 住并没有那么容易。真实代码中 async 执行的是一个远程的 gRPC 调用,当初怀疑过是否是 gRPC server 的问题,动用了网络抓包等等手段最终发现是 client 侧 的问题。这也提醒了我们在出现 bug 的时候,抽象出问题代码的执行模式并且做出一个最小可复现的样例(Minimal Reproducible Example)是非常重要的。

Catchup

凭着对 Rust 异步 runtime 的基本了解,我大概知道类似 Tokio 这样的异步 runtime 是一个 executor+scheduler+reactor 的模型。其中:

  • executor:在计算资源(在 std 环境下就是操作系统的 thread,嵌入式环境下则没有 thread 抽象)上执行真正的异步任务;
  • scheduler:负责维护一个异步任务的 FIFO 并且负责任务在不同 executor 之间调度;
  • reactor:抽象底层阻塞 IO,提供非阻塞事件监听和唤醒的功能(类似 epoll 那样)。

Rust 中的一个异步代码块本质上是一个 future,

1async {
2    println!("hello");
3}

其是不会直接执行的,只有将其 spawn 到异步的 runtime 里面才会真正封装成一个任务交给 executor 执行。Runtime 有一套机制去唤醒异步任务,检查 future 的状态是否为 ready 等等。

问题分析

回顾完背景知识,我们再看一眼方法的实现:

1fn generate(&self) -> Vec<i32> {
2    futures::executor::block_on(async {
3        self.generate_async().await
4    })
5}

调用 generate方法的肯定是 Tokio 的 executor,那么 block_on 里面的 self.generate_async().await这个 future 又是谁在 poll 呢?一开始我以为,futures::executor::block_on会有一个内部的 runtime 去负责 generate_async的 poll。于是点进去代码(主要是futures_executor::local_pool::run_executor这个方法):

 1fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
 2    let _enter = enter().expect(
 3        "cannot execute `LocalPool` executor from within \
 4         another executor",
 5    );
 6
 7    CURRENT_THREAD_NOTIFY.with(|thread_notify| {
 8        let waker = waker_ref(thread_notify);
 9        let mut cx = Context::from_waker(&waker);
10        loop {
11            if let Poll::Ready(t) = f(&mut cx) {
12                return t;
13            }
14            let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
15            if !unparked {
16                thread::park();
17                thread_notify.unparked.store(false, Ordering::Release);
18            }
19        }
20    })
21}

立刻嗅到了一丝不对的味道,虽然这个方法名为 run_executor,但是整个方法里面貌似没有任何 spawn 的操作,只是在当前线程不停的循环判断用户提交的 future 的状态是否为 ready 啊!!!!

这意味着,当 Tokio 的 runtime 线程执行到这里的时候,会立刻进入一个循环,在循环中不停地判断用户的的 future 是否 ready,如果还是 pending 状态,则将当前线程 park 住。假设,用户 future 的异步任务也是交给了当前线程去执行,futures::executor::block_on等待用户的 future ready,而用户 future 等待 futures::executor::block_on释放当前的线程资源,那么不就死锁了? 很有道理啊,立刻验证一下。既然不能在当前 runtime 线程 block,那就重新开一个线程 block:

 1impl Sequencer for PlainSequencer {
 2    fn generate(&self) -> Vec<i32> {
 3        let bound = self.bound;
 4        std::thread::spawn(move || {
 5            futures::executor::block_on(async {
 6                // 这里使用一个全局的 runtime 来 block
 7                RUNTIME.block_on(async {
 8                    let mut res = vec![];
 9                    for i in 0..bound {
10                        res.push(i);
11                        tokio::time::sleep(Duration::from_millis(100)).await;
12                    }
13                    res
14                })
15            })
16        }).join().unwrap()
17    }
18}

果然可以了。

1cargo test --color=always --package tokio-demo --bin tt tests::test_sync_method --no-fail-fast -- --format=json --exact -Z unstable-options --show-output
2    Finished test [unoptimized + debuginfo] target(s) in 0.04s
3     Running unittests src/common/tt.rs (target/debug/deps/tt-adb10abca6625c07)
4vec: [0, 1, 2]

值得注意的是,在 futures::executor::block_on里面,额外使用了一个 RUNTIME来 spawn 我们的异步代码。其原因还是刚刚所说,这个异步任务需要一个 runtime 来驱动状态的变化,如果去掉 runtime,从一个新线程中执行我们的异步任务的话,futures::executor::block_on第一次 poll 我们异步任务的 future,会进入到 tokio::time::sleep方法调用,这个方法会判断当前执行的异步 runtime (上下文信息),而这个上下文信息是保留在 thread local 中的,在新线程中并不存在这个 thread local,从而会 panic。

1called `Result::unwrap()` on an `Err` value: Any { .. }
2thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime',
3...

tokio::maintokio::test

在分析完上面的原因之后,“为什么 tokio::main中不会 hang 住而 tokio::test会 hang 住”这个问题也很清楚了,他们两者所使用的的 runtime 并不一样。tokio::main使用的是多线程的 runtime,而 tokio::test 使用的是单线程的 runtime,而在单线程的 runtime 下,当前线程被 futures::executor::block_on卡死,那么用户提交的异步代码是一定没机会执行的,从而必然形成上面所说的死锁。

Best practice

经过上面的分析,结合 Rust 基于 generator 的协作式异步特性,我们可以总结出 Rust 下桥接异步代码和同步代码的一些注意事项:

  • 在异步代码调用(可能阻塞的)同步代码时,请使用 tokio::task::spawn_blocking [3]
  • 尽量避免在异步代码中进行大规模的 CPU 密集型计算,避免对任务调度的公平性造成影响(CPU 密集型任务可以使用 rayon 代替)
  • 在同步代码调用异步代码的时候,使用 futures::executor::block_on请务必注意和 tokio::task::spawn_blocking(或者std::thread::spawn)配合使用,因为前者会阻塞当前线程。理想情况是,通过一个 blocking-dedicated executor 去调用 futures::executor::block_on,然后再通过 tokio::runtime::spawn把任务丢回给原来的 runtime 去执行。
 1// 如果你可以控制 generate 方法如何被调用
 2fn generate(&self) -> Vec<i32> {
 3    let bound = self.bound;
 4    futures::executor::block_on(async {
 5        RUNTIME.block_on(async {
 6            let mut res = vec![];
 7            for i in 0..bound {
 8                res.push(i);
 9                tokio::time::sleep(Duration::from_millis(100)).await;
10            }
11            res
12        })
13    })
14}
15
16// 那么你可以在调用 generate 之前使用 spawn_blocking 避免死锁
17#[tokio::test]
18async fn test_sync_method() {
19    let sequencer = PlainSequencer {
20        bound: 3
21    };
22    let vec = tokio::task::spawn_blocking(move || {
23        sequencer.generate()
24    });
25    println!("vec: {:?}", vec);
26}
 1// 如果 generate 方法的调用方并不是你能控制的,那么请使用 std::thread::spawn
 2fn generate(&self) -> Vec<i32> {
 3    let bound = self.bound;
 4    std::thread::spawn(move ||{
 5        futures::executor::block_on(async {
 6            RUNTIME.block_on(async {
 7                let mut res = vec![];
 8                for i in 0..bound {
 9                    res.push(i);
10                    tokio::time::sleep(Duration::from_millis(100)).await;
11                }
12                res
13            })
14        })
15    }).join().unwrap()
16}
17
18#[tokio::test]
19async fn test_sync_method() {
20    let sequencer = PlainSequencer {
21        bound: 3
22    };
23	// 这里只是一个普通的同步方法调用
24    let vec = sequencer.generate();
25    println!("vec: {:?}", vec);
26}

参考