The calling thread (in this case main) does not wait for the thread t.
But when the main thread exists all other threads are destroyed.
Therefore the created thread might not have enough time to perform its job.
You can see these two cases by defining and undefining WAIT in the code.
In class exercise 0
Write two function, each running in its own thead.
The first read_input keeps reading input from std::cin until the user types "quit".
The second one,read_file reads the content of (a one line) file.
To simulate a time consuming I/O operations the read_file function should sleep for 10 seconds.
1.Create a text file in the same directory as your VC++ solution, call it "input.txt"
write a one liner in it.
#include<iostream>#include<fstream>#include<thread>#include<chrono>voidread_input(){
std::string s;
std::cout << "enter a string. Type 'quit' to quit\n";
while (s != "quit") {
std::cin >> s;
std::cout << "user input= " << s << "\n";
}
}
The statement std::cout<<"main thread is done\n" is never reached.
This is because when the destructor of a thread object is called (i.e. ~std::thread()) it calls std::terminate()if the thread is joinable.
Calling std::thread::join() or std::thread::detach() make the thread not joinable and therefore does not call std::terminate().
In the above example, t.join() is never reached because myf() throws an exception.
Therefore t.~thread() calls std::terminate().
One way to guard against such a situation to use the Resource acquisition is initialization (RAII) technique.
We will see more of this technique when we study mutexes and locks
but for now it is sufficient to say that we construct an object wrapper around the thread so that it automatically calls join() when it is destroyed.
Thread guard
we define a class thread_guard that automatically calls join when its destructor is called.
Since the destructor is called when the object goes out of scope this guarantees that the created thread will be joined even when the scope that created the thread takes an unexpected flow due to exceptions.
In this exercise we use two threads to generate two large random prime numbers.
We compare the performance with a sequential version where the generator is called twice in a row.
First we write the random number generator and the selection of primes
#include<iostream>#include<thread>#include<random>/* a simple function to sample large random numbers */std::random_device rd;
constint lowest=4000000;
constint highest=8000000;
std::uniform_int_distribution<int> dist(lowest, highest);
inlineintget_rand(){
return dist(rd);
}
boolis_prime(int value){
/* this is NOT the most efficient way of
* determining if a number is prime */if (value <= 1)returntrue;
for (int i = 2; i < value; ++i)
if (value % i == 0) returnfalse;
returntrue;
}
The above allows us to generate a random number between lowest and highest then test if it is a prime number.
Note that is_prime() is not the most efficient way of testing a prime.
In any case we prefer this so that each thread has sufficiently large computations to perform. 1.To have enough probability to select a prime we repeat the above n times until we find a prime number as shown below.
/* A function that samples random numbers until it finds a prime.
* Gives up and returns 0 after n tries
*/intget_large_prime(int n){
int large;
for (int i = 0; i < n; ++i) {
large = get_rand();
if (is_prime(large))return large;
}
return0;
}
We call the above code twice, sequentially, and measure the duration.
To perform the two operations in parallel, since we have multiple cores, we run get_large_prime in two separate threads.
Since a std::thread object ignores the return value of the top level function we must pass a variable by reference to retrieve the prime number.
Therefore we overload get_large_prime as follows:
/* to be used with threads */voidget_large_prime(int n,int& result){
int large;
result=0;
for (int i = 0; i < n; ++i) {
large = get_rand();
if (is_prime(large)){
result=large;
return ;
}
}
return ;
}
Main function
intmain(){
constint num_tries = 300;
int v1,v2;
Duration d;
std::cout << " starting two threads \n";
auto start = std::chrono::high_resolution_clock::now();
/* start two threads here and call the
* the function you wrote to get a large prime
*/
TIMEIT(d,
std::thread t1(get_large_prime,num_tries, std::ref(v1));
std::thread t2(get_large_prime,num_tries, std::ref(v2));
t1.join(); t2.join();
)
std::cout << "values found " << v1 << " and " << v2 << std::endl;
std::cout << "two threads time = " << d.count() << std::endl;
}
Need for template parameters
Unfortunately, with threads the compiler doesn't know which version to use.
If you run the above code here the compiler will give an error similar to "use of deleted function operator=" because it is deleted for std::tread objects.
One solution would be to specialize the template for Container, i.e. add the following code
In class exercise 2 (Note on compiler optimization)
In many of the examples we compare the performance of single vs multi threaded versions.
Since our focus is on performance we don't use the result of the single threaded version unless we need to check the correctness of the multi-threaded version.
This approach sometimes leads to surprising results because of compiler optimizations.
-- In particular, dead code elimination.
The basic idea is this
if a variable or the output of a function is not used, the compiler might not include the function in the executable. - - In that case the duration of the function call is (close to) zero.
Let us illustrate with an example of sequential code.
The code
intmain(){
std::random_device rd;
std::uniform_int_distribution<int> dist(1,1000);
constint n=5000000;
std::vector<int> v(n);
std::generate(v.begin(),v.end(),[&](){return dist(rd);});
auto start=std::chrono::high_resolution_clock::now();
bool result=std::any_of(v.begin(),v.end(),
[](int x){return x==2000;}
);
auto end=std::chrono::high_resolution_clock::now();
auto duration=
std::chrono::duration<double,std::milli>(end-start);
std::cout<<"duration = "<<duration.count()<<"\n";
std::cout<<"value found ="<<std::boolalpha<<result<<"\n";
}
The duration is either close to zero or about 2.3 milliseconds
The value depends on if we remove the last line that prints the value of result or not.
The reason is that, when we don't print (i.e. not use) the variable result the compiler removes it from the code.
This can be seen below in the assembly output between the two calls to now().
Figure 1 shows the assembly output when the last line is not included. Note the absence of instructions between the two calls to now().
Figure 2 shows the assembly output when the last line is included. Note the number of instructions between the two calls to now().
In class exercise 3
In this exercise we will build a multi-threaded program to determine if a large vector of integers contains a certain value.
This is the parallel version of STL std::any_of( we will see later that STL algorithms can also be parallelized easily)
The basic idea is to divide the range into as many blocks as there are hardware threads.
For each block, a thread, independently, searches that block for the value.
When all the threads are done, if any of them as found a value in its own block we conclude that the vector contains that value.
To collect the results of threads we define std::vector<bool> results(num_threads). thread i, will store its result in results[i].
The function contains takes a starting and ending iterators denoting a particular block of the vector.
The reason we use a bool iterator for the results instead of, say, passing result[i] by reference
is that the implementation of std::vector<bool> is a specialization of std::vector.
In particular operator[] returns a value not a reference like the general implementation.
In the example below we search for a non-existent value to maximize the running time.
/* Given a vector v of n integers:
* Write a function to test if v
* contains a certain number at least once.
* Use as many threads as your hardware permits.
*/
```cpp
template<typename Iter,typename T,typename IterBool>
voidcontains(Iter first, Iter last, T value,IterBool result){
Iter itr = first;
*result = false;
while (itr != last) {
if (*itr == value) {
*result = true;
return;
}
++itr;
}
return ;
}
Main function
intmain(){
/* choose size power of two */constint n = 2 << 25;
std::random_device rd;
std::uniform_int_distribution<int> dist(1, 10000);
/* choosing a value that is NOT in the vector*/int value = 12000;
std::vector<int> v(n);
std::generate(v.begin(), v.end(), [&]() {return dist(rd); });
using Iter = std::vector<int>::iterator;
/* using the sequential STL std::any_of() */bool res;
Duration d;
TIMEIT(d,
bool res = std::any_of(v.begin(), v.end(), [=](int x) {return x == value; });
)
std::cout << "sequential result = " << std::boolalpha << res << std::endl;
std::cout<<"duration ="<<d.count()<<"\n";
Main continued
int num_threads = std::thread::hardware_concurrency();
int block_size = n / num_threads;
/* store the result of each block in vector results */std::vector<bool> results(num_threads);
std::vector<std::thread> mythreads;
Iter begin = v.begin();
auto itr = results.begin();
TIMEIT(d,
for (longlong i = 0; i < num_threads; ++i) {
Iter first = begin + i * block_size;
Iter last = begin + (i + 1) * block_size;
mythreads.push_back(
std::thread(
contains<Iter, int, decltype(itr)>, first, last, value, itr)
);
++itr;
}
for (auto& t : mythreads)
t.join();
/* combine the results of blocks */auto ret=std::find(results.begin(), results.end(),true);
std::cout<<(ret==results.end()?"false":"true")<<"\n";
std::cout<<"duration ="<<d.count()<<"\n";
)
Example: parallel accumulate
In its simplest form, std::accumulate adds( accumulates) all the values in a given range, including a supplied initial value.
The default operation is std::plus{}.
the first call to std::accumulate below can be written as
std::accumulate(v.begin(),v.end(),0,std::plus{})
We can supply any binary operator
for example, in the second call below, we use std::multiplies.
NOTE: if you are using a pre c++17 compiler you might need to supply template parameter for std::multiplies{}, e.g. std::multiplies<int>{}.
We modify the my_accumulate function to run its operations in parallel.
The basic idea is to divide the range into m subranges, where m is the number of threads we will run.
Each thread will accumulate the values in its own range.
when all the threads are done, the main thread will add the partial results.
Note that even though the parallel_acc function below applies
an arbitrary function f on the subranges, the final result is the sum of those partial results.
Parallel accumulate
using Long = unsignedlonglong;
/* assume the vector size is a power of 2 */template <int NUMT=0,typename Iter,typename T,typename Func>
T parallel_acc(Iter first, Iter last, T init,Func f) {
Long const length = std::distance(first, last);
Long num_threads = NUMT==0?std::thread::hardware_concurrency():NUMT;
std::cout << "Number of threads = " << num_threads << "\n";
Long block_size = length / num_threads;
std::vector<T> results(num_threads );
std::vector<std::thread> threads(num_threads );
Iter block_start, block_end;
for (Long i = 0; i < num_threads; ++i) {
block_start = first + i * block_size;
block_end = first+(i+1)*block_size;
threads[i] = std::thread(
[=](Iter s, Iter e, T& r) {
r = std::accumulate(s, e, 0.0,f);
},
block_start,block_end,std::ref(results[i])
);
}
for (auto& t : threads)
t.join();
returnstd::accumulate(results.begin(), results.end(), init);
}
In the above we have used the property that we can compute the numerator and denominator of a term based on
its previous values as follows
Since all the terms are odd so numerator=previous numerator * x *x
The denominator contains the factorial of odd numbers so denominator=previous denominator* (2j+2)(2j+3).
For example 5!=3! * 4 * 5 and j=2 so 5!=3! * (2j+2)(2j+3)
The sign of a term is the negation of the previous term
Computing the sin for a range
We modify the previous code to handle the case where we want to compute the sin for a range of elements.
voidseq_sin(int n,int terms, float* a, float* b){
for (int i = 0; i < n; ++i) {
float x = a[i];
float value = x;
float num = x * x * x;
float denom = 6;
int sign = -1;
for (int j = 1; j <= terms; ++j) {
value += sign * num / denom;
num *= x * x;
denom *= (2 * j + 2) * (2 * j + 3);
sign *= -1;
}
b[i] = value;
}
}
Multi-threaded version
We can use multiple threads to compute the sin, or any function for that matter, for a range faster by subdividing the range between threads as we have done so far.
template<typename F>
voidmultit_sin(int terms, float* x, float* y,F f){
int num_threads = std::thread::hardware_concurrency();
std::vector<std::thread> mythreads;
int block_size = n / num_threads;
for (int i = 0; i < num_threads; ++i) {
mythreads.push_back(
std::thread(f, block_size, terms, x, y)
);
x += block_size;
y += block_size;
}
for (int i = 0; i < num_threads; ++i)
mythreads[i].join();
}
USing AVX512
The Advanced Vector Extension (AVX) are extensions to the x86 instruction set which allow the CPU to perform a
single instruction on multiple data (SIMD).
In this section we use the AVX512 which allow us to compute a single instruction on 512 bits of data.
For example, for a float which is typically 4 bytes, an AVX can compute the same operation on 16 floats at the same time.
It is important to note that, in our case, we don't expect a x16 speedup since there are other factors that come into play like memory transfers and cache invalidation.
#include<immintrin.h>voidvector_sin(int n,int terms, float* a, float* b){
for (int k = 0; k < LOAD; ++k) {
for (int i = 0; i < n; i += 16)
{
__m512 x = _mm512_load_ps(&a[i]);
__m512 value = x;
__m512 numer = _mm512_mul_ps(x, _mm512_mul_ps(x, x));//x^3
__m512 denom = _mm512_set1_ps(6);// 3 factorialint sign = -1;
for (int j = 1; j <= terms; j++)
{
// value += sign * numer / denom
__m512 tmp = _mm512_div_ps(_mm512_mul_ps(_mm512_set1_ps(sign), numer), denom);
value = _mm512_add_ps(value, tmp);
numer = _mm512_mul_ps(numer, _mm512_mul_ps(x, x));
denom = _mm512_mul_ps(denom, _mm512_set1_ps((2 * j + 2) * (2 * j + 3)));
sign *= -1;
}
_mm512_store_ps(&b[i], value);
}
}
}
Combining both
Since the multi-threaded version can accept any function we can passed the AVX512 function.
In fact this combination gives us the biggest speedup.
Without going into too much details, since the computation is split over multiple cores, cache invalidation is minimized.