Форум программистов, компьютерный форум CyberForum.ru

Параллельная сортировка, протестировать алгоритм - C++

Восстановить пароль Регистрация
 
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
16.12.2013, 16:11     Параллельная сортировка, протестировать алгоритм #1
Надо протестировать масштабируемость работы алгоритма для параллельной сортировки, у кого более 2 процессорных ядер. На 2-х ядрах производительность почти в два раза выше последовательной сортировки. Ну и плюс выявить недочеты в коде, критика кода.
В вкратце опишу работу алгоритма:
1. разбиваем последовательность итераторов на некоторое ко-тво блоков в зависимости от к-сва ядер (границы блоков храним в векторе);
2. сортируем все блоки параллельно;
3. затем объединяем соседние отсортированные блоки с помощью inplace_merge с последуещем перераспределением границ
Это черновой вариант алгоритма, - буду еще дорабатывать. Цель - максимальная эффективность и правильная масштабируемость.
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#include <iostream>
#include <iterator>
#include <vector>
#include <set>
#include <array>
#include <algorithm>
#include <functional>
#include <numeric>
#include <thread>
#include <future>
#include <chrono>
#include <random>
 
namespace ch = std::chrono;
 
template <typename duration = ch::seconds, typename clock = ch::high_resolution_clock>
class timer
{
    typename clock::time_point m_start;
    typename clock::time_point m_stop;
public:
    timer() : m_start(clock::now()), m_stop(clock::now()) {}
 
    void start(){ m_start = clock::now(); }
 
    typename clock::rep get_time()
    {
        m_stop = clock::now();
        return ch::duration_cast<duration>(m_stop - m_start).count();
    }
};
 
namespace sp
{
    static const std::array<size_t, 10> required_amount{ 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024 };
    static size_t const min_per_thread = 25; // минимальное кол-во эллементов на поток
 
    //получение границ по контейнеру
    std::vector<size_t> get_bounds(size_t length, size_t blocks)
    {
        size_t block = length / blocks;
        std::vector<size_t> bounds(blocks + 1, block);
        bounds[0] = 0;
        std::partial_sum(bounds.begin() + 1, bounds.end(), bounds.begin() + 1);
        bounds.back() = length;
        return bounds;
    }
 
    template <typename Iterator, typename Compare = std::less<typename std::iterator_traits<Iterator>::value_type>>
    void sort_when_two_threads(Iterator first, Iterator last, Compare comp = std::less<typename std::iterator_traits<Iterator>::value_type>())
    {
        size_t const length = std::distance(first, last);
        std::vector<size_t> bounds = get_bounds(length, 2);
 
        std::thread t(std::sort<Iterator, Compare>, first + bounds[0], first + bounds[1], comp);
        std::sort(first + bounds[1], first + bounds[2], comp);
        t.join();
 
        std::inplace_merge(first + bounds[0], first + bounds[1], first + bounds[2], comp);
    }
 
    template <typename Iterator, typename Compare = std::less<typename std::iterator_traits<Iterator>::value_type>>
    void sort_helper(Iterator first, Iterator last, size_t num_threads, Compare comp = std::less<typename std::iterator_traits<Iterator>::value_type>())
    {
        if (num_threads == 1)
        {
            std::sort(first, last, comp);
            return;
        }
 
        //упрощаем логику для 2-х потоков
        if (num_threads == 2)
        {
            sort_when_two_threads(first, last, comp);
            return;
        }
 
        size_t const length = std::distance(first, last);
 
        //для правильной работы алгоритма количество потоков надо сократить до значения меньшего
        // или равного 2 или 4 или 8 и т. д.
        num_threads = *std::find_if(required_amount.rbegin(), required_amount.rend(),
            std::bind2nd(std::less_equal<size_t>(), num_threads));
 
        //размечаем границы по которым будем работать
        std::vector<size_t> bounds = get_bounds(length, num_threads);
 
        //sort step:
        std::vector<std::thread> sort_threads(num_threads);
        for (size_t i = 0; i < sort_threads.size(); ++i)
            sort_threads[i] = std::thread(std::sort<Iterator, Compare>, first + bounds[i], first + bounds[i + 1], comp);
        for (auto& t : sort_threads) t.join();
 
        //merge step:
        std::vector<std::thread> merge_threads;
        std::set<size_t> new_bounds;
        while (num_threads >= 2)
        {
            num_threads /= 2;
            for (size_t i = 0, j = 0; i < num_threads; j += 2, ++i)
            {
                merge_threads.emplace_back(std::inplace_merge<Iterator, Compare>, first + bounds[j],
                    first + bounds[j + 1], first + bounds[j + 2], comp);
 
                new_bounds.insert(bounds[j]);
                new_bounds.insert(bounds[j + 2]);
            }
            bounds.assign(new_bounds.begin(), new_bounds.end());
            new_bounds.clear();
 
            for (auto& t : merge_threads) t.join();
 
            merge_threads.clear();
        }
    }
 
