简单介绍C++中的Promise和Future

并发编程

随着计算机硬件的发展,单核芯片的性能已经很难再提升,现在科研和工程领域已经向着多核迈进很久了。这里先不说科研领域目前很火的分布式技术(包括存储、数据块,容器,网络等),而是从基本的工程领域的编程的视角上看,如何利用现代CPU的多核性能。

现代操作系统中CPU调度的基本单位是线程,在多核CPU的场景下多个线程会被操作系统调度器自动分配到多个核心中运行,如果线程数超过CPU的核心数(有的CPU具有超线程技术,这时候应该以最大超线程数为准),那么调度器的一种可能的调度就是把这些线程平均分到多个核心上。

说了这么多,这里首先介绍下怎么在C++11或者以后的版本中怎么使用<thread>来进行多线程编程

创建线程

下图创建了两个线程t1和t2,它们分别将x自增10000次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int main() {
int x = 0;
std::thread t1([&]() {
for (int i = 0; i < 10000; i++)
++x;
});
std::thread t2([&]() {
for (int i = 0; i < 10000; i++)
++x;
});
t1.join();
t2.join();
printf("Value of x is %d", x);
return 0;
}

如果你对计算机有一定认识的话就不难发现,运行完成后x的值极有可能不是20000,因为这两个线程之间存在竞争关系。计算机的内存只有一块,但是线程运行在两个不同的核心中,而++x操作不是原子操作,这会导致出现读脏数据,写失效等等问题,下面是一个可能的输出:

1
Value of x is 13446

互斥与Mutex

为了解决上述问题,CPU提供了一些基本的原子操作的原语(即一些特殊的指令)如compare_and_exchange或者fetch_and_add等来保证对一个变量的原子访问,在此基础上使用一些软件技巧,就有了互斥锁mutex。

mutex的基本作用就是对一个变量进行加锁,说简单就是:

  1. 一个变量未被加锁的时候所有线程都可访问改变量
  2. 当一个线程在访问这个变量的期间,它就持有了这个变量的锁,此时其他线程如果想访问这个变量就必须阻塞直到这个线程释放这个锁
  3. 当线程访问变量结束后就可以释放锁,这时候其他线程就能继续抢占锁然后访问了,以此类推

下面举了一个C++ mutex基本使用方法的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int main() {
int x = 0;
std::mutex m;
std::thread t1([&]() {
for (int i = 0; i < 10000; i++) {
m.lock();
++x;
m.unlock();
}
});
std::thread t2([&]() {
for (int i = 0; i < 10000; i++) {
m.lock();
++x;
m.unlock();
}
});
t1.join();
t2.join();
printf("Value of x is %d", x);
return 0;
}

在这种写法下x的值就一定是20000了,当然这里也可以利用C++引以为豪的RAII机制,使用lock_gruard来简化代码:

1
2
3
4
5
6
std::thread t1([&]() {
for (int i = 0; i < 10000; i++) {
std::lock_guard<std::mutex> l(m);
++x;
}
});

原理和上面的lock+unlock是一样的,这里只是在构造函数内执行lock,在析构函数内执行unlock罢了。

同步与condition_variable(条件变量)

线程之前除了上述的互斥关系外,还有一层同步关系,有个典型的例子就是生产者消费者模型,生产者线程A必须在队列没满的时候才往里面加东西,消费者B必须只能在队列不为空的时候从里面拿东西,否者就一直阻塞。仔细分析的话就会发现这里有个”等待关系”或者说”条件关系”,比如A线程睡眠直到队列不满等。这也就是我们所说的同步。

可以使用条件变量来实现线程之间的同步,condition_variable也是基于CPU提供的一些基本的原语来实现,这里就不细究了。仔细对比下condition_variablemutex,你会发现它们之前的一些共同点:

  1. 两者都是某些线程能够执行,某些线程阻塞
  2. 两种都是执行完成的线程通知阻塞的线程

