深入 Future

Rust 的异步编程主干库是 futures。它提供了一个 future 的结构。这实际上是一个表示操作结果的点位符。如你所想,操作的结果可以处理两种状态:操作仍在进行中,旨果尚未完成;或者操作已完成且结果可用。 注意在第二种情况下,可能存在错误,使用结果变得无关紧要。

该库提供了一个名为 Future(以及其他东西)的特征,任何类型都可以实现这种特性。


# #![allow(unused_variables)]
#fn main() {
trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
#}

Item 表示操作成功后的返回类型,Error 表示操作失败的返回类型。实现必须指定这些并且还实现获取计算当前状态的 poll 方法。如果已经完成,则返回结果。如果没有, future 将注册当前任务对给定操作的结果感兴趣。此函数返回一个 Poll ,如下所示:


# #![allow(unused_variables)]
#fn main() {
type Poll<T, E> = Result<Async<T>, E>;
#}

Poll 被输入到另一个名为 Async(和给定的错误类型)的类型的结果中,该类型接下来定义。


# #![allow(unused_variables)]
#fn main() {
pub enum Async<T> {
    Ready(T),
    NotReady,
}
#}

反过来,Async 是一个可以在 Ready(T)NotReady 中的枚举。 最后两个状态对应于操作的状态。因此,poll 函数可以返回三种可能的状态:

  • Ok(Async::Ready(result)) 当操作成功完成并且结果在名为 result 的内部变量中。
  • OK(Async::NotReady) 当操作还没有完成,结果获取不到。注意,这并不表示错误情况。
  • Err(e) 当操作运行产生一个错误,在这种情况下没有结果可以获取。

很容易注意到,Future 本质上是一个 Result ,可能仍在运行实际生成 Result 的东西。 如果删除结果可能在任何时间点都没有准备好的情况,我们剩下的唯一两个选项是 Ok 和 Err 情况,它们与结果完全对应。

因此, Future 可以代表任何需要花费大量时间才能完成的事情。这可以是网络事件,磁盘读取等。现在,最常见的问题是:我们如何从给定函数返回 future ?有几种方法可以做到这一点。我们来看一个例子吧。项目设置与以往一样。

 cargo new --bin futures-example

在 Cargo 配置里添加一些库:

[package] 
name = "futures-example" 
version = "0.1.0" 
authors = ["Foo<foo@bar.com>"]

[dependencies] 
futures = "0.1.17" 
futures-cpupool = "0.1.7"

main.rs 中,我们像往常一样设置所有内容。我们的目的是找出给定的整数是否为素数,这将代表我们的操作部分,需要一些时间才能完成。我们有两个函数来做这些。 正如我们稍后将看到的,这两种方法使用两种不同的 futures 返回方式。在实践中,这种幼稚的初等性测试方法并没有被证明是一个很好的例子。因此,我们不得不随机睡眠一段时间来模拟慢度。

// ch7/futures-example/src/main.rs

use futures::Future;
use futures_cpupool::CpuPool;
use std::io;

fn check_prime_boxed(n: u64) -> Box<Future<Item = bool, Error = io::Error>> {
    for i in 2..n {
        if n % i == 0 {
            return Box::new(futures::future::ok(false));
        }
    }
    Box::new(futures::future::ok(true))
}

fn check_prime_impl_trait(n: u64) -> impl Future<Item = bool, Error = io::Error> {
    for i in 2..n {
        if n % i == 0 {
            return futures::future::ok(false);
        }
    }
    futures::future::ok(true)
}

fn check_prime(n: u64) -> bool {
    for i in 2..n {
        if n % i == 0 {
            return false;
        }
    }
    true
}

