## C++11: Multi-core Programming – PPL Parallel Aggregation Explained

In the first part of this series we looked at general multi-threading and multi-core programming concepts without getting into the meat of any real problems, and in the second part we looked at the theory and application of the parallel aggregation pattern using either C++11’s standard thread library or Boost using classical parallel programming techniques. In this part we shall solve exactly the same summation problem from part 2 (adding all the numbers from 0 to 1 billion) but use the new Parallel Patterns Library (PPL) to do so. We will first look at a direct port using classical parallel programming techniques, then find out how to leverage new language features in C++11 which are employed by PPL to simplify the code and write parallel algorithms using an entirely new paradigm. We shall also look at some fundamental differences between PPL and other multi-threading/multi-core libraries, the pros and cons, and some gotchas to be aware of. This tutorial will assume you have at least read the non-coding sections of part 2 of this series, so that you understand the problem we are trying to solve, the underlying algorithm and some of the problems that can crop up. Once again, we will use a Core i7-2600K CPU with 6GB of DDR-1600 RAM when benchmarking performance. This article is primarily focused on Windows, but a related library called PPLx exists which provides very similar functionality for Linux, so any non-Windows code below will apply equally to PPLx.

#### The Concurrency Runtime and PPL

New versions of Windows ship with a sub-system known as the *Concurrency Runtime* or *ConcRT* for short. ConcRT is a thin layer that sits between the operating system and applications and provides services specifically relating to multi-core programming. Various tools and libraries are provided with new versions of Visual Studio allowing us to take advantage of ConcRT; here we will look at just one, the *Parallel Patterns Library* or PPL for short. PPL is an abstraction layer which sits on top of ConcRT and leverages the new syntax of C++11 to hide much of the underlying complexity. While the feature set of ConcRT and PPL is vast, there are a few key advantages worth mentioning:

- PPL’s programming paradigm does away with synchronization objects such as critical sections and mutexes almost entirely
- PPL allows you to write parallel code without having to manage the creation and tear-down of threads yourself
- PPL allows (in many cases) sequential algorithms to be spread across multiple cores without having to re-design the algorithm significantly

ConcRT differs from traditional paradigm of leaving it up to the application to distribute specific-sized parcels of work across a specific number of threads by using something called a *work-stealing algorithm*. This is essentially a form of load balancing: consider the case where you allocate 5 units of work to thread A and 2 units to thread B. Traditionally, if we assume thread B finishes first, we end up waiting on thread A. ConcRT will automatically detect when a thread finishes its work and transfer (steal) work from another thread with a remaining backlog (in this case, thread B will steal work from thread A) to minimize the amount of waiting and thread idling, and therefore minimize the total execution time. In addition, this frees the application from having to compute or estimate the optimum number of threads or cores to use: the load balancing in ConcRT will take care of it automatically. If you do need finer-grained control, you can use *partitioning strategies* which are discussed below. PPL also provides objects which can be used to store and combine partial results from a parallel aggregation task in a thread-safe manner. More on this below.

#### Using Task Groups to perform Parallel Aggregation the old-fashioned way

As we explored in part 2 of this series, the classical way to code for parallel aggregation of a single large computational task is:

- determine how many cores to use
- calculate the size of sub-task to allocate to each core (splitting the main task up into several smaller sub-tasks)
- start each sub-task on a new core/thread
- wait for all the threads to finish
- combine the partial results returned from each thread into the final result

You can accomplish roughly the same feat using *task groups* in PPL, with the proviso that ConcRT will select the number of threads and cores for you. The basic code is very similar to the C++11 example in part 2 and looks like this:

