Non-Profit, International

Spirit unsterblich.

结构化并发

字数统计:5706

本文是Eric Niebler在2020年编写的文章Structured Concurrency的中文翻译。作者抓住了结构化并发的核心优势:为作用域而不是对象建立共享状态、功能完全的控制流而不是回调。因此有必要在这里分享出来。

TL;DR:“结构化并发”指的是一种组织异步计算的方式,确保子操作在其父操作完成之前完成,就如同函数保证在其调用者之前完成一样。这听起来简单且平常,但在C++中却意义非凡。结构化并发——最显著的体现是C++20协程——对于异步架构的正确性和简洁性具有深远的影响。它通过使异步生命周期与普通的C++词法作用域相对应,将现代C++风格带入到我们的异步程序中,从而消除了为管理对象生命周期而进行引用计数的需要。

结构化编程与C++

回溯到20世纪50年代,新兴的计算机行业发现了结构化编程:使用带有词法作用域、控制结构和子例程的高级编程语言,比在汇编层面使用测试-跳转指令和goto进行编程所得到的程序更易于阅读、编写和维护。这一进步如此巨大,以至于如今已无人再谈论结构化编程;它已然就是“编程”本身。

C++比任何其他语言都更充分地利用了结构化编程。对象生命周期的语义反映并紧密绑定于作用域的严格嵌套;也就是说,绑定于你代码的结构。函数激活(调用)是嵌套的,作用域是嵌套的,对象生命周期也是嵌套的。对象的生命周期随着作用域的结束花括号而终结,并且对象按照其构造相反的顺序销毁,以保持严格的嵌套关系。

现代C++编程风格正是建立在这个结构化基础之上的。对象具有值语义——它们的行为类似于int——资源在析构函数中被确定性地清理,这从结构上保证了资源在其生命周期结束后不会被使用。这一点非常重要。

当我们放弃这种作用域和生命周期的严格嵌套时——例如,当我们对堆上的对象进行引用计数,或者使用单例模式时——我们就是在与语言的优势相抗衡,而不是顺应它们。

线程带来的麻烦

在存在并发的情况下编写正确的程序,远比编写单线程代码困难得多。原因有很多。其中一个原因是,线程(就像单例和动态分配的对象一样)对你那微不足道的嵌套作用域嗤之以鼻。尽管你可以在一个线程内部使用现代C++风格,但当逻辑和生命周期分散在多个线程中时,程序的层次结构就丢失了。我们在单线程代码中用来管理复杂性的工具——尤其是与嵌套作用域绑定的嵌套生命周期——根本无法直接应用到异步代码中。

为了说明我的意思,让我们看看将一个简单的同步函数改为异步时会发生什么。

 C++
void computeResult(State & s);

int doThing() {
  State s;
  computeResult(s);
  return s.result;
}

doThing()非常简单。它声明一些局部状态,调用一个辅助函数,然后返回一些结果。现在假设我们希望将这两个函数都改为异步,可能是因为它们耗时太长。没问题,让我们使用支持连续操作链式调用的Boost future:

 C++
boost::future<void> computeResult(State & s);

boost::future<int> doThing() {
  State s;
  auto fut = computeResult(s);
  return fut.then(
    [&](auto&&) { return s.result; }); // 哎呀
}

如果你以前使用过future进行编程,可能会大喊:“不不不!”最后一行代码中的.then()将一些工作排入队列,这些工作将在computeResult()完成后运行。然后doThing()返回结果future。问题在于,当doThing()返回时,State对象的生命周期就结束了,而continuation(延续)仍然在引用它。这就成了一个悬空引用,很可能会导致程序崩溃。

哪里出错了呢?Future允许我们使用尚未就绪的结果进行计算,而Boost风格的future还允许我们链式调用continuation。但是continuation是一个独立的函数,拥有独立的作用域。我们经常需要在这些独立的作用域之间共享数据。整洁的嵌套作用域和嵌套生命周期不复存在。我们必须手动管理状态的生命周期,类似于这样:

 C++
boost::future<void>
computeResult(shared_ptr<State> s); // 增加
                                    // 状态的引用计数

boost::future<int> doThing() {
  auto s = std::make_shared<State>();
  auto fut = computeResult(s);
  return fut.then(
    [s](auto&&) { return s.result; }); // 增加
                                       // 状态的引用计数
}

