C++ std::promise
2021年11月10日
字数统计:1522
由于std::async在理论和实现上并不能满足我们的需求,因为我们常常在异步操作后对异步操作的结果进行一个同步,而std::async并不能提供这点需求,甚至由于默认发射策略导致std::async很有可能退化为单线程顺序运行,所以在std::async的基础上提供了std::promise。
std::promise可以保存某一类型T的值,该值通常使用std::future储存(可能传递给另外一个线程),因此std::promise也提供了一种线程同步的手段 。
可以通过get_ future来获取与该promise对象相关联的future对象,调用该函数之后,两个对象共享相同的共享状态(Shared State)。
promise是异步Provider,它可以在某一时刻设置共享状态的值。
future可以异步返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标志变为ready,然后才能获取共享状态的值。
#include <iostream>
#include <future>
void print_int ( std :: future < int >& fut ) {
int x = fut . get (); // 获取共享状态的值
std :: cout << "value: " << x << std :: endl ; // 打印value: 10
}
int main ()
{
std :: promise < int > prom ;
std :: future < int > fut = prom . get_future ();
std :: thread t ( print_int , std :: ref ( fut ));
prom . set_value ( 10 ); // 设置共享状态的值, 此处和线程t保持同步
// 若不设置共享状态的值,那么t会被一直阻塞
// 也可选择阻塞一定时间后若仍未设置值,进行异常处理
t . join ();
return 0 ;
}
future本质上是我们发起的一个并发操作,而promise本质上则是并发操作的回调。我们可以通过future对象等待该操作和获取操作的结果,而promise对象则负责写入返回值并通知我们 。
具体的实现中,future与promise会有指向同一个共享的状态对象Shared State的共享指针,当promise对象接受到返回值或者错误之后,通过条件变量通知另一端等待的future对象。future对象则可以通过Shared State对象中的状态,来判断接收到回调之后是继续处理业务还是处理错误。
std::promise 有如下成员函数:
get_ future返回与promise关联的future
set_ value设置结果为指定值
set_ value_ at_ thread_ exit设置结果为指定值,同时仅在线程退出时分发提醒
set_ exception设置结果为指示异常
set_ exception_ at_ thread_ exit设置结果为指示异常,同时仅在线程退出时分发提醒
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
void accumulate ( std :: vector < int >:: iterator first ,
std :: vector < int >:: iterator last ,
std :: promise < int > accumulate_promise )
{
int sum = std :: accumulate ( first , last , 0 );
accumulate_promise . set_value ( sum ); // 提醒future已就绪
}
int main ()
{
// 演示用promise<int> 在线程间传递结果。
std :: vector < int > numbers = { 1 , 2 , 3 , 4 , 5 , 6 };
std :: promise < int > accumulate_promise ;
std :: future < int > accumulate_future = accumulate_promise . get_future ();
std :: thread work_thread ( accumulate , numbers . begin (), numbers . end (),
std :: move ( accumulate_promise ));
// future::get() 将等待直至该future拥有合法结果并取得它
// 无需在get() 前调用wait
//accumulate_future.wait(); // 等待结果
std :: cout << "result=" << accumulate_future . get () << std :: endl ;
work_thread . join (); // wait for thread completion
}
由于std::future 所引用的共享状态不与另一异步返回对象共享,所以std::promise也具有同样的性质,所以std::promise一般使用移动构造 的方式来传递(原代码中使用了引用,但是这并不值得展示)。
C++并发编程实战 第二版4.2.3使用promises 中提供了一个简化的使用场景:
void process_connections ( connection_set & connections )
{
while ( ! done ( connections ))
{
for ( connection_iterator
connection = connections . begin (), end = connections . end ();
connection != end ;
++ connection )
{
if ( connection -> has_incoming_data ())
{
data_packet data = connection -> incoming ();
std :: promise < payload_type >& p =
connection -> get_promise ( data . id );
p . set_value ( data . payload );
}
if ( connection -> has_outgoing_data ())
{
outgoing_packet data =
connection -> top_of_outgoing_queue ();
connection -> send ( data . payload );
data . promise . set_value ( true );
}
}
}
}
假设connections是一个网络连接池,自旋锁不断通过循环检查是否整个连接池都已经结束连接,并每次遍历整个连接池。
每一个connection就是一个连接,如果等待传入的数据,那么就构造一个promise,然后设置promise的共享状态的值。
如果有传出数据(通常同时可能存在多个等待传出的数据),则从队列里得到顶部数据,然后构造一个promise发送数据。
显然的,promise总是积极的创造一个线程。
以下是向promise传递一个异常的示例 :
#include <iostream> // std::ios
#include <thread> // std::thread
#include <future> // std::promise, std::future
#include <exception> // std::exception, std::current_exception
void get_int ( std :: promise < int > prom ) {
int x ;
std :: cout << "Please, enter an integer value: " ;
std :: cin . exceptions ( std :: ios :: failbit ); // throw on failbit
try {
std :: cin >> x ; // sets failbit if input is not int
prom . set_value ( x );
}
catch ( std :: exception & ) {
prom . set_exception ( std :: current_exception ());
}
}
void print_int ( std :: future < int > fut ) {
try {
int x = fut . get ();
std :: cout << "value: " << x << std :: endl ;
}
catch ( std :: exception & e ) {
std :: cout << "[exception caught: " << e . what () << "]" << std :: endl ;
}
}
int main ()
{
std :: promise < int > prom ;
std :: future < int > fut = prom . get_future ();
std :: thread th1 ( get_int , std :: move ( prom ));
std :: thread th2 ( print_int , std :: move ( fut ));
th1 . join ();
th2 . join ();
return 0 ;
}
其中get_ int负责接收一个整数的输入,如果输入不正确则设置一个异常,print_ int用于捕获promise的输入,如果promise储存了一个异常,则输出异常信息。
下面是一个考虑更加周全的例子 :
#include <thread>
#include <iostream>
#include <future>
int main ()
{
std :: promise < int > p ;
std :: future < int > f = p . get_future ();
std :: thread t ([ & p ] {
try {
// 可能抛出的代码
throw std :: runtime_error ( "Example" );
}
catch (...) {
try {
// 存储任何抛出的异常于promise
p . set_exception ( std :: current_exception ());
}
catch (...) {} // set_exception也可能抛出异常
}
});
try {
std :: cout << f . get ();
}
catch ( const std :: exception & e ) {
std :: cout << "Exception from the thread: " << e . what () << std :: endl ;
}
t . join ();
}
若无特殊声明,本人原创文章以
CC BY-SA 4.0许可协议
提供。
本站不欢迎非搜索引擎类,非个人学习类爬虫;严禁将文章直接爬取至其他站点。
若看到此条消息,说明你正在访问的网站可能是垃圾二手转载网站。
为了获得更好的浏览体验,请访问唯一原始网站:mysteriouspreserve[dot]com或blog[dot]bizwen[dot]com。