《C++并发编程实战》读书笔记(3):并发操作的同步

1、条件变量

当线程需要等待特定事件发生、或是某个条件成立时,可以使用条件变量std::condition_variable,它在标准库头文件<condition_variable>内声明。

std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread()
{
    while (more_data_to_prepare())
    {
        const data_chunk data = prepare_data();
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);
        data_cond.notify_one();
    }
}
void data_processing_thread()
{
    while (true)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [] { return !data_queue.empty(); });
        data_chunk data = data_queue.front();
        data_queue.pop();
        lk.unlock();
        process(data);
        if (is_last_chunk(data)) { break; }
    }
}

wait()会先在内部调用lambda函数判断条件是否成立,若条件成立则wait()返回,否则解锁互斥并让当前线程进入等待状态。当其它线程调用notify_one()时,当前调用wait()的线程被唤醒,重新获取互斥锁并查验条件,若条件成立则wait()返回(互斥仍被锁住),否则解锁互斥并继续等待。

wait()函数的第二个参数可以传入lambda函数,也可以传入普通函数或可调用对象,也可以不传。

notify_one()唤醒正在等待当前条件的线程中的一个,如果没有线程在等待,则函数不执行任何操作,如果正在等待的线程多于一个,则唤醒的线程是不确定的。notify_all()唤醒正在等待当前条件的所有线程,如果没有正在等待的线程,则函数不执行任何操作。

2、使用future等待一次性事件发生

C++标准程序库有两种future,分别由两个类模板实现,即std::future<>std::shared_future<>,它们的声明位于头文件<future>内。

2.1、从后台任务返回值

由于std::thread没有提供直接回传结果的方法,所以我们使用函数模板std::async()来解决这个问题。std::async()以异步方式启动任务,并返回一个std::future对象,运行函数一旦完成,其返回值就由该对象持有。在std::future对象上调用get()方法时,当前线程就会阻塞,直到std::future准备妥当并返回异步线程的结果。std::future模拟了对异步结果的独占行为,get()仅能被有效调用一次,调用时会对目标值进行移动操作。

int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
    std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
    do_other_stuff();
    std::cout << "The answer is " << the_answer.get() << std::endl;
}

在调用std::async()时,它可以接收附加参数进而传递给任务函数作为其参数,此方式与std::thread的构造函数相同。更多启动异步线程的方法可参考下面的例程:

struct X
{
    void foo(int, const std::string&);
    std::string bar(const std::string&);
};
X x;
auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用p->foo(42, "hello"),p是指向x的指针
auto f2 = std::async(&X::bar, x, "goodbye");    // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本
struct Y
{
    double operator()(double);
};
Y y;
auto f3 = std::async(Y(), 3.141);         // 调用tmpy(3.141),tmpy是由Y()生成的匿名变量
auto f4 = std::async(std::ref(y), 2.718); // 调用y(2.718)
X baz(X&);
std::async(baz, std::ref(x)); // 调用baz(x)

我们还能为std::async()补充一个std::launch类型的参数,来指定采用哪种方式运行:std::launch::deferred指定在当前线程上延后调用任务函数,等到在future上调用了wait()get(),任务函数才会执行;std::launch::async指定必须开启专属的线程,在其上运行任务函数。该参数的还可以是std::launch::deferred | std::launch::async,表示由std::async()的实现自行选择运行方式,这也是这项参数的默认值。

auto f6 = std::async(std::launch::async, Y(), 1.2); // 在新线程上执行
auto f7 = std::async(std::launch::deferred, baz, std::ref(x)); // 在wait()或get()调用时执行
auto f8 = std::async(std::launch::deferred | std::launch::async, baz, std::ref(x)); // 交由实现自行选择执行方式
auto f9 = std::async(baz, std::ref(x));
f7.wait(); // 调用延迟函数

2.2、关联future实例和任务

std::packaged_task<>连结future对象与函数(或可调用对象,下同)。std::packaged_task<>对象在执行任务时,会调用关联的函数,把返回值保存为future的内部数据,并令future准备就绪。若一项庞杂的操作能分解为多个子任务,则可以把它们分别包装到多个std::packaged_task<>实例之中,再传递给任务调度器或线程池,这就隐藏了细节,使任务抽象化,让调度器得以专注处理std::packaged_task<>实例,无需纠缠于形形色色的任务函数。