由于两个异步操作都引用了这个状态,它们都需要共同承担责任来保持其存活。

另一种思考方式是:这个异步计算的生命周期是多久?它始于doThing()被调用时,但直到continuation——即传递给future.then()的lambda表达式——返回时才结束。没有一个词法作用域与这个生命周期相对应。这正是我们困境的根源。

非结构化并发

当我们考虑执行器(executors)时,情况变得更加复杂。执行器是执行上下文的句柄,允许你将工作调度到,例如,一个线程或线程池上。许多代码库都有某种执行器的概念,有些甚至允许你延迟调度或按某种策略调度。这让我们可以做很酷的事情,比如将计算从一个IO线程池移到一个CPU线程池,或者带延迟重试一个异步操作。很方便,但就像goto一样,它是一种非常底层的控制结构,往往会使代码变得模糊而不是清晰。

例如,我最近遇到一个算法,它使用执行器和回调(这里称为Listener)来重试某个资源的异步分配。以下是大幅精简的版本。在分隔线之后进行描述。

 C++
// 这是一个在异步操作完成时
// 被调用的 continuation:
struct Manager::Listener : ListenerInterface {
  shared_ptr<Manager> manager_;
  executor executor_;
  size_t retriesCount_;

  void onSucceeded() override {
    /* ...耶,分配成功了... */
  }
  void onFailed() override {
    // 当分配失败时,向执行器
    // 发布一个带延迟的重试任务
    auto alloc = [manager = manager_]() {
      manager->allocate();
    };
    // 在未来的某个时间点运行 "alloc":
    executor_.execute_after(
      alloc, 10ms * (1 << retriesCount_));
  }
};

// 尝试异步分配一些资源,
// 使用上述类作为 continuation
void Manager::allocate() {
  // 是否已经尝试太多次?
  if (retriesCount_ > kMaxRetries) {
    /* ...通知任何观察者我们失败了 */
    return;
  }

  // 再试一次:
  ++retriesCount_;
  allocator_.doAllocate(
    make_shared<Listener>(
      shared_from_this(),
      executor_,
      retriesCount_));
}

allocate()成员函数首先检查该操作是否已经重试太多次。如果没有,它调用辅助函数doAllocate(),并传入一个回调,以便在成功或失败时得到通知。失败时,处理程序向执行器发布延迟工作,执行器随后会回调allocate(),从而实现带延迟的重试。

这是一个高度状态化且相当迂回的异步算法。逻辑跨越多个函数和几个对象,控制流和数据流并不清晰。注意为了保持对象存活而进行的复杂的引用计数操作。将工作发布给执行器使得事情更加困难。这里的执行器没有continuation的概念,因此任务执行期间发生的错误无处可去。如果期望程序的任何部分能够从错误中恢复,那么allocate()函数就无法通过抛出异常来指示错误。错误处理必须手动且带外进行。如果我们想支持取消,也是如此。

这就是非结构化并发:我们以特设(ad hoc)的方式将异步操作排入队列;我们链式调用依赖的工作,使用continuation或“strand”执行器来强制执行顺序一致性;我们使用强引用和弱引用来保持数据存活,直到我们确定不再需要它为止。没有任务A是任务B的子任务这样的正式概念,无法强制子任务在其父任务之前完成,也没有一个代码位置可以让我们指着说,“这就是算法。”

如果你不介意这个类比的话,通过执行器的跳转有点像在时间和空间上都是非局部的goto语句:“跳转到程序中的这一点,在X毫秒之后,在特定的这个线程上。”

这种非局部的断裂性使得正确性和效率的推理变得困难。将非结构化并发扩展到处理大量并发实时事件的整个程序时,手动处理带外的异步控制和数据流、控制对共享状态的并发访问以及管理对象生命周期所带来的偶然复杂度,将变得难以承受。

结构化并发

回想一下,在计算机发展的早期,非结构化编程风格迅速让位于结构化风格。随着C++增加了协程,我们看到异步代码领域如今正在发生类似的阶段性转变。如果我们用协程重写上述重试算法(使用Lewis Baker流行的cppcoro库),它可能看起来像这样:

 C++