#include <ppl.h> // for PPL (concurrency::task_group) #include <chrono> // for std::chrono::high_resolution_clock #include <cstdint> // for uint64_t #include <iostream> #include <vector> // for std::vector #include <cassert> // for assert #include <concrtrm.h> // for GetProcessorCount() using namespace concurrency; std::vector<uint64_t *> part_sums; int cores_to_use; const int max_sum_item = 1000000000; void per_core_sum(uint64_t *final, int start_val, int sums_to_do) { uint64_t sub_result = 0; for (int i = start_val; i < start_val + sums_to_do; i++) sub_result += i; *final = sub_result; } int main() { for (unsigned int cores_to_use = 1; cores_to_use <= GetProcessorCount(); cores_to_use++) { task_group tasks; part_sums.clear(); for (unsigned int i = 0; i < cores_to_use; i++) part_sums.push_back(new uint64_t(0)); auto start = std::chrono::high_resolution_clock::now(); int sums_per_thread = max_sum_item / cores_to_use; for (int start_val = 0, i = 0; start_val < max_sum_item; start_val += sums_per_thread, i++) { // Lump extra bits onto last thread if work items is not equally divisible by number of threads int sums_to_do = sums_per_thread; if (start_val + sums_per_thread < max_sum_item && start_val + sums_per_thread * 2 > max_sum_item) sums_to_do = max_sum_item - start_val; tasks.run([start_val, sums_to_do, i] { per_core_sum(part_sums[i], start_val, sums_to_do); }); if (sums_to_do != sums_per_thread) break; } tasks.wait(); auto end = std::chrono::high_resolution_clock::now(); std::cout << "Time taken with " << cores_to_use << " core" << (cores_to_use != 1? "s":"") << ": " << (end - start).count() * ((double) std::chrono::high_resolution_clock::period::num / std::chrono::high_resolution_clock::period::den) << std::endl; uint64_t result = 0; std::for_each(part_sums.begin(), part_sums.end(), [&result] (uint64_t *subtotal) { result += *subtotal; }); assert(result == uint64_t(499999999500000000)); for (unsigned int i = 0; i < cores_to_use; i++) delete part_sums[i]; } }

Just 6 lines of code differ from the C++ standard thread library version to make this application work using PPL (highlighted in grey). Let’s walk through it: We start by including *ppl.h* to get access to PPL, and place *using namespace concurrency* at the top of our code to save on typing a lot of *concurrency:: *scope resolution operators later. *per_core_sum* is identical to *do_partial_sum* in the C++11 example in part 2. We’re not using the C++ standard thread library here, so instead of using *std::thread::hardware_concurrency()* to fetch the number of cores, we use *GetProcessorCount()* instead which is defined in *concrtrm.h*. The only other changes are PPL-specific: we get rid of the vector of *std::thread* pointers and replace it with a PPL *task_group* object which we have called *tasks*. The key line which starts new threads is replaced as follows:

tasks.run([start_val, sums_to_do, i] { per_core_sum(part_sums[i], start_val, sums_to_do); });

The* run()* method of *task_group* takes a single parameter-less function with no return value to start on a new thread. By using a C++11 lambda expression, we can capture the values we want to pass as parameters to *per_core_sum* and cause it to be executed with these parameters when the thread starts. The thread details are stored as a *task* object in the task group. Notice that when you group tasks into a task group in this way, the work-stealing algorithm mentioned earlier comes into effect, so if a task in the task group on one thread finishes and there are other tasks remaining, work will be stolen from one of the other threads to keep all threads busy and minimize the total execution time. This is all done automatically for you. The join operation (which waits for each thread to finish) is replaced with:

tasks.wait();

In the version using the C++ thread library, we iterate over all the *std::thread*s and call *join()* on them to wait for them to finish one at a time. Since a PPL task group manages all the threads involved, there is no need for a *for* loop; *wait()* will block until all of the tasks in the task group have finished, then return. The actual algorithm used to apportion the work into equal chunks (besides possibly the last chunk) and the method of aggregating the partial results into a final result is identical in both concept and code to the C++11 standard thread library example in part 2 – refer to that article for the full details.

Performance-wise, changing the use of *std::thread* to PPL tasks results in comparable execution times (see figure 1). If the work unit sizes were more uneven, the work-stealing algorithm in ConcRT would make the task group version slightly faster, but in this case where the work unit sizes are the same, performance is more or less identical to managing the threads directly.

#### Implementing Parallel Aggregation with parallel_for

PPL provides a number of functions such as *parallel_invoke, parallel_for, parallel_for_each, parallel_reduce* and so on to enable common constructs and aggregation operations to be spread over multiple cores automatically. The simplest solution to our summation problem using *parallel_for* looks like this:

#include <iostream> #include <cstdint> #include <ppl.h> using namespace Concurrency; const int max_sum_item = 1000000000; int main() { combinable<uint64_t> part_sums([] { return 0; }); parallel_for(0, max_sum_item, [&part_sums] (int i) { part_sums.local() += i; } ); uint64_t result = part_sums.combine(std::plus<uint64_t>()); if (result != uint64_t(499999999500000000)) throw; }

Pretty cool, huh? We have written the program in a similar way to how we would code it if we were writing a sequential (single-core/single-thread) algorithm, and it’s really short! We have done away with our old *std::vector<uint64_t *> part_sums* which stored the partial results from each thread, and replaced it with an instance PPL’s *combinable* template class. *combinable* is great for several reasons:

- we don’t need to know in advance how many threads will be used
- by using the
*local()*method we can read and write a*thread-private*or*thread-local*instance of the object’s template type parameter (*uint64_t*in this case). We don’t need to worry about synchronization objects, or allocating/releasing memory for partial results, or providing the thread function with pointers to their own partial result storage. We can just call*local()*to access our result storage location. - when a new thread starts, a new instance of the object’s template type parameter is initialized using the function passed in
*combinable*‘s constructor - when all the threads are finished, you can use
*combine()*(or*combine_each()*) with any signature-compatible function to aggregate the final result

In our code above, we initialize each new partial result element with 0 by using a C++11 lambda function with returns 0, and combine the final results using *std::plus*, which adds together (sums) its arguments. Let’s take a closer look at the main calculation code:

parallel_for(0, max_sum_item, [&part_sums] (int i) { part_sums.local() += i; } );

Here is how the equivalent sequential code would look (a standard *for* loop):

uint64_t total = 0; for (int i = 0; i < max_sum_item; i++) { total += i; }

*parallel_for* iterates from the value passed in the first argument (0) to the value passed in the second argument exclusive (*max_sum_item*), calling the function passed in the third argument on each iteration with the current value, just like a standard *for* loop. The difference is that *parallel_for* splits the work over multiple threads/cores, and it is here we can see where the beauty lies: the algorithm inside the loop is exactly the same in both cases, and can be written in-line thanks to C++11 lambda expressions. The only difference is that in the *parallel_for* loop, we add the result to our thread-private partial result rather than the global result. If we did this, we would have to lock the global result with a critical section or mutex prior to each update; the cost of synchronization is so high compared to the complexity of the addition that the ultimate performance would be significantly worse than a single-threaded implementation, as the following code shows:

CRITICAL_SECTION cs_total; ... InitializeCriticalSection(&cs_total); uint64_t total = 0; parallel_for(0, max_sum_item, [&total] (int i) { EnterCriticalSection(&cs_total); total += i; LeaveCriticalSection(&cs_total); } ); DeleteCriticalSection(&cs_total);

On the benchmark machine, the version using the critical section runs approximately **15 times slower** than the version without – see figure 2. Watching CPU resource use during execution also shows that the CPU is only under an approximate 20% load in total, meaning that it is spending 80% of its time stalling and waiting for critical sections to be exited in other threads, whereas the basic *parallel_for* loop without critical sections puts the CPU under the desired 100% load, using all available processing resources.

Unfortunately, even without using a critical section, the performance of the *parallel_for* loop in the example above is very slow compared to our expectations, as figure 2 again shows. While performance on the benchmark machine using the C++11 standard thread library and PPL task groups is comparable (0.08 seconds), and a single-threaded sequential for loop is slower as expected (0.21 seconds), using the *parallel_for* loop takes a whopping 2.4 seconds – over 10 times slower than the single-threaded for loop! What is going on here? Read on to find out.

#### Small loop bodies, large management overheads

The problem with our code is that the loop body is so short and simple and does so little that the vast majority of the processing time is taken up managing overheads:

- function call cost overhead
- using the thread-private storage location (
*part_sums.local()*) - worker thread creation and tear-down

##### Function call cost overhead

