三个字母引发的惨案



问题

feat: compaction integration - GreptimeTeam/greptimedb #997 中,我们尝试把 table compaction 作为 flush 的后置任务触发,因此需要为 FlushJob增加一个 callback,因为涉及到异步操作,所以这个 callback 的定义是:

1pub type FlushCallback = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
2
3pub struct FlushJob<S: LogStore> {
4    ...
5    pub on_success: Option<FlushCallback>,
6}

然后在完成 flush 后调用 callback:

 1#[async_trait]
 2impl<S: LogStore> Job for FlushJob<S> {
 3    async fn run(&mut self, ctx: &Context) -> Result<()> {
 4        let file_metas = self.write_memtables_to_layer(ctx).await?;
 5        self.write_manifest_and_apply(&file_metas).await?;
 6        if let Some(cb) = self.on_success.take() {
 7            cb.await;
 8        }
 9        Ok(())
10    }
11}

可惜编译器无情地给出了 error:

 1error: future cannot be sent between threads safely
 2   --> src/storage/src/flush.rs:250:58
 3    |
 4250 |       async fn run(&mut self, ctx: &Context) -> Result<()> {
 5    |  __________________________________________________________^
 6251 | |         let file_metas = self.write_memtables_to_layer(ctx).await?;
 7252 | |         self.write_manifest_and_apply(&file_metas).await?;
 8253 | |
 9...   |
10257 | |         Ok(())
11258 | |     }
12    | |_____^ future created by async block is not `Send`
13    |
14    = help: the trait `Sync` is not implemented for `(dyn futures_util::Future<Output = ()> + std::marker::Send + 'static)`
15note: future is not `Send` as this value is used across an await
16   --> src/storage/src/flush.rs:251:60
17    |
18251 |         let file_metas = self.write_memtables_to_layer(ctx).await?;
19    |                          ----                              ^^^^^^ - `self` is later dropped here
20    |                          |                                 |
21    |                          |                                 await occurs here, with `self` maybe used later
22    |                          has type `&FlushJob<S>` which is not `Send`
23help: consider moving this into a `let` binding to create a shorter lived borrow
24   --> src/storage/src/flush.rs:251:26
25    |
26251 |         let file_metas = self.write_memtables_to_layer(ctx).await?;
27    |                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
28    = note: required for the cast from `[async block@src/storage/src/flush.rs:250:58: 258:6]` to the object type `dyn futures_util::Future<Output = std::result::Result<(), error::Error>> + std::marker::Send`

大概意思是说,新加入FlushJobcb字段不是Sync的,导致FlushJob::run方法所产生的 future 不满足Send约束。

这一点倒是容易理解,因为我们对 FlushCallback 的定义Pin<Box<dyn Future<Output=xxx> + Send>>并未带上 Synctrait,通常我们也不应该要求一个 future 是Sync的,因为 future 的定义中唯一一个方法的 receiver 是 self: Pin<&mut Self>,不存在对 self 的 shared reference,所以不应该要求 future 满足Sync

1pub trait Future {
2    fn poll(self: Pin<&mut Self>, 
3            cx: &mut Context<'_>) -> Poll<Self::Output>;
4}

因此,除了为 FlushCallback强行加上Sync约束外,我们更应该想一想为什么编译器会要求FlushCallback: Sync

MRE

为了方便排查问题,我们把不相关的代码逻辑去掉,用一段最简单的代码来复现这个问题。

 1use std::future::Future;
 2use std::pin::Pin;
 3
 4#[async_trait::async_trait]
 5pub trait Job: Send {
 6    async fn run(&mut self) -> Result<(), ()>;
 7}
 8
 9type Callback = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
10
11pub struct JobImpl {
12    cb: Option<Callback>,
13}
14
15impl JobImpl {
16    pub async fn do_something(& self) {}
17}
18
19#[async_trait::async_trait]
20impl Job for JobImpl {
21    async fn run(&mut self) -> Result<(), ()> {
22        self.do_something().await;
23        if let Some(cb) = self.cb.take() {
24            cb.await;
25        }
26        Ok(())
27    }
28}
29
30#[tokio::main]
31pub async fn main() {
32    let mut job_impl = JobImpl { cb: None };
33    job_impl.run().await.unwrap();
34}

Rust Playground 中编译运行这段代码可以看到类似的报错。

现在代码里面的魔法只剩下了async_trait这个 proc macro。为了进一步简化代码,我们可以手动把这个宏展开:

 1$ cargo expand mre
 2    Checking lance-demo v0.1.0 (/Users/lei/Workspace/Rust/mre)
 3    Finished dev [unoptimized + debuginfo] target(s) in 0.23s
 4
 5mod mre {
 6    use std::future::Future;
 7    use std::pin::Pin;
 8    
 9    pub trait Job: Send {
10        fn run(&mut self) -> 
11            Pin<Box<dyn Future<Output = Result<(), ()>> + Send + '_>>;
12    }
13    
14    type Callback = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
15    
16    pub struct JobImpl {
17        cb: Option<Callback>,
18    }
19    
20    impl JobImpl {
21        pub async fn do_something(&self) {}
22    }
23    
24    impl Job for JobImpl {
25        fn run(&mut self) -> Pin<Box<dyn Future<Output = Result<(), ()>> + Send + '_>> {
26            let mut __self = self;
27            Box::pin(async move {
28                JobImpl::do_something(__self).await;
29                if let Some(cb) = __self.cb.take() {
30                    cb.await;
31                }
32                Ok(())
33            })
34        }
35    }
36}