std::packaged_task<>是类模板,其模板参数是函数签名(例如void()表示一个函数,不接收参数,也没有返回值),传入的函数必须与之相符,即它应接收指定类型的参数,返回值也必须可以转换成指定类型。这些类型不必严格匹配,若某函数接收int类型参数并返回float值,则可以为其构建std::packaged_task<double(double)>的实例,因为对应的类型可以隐式转换。

std::packaged_task<>具有成员函数get_future(),它返回std::future<>实例,该future的特化类型取决于函数签名指定的返回值。std::packaged_task<>还具备函数调用操作符,它的参数取决于函数签名的参数列表。

std::mutex m;
std::deque<std::packaged_task<void()>> tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread()
{
    while (!gui_shutdown_message_received())
    {
        get_and_process_gui_message();
        std::packaged_task<void()> task;
        {
            std::lock_guard<std::mutex> lk(m);
            if (tasks.empty()) { continue; }
            task = std::move(tasks.front());
            tasks.pop_front();
        }
        task();
    }
}
std::thread gui_bg_thread(gui_thread);
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
    std::packaged_task<void()> task(f);
    std::future<void> res = task.get_future();
    std::lock_guard<std::mutex> lk(m);
    tasks.push_back(std::move(task));
    return res;
}

2.3、创建std::promise

有些任务无法以简单的函数调用表达出来,还有一些任务的执行结果可能来自多个部分的代码,这时可以借助std::promise显式地异步求值。配对的std::promisestd::future可以实现下面的工作机制:等待数据的线程在future上阻塞,而提供数据的线程利用相配的std::promise设定关联的值,使future准备就绪。

若需从给定的std::promise实例获取关联的std::future对象,调用前者的成员函数get_future()即可,这与std::package_task一样。promise的值通过成员函数set_value()设置,只要设置好,future即准备就绪,凭借它就能获取该值。如果std::promise在被销毁时仍未曾设置值,保存的数据则由异常代替。

void f(std::promise<int> ps)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    ps.set_value(42);
}

int main()
{
    std::promise<int> ps;
    std::future<int> ft = ps.get_future();
    std::thread t(f, std::move(ps));
    int val = ft.get();
    std::cout << val << std::endl;
    t.join();
}

2.4、将异常保存到future中

若经由std::async()调用的函数抛出异常,则会被保存到future中,future随之进入就绪状态,等到其成员函数get()被调用,存储在内的异常即被重新抛出。std::packaged_task也是同理,若包装的任务函数在执行时抛出异常,则也会被保存到future中,只要调用get(),该异常就会被再次抛出。自然而然,std::promise也具有同样的功能,它通过成员函数显式调用实现。假如我们不想保存值,而想保存异常,就不应调用set_value(),而应调用成员函数set_exception()

2.5、多个线程一起等待

若我们在多个线程上访问同一个std::future对象,而不采取额外的同步措施,将引发数据竞争并导致未定义的行为。std::future仅能移动构造和移动赋值,而std::shared_future的实例则能复制出副本。但即便改用std::shared_future,同一个对象的成员函数却依然没有同步,若我们从多个线程访问同一个对象,首选方式是:向每个线程传递std::shared_future对象的副本,它们为各线程独有,这些副本就作为各线程的内部数据,由标准库正确地同步,可以安全地访问。

future和promise都具备成员函数valid(),用于判别异步状态是否有效。std::shared_future的实例依据std::future的实例构造而得,前者所指向的异步状态由后者决定。因为std::future对象独占异步状态,所以若要按默认方式构造std::shared_future对象,则须用std::move向其默认构造函数传递归属权。

std::promise<int> p;
std::future<int> f(p.get_future());
assert(f.valid());
std::shared_future<int> sf(std::move(f));
assert(!f.valid());
assert(sf.valid());

std::future具有成员函数share(),直接创建新的std::shared_future对象,并向它转移归属权。

std::promise<std::map<SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator> p;
auto sf = p.get_future().share();

3、限时等待

有两种超时机制可供选择:一是延迟超时,线程根据指定的时长而继续等待;二是绝对超时,在某个特定时间点来临之前,线程一直等待。大部分等待函数都有变体,专门处理这两种机制的超时。处理延迟超时的函数变体以_for为后缀,而处理绝对超时的函数变体以_until为后缀。例如std::condition_variable的成员函数wait_for()wait_until()

热门相关:总裁的天价小妻子   高尔夫球女孩的两个洞   厉害的按摩店的商业秘密2   异能特工:军火皇后   我成了暴戾帝君的小娇包