    template <typename Iterator, typename Compare = std::less<typename std::iterator_traits<Iterator>::value_type>>
    void sort_parallel(Iterator first, Iterator last, Compare comp = std::less<typename std::iterator_traits<Iterator>::value_type>())
    {
        size_t const length = std::distance(first, last);
        if (!length)
            return;
 
        size_t const hardware_threads = std::thread::hardware_concurrency();
        if (hardware_threads <= 1 || length <= min_per_thread)
        {
            std::sort(first, last, comp);
            return;
        }
 
        //упрощаем логику для 2-х потоков //FIXME
        if (hardware_threads == 2)
        {
            sort_when_two_threads(first, last, comp);
            return;
        }
 
        //ограничение максимального количесва потоков
        size_t const max_threads = (length + min_per_thread - 1) / min_per_thread;
 
        //финальный выбор кол-ва требуемых потоков
        size_t num_threads = std::min(hardware_threads, max_threads); //NOTE
                                                              
        size_t all_threads = num_threads;
 
        //для правильной работы алгоритма количество потоков надо сократить до значения меньшего
        // или равного 2 или 4 или 8 и т. д.
        num_threads = *std::find_if(required_amount.rbegin(), required_amount.rend(),
            std::bind2nd(std::less_equal<size_t>(), num_threads));
 
        //если аппаратных потоков будет больше чем выбранных до этого, то определяем
        // количество остальных потоков и расставляем новые границы для контейнера
        size_t required_length = length;
        size_t cut_threads = 0;
        bool is_cut_threads = false;
        std::future<void> help_sort;
        if (all_threads > num_threads)
        {
            is_cut_threads = true;
            cut_threads = all_threads - num_threads;
            required_length = (length / all_threads) * num_threads;
            help_sort = std::async(std::launch::async, sort_helper<Iterator, Compare>, first + required_length, last, cut_threads, comp);
        }
 
        //размечаем границы по которым будем работать
        std::vector<size_t> bounds = get_bounds(required_length, num_threads);
 
        //sort step:
        std::vector<std::thread> sort_threads(num_threads - 1);
        for (size_t i = 0; i < sort_threads.size(); ++i)
        {
            sort_threads[i] = std::thread(std::sort<Iterator, Compare>, first + bounds[i], first + bounds[i + 1], comp);
        }
        std::sort(first + bounds[bounds.size() - 2], first + bounds.back(), comp);
        for (auto& t : sort_threads) t.join();
 
        //merge step:
        std::vector<std::thread> merge_threads;
        std::set<size_t> new_bounds;
        while (num_threads >= 4)
        {
            num_threads /= 2;
            for (size_t i = 0, j = 0; i < num_threads; j += 2, ++i)
            {
                merge_threads.emplace_back(std::inplace_merge<Iterator, Compare>, first + bounds[j],
                    first + bounds[j + 1], first + bounds[j + 2], comp);
 
                new_bounds.insert(bounds[j]);
                new_bounds.insert(bounds[j + 2]);
            }
            bounds.assign(new_bounds.begin(), new_bounds.end());
            new_bounds.clear();
 
            for (auto& t : merge_threads) t.join();
 
            merge_threads.clear();
        }
        std::inplace_merge(first, first + bounds[bounds.size() - 2], first + bounds.back(), comp);
        if (is_cut_threads)
        {
            help_sort.wait();
            std::inplace_merge(first, first + required_length, first + length, comp);
        }
    }
} // end namespace sp
 
 
long long fib(long long i) { return i < 3 ? 1 : fib(i - 1) + fib(i - 2); }
 
class long_less_op
{
    int n;
public:
    long_less_op(int n_ = 0) : n(n_) {}
    bool operator < (const long_less_op& other) const
    {
        fib(6);
        return n < other.n;
    }
};
 