为了演示条件变量的用法,这里使用一个简单线程池来展示,代码来自https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
ThreadPool(size_t); //构造函数
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
std::vector< std::thread > workers; //工作线程
std::queue< std::function<void()> > tasks; //任务队列
std::mutex queue_mutex; //锁
std::condition_variable condition;//条件变量
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i) //构造函数设置每个线程的任务
workers.emplace_back([this]{
for(;;){
std::function<void()> task;
{
//组阻塞直到任务队列不为空(任务队列是个std::function的列表)
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())return; //如果线程池结束了而且当前没任务了就返回
task = std::move(this->tasks.front());
this->tasks.pop(); //从队列中弹出问题
}
task();//执行任务
}
}
);
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
//future这部分先不管,在下一节
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); }); //加入任务
}
condition.notify_one(); //加入任务后任务队列肯定不为空了,通知阻塞的工作线程
return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
#endif

这个代码还是比较简单的,完美体现了条件变量的使用。

Future和Promise

介绍

互斥量mutex和条件变量condition_variable一同构成了并发编程最基本的模型,前者提供互斥机制,保证变量的安全读写,后者提供同步机制,保证了不同任务之前的依赖关系,这两者的实现都基于CPU/操作系统提供的相关原语。

你可能注意到了,互斥量的unlock和条件变量的notify_all都具有通知和唤醒其他线程的能力,但是这个唤醒也仅限唤醒而已。这时候我们就想,能不能提供一个更加抽象的<通知>机制来实现这个唤醒,也就是实际的线程同步操作呢,然后顺便再给它加点功能,比如唤醒的时候返回一个值

为了达到这一目的,人们进行了进一步的抽象,也就是这篇文章的主题:Future/Promise机制,据说这一机制是来源于函数式编程领域。

Future/Promise的最大特点就是把结果进行了抽象,FuturePromise总是成对出现的:用Future来代表一个任务的结果,用Promise来更新任务的执行状态,借用stack overflow上的一个回答来说就是:promise被生产者使用,Future被消费者使用。

  1. 生产者在完成生产(即任务完成的时候)调用promise.set_value()并将任务已完成这一状态来同步到Future内。
  2. 消费者可以随时通过Future来查询任务的完成状态,也能直接通过Future阻塞当前线程,直到任务完成

说了这么多还是比较抽象,这里使用一些实际的例子来解释C++内的std::promisestd::future的使用:

1
2
3
4
5
6
7
8
9
10
11
int main() {
std::promise<int> promise;
auto f = promise.get_future();
std::thread t([&]() {
std::this_thread::sleep_for(std::chrono::seconds(5));
promise.set_value(12);
});
std::cout << "task return value is " << f.get();
t.join();
return 0;
}

这里我们创建一个promise实例,然后获取到和它对应的Future,紧接着我们创建一个新线程,线程内在5s后才调用promise.set_value(),这里的意思就是模拟t线程内的任务要执行5s,然后得到结果之后通过promise将12这个结果发送给future. t线程在执行完成之前f.get()会一直阻塞直到promise.set_value()执行完成,因此这里运行结果就是主线程会睡眠5s直到t线程运行完成,然后主线程打印12,最后整个程序返回。

实现简单的future和promise

说了这么多,这就用mutexcondition_variable来实现一个简单的promisefuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
struct SharedStatus {
enum Status { //任务状态
Pending,
Ready,
};
Status status = Pending;
std::mutex m;
std::condition_variable cv;

void wait() {
std::unique_lock<std::mutex> lk(this->m);
cv.wait(lk, [&]() { return this->status == Ready; });
}
void notify_all() {
cv.notify_all();
}
};

template<typename T>
struct MyFuture {
SharedStatus *status{}; //和promise共享的状态信息
T *value;

T get_value() { //获取值,阻塞直到条件变量通知
this->status->wait();
return *this->value; //返回
}
void wait() {
this->status->wait();
}

MyFuture(T *value, SharedStatus *status) {
this->value = value;
this->status = status;
}
};