可以看到 JobImpl::run方法返回的 future 里面只包含了对 self 的引用 __self。因此 future 不满足Send约束,也就是__self不满足Send约束,而 __self的类型是什么呢?根据JobImpl::run的 receiver 很显然是 &mut self

Hmmm… Send/Sync/&mut T/&T似乎让人想起来刚刚开始学 Rust 的时候一些古早记忆,查到这里不妨再复习一下。

一点点小知识🤏🏻

SendSync 都是用于描述 thread-safety 的 marker trait.

  • Send指的是某个数据类型可以被安全地转移到另一个线程 (move)
  • Sync指的是某个数据类型可以安全地被多个线程共享不可变引用 (borrow)

“安全地传递”指的是在保证线程安全的情况下转移所有权。那么什么情况下转移所有权是不安全的呢?举个例子,Rc并没有实现 Send,只能用作单线程内部的引用计数。Rc 的内部是用的 Cell实现内部可变性,在 drop 的时候,如果引用计数为 0,则将内部包含的数据析构掉。因为修改引用计数的行为本身不是原子的,因此 move 到另一个线程之后如果进行 clone,那么可能就会导致并发问题,因此 Rc 不能安全地被转移所有权。

  • Send 和 Sync 的绑定关系
    • &T: SendT: Sync:要想一个数据类型的不可变引用是Send的,那么这个数据类型自身必须是Sync的,这样才能保证跨线程的只读访问是安全的;
    • &mut T: SendT: Send:要想一个数据类型的可变引用是 Send的,那么这个数据类型自身也必须是Send的,因为跨线程共享可变引用即意味着 move 。
    • 显然,相比要求 T: SyncT: Send是更加宽松的。

去掉一点糖

我们再来看看 !Send的 future:

 1fn run(&mut self) -> Pin<Box<dyn Future<Output = Result<(), ()>> + Send + '_>> {
 2    let mut __self = self;
 3    Box::pin(async move {
 4        JobImpl::do_something(__self).await;
 5        if let Some(cb) = __self.cb.take() {
 6            cb.await;
 7        }
 8        Ok(())
 9    })
10}

既然 future 捕获的是 JobImpl的可变引用,那么只要 JobImplSend的那么 future 就一定是Send才对啊?哪里出了问题呢?我们再来看看 JobImpl::do_something的签名

1impl JobImpl {
2    async fn do_something(&self) {}
3}

可以看到 do_something的 receiver 是 JobImpl的共享引用,那么__self自然就会被 coerce 成 &JobImpl。根据 SendSync 的绑定关系,如果 T 的共享引用是 Send,那么 T 自身必须是 Sync,这样就能解释为啥要求 JobImpl: Sync了。

仅仅是因为 coercion 吗?我们可以试试把 JobImpl::do_something改成同步的签名 fn do_something(&self) {},结果居然是可以编译的。显然问题不仅出在 receiver,也和 async 有关。我们都知道,Rust 的 async 本质上只是一个 generator 的语法糖,对 JobImpl::do_something的调用会被展开成一个实现了 Generator trait 的 struct,这个 struct 捕获了JobImpl的共享引用,而 JobImpl不满足Sync约束,从而 genrator 不满足 Send约束。

因此我们只需要增加三个字母,把 JobImpl::do_something的 receiver 改成可变引用就行啦。

1impl JobImpl {
2    pub async fn do_something(&mut self) {}
3    //                         ^~~~ the magic lies here
4}

You can try it here

结论

所以这个问题的本质是,在一个要求 T 的可变引用的异步上下文中,调用了一个接收 T 的共享引用的异步方法,从而导致对 T 的要求从 Send 升级成了 Sync。 其实这在做了一个复现的 demo 之后还是很容易发现的(毕竟才二三十行代码),但是在项目的一个几百行的 PR 里面就不太容易发现到底是哪里不 Sync了。Rust 的 async/await 语法糖用起来很方便,但是还是要理解它背后是怎么实现的,不然很容易踩坑里。