int main()
{
    std::default_random_engine gen(ch::system_clock::now().time_since_epoch().count());
 
    std::vector<long_less_op> v(2000000);
    std::iota(v.begin(), v.end(), -1000000);
    std::shuffle(v.begin(), v.end(), gen);
 
 
    auto copy_for_test = v;
    std::cout << std::boolalpha;
    std::cout << "Parallel sorting:\n";
    std::cout << "Is sorted -> " << std::is_sorted(copy_for_test.begin(), copy_for_test.end());
 
    timer<ch::milliseconds> t;
    sp::sort_parallel(copy_for_test.begin(), copy_for_test.end());
    std::cout << "\nTIME: " << t.get_time() << " milliseconds." << std::endl;
    std::cout << "Is sorted -> " << std::is_sorted(copy_for_test.begin(), copy_for_test.end());
 
    copy_for_test = v;
    std::cout << "\n\nSequential sorting:\n";
    std::cout << "Is sorted -> " << std::is_sorted(copy_for_test.begin(), copy_for_test.end());
    t.start();
    std::sort(copy_for_test.begin(), copy_for_test.end());
    std::cout << "\nTIME: " << t.get_time() << " milliseconds." << std::endl;
    std::cout << "Is sorted -> " << std::is_sorted(copy_for_test.begin(), copy_for_test.end());
 
    std::cout << "\n\nDone." << std::endl;
    system("pause");
    return 0;
}
Основано на этой статье: Sorting data in parallel CPU vs GPU (там и фото алгоритма)

Добавлено через 18 часов 4 минуты
up...
Similar
Эксперт
41792 / 34177 / 6122
Регистрация: 12.04.2006
Сообщений: 57,940
16.12.2013, 16:11     Параллельная сортировка, протестировать алгоритм
Посмотрите здесь:

C++ Параллельная работа с документами
Параллельная прямая C++
Быстрая сортировка. Алгоритм C++
Сортировка.Алгоритм Форель C++
C++ Параллельная работа лифтов
После регистрации реклама в сообщениях будет скрыта и будут доступны все возможности форума.
ForEveR
Модератор
Эксперт C++
 Аватар для ForEveR
7927 / 4709 / 318
Регистрация: 24.06.2010
Сообщений: 10,524
Завершенные тесты: 3
16.12.2013, 16:23     Параллельная сортировка, протестировать алгоритм #2
6 ядер.
g++-4.8.

-O3.
Parallel sorting:
Is sorted -> false
TIME: 350 milliseconds.
Is sorted -> true

Sequential sorting:
Is sorted -> false
TIME: 1234 milliseconds.
Is sorted -> true
-O2.
Parallel sorting:
Is sorted -> false
TIME: 579 milliseconds.
Is sorted -> true

Sequential sorting:
Is sorted -> false
TIME: 2210 milliseconds.
Is sorted -> true
-O0.

Parallel sorting:
Is sorted -> false
TIME: 1289 milliseconds.
Is sorted -> true

Sequential sorting:
Is sorted -> false
TIME: 4469 milliseconds.
Is sorted -> true
clang-ом же код не собирается.
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
18.12.2013, 02:06  [ТС]     Параллельная сортировка, протестировать алгоритм #3
Существенно упростил алгоритм, убрал лишнее. Получилось вроде неплохо:
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#include <iostream>
#include <iterator>
#include <vector>
#include <set>
#include <algorithm>
#include <numeric>
#include <thread>
#include <chrono>
#include <random>
 
