tlx
Loading...
Searching...
No Matches
parallel_mergesort.hpp
Go to the documentation of this file.
1/*******************************************************************************
2 * tlx/sort/parallel_mergesort.hpp
3 *
4 * **EXPERIMENTAL** Parallel multiway mergesort **EXPERIMENTAL**
5 *
6 * Copied and modified from STXXL, see http://stxxl.org, which itself extracted
7 * it from MCSTL http://algo2.iti.uni-karlsruhe.de/singler/mcstl/. Both are
8 * distributed under the Boost Software License, Version 1.0.
9 *
10 * Part of tlx - http://panthema.net/tlx
11 *
12 * Copyright (C) 2007 Johannes Singler <singler@ira.uka.de>
13 * Copyright (C) 2014-2018 Timo Bingmann <tb@panthema.net>
14 *
15 * All rights reserved. Published under the Boost Software License, Version 1.0
16 ******************************************************************************/
17
18#ifndef TLX_SORT_PARALLEL_MERGESORT_HEADER
19#define TLX_SORT_PARALLEL_MERGESORT_HEADER
20
21#include <algorithm>
22#include <functional>
23#include <thread>
24#include <utility>
25
26#if defined(_OPENMP)
27#include <omp.h>
28#endif
29
32#include <tlx/simple_vector.hpp>
34
35namespace tlx {
36
37//! \addtogroup tlx_sort
38//! \{
39
40namespace parallel_mergesort_detail {
41
42//! Subsequence description.
43template <typename DiffType>
44struct PMWMSPiece {
45 //! Begin of subsequence.
46 DiffType begin;
47 //! End of subsequence.
48 DiffType end;
49};
50
51/*!
52 * Data accessed by all threads.
53 *
54 * PMWMS = parallel multiway mergesort
55 */
56template <typename RandomAccessIterator>
58 using ValueType =
59 typename std::iterator_traits<RandomAccessIterator>::value_type;
60 using DiffType =
61 typename std::iterator_traits<RandomAccessIterator>::difference_type;
62
63 //! Input begin.
64 RandomAccessIterator source;
65 //! Start indices, per thread.
67
68 /** Storage in which to sort. */
70 /** Samples. */
72 /** Offsets to add to the found positions. */
74 /** PMWMSPieces of data to merge \c [thread][sequence] */
76
77 explicit PMWMSSortingData(size_t num_threads)
78 : starts(num_threads + 1),
79 temporary(num_threads),
80 offsets(num_threads - 1),
81 pieces(num_threads)
82 { }
83};
84
85/*!
86 * Select samples from a sequence.
87 * \param sd Pointer to sorting data struct. Result will be placed in \c sd->samples.
88 * \param num_samples Number of samples to select.
89 * \param iam my thread number
90 * \param num_threads number of threads in group
91 */
92template <typename RandomAccessIterator, typename DiffType>
94 DiffType& num_samples,
95 size_t iam,
96 size_t num_threads) {
97
98 num_samples = parallel_multiway_merge_oversampling * num_threads - 1;
99
100 simple_vector<DiffType> es(num_samples + 2);
102 sd->starts[iam + 1] - sd->starts[iam],
103 static_cast<size_t>(num_samples + 1), es.begin());
104
105 for (DiffType i = 0; i < num_samples; i++) {
106 sd->samples[iam * num_samples + i] = sd->source[sd->starts[iam] + es[i + 1]];
107 }
108}
109
110/*!
111 * PMWMS code executed by each thread.
112 * \param sd Pointer to sorting data struct.
113 * \param iam my thread number
114 * \param num_threads number of threads in group
115 * \param barrier thread barrier from main function
116 * \param comp Comparator.
117 * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
118 */
119template <bool Stable, typename RandomAccessIterator, typename Comparator>
121 size_t iam,
122 size_t num_threads,
123 ThreadBarrierMutex& barrier,
124 Comparator& comp,
126 using ValueType =
127 typename std::iterator_traits<RandomAccessIterator>::value_type;
128 using DiffType =
129 typename std::iterator_traits<RandomAccessIterator>::difference_type;
130
131 // length of this thread's chunk, before merging
132 DiffType length_local = sd->starts[iam + 1] - sd->starts[iam];
133
134 using SortingPlacesIterator = ValueType *;
135
136 // sort in temporary storage, leave space for sentinel
137 sd->temporary[iam] = static_cast<ValueType*>(
138 ::operator new (sizeof(ValueType) * (length_local + 1)));
139
140 // copy there
141 std::uninitialized_copy(sd->source + sd->starts[iam],
142 sd->source + sd->starts[iam] + length_local,
143 sd->temporary[iam]);
144
145 // sort locally
146 if (Stable)
147 std::stable_sort(sd->temporary[iam],
148 sd->temporary[iam] + length_local, comp);
149 else
150 std::sort(sd->temporary[iam],
151 sd->temporary[iam] + length_local, comp);
152
153 // invariant: locally sorted subsequence in sd->temporary[iam],
154 // sd->temporary[iam] + length_local
155
156 if (mwmsa == MWMSA_SAMPLING)
157 {
158 DiffType num_samples;
159 determine_samples(sd, num_samples, iam, num_threads);
160
161 barrier.wait(
162 [&]() {
163 std::sort(sd->samples.begin(), sd->samples.end(), comp);
164 });
165
166 for (size_t s = 0; s < num_threads; s++)
167 {
168 // for each sequence
169 if (num_samples * iam > 0)
170 sd->pieces[iam][s].begin =
171 std::lower_bound(sd->temporary[s],
172 sd->temporary[s] + sd->starts[s + 1] - sd->starts[s],
173 sd->samples[num_samples * iam],
174 comp)
175 - sd->temporary[s];
176 else
177 // absolute beginning
178 sd->pieces[iam][s].begin = 0;
179
180 if ((num_samples * (iam + 1)) < (num_samples * num_threads))
181 sd->pieces[iam][s].end =
182 std::lower_bound(sd->temporary[s],
183 sd->temporary[s] + sd->starts[s + 1] - sd->starts[s],
184 sd->samples[num_samples * (iam + 1)],
185 comp)
186 - sd->temporary[s];
187 else
188 // absolute end
189 sd->pieces[iam][s].end = sd->starts[s + 1] - sd->starts[s];
190 }
191 }
192 else if (mwmsa == MWMSA_EXACT)
193 {
194 barrier.wait();
195
196 simple_vector<std::pair<SortingPlacesIterator,
197 SortingPlacesIterator> > seqs(num_threads);
198
199 for (size_t s = 0; s < num_threads; s++)
200 seqs[s] = std::make_pair(
201 sd->temporary[s],
202 sd->temporary[s] + sd->starts[s + 1] - sd->starts[s]);
203
204 simple_vector<SortingPlacesIterator> offsets(num_threads);
205
206 // if not last thread
207 if (iam < num_threads - 1)
208 multisequence_partition(seqs.begin(), seqs.end(),
209 sd->starts[iam + 1], offsets.begin(), comp);
210
211 for (size_t seq = 0; seq < num_threads; seq++)
212 {
213 // for each sequence
214 if (iam < (num_threads - 1))
215 sd->pieces[iam][seq].end = offsets[seq] - seqs[seq].first;
216 else
217 // absolute end of this sequence
218 sd->pieces[iam][seq].end = sd->starts[seq + 1] - sd->starts[seq];
219 }
220
221 barrier.wait();
222
223 for (size_t seq = 0; seq < num_threads; seq++)
224 {
225 // for each sequence
226 if (iam > 0)
227 sd->pieces[iam][seq].begin = sd->pieces[iam - 1][seq].end;
228 else
229 // absolute beginning
230 sd->pieces[iam][seq].begin = 0;
231 }
232 }
233
234 // offset from target begin, length after merging
235 DiffType offset = 0, length_am = 0;
236 for (size_t s = 0; s < num_threads; s++)
237 {
238 length_am += sd->pieces[iam][s].end - sd->pieces[iam][s].begin;
239 offset += sd->pieces[iam][s].begin;
240 }
241
242 // merge directly to target
243
244 simple_vector<std::pair<SortingPlacesIterator,
245 SortingPlacesIterator> > seqs(num_threads);
246
247 for (size_t s = 0; s < num_threads; s++)
248 {
249 seqs[s] = std::make_pair(
250 sd->temporary[s] + sd->pieces[iam][s].begin,
251 sd->temporary[s] + sd->pieces[iam][s].end);
252 }
253
254 multiway_merge_base<Stable, /* Sentinels */ false>(
255 seqs.begin(), seqs.end(),
256 sd->source + offset, length_am, comp);
257
258 barrier.wait();
259
260 operator delete (sd->temporary[iam]);
261}
262
263} // namespace parallel_mergesort_detail
264
265//! \name Parallel Sorting Algorithms
266//! \{
267
268/*!
269 * Parallel multiway mergesort main call.
270 *
271 * \param begin Begin iterator of sequence.
272 * \param end End iterator of sequence.
273 * \param comp Comparator.
274 * \param num_threads Number of threads to use.
275 * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
276 * \tparam Stable Stable sorting.
277 */
278template <bool Stable,
279 typename RandomAccessIterator, typename Comparator>
281 RandomAccessIterator begin,
282 RandomAccessIterator end,
283 Comparator comp,
284 size_t num_threads = std::thread::hardware_concurrency(),
286
287 using namespace parallel_mergesort_detail;
288
289 using DiffType =
290 typename std::iterator_traits<RandomAccessIterator>::difference_type;
291
292 DiffType n = end - begin;
293
294 if (n <= 1)
295 return;
296
297 // at least one element per thread
298 if (num_threads > static_cast<size_t>(n))
299 num_threads = static_cast<size_t>(n);
300
301 PMWMSSortingData<RandomAccessIterator> sd(num_threads);
302 sd.source = begin;
303
304 if (mwmsa == MWMSA_SAMPLING) {
305 sd.samples.resize(
306 num_threads * (parallel_multiway_merge_oversampling * num_threads - 1));
307 }
308
309 for (size_t s = 0; s < num_threads; s++)
310 sd.pieces[s].resize(num_threads);
311
312 DiffType* starts = sd.starts.data();
313
314 DiffType chunk_length = n / num_threads, split = n % num_threads, start = 0;
315 for (size_t i = 0; i < num_threads; i++)
316 {
317 starts[i] = start;
318 start += (i < static_cast<size_t>(split))
319 ? (chunk_length + 1) : chunk_length;
320 }
321 starts[num_threads] = start;
322
323 // now sort in parallel
324
325 ThreadBarrierMutex barrier(num_threads);
326
327#if defined(_OPENMP)
328#pragma omp parallel num_threads(num_threads)
329 {
330 size_t iam = omp_get_thread_num();
331 parallel_sort_mwms_pu<Stable>(
332 &sd, iam, num_threads, barrier, comp, mwmsa);
333 }
334#else
335 simple_vector<std::thread> threads(num_threads);
336 for (size_t iam = 0; iam < num_threads; ++iam) {
337 threads[iam] = std::thread(
338 [&, iam]() {
339 parallel_sort_mwms_pu<Stable>(
340 &sd, iam, num_threads, barrier, comp, mwmsa);
341 });
342 }
343 for (size_t i = 0; i < num_threads; i++) {
344 threads[i].join();
345 }
346#endif // defined(_OPENMP)
347}
348
349/*!
350 * Parallel multiway mergesort.
351 *
352 * \param begin Begin iterator of sequence.
353 * \param end End iterator of sequence.
354 * \param comp Comparator.
355 * \param num_threads Number of threads to use.
356 * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
357 */
358template <typename RandomAccessIterator,
359 typename Comparator = std::less<
360 typename std::iterator_traits<RandomAccessIterator>::value_type> >
362 RandomAccessIterator begin,
363 RandomAccessIterator end,
364 Comparator comp = Comparator(),
365 size_t num_threads = std::thread::hardware_concurrency(),
367
368 return parallel_mergesort_base</* Stable */ false>(
369 begin, end, comp, num_threads, mwmsa);
370}
371
372/*!
373 * Stable parallel multiway mergesort.
374 *
375 * \param begin Begin iterator of sequence.
376 * \param end End iterator of sequence.
377 * \param comp Comparator.
378 * \param num_threads Number of threads to use.
379 * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
380 */
381template <typename RandomAccessIterator,
382 typename Comparator = std::less<
383 typename std::iterator_traits<RandomAccessIterator>::value_type> >
385 RandomAccessIterator begin,
386 RandomAccessIterator end,
387 Comparator comp = Comparator(),
388 size_t num_threads = std::thread::hardware_concurrency(),
390
391 return parallel_mergesort_base</* Stable */ true>(
392 begin, end, comp, num_threads, mwmsa);
393}
394
395//! \}
396//! \}
397
398} // namespace tlx
399
400#endif // !TLX_SORT_PARALLEL_MERGESORT_HEADER
401
402/******************************************************************************/
Simpler non-growing vector without initialization.
iterator begin() noexcept
return mutable iterator to first element
Implements a thread barrier using mutex locking and condition variables that can be used to synchroni...
void wait(Lambda lambda=Lambda())
Waits for n threads to arrive.
size_t parallel_multiway_merge_oversampling
default oversampling factor for parallel_multiway_merge
void multisequence_partition(const RanSeqs &begin_seqs, const RanSeqs &end_seqs, const RankType &rank, RankIterator begin_offsets, Comparator comp=Comparator())
Splits several sorted sequences at a certain global rank, resulting in a splitting point for each seq...
RandomAccessIterator3 multiway_merge_base(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT)
Sequential multi-way merging switch.
MultiwayMergeSplittingAlgorithm
Different splitting strategies for sorting/merging: by sampling, exact.
void parallel_mergesort_base(RandomAccessIterator begin, RandomAccessIterator end, Comparator comp, size_t num_threads=std::thread::hardware_concurrency(), MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT)
Parallel multiway mergesort main call.
void parallel_mergesort(RandomAccessIterator begin, RandomAccessIterator end, Comparator comp=Comparator(), size_t num_threads=std::thread::hardware_concurrency(), MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT)
Parallel multiway mergesort.
void stable_parallel_mergesort(RandomAccessIterator begin, RandomAccessIterator end, Comparator comp=Comparator(), size_t num_threads=std::thread::hardware_concurrency(), MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT)
Stable parallel multiway mergesort.
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
DiffTypeOutputIterator equally_split(DiffType n, size_t p, DiffTypeOutputIterator s)
Split a sequence into parts of almost equal size.
void determine_samples(PMWMSSortingData< RandomAccessIterator > *sd, DiffType &num_samples, size_t iam, size_t num_threads)
Select samples from a sequence.
void parallel_sort_mwms_pu(PMWMSSortingData< RandomAccessIterator > *sd, size_t iam, size_t num_threads, ThreadBarrierMutex &barrier, Comparator &comp, MultiwayMergeSplittingAlgorithm mwmsa)
PMWMS code executed by each thread.
typename std::iterator_traits< RandomAccessIterator >::difference_type DiffType
simple_vector< DiffType > starts
Start indices, per thread.
simple_vector< ValueType * > temporary
Storage in which to sort.
simple_vector< DiffType > offsets
Offsets to add to the found positions.
typename std::iterator_traits< RandomAccessIterator >::value_type ValueType
simple_vector< simple_vector< PMWMSPiece< DiffType > > > pieces
PMWMSPieces of data to merge [thread][sequence].