template<typename T>
struct MyPromise {
SharedStatus status; //存储所有的条件标量和状态信息
T value{};
MyFuture<T> get_future() { //Promise返回的future和当前promise共享所有的状态信息,才能被称之为一对,这里使用一个指针将值也共享了
return MyFuture<T>(&this->value, &this->status);
}

void set_value(const T &v) {//promise在set_value的时候调用notify_all函数通知feture
this->value = v;
this->status.status = SharedStatus::Ready;
this->status.notify_all();
}
};

注释写的很清楚了,这里就不再细说,使用我们自己写的MyFutureMyFuture也能实现上述示例代码类似的效果:

1
2
3
4
5
6
7
8
9
MyPromise<int> promise;
auto f = promise.get_future();
std::thread t([&]() {
std::this_thread::sleep_for(std::chrono::seconds(5));
promise.set_value(12);
});

std::cout << "value is " << f.get_value();
t.join();

运行结果和上述示例一模一样。

promise/future机制与异步编程

不知道你有没有发现,Future/Promsire似乎并没有与所谓的多线程绑定,而且示例代码似乎在线程下也能运行(就是没屁用):

1
2
3
4
5
6
7
int main() {
std::promise<int> promise;
auto f = promise.get_future();
promise.set_value(12);
std::cout << "task return value is " << f.get();
return 0;
}

当然这里会直接打印,因为两个运行在一个线程中,因此不会阻塞。如果我们再封装一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename T>
std::future<T> run_task(const std::function<T()> &task) {
std::promise<T> p;
p.set_value(task());
return p.get_future();
}
int main() {
auto f = run_task<int>([]() {
return 3;
});
std::cout << "task return value is " << f.get();
return 0;
}

这样的话,就类似于用一个函数执行一个任务,并且我们并不知道这个任务会啥时候结束,但是我们得到了一个类似句柄的东西(也就是返回值future)能够随时查询任务的执行状态,因为这个任务执行完成的时候可以通过内部的promise来更新外部的future的状态。这就是所谓的异步编程,我们这里没有体现异步是因为run_task是和主线程一起跑的,和直接调用函数没啥区别,但是别忘了,我们可以在run_task内部加上一个新线程,甚至一个线程池,这样就变成真正的异步编程了,当然这东西也不需要我们自己实现,因为c++内部已经提供了一个:std::async_task,下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11

int main() {
auto task = []() {
std::this_thread::sleep_for(std::chrono::seconds(4));
return 3;
};
auto f = std::async(task);
printf("task won't block current thread\n");
std::cout << "task return value is " << f.get();
return 0;
}

相信看到这里这行代码啥意思也不需要我来讲了。

到这里还不够,你可能会好奇std::async内部是怎么创建新线程,以及如果我们同时添加多个异步任务,std::async内部会如何调度呢,为了解决这个问题,人们又从异步编程中抽象出来一个东西:executor,也就是专门用来执行任务的,可配置的执行器,由于这个东西还没在C++中完全对外暴露接口,这里我也就不细说了。当然到了这里我们也需要注意到,executor已经超出了线程池的概念了,它就是是一个任务调度器,完全可以在单线程中执行。

Async和Await

说到这里就可以慢慢引出来asyncawait了,这两个东西已经和C++没啥关系,就一个单纯的异步任务执行抽象: 使用async来定义一个异步任务,使用await来等待一个异步任务的结束。在此基础上配合一个外部的executor来进行任务执行和调度。这两个东西已经作为python,rust等编程语音的关键字,成为受编译器支持的异步编程模型的基础设施。由于这里不是asyncawait的专场,因此这里不细说了,有空再开一篇文章专门说下这两个东西内部原理(其实是我自己还没搞清楚)。