namespace sp
{
static size_t const min_per_thread = 25; // минимальное кол-во элементов на поток
 
//получение границ по контейнеру
std::vector<size_t> get_bounds(size_t length, size_t threads)
{
   size_t block = length / threads;
   std::vector<size_t> bounds(threads + 1, block);
   bounds[0] = 0;
   std::partial_sum(bounds.begin() + 1, bounds.end(), bounds.begin() + 1);
   bounds.back() = length;
   return bounds;
}
 
template <typename Iterator, typename Compare = std::less<typename std::iterator_traits<Iterator>::value_type> >
void sort_when_two_threads(Iterator first, Iterator last, Compare comp = std::less<typename std::iterator_traits<Iterator>::value_type>())
{
   size_t const length = std::distance(first, last);
   std::vector<size_t> bounds = get_bounds(length, 2);
 
   std::thread t(std::sort<Iterator, Compare>, first + bounds[0], first + bounds[1], comp);
   std::sort(first + bounds[1], first + bounds[2], comp);
   t.join();
 
   std::inplace_merge(first + bounds[0], first + bounds[1], first + bounds[2], comp);
}
 
template <typename Iterator, typename Compare = std::less<typename std::iterator_traits<Iterator>::value_type> >
void sort_parallel(Iterator first, Iterator last, Compare comp = std::less<typename std::iterator_traits<Iterator>::value_type>())
{
   size_t const length = std::distance(first, last);
   if (!length)
      return;
 
   size_t const hardware_threads = std::thread::hardware_concurrency();
   if (hardware_threads <= 1 || length <= min_per_thread)
   {
      std::sort(first, last, comp);
      return;
   }
 
   if (hardware_threads == 2)
   {
      sort_when_two_threads(first, last, comp);
      return;
   }
 
   //ограничение максимального количества потоков
   size_t const max_threads = (length + min_per_thread - 1) / min_per_thread;
 
   //финальный выбор кол-ва требуемых потоков
   size_t num_threads = std::min(hardware_threads, max_threads); //NOTE
 
   //размечаем границы по которым будем работать
   std::vector<size_t> bounds = get_bounds(length, num_threads);
 
   //sort step:
   std::vector<std::thread> threads(num_threads - 1);
   for (size_t i = 0; i < threads.size(); ++i)
   {
      threads[i] = std::thread(std::sort<Iterator, Compare>, first + bounds[i], first + bounds[i + 1], comp);
   }
   std::sort(first + bounds[bounds.size() - 2], first + bounds.back(), comp);
   for (auto& t : threads) t.join();
   threads.clear();
 
   //merge step:
   std::set<size_t> new_bounds;
   while (num_threads >= 2)
   {
      num_threads /= 2;
      for (size_t i = 0, j = 0; i < num_threads; j += 2, ++i)
      {
         threads.emplace_back(std::inplace_merge<Iterator, Compare>, first + bounds[j],
                              first + bounds[j+1], first + bounds[j+2], comp);
 
         new_bounds.insert(bounds[j]);
         new_bounds.insert(bounds[j+2]);
      }
 
      if (new_bounds.find(bounds.back()) == new_bounds.end())
      {
         size_t size = bounds.size();
         threads.back().join();
 
         std::inplace_merge(first + bounds[size-4], first + bounds[size-2], first + bounds[size-1], comp);
         new_bounds.erase(bounds[size-2]);
         new_bounds.insert(bounds[size-4]);
         new_bounds.insert(bounds[size-1]);
 
         threads.pop_back();
      }
 
      bounds.assign(new_bounds.begin(), new_bounds.end());
      new_bounds.clear();
 
      for (auto& t : threads) t.join();
      threads.clear();
   }
   std::inplace_merge(first, first + bounds[2], first + length, comp);
}
} // namespace sp
 
///////////////////////////////////////////////////////////////////////////
 
// имитация загрузки ядер процессора
long long fib(long long i) { return i < 3 ? 1 : fib(i - 1) + fib(i - 2); }
 
class long_less_op
{
    int n;
public:
    long_less_op(int n_ = 0) : n(n_) {}
    bool operator < (const long_less_op& other) const
    {
        fib(5);
        return n < other.n;
    }
};
 
// таймер времени
namespace ch = std::chrono;
 
template <typename duration = ch::seconds, typename clock = ch::high_resolution_clock>
class timer
{
   typename clock::time_point m_start;
   typename clock::time_point m_stop;
public:
   timer() : m_start(clock::now()), m_stop(clock::now()) {}
 
   void start(){ m_start = clock::now(); }
 
   typename clock::rep get_time()
   {
      m_stop = clock::now();
      return ch::duration_cast<duration>(m_stop - m_start).count();
   }
};
 
int main()
{
    std::default_random_engine gen(ch::system_clock::now().time_since_epoch().count());
 
    std::vector<long_less_op> v(25432415);
    std::iota(v.begin(), v.end(), -1000000);
    std::shuffle(v.begin(), v.end(), gen);
 
 
    auto copy_for_test = v;
    std::cout << std::boolalpha;
    std::cout << "Parallel sorting:\n";
    std::cout << "Is sorted -> " << std::is_sorted(copy_for_test.begin(), copy_for_test.end());
 
    timer<ch::milliseconds> t;
    sp::sort_parallel(copy_for_test.begin(), copy_for_test.end());
    std::cout << "\nTIME: " << t.get_time() << " milliseconds." << std::endl;
    std::cout << "Is sorted -> " << std::is_sorted(copy_for_test.begin(), copy_for_test.end());
 
    copy_for_test = v;
    std::cout << "\n\nDefault sorting:\n";
    t.start();
    std::sort(copy_for_test.begin(), copy_for_test.end());
    std::cout << "TIME: " << t.get_time() << " milliseconds." << std::endl;
 
    std::cout << "\n\nDone." << std::endl;
    return 0;
}
Если у кого есть желание и более 2-х ядер, то поделитесь своими результатами. Вот мои:
g++ 4.8.2 || 2-х ядерный проц