The first problem can be solved by making each call to the thread function (the function we pass to *parallel_for*) do more work. Rather than only adding one number to the total, we can parcel up blocks of work similar to in the C++11 standard thread library and PPL task group examples and make each function call do a block of additions. Each thread will then store its own partial result in a *combinable* object and we’ll sum the partial results when all the threads have finished, just like before. Note this somewhat defeats the point of using *parallel_for* to simplify parallel programming because we are still having to re-design the algorithm, but it is only a problem with small loop bodies; more typically used complex code will work just fine and see a good performance benefit using the basic technique in the previous section, without further modification. However, the issue with small loop bodies is a common gotcha so I thought it worth exploring further.

Our first stab at the solution might look like this:

combinable<uint64_t> part_sums([] { return 0; }); int sums_to_do = max_sum_item / GetProcessorCount(); parallel_for(0, max_sum_item, sums_to_do, [&part_sums, sums_to_do] (int start_val) { for (int i = start_val; i < start_val + sums_to_do; i++) part_sums.local() += i; } ); uint64_t result = part_sums.combine(std::plus<uint64_t>());

We have essentially replaced the single addition with a standard *for* loop that performs a series of additions (I have called this kind of construct a ‘chunked parallel_for loop’ in figure 3, as we are splitting the work into chunks). Here we have split the work equally across a number of threads equal to the number of processor cores (as returned by *GetProcessorCount()*), but you can use any number of threads you choose.

Note that we have now lost the flexibility to not worry about how much thread does how much work: if all of the work unit sizes are not the same (for example if *GetProcessorCount()* is not exactly divisible into the problem size, or if you replace the call with some arbitrary number of threads which are not exactly divisible into the problem size), you will get an incorrect result.

Performance-wise, figure 3 shows an average time of about 1.88 seconds for this version – still much slower than the single-threaded version, but about 25% faster than our first attempt at *parallel_for*. It’s a start.

##### Thread-private storage

Every time we use *local()* a function call must be made to get the reference to the thread-private storage in the *combinable* object. We can improve performance by fetching the reference once and storing it in a variable before our *for* loop begins:

parallel_for(0, max_sum_item, sums_to_do, [&part_sums, sums_to_do] (int start_val) { uint64_t &part_result = part_sums.local(); for (int i = start_val; i < start_val + sums_to_do; i++) part_result += i; } );

We call *local()* only once, before the *for *loop, save the reference in *part_result* and use this as the storage target when doing the additions.

Figure 3 shows the dramatic performance gain achieved: at around 0.4 seconds, this is over 4 times faster than the previous version, yet still twice as slow as the single-threaded version!

The problem is that accessing any variable not local to the thread function still takes time – more time than if it was local. The solution therefore, is to not use *part_sums.local()* at all until the partial result is ready, and store a temporary result as a local variable:

parallel_for(0, max_sum_item, sums_to_do, [&part_sums, sums_to_do] (int start_val) { uint64_t part_result = 0; for (int i = start_val; i < start_val + sums_to_do; i++) part_result += i; part_sums.local() += part_result; } );

Figure 3 shows the performance: at 0.085 seconds, it is comparable with the standard thread library and PPL task groups, and therefore runs at the expected speed.

NOTE: It is very important that you **add*** *your temporary partial results to *part_sums.local()* and not assign them directly. This is because the work-stealing algorithm may add extra work (in the form of another call to this function) to a thread after the first run of the *for* loop completes, which means *part_sums.local()* will already be initialized with a value which is part of the total result. If you just assign to* part_sums.local()* directly, you overwrite any previous results calculated in the same thread and may finish with an incorrect total result.

#### Partitioning strategy

Partitioning is the mechanism ConcRT uses to decide how much work of a *parallel_for* or other construct to give to each thread. When you use the basic (non-chunked) *parallel_for* loop for example, ConcRT’s partitioning strategy decides how many iterations of the function will run on how many threads, and this is the reason you don’t need to specify a workload size for each thread individually.

The default strategy is called *auto_partitioner* and splits the work into ranges, and these ranges into sub-ranges, then employs the work-stealing algorithm to move the sub-ranges around to the least backlogged threads, essentially performing load balancing as discussed earlier.

For our chunked version of the code, this has a down-side: we have already calculated the ranges (workload sizes per thread) ourselves and we don’t want or need load balancing. This extra management performed by *auto_partitioner* then just becomes an unnecessary overhead. Even with the non-chunked version, since our loop iterations are so simple, the overhead of load balancing may end up costing more time than it saves.

