《C++并发编程实战》读书笔记(3):并发操作的同步
1、条件变量
当线程需要等待特定事件发生、或是某个条件成立时,可以使用条件变量std::condition_variable
,它在标准库头文件<condition_variable>
内声明。
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread()
{
while (more_data_to_prepare())
{
const data_chunk data = prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
}
void data_processing_thread()
{
while (true)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [] { return !data_queue.empty(); });
data_chunk data = data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
if (is_last_chunk(data)) { break; }
}
}
wait()
会先在内部调用lambda函数判断条件是否成立,若条件成立则wait()
返回,否则解锁互斥并让当前线程进入等待状态。当其它线程调用notify_one()
时,当前调用wait()
的线程被唤醒,重新获取互斥锁并查验条件,若条件成立则wait()
返回(互斥仍被锁住),否则解锁互斥并继续等待。
wait()
函数的第二个参数可以传入lambda函数,也可以传入普通函数或可调用对象,也可以不传。
notify_one()
唤醒正在等待当前条件的线程中的一个,如果没有线程在等待,则函数不执行任何操作,如果正在等待的线程多于一个,则唤醒的线程是不确定的。notify_all()
唤醒正在等待当前条件的所有线程,如果没有正在等待的线程,则函数不执行任何操作。
2、使用future等待一次性事件发生
C++标准程序库有两种future
,分别由两个类模板实现,即std::future<>
和std::shared_future<>
,它们的声明位于头文件<future>
内。
2.1、从后台任务返回值
由于std::thread
没有提供直接回传结果的方法,所以我们使用函数模板std::async()
来解决这个问题。std::async()
以异步方式启动任务,并返回一个std::future
对象,运行函数一旦完成,其返回值就由该对象持有。在std::future
对象上调用get()
方法时,当前线程就会阻塞,直到std::future
准备妥当并返回异步线程的结果。std::future
模拟了对异步结果的独占行为,get()
仅能被有效调用一次,调用时会对目标值进行移动操作。
int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout << "The answer is " << the_answer.get() << std::endl;
}
在调用std::async()
时,它可以接收附加参数进而传递给任务函数作为其参数,此方式与std::thread
的构造函数相同。更多启动异步线程的方法可参考下面的例程:
struct X
{
void foo(int, const std::string&);
std::string bar(const std::string&);
};
X x;
auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用p->foo(42, "hello"),p是指向x的指针
auto f2 = std::async(&X::bar, x, "goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本
struct Y
{
double operator()(double);
};
Y y;
auto f3 = std::async(Y(), 3.141); // 调用tmpy(3.141),tmpy是由Y()生成的匿名变量
auto f4 = std::async(std::ref(y), 2.718); // 调用y(2.718)
X baz(X&);
std::async(baz, std::ref(x)); // 调用baz(x)
我们还能为std::async()
补充一个std::launch
类型的参数,来指定采用哪种方式运行:std::launch::deferred
指定在当前线程上延后调用任务函数,等到在future
上调用了wait()
或get()
,任务函数才会执行;std::launch::async
指定必须开启专属的线程,在其上运行任务函数。该参数的还可以是std::launch::deferred | std::launch::async
,表示由std::async()
的实现自行选择运行方式,这也是这项参数的默认值。
auto f6 = std::async(std::launch::async, Y(), 1.2); // 在新线程上执行
auto f7 = std::async(std::launch::deferred, baz, std::ref(x)); // 在wait()或get()调用时执行
auto f8 = std::async(std::launch::deferred | std::launch::async, baz, std::ref(x)); // 交由实现自行选择执行方式
auto f9 = std::async(baz, std::ref(x));
f7.wait(); // 调用延迟函数
2.2、关联future实例和任务
std::packaged_task<>
连结future对象与函数(或可调用对象,下同)。std::packaged_task<>
对象在执行任务时,会调用关联的函数,把返回值保存为future的内部数据,并令future准备就绪。若一项庞杂的操作能分解为多个子任务,则可以把它们分别包装到多个std::packaged_task<>
实例之中,再传递给任务调度器或线程池,这就隐藏了细节,使任务抽象化,让调度器得以专注处理std::packaged_task<>
实例,无需纠缠于形形色色的任务函数。
std::packaged_task<>
是类模板,其模板参数是函数签名(例如void()
表示一个函数,不接收参数,也没有返回值),传入的函数必须与之相符,即它应接收指定类型的参数,返回值也必须可以转换成指定类型。这些类型不必严格匹配,若某函数接收int
类型参数并返回float
值,则可以为其构建std::packaged_task<double(double)>
的实例,因为对应的类型可以隐式转换。
std::packaged_task<>
具有成员函数get_future()
,它返回std::future<>
实例,该future的特化类型取决于函数签名指定的返回值。std::packaged_task<>
还具备函数调用操作符,它的参数取决于函数签名的参数列表。
std::mutex m;
std::deque<std::packaged_task<void()>> tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread()
{
while (!gui_shutdown_message_received())
{
get_and_process_gui_message();
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if (tasks.empty()) { continue; }
task = std::move(tasks.front());
tasks.pop_front();
}
task();
}
}
std::thread gui_bg_thread(gui_thread);
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
std::packaged_task<void()> task(f);
std::future<void> res = task.get_future();
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task));
return res;
}
2.3、创建std::promise
有些任务无法以简单的函数调用表达出来,还有一些任务的执行结果可能来自多个部分的代码,这时可以借助std::promise
显式地异步求值。配对的std::promise
和std::future
可以实现下面的工作机制:等待数据的线程在future上阻塞,而提供数据的线程利用相配的std::promise
设定关联的值,使future准备就绪。
若需从给定的std::promise
实例获取关联的std::future
对象,调用前者的成员函数get_future()
即可,这与std::package_task
一样。promise的值通过成员函数set_value()
设置,只要设置好,future即准备就绪,凭借它就能获取该值。如果std::promise
在被销毁时仍未曾设置值,保存的数据则由异常代替。
void f(std::promise<int> ps)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
ps.set_value(42);
}
int main()
{
std::promise<int> ps;
std::future<int> ft = ps.get_future();
std::thread t(f, std::move(ps));
int val = ft.get();
std::cout << val << std::endl;
t.join();
}
2.4、将异常保存到future中
若经由std::async()
调用的函数抛出异常,则会被保存到future中,future随之进入就绪状态,等到其成员函数get()
被调用,存储在内的异常即被重新抛出。std::packaged_task
也是同理,若包装的任务函数在执行时抛出异常,则也会被保存到future中,只要调用get()
,该异常就会被再次抛出。自然而然,std::promise
也具有同样的功能,它通过成员函数显式调用实现。假如我们不想保存值,而想保存异常,就不应调用set_value()
,而应调用成员函数set_exception()
。
2.5、多个线程一起等待
若我们在多个线程上访问同一个std::future
对象,而不采取额外的同步措施,将引发数据竞争并导致未定义的行为。std::future
仅能移动构造和移动赋值,而std::shared_future
的实例则能复制出副本。但即便改用std::shared_future
,同一个对象的成员函数却依然没有同步,若我们从多个线程访问同一个对象,首选方式是:向每个线程传递std::shared_future
对象的副本,它们为各线程独有,这些副本就作为各线程的内部数据,由标准库正确地同步,可以安全地访问。
future和promise都具备成员函数valid()
,用于判别异步状态是否有效。std::shared_future
的实例依据std::future
的实例构造而得,前者所指向的异步状态由后者决定。因为std::future
对象独占异步状态,所以若要按默认方式构造std::shared_future
对象,则须用std::move
向其默认构造函数传递归属权。
std::promise<int> p;
std::future<int> f(p.get_future());
assert(f.valid());
std::shared_future<int> sf(std::move(f));
assert(!f.valid());
assert(sf.valid());
std::future
具有成员函数share()
,直接创建新的std::shared_future
对象,并向它转移归属权。
std::promise<std::map<SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator> p;
auto sf = p.get_future().share();
3、限时等待
有两种超时机制可供选择:一是延迟超时,线程根据指定的时长而继续等待;二是绝对超时,在某个特定时间点来临之前,线程一直等待。大部分等待函数都有变体,专门处理这两种机制的超时。处理延迟超时的函数变体以_for
为后缀,而处理绝对超时的函数变体以_until
为后缀。例如std::condition_variable
的成员函数wait_for()
和wait_until()
。