Parallel sorting:
Is sorted -> false
TIME: 21073 milliseconds.
Is sorted -> true

Default sorting:
TIME: 36635 milliseconds.
ForEveR
Модератор
Эксперт C++
 Аватар для ForEveR
7927 / 4709 / 318
Регистрация: 24.06.2010
Сообщений: 10,524
Завершенные тесты: 3
18.12.2013, 10:38     Параллельная сортировка, протестировать алгоритм #4
6 ядер.
g++ 4.8.0
-O0
Parallel sorting:
Is sorted -> false
TIME: 13975 milliseconds.
Is sorted -> true
Default sorting:
TIME: 46035 milliseconds.

Done.
-O2

Parallel sorting:
Is sorted -> false
TIME: 6053 milliseconds.
Is sorted -> true

Default sorting:
TIME: 22369 milliseconds.


Done.
-O3
Parallel sorting:
Is sorted -> false
TIME: 2908 milliseconds.
Is sorted -> true

Default sorting:
TIME: 9825 milliseconds.


Done.
ed_lol
0 / 0 / 0
Регистрация: 06.04.2016
Сообщений: 5
06.04.2016, 02:19     Параллельная сортировка, протестировать алгоритм #5
DiffEreD,
помоги пожалуйста.что не так
Миниатюры
Параллельная сортировка, протестировать алгоритм  
AlexVRud
414 / 143 / 36
Регистрация: 04.07.2014
Сообщений: 415
06.04.2016, 09:43     Параллельная сортировка, протестировать алгоритм #6
Bash
1
2
3
4
5
6
7
8
9
10
11
12
13
$ module load gcc/5.3.0
$ g++ -O3 -std=c++11 -lpthread test_parallel_sort.cpp 
$ ./a.out 
Parallel sorting:
Is sorted -> false
TIME: 1006 milliseconds.
Is sorted -> true
 
Default sorting:
TIME: 3176 milliseconds.
 
 
Done.
2x10 ядер
ed_lol
0 / 0 / 0
Регистрация: 06.04.2016
Сообщений: 5
07.04.2016, 00:05     Параллельная сортировка, протестировать алгоритм #7
Вы редактировали или этот код смогли запустить?
если редактировали, будьте добры в лс)
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
07.04.2016, 11:25  [ТС]     Параллельная сортировка, протестировать алгоритм #8
ed_lol, код абсолютно рабочий, вот на онлайн компиляторе запусти: http://melpon.org/wandbox/permlink/OgzRwzTaJCK1dpt9
Попробуй поставить последнюю студию или используй MinGW(gcc) 5.
MoreAnswers
Эксперт
37091 / 29110 / 5898
Регистрация: 17.06.2006
Сообщений: 43,301
15.04.2016, 21:56     Параллельная сортировка, протестировать алгоритм
Еще ссылки по теме:

Параллельная обработка C++
C++ Параллельная обработка файлов
Параллельная работа с файлами C++

Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
ed_lol
0 / 0 / 0
Регистрация: 06.04.2016
Сообщений: 5
15.04.2016, 21:56     Параллельная сортировка, протестировать алгоритм #9
Проблема в VS была.
Если не сложно,можете изменить код так,чтобы считалось дольше.например минута параллельной и две последовательная?

Добавлено через 11 минут
DiffEreD,

или хотя бы в раз 30 сек и минута.
вот мои результаты.

Parallel sorting:
Is sorted -> false
TIME: 1642 milliseconds.
Is sorted -> true

Default sorting:
TIME: 3962 milliseconds.


Done.

Добавлено через 4 часа 17 минут
DiffEreD, сможете помочь?)
Yandex
Объявления
15.04.2016, 21:56     Параллельная сортировка, протестировать алгоритм
Ответ Создать тему
Опции темы

Текущее время: 17:13. Часовой пояс GMT +3.
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2016, vBulletin Solutions, Inc.
Рейтинг@Mail.ru