Fortunately, when you call *parallel_for* you can specify as a final optional argument an alternative partitioning strategy to use instead of *auto_partitioner*. For our purposes, the *static_partitioner* strategy is a good match: it splits the work into ranges and assigns one range to each thread. No work-stealing/load balancing is used, and the ranges are not divided into sub-ranges for this purpose. The documentation states: *“Use this partitioner type when each iteration of a parallel loop performs a fixed and uniform amount of work…”*.

If you want to ensure that each range has a certain minimum size, you can use *simple_partitioner*. *simple_partitioner *takes a single argument which specifies the minimum number of iterations of the *parallel_for* which should run on each thread (ie. the minimum range size). Work-stealing is still performed but only once the specified minimum number of iterations has completed, and the ranges are not split into sub-ranges. Essentially then, work-stealing can only occur if an entire range completes before another one on another thread has started.

**WARNING:** In previous versions of PPL there was no modifiable partitioning strategy. Users were required to install the Concurrency Runtime Sample Pack, include *ConcRTExtras/ppl_extras.h* and use *samples::**parallel_for_fixed* to have the same effect as calling *parallel_for* with *static_partitioner. *The Microsoft documentation is still out-of-date in this regard and you will find many references to *parallel_for_fixed* and related functions on MSDN. The current version of PPL implements partitioning strategy as the optional final argument as a replacement to these functions. Do not use the Concurrency Runtime Sample Pack for partitioning.

Looking at the non-chunked case first:

parallel_for(0, max_sum_item, [&part_sums] (int i) { part_sums.local() += i; }, static_partitioner() );

By simply adding *static_partitioner()* as the final argument to *parallel_for*, we change the partitioning strategy in use.

For the chunked case:

parallel_for(0, max_sum_item, sums_to_do, [&part_sums, sums_to_do] (int start_val) { uint64_t part_result = 0; for (int i = start_val; i < start_val + sums_to_do; i++) part_result += i; part_sums.local() += part_result; }, static_partitioner() );

Figures 4 and 5 show the relative performance of the mentioned partitioner types. In the standard (non-chunked case), we set the range size in *simple_partitioner* such that it is exactly divisible into the number of processor cores available, such that each core is used and given an initially equal amount of work.

In the chunked case, we set the range size to 1 when we use *simple_partitioner*, because we have already divided the work into a number of chunks equal to the number of processor cores and each iteration of the *parallel_for* is meant to run exactly once on each core, in contrast to the standard *parallel_for* use.

As you can see, in our summation problem the *simple_partitioner* wins out in all cases, although both *simple_partitioner* and *static_partitioner* bring performance benefits over the default *auto_partitioner, *particularly in the standard (non-chunked) case.

#### Final results

Figure 6 shows all the PPL solutions we have tried which improve performance over the single-threaded implementation. As you can see, with small loop bodies, the chunking approach discussed above (without using *local()* until the entire chunk of work is finished) is the optimal solution, with performance comparable to the thread library and task groups. *parallel_for* is preferred over task groups (whether chunked or not) for this kind of problem, where the tasks use identical code with different data. Task groups should be used when each task requires different code (ie. several unrelated tasks). Using the simple or static partitioning strategies can lead to additional performance gains, but check during development as your mileage may vary depending on the computations being performed.

Standard (non-chunked) *parallel_for* is preferred where the tasks have more complexity than a single line of code, and in those cases performance should again be comparable to the thread library and task groups.

#### …and across the line

This concludes our exploration of parallel aggregation in PPL. I hope you found it useful!

#### References

MSDN – Overview of the Concurrency Runtime

Microsoft Patterns & Practices – Parallel Loops (including Special Handling of Small Loop Bodies)

Microsoft Patterns & Practices – Parallel Tasks

MSDN – Parallel Algorithms -> Partitioning Work

Love your blog, your articles about ppl.h help me a lot! Thank you!

Hi, this article is great, but I have one question: what is the way to put result of every task into one unordered_map? Other words: how to do return of every task? I am trying to do it since days, but I haven’t idea.