// 尝试异步分配一些资源,
// 并带重试:
cppcoro::task<> Manager::allocate() {
  // 最多重试 kMaxRetries 次:
  for (int retriesCount = 1;
       retriesCount <= kMaxRetries;
       ++retriesCount) {
    try {
      co_await allocator_.doAllocate();
      co_return; // 成功!
    } catch (...) {}

    // 哎呀,失败了。让出线程一小会儿,
    // 然后重试:
    co_await scheduler_.schedule_after(
      10ms * (1 << retriesCount));
  }

  // 错误,重试次数过多
  throw std::runtime_error(
    "Resource allocation retry count exceeded.");
}

旁注:这里将executor_替换为scheduler_,它实现了cppcoro的DelayedScheduler概念。

让我们列举一下这其中的改进之处:

1.全部在一个函数中!良好的局部性。
2.状态(如retriesCount)可以保存在局部变量中,而不是作为需要引用计数的对象的成员。
3.我们可以使用普通的C++错误处理技术。
4.我们从结构上保证了在allocator_.doAllocate()的异步调用完成之前,此函数不会继续执行。

第(4)点具有深远的影响。考虑文章开头那个简单的例子。下面用协程重新实现的版本是完全安全的:

 C++
cppcoro::task<> computeResult(State & s);

cppcoro::task<int> doThing() {
  State s;
  co_await computeResult(s);
  co_return s.result;
}

上面的代码是安全的,因为我们知道computeResult会在doThing恢复执行之前完成,因此也会在s被销毁之前完成。

借助结构化并发,将局部变量通过引用传递给立即等待(awaited)的子任务是绝对安全的。

取消

采用结构化并发方法,即并发操作的生命周期严格嵌套在其使用的资源的生命周期内,并与程序作用域绑定,这使我们能够避免使用像shared_ptr这样的垃圾回收技术来管理生命周期。这可以带来更高效的代码,减少堆分配和原子引用计数操作,也使代码更容易推理且不易出错。然而,这种方法的一个含义是,我们必须始终等待子操作完成才能让父操作完成。我们不能再仅仅从这些子操作中分离(detach),并让资源在其引用计数降至零时自动清理。为了避免因不再需要子操作的结果而不得不等待不必要长时间,我们需要一种机制来能够取消这些子操作,使它们快速完成。因此,结构化并发模型需要深度支持取消,以避免引入不必要的延迟。

请注意,每当我们通过引用将局部变量传递给子协程时,我们都在依赖结构化生命周期和结构化并发。我们必须确保子协程已经完成并且不再使用该对象之后,父协程才能退出该局部变量的作用域并销毁它。

结构化并发>协程

当我谈论“结构化并发”时,我不仅仅是在谈论协程——尽管这是它最明显的体现。为了说明我的意思,让我们简要讨论一下协程是什么以及不是什么。特别地,C++协程本身根本没有固有的并发性!它们实际上只是让编译器将你的函数为你分割成回调的一种方式。

考虑上面的简单协程:

 C++
cppcoro::task<> computeResult(State & s);

cppcoro::task<int> doThing() {
  State s;
  co_await computeResult(s);
  co_return s.result;
}

这里的co_await是什么意思?一个浅显的回答是:它意味着cppcoro::task<>的作者希望它表达的意思(在一定范围内)。更完整的答案是,co_await挂起当前的协程,将协程的其余部分(这里是指co_return s.result;语句)打包成一个continuation,并将其传递给等待对象(这里是由computeResult(s)返回的task<>)。这个等待对象通常会将其存储在某处,以便稍后当子任务完成时可以调用它。例如,这就是cppcoro::task<>所做的。

换句话说,task<>类型和协程语言特性协同作用,将“结构化并发”层叠在枯燥的回调之上。就是这样。这就是魔法所在。这一切都只是回调,但是以一种非常特定的模式组织的回调,正是这种模式使其成为“结构化”的。这种模式确保子操作在父操作之前完成,而这个特性正是带来种种好处的根源。

一旦我们认识到结构化并发实际上只是特定模式下的回调,我们就会意识到我们可以在没有协程的情况下实现结构化并发。使用回调进行编程当然不是什么新鲜事,并且可以将这些模式编纂成库,使其可重用。libunifex就是这样的库。如果你关注C++标准化进程,这也是执行器提案中的sender/receiver抽象所做的事情。

使用libunifex作为结构化并发的基础,我们可以将上面的例子写成如下形式:

 C++
unifex::any_sender_of<> computeResult(State & s);