fn main() {

    let input: u64 = 58466453;
    println!("Right before first call");
    let res_one = check_prime_boxed(input);
    println!("Called check_prime_boxed");
    let res_two = check_prime_impl_trait(input);
    // let _r = futures::executor::spawn(res_two);
    println!("Called check_prime_impl_trait");
    println!(
        "Results are {} and {}",
        res_one.wait().unwrap(),
        res_two.wait().unwrap()
    );

    let thread_pool = CpuPool::new(4);
    let res_three = thread_pool.spawn_fn(move || {
        let temp = check_prime(input);
        let result: Result<bool, ()> = Ok(temp);
        result
    });
    println!("Called check_prime in another thread");
    println!("Result from the last call: {}", res_three.wait().unwrap());
}

有几种返回 futures 的方式。第一种是使用 trait 对象,如在 check_prime_boxed 。 Box 是一个指向堆上对象的指针类型。从某种意义上说,它是一个托管指针,当对象超出范围时,它将被自动清理。 函数的返回类型是一个 trait 对象,它可以表示将其 Item 设置为 bool 且 Error 设置为 io::Error 的任何 future 。因此,这代表动态调度。 返回 futures 的第二种方法是使用 impl trait 功能。如 check_prime_impl_trait 中所做的。 我们说该函数返回返回一个实现 Future<Item=bool, Error=io::Error> 的类型。并且任何实现 Future trait 的类型都是 future ,我们的函数返回一个 future。 请注意,在这种情况下,我们不需要在返回结果之前使用 box 。因此,这种方法的一个优点是不需要分配来返回 future 。我们的两个函数都使用 future::ok 函数来表示我们的计算已成功完成给定的结果。 另一种选择是不实际返回 future 并使用基于 future 的线程池箱来创造 future 并管理它。这是 check_prime 只返回一个 bool 的情况。在我们的 main 函数中,我们使用futures-cpupool crate 建立一个线程池,然后运行该池中的最后一个函数。我们回到 future ,我们可以调用 wait 获得结果。 实现相同目标的完全不同的选择是返回实现 Future 特征的自定义类型。这个是最不符合人体工程学的,因为它涉及编写一些额外的代码,但它是最灵活的方法。

在构建了 future 之后,下一个目标就是执行它。有三种方法可以做到这一点:

  • 在当前线程中:这将最终阻塞当前线程,直到将来完成执行。在前面的示例中,res-one和res-two在主线程上执行,从而阻止用户交互。
  • 在线程池中:res_three 就是这种情况,它在名为 thread_pool 的线程池中执行。因此,在这种情况下,调用线程可以自己继续进行自己的处理。
  • 在事件循环中:在某些情况下,上述两种情况都不可能。那么唯一的选择是在事件循环中执行 future 。方便的是,tokio-core crate提供了适合期货的API来使用事件循环。我们将在下一节深入探讨这个模型

在我们的主函数中,我们调用主线程中的前两个函数。因此,它们将阻塞主线程的执行。但是,最后一个线程在另一个线程上运行。在这种情况下,主线程立即可以自由打印出调用了check-prime 的文件。它再次阻止了对未来的等待。请注意,在所有情况下, future 都被懒惰地评估。当我们运行此程序时,应该看到以下内容:

$ cargo run 
    Compiling futures-example v0.1.0 (file:///src/ch7/futures-example) 
     Finished dev [unoptimized + debuginfo] target(s) in 0.77 secs 
      Running `target/debug/futures-example` 
Right before first call 
Called check_prime_boxed 
Called check_prime_impl_trait 
Results are true and true 
Called check_prime in another thread 
Result from the last call: true

将 futures 与常规线程区分开来的是,它们可以按照人体工程学方式链接起来。 这就像是说,下载网页,然后解析HTML,然后提取给定的单词。这些步骤中的每一个都是一个 future ,下一个步骤不能开始,除非第一个步骤已经完成。 整个业务也是一个 future ,由若干组成 future 组成。当这个更大的 Future 被执行时,它被称为任务。 板条箱提供了许多用于与 futures::task 命名空间中的任务交互的 API。该库提供了许多以这种方式处理期货的功能。 当一个给定的类型实现 Future 的特性(实现轮询方法)时,编译器可以为所有这些组合器提供实现。让我们来看一个使用链接实现超时功能的示例。 我们将使用 tokio timer 板条箱来计算 future 的超时时间,在我们的代码中,我们有两个相互竞争的函数,它们可以随机休眠一段时间,然后向调用者返回一个固定的字符串。我们将同时调度所有这些函数,如果我们返回与第一个函数对应的字符串,我们将声明它已经赢了。同样,这也适用于第二个。如果我们也不回来,我们知道 future 的超时已经触发了。让我们从项目设置开始:

cargo new --bin futures-chaining

在 Cargo.toml 中添加依赖:

[package] 
name = "futures-chaining" 
version = "0.1.0" 
authors = ["Foo <foo@bar.com>"]

[dependencies] 
tokio-timer = "0.1.2"
futures = "0.1.17" 
futures-cpupool = "0.1.7" 
rand = "0.3.18"

和上次一样,我们使用一个线程池使用 futures-cpupool 来执行我们的 future 。让我们看看代码:

//  ch7/futures-chaining/src/main.rs

use futures::future::select_ok;
use futures::Future;
use futures_cpupool::CpuPool;
use rand::{thread_rng, Rng};
use std::thread;
use std::time::{Duration, Instant};
use tokio_timer::Timer;

fn player_one() -> &'static str {
    let d: u64 = thread_rng().gen_range(1, 5);
    thread::sleep(Duration::from_secs(d));
    "player_one"
}

fn player_two() -> &'static str {
    let d: u64 = thread_rng().gen_range(1, 5);
    thread::sleep(Duration::from_secs(d));
    "player_two"
}


fn main() {
    let pool = CpuPool::new_num_cpus();
    let timer = Timer::default();

    let timeout = timer.sleep(Duration::from_secs(3))
        .then(|_| Err(()));

    let one = pool.spawn_fn(|| {
        Ok(player_one())
    });

    let two = pool.spawn_fn(|| {
        Ok(player_two())
    });

    let tasks = vec![one, two];
    let winner = select_ok(tasks).select(timeout).map(|(result, _)| result);
    let result = winner.wait().ok();
    match result {
        Some(("player_one", _)) => println!("Player one won"),
        Some(("player_two", _)) => println!("Player two won"),
        Some((_, _)) | None => println!("Timed out"),
    }
}

我们的两个玩家非常相似;他们都会产生一个介于1到5之间的随机数,并在这段时间内睡眠。 之后,它们返回与名称对应的固定字符串。稍后我们将使用这些字符串来唯一地标识它们。 在我们的 main 函数中,我们初始化线程池和计时器。我们使用计时器上的组合器返回一个 3 秒后出错的 future 。然后,我们在线程池中生成这两个参与者,并将这些参与者的结果作为 future 返回。 请注意,这些函数目前并没有真正运行,因为 future 惰性执行。然后,我们将这些 future 放到一个列表中,并使用 select_ok 组合器并行运行这些 future 。此函数接受一个不可重复的 future ,并选择第一个成功的 future;这里唯一的限制是传递给此函数的所有 future 都应该是相同的类型。因此,我们不能在这里通过超时 future 。我们使用 select 组合器将 select_ok 的结果链接到超时 future ,select 组合器接受两个 future ,并等待其中一个完成执行。结果的未来将有一个已经完成的和一个没有完成的,然后我们使用 map 组合器来丢弃第二部分。 最后,我们阻塞了我们的 future ,并用 OK() 来表示链的结束。然后,我们可以将结果与已知字符串进行比较,以确定哪一个 future 获胜,并相应地打印出消息。

这是几次运行后的样子。由于我们的超时小于两个函数中任何一个的最大睡眠周期,我们应该看到一些超时。每当一个函数选择一个小于超时的时间时,它就会获得胜利。

$ cargo run 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/futures-chaining` 
Player two won 
$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/futures-chaining` 
Player one won 
$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/futures-chaining` 
Timed out