auto doThing() {
  return unifex::let_with(
    // 声明一个 State 类型的“局部变量”:
    [] { return State{}; },
    // 使用这个局部变量来构造一个异步任务:
    [](State & s) {
      return unifex::transform(
        computeResult(s),
        [&] { return s.result; });
    });
}

既然有协程,为什么还有人会写这种代码?你当然需要一个好的理由,但我能想到几个。使用协程,首次调用协程时会有一次分配,每次恢复执行时会有一次间接函数调用。编译器有时可以消除这些开销,但有时不能。通过直接使用回调——但遵循结构化并发模式——我们可以在没有协程某些权衡的情况下获得协程的许多好处。

然而,这种编程风格做了不同的权衡:它比等效的协程更难编写和阅读。我认为未来超过90%的异步代码出于可维护性的考虑都应该使用协程。对于热点代码,可以有选择地用底层的等效实现替换协程,并以基准测试结果为准。

并发

我在上面提到,协程本身并不具有并发性;它们只是一种编写回调的方式。协程本质上是顺序的,task<>类型的惰性——协程启动时挂起,直到被等待(awaited)才开始执行——意味着我们不能用它们来引入程序中的并发性。现有的基于future的代码通常假设操作已经急切地开始了,引入了需要小心修剪的特设并发性。这迫使你一次又一次地以特设的方式重新实现并发模式。

借助结构化并发,我们将并发模式编纂成可重用的算法,以结构化的方式引入并发。例如,如果我们有一堆task,并且希望等待它们全部完成并以tuple形式返回它们的结果,我们可以将它们全部传递给cppcoro::when_allco_await其结果。(Libunifex也有一个when_all算法。)

目前,cppcoro和libunifex都没有when_any算法,所以你无法启动一批并发操作并在第一个操作完成时返回。不过,这是一个非常重要且有趣的基础算法。为了维护结构化并发的保证,当第一个子任务完成时,when_any应该请求取消所有其他任务,然后等待它们全部完成。这个算法的实用性取决于程序中所有异步操作能否迅速响应取消请求,这恰恰表明了在现代异步程序中深度支持取消是多么重要。

迁移

到目前为止,我已经讨论了什么是结构化并发以及它为何重要。我还没有讨论如何实现它。如果你已经在使用协程编写异步C++代码,那么恭喜你。你可以继续享受结构化并发带来的好处,也许现在对为什么协程具有如此变革性的意义有了更深的理解和欣赏。

对于那些缺乏结构化并发、深度取消支持,甚至缺乏异步抽象本身的代码库来说,任务是很艰巨的。甚至可能需要从引入复杂性开始,以便开辟出一个“孤岛”,让周围的代码能够提供结构化并发模式所需的保证。例如,这包括创建一种印象,即调度的任务能够被迅速取消,即使底层的执行上下文并不直接提供这种能力。这种增加的复杂性可以被隔离在一个层中,而结构化并发的“孤岛”可以构建在其之上。然后简化工作就可以开始了,将基于future或回调风格的代码转换为协程,梳理出父子关系、所有权和生命周期。

总结

添加co_await使同步函数变为异步,但不会干扰计算的结构。被等待的异步操作必然在调用函数完成之前完成,就像普通的函数调用一样。其革命性在于:一切照旧。作用域和生命周期仍然像往常一样嵌套,只不过现在作用域在时间上是不连续的。使用原始回调和future时,这种结构就丢失了。

协程,以及更广泛的结构化并发,将现代C++风格的优势——值语义、算法驱动设计、具有确定性终结的清晰所有权语义——带入了我们的异步编程中。它之所以能做到这一点,是因为它将异步生命周期重新绑定到普通的C++词法作用域。协程在挂起点将我们的异步函数分割成回调,这些回调以非常特定的模式被调用,以维护作用域、生命周期和函数激活的严格嵌套。

我们在代码中点缀co_await,然后就可以继续使用所有熟悉的惯用法:用于错误处理的异常、局部变量中的状态、用于释放资源的析构函数、按值或按引用传递的参数,以及所有其他良好、安全和惯用的现代C++的特征。

感谢阅读。


如果你想了解更多关于C++中结构化并发的信息,请务必查看Lewis Baker在2019年CppCon上的演讲


若无特殊声明,本人原创文章以 CC BY-SA 4.0许可协议 提供。