0 / 0 / 0
Регистрация: 29.08.2012
Сообщений: 59
1

Параллельный алгоритм Гаусса средствами std::thread

03.03.2020, 11:13. Показов 2778. Ответов 7

Студворк — интернет-сервис помощи студентам
Здравствуйте! Пытаюсь написать параллельный алгоритм решения СЛАУ методом Гаусса с помощью std::thread, но не могу настроить синхронизацию потоков.
Вот мой код, там комментарии прописаны, надеюсь суть будет ясна:
C++ (Qt)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#include <iostream>
#include <iomanip>
#include <cstdio>
#include <Windows.h>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <atomic>
#include <cmath>
 
using namespace std;
 
condition_variable cv;
condition_variable cv2;
 
mutex cv_m;   // мьютекс для condition variable cv (ожидание i-й строки)
mutex cv2_m;  // мьютекс для condition variable cv2 (ожидание завершения i-й итерации)
mutex cout_m; // мьютекс для потока вывода
 
int i_row = 0; // индекс текущей строки, хранящейся в r
atomic_int i_counter(0); // переменная для подсчёта количества потоков, ожидающих окончания i-й итерации
 
//заполнение матрицы
void FillMatrix(double *Matrix[], int nSize)
{
    for (int i = 0; i < nSize; i++)
    {
        for (int j = 0; j < nSize; j++)
            Matrix[i][j] = nSize - fabs(i - j) - 1;
        Matrix[i][nSize] = 0;
    }
}
 
// матрица разбивается построчно, всего потоков rank_count
// в массив r сохраняется текущая строка, которую вычитаем из нижестоящих
 
void GaussThread(double *Matrix[], int nSize, double *x,
                 int my_rank, int rank_count, double *r)
{
    double tmp;
 
    // прямой проход по алгоритму Гаусса
    for (int i = 0; i < nSize; i++)
    {
        if (i % rank_count == my_rank)
        {
            // для потока, который работает с i-й строкой
            lock_guard<mutex> lk(cv_m);
 
            // вычисляем строку
            tmp = Matrix[i][i];
            for (int j = i; j <= nSize; j++)
                Matrix[i][j] /= tmp;
 
            // сохраняем её в массив r
            memcpy(r, Matrix[i], (nSize + 1) * sizeof(double));
 
            // устанавливаем номер текущей строки, остальные потоки теперь могу пользоваться массивом r
            i_row = i;
            
            cout_m.lock();
            cout << "rank = " << my_rank <<  ", setted i_row = " << i_row << endl;
            cout_m.unlock();
 
            // будим остальные потоки
            cv.notify_all();
        }
        else
        {
            // для потока, который НЕ работает с i-й строкой
            unique_lock<mutex> lk(cv_m);
            
            cout_m.lock();
            cout << "rank = " << my_rank <<  ", wait i_row..." << endl;
            cout_m.unlock();
            
            // ждём пока буфер r не будет готов, тогда же будет выполняться i_row == i
            cv.wait(lk, [&]{ return i_row == i; });
 
            // строка в буфере r готова, можем работать дальше
            cout_m.lock();
            cout << "rank = " << my_rank <<  ", getted i_row = " << i << endl;
            cout_m.unlock();
        }
 
        // этот цикл выполняется параллельно, вычисляются строки ниже текущей
        for (int k = i + my_rank; k < nSize; k += rank_count)
        {
            if (k == i)
                continue;
 
            tmp = Matrix[k][i];
            for (int j = i; j <= nSize; j++)
                Matrix[k][j] -= tmp * r[j];
        }
 
        // увеличиваем счётчик ожидающих потоков
        i_counter ++;
 
        if (i_counter < rank_count)
        {
            // если счётчик не достиг количества потоков
            unique_lock<mutex> lk2(cv2_m);
 
            cout_m.lock();
            cout << "rank = " << my_rank <<  ", wait for finish " << i_row << ", i_counter = " << i_counter << endl;
            cout_m.unlock();
 
            // засыпаем пока последний поток не сбросит счётчик
            cv2.wait(lk2, []{ return i_counter == 0; });
 
            cout_m.lock();
            cout << "rank = " << my_rank <<  ", finished, i_row = " << i_row << endl;
            cout_m.unlock();
        }
        else
        {
            // если счётчик достиг количества потоков
            lock_guard<mutex> lk(cv2_m);
            
            cout_m.lock();
            cout << "rank = " << my_rank <<  ", set i_counter = 0, i_row = " << i_row << endl;
            cout_m.unlock();
 
            // не засыпаем, сбрасываем счётчик в ноль и уведомляем остальные потоки
            i_counter = 0;
            cv2.notify_all();
        }
    }
 
    // обратный проход, только для одного потока
    if (my_rank == 0)
    {
        for (int i = nSize - 1; i >= 0; i--)
        {
            tmp = Matrix[i][nSize];
            for (int j = i + 1; j < nSize; j++)
                tmp -= Matrix[i][j] * x[j];
            x[i] = tmp / Matrix[i][i];
        }
    }
}
 
int main()
{
    setlocale(LC_ALL, "ru");
 
    int Asize;
    //cout << "Размер матрицы: ";
    //cin >> Asize;
    Asize = 10;
 
    double **A_arr = new double*[Asize];
    for (int i = 0; i < Asize; i++)
        A_arr[i] = new double[Asize + 1];
 
    FillMatrix(A_arr, Asize);
    
    double *X_arr_Thr = new double [Asize];
    double *cur_row = new double [Asize + 1];
 
    thread th1(GaussThread, A_arr, Asize, X_arr_Thr, 0, 4, cur_row);
    thread th2(GaussThread, A_arr, Asize, X_arr_Thr, 1, 4, cur_row);
    thread th3(GaussThread, A_arr, Asize, X_arr_Thr, 2, 4, cur_row);
    thread th4(GaussThread, A_arr, Asize, X_arr_Thr, 3, 4, cur_row);
    th1.join();
    th2.join();
    th3.join();
    th4.join();
    
    for (int i = 0; i < Asize; i++)
        delete A_arr[i];
    delete[] A_arr;
    delete[] X_arr_Thr;
    delete[] cur_row;
 
    return 0;
}
Если в двух словах, суть такая. Распараллеливается только прямой проход. Матрица разбивается построчно, т.е. на 4-х потоках каждый поток получает каждую 4-ю строку. На каждой итерации есть два места, где мне нужно синхринизировать работу потоков:
1) В начале итерации один из потоков формирует текущую (или ведущую) строку, которая потом используется им же и всеми остальными для обработки нижестоящих строк. Остальные потоки ждут пока будет готова текущая строка, для этой цели я использую condition_variable cv.
2) В конце итерации мне нужно дождаться, чтобы все потоки обработали свои строки, я использую в качестве счётчика atomic_int i_counter. Пока эта переменная меньше числа потоков, все потоки останавливатся с помощью второй condition_variable cv2. Последний поток, который доводит i_counter до общего числа потоков, сбрасывает эту переменную в ноль и все потоки дружно несутся на следующую итерацию.

Так я себе это представлял. А по факту один из потоков не просыпается после того, как i_counter сброшен в ноль, на скриншоте с примером это поток №0. Что я делаю не так подскажите пожалуйста?
Миниатюры
Параллельный алгоритм Гаусса средствами std::thread  
0
Лучшие ответы (1)
Programming
Эксперт
94731 / 64177 / 26122
Регистрация: 12.04.2006
Сообщений: 116,782
03.03.2020, 11:13
Ответы с готовыми решениями:

Std::thread::sleep_for(3s)
Всем доброго времени суток. Столкнулся с такой проблемой, что не могу понять как оформить передачу...

Pimpl + std::thread + linux sockets?
Доброго времени суток. Есть задача постоянно слушать сокеты, ну и что-то с этим всем делать, класс...

Чтение переменной несколькими std::thread
Собственно, вопрос в заголовке темы. Можно ли прочитать одну переменную несколькими потоками...

Ошибка "неопределенная ссылка" при работе с std::thread
При попытке сделать что-либо с std::thread, выдает ошибку компиляции, &quot;неопределенная ссылка &quot; . ...

7
6577 / 4562 / 1843
Регистрация: 07.05.2019
Сообщений: 13,726
03.03.2020, 11:22 2
Цитата Сообщение от Hirurg2605 Посмотреть сообщение
Так я себе это представлял. А по факту один из потоков не просыпается после того, как i_counter сброшен в ноль, на скриншоте с примером это поток №0. Что я делаю не так подскажите пожалуйста?
Подозреваю, он стоит вот здесь - cv.wait(lk, [&]{ return i_row == i; });
Какая-то мутная у тебя реализация. Там не нужно такое количество мьютексов и conditional variables

Добавлено через 1 минуту
Чтобы обработать матрицу по строкам, достаточно одной atomic-переменной
0
0 / 0 / 0
Регистрация: 29.08.2012
Сообщений: 59
03.03.2020, 11:30  [ТС] 3
Цитата Сообщение от oleg-m1973 Посмотреть сообщение
Там не нужно такое количество мьютексов и conditional variables
Один мьютекс чисто отладочный, для вывода в консоль используется, он не считается)
Далее, первый используется чтобы все потоки подождали пока один из них сформирует ведущую строку, а второй - чтобы дождаться завершения всех потоков. Нужен был какой-то типа "барьер", вот такой вот велосипед сочинился

Добавлено через 2 минуты
на скриншоте видно, что нулевой поток зашёл сюда
cv2.wait(lk2, []{ return i_counter == 0; });
и больше не вышел, хотя поток №3 сделал set i_counter = 0 и все остальные сразу проснулись

Добавлено через 1 минуту
а потом опять же 3-й закончил очередную итерацию и сделал i_counter ++, а нулевой не успел проснуться... Вот как быть?
0
6577 / 4562 / 1843
Регистрация: 07.05.2019
Сообщений: 13,726
03.03.2020, 11:33 4
Цитата Сообщение от Hirurg2605 Посмотреть сообщение
а потом опять же 3-й закончил очередную итерацию и сделал i_counter ++, а нулевой не успел проснуться... Вот как быть?
Соственно, это ответ на вопрос почему не проснулся нулевой поток
0
0 / 0 / 0
Регистрация: 29.08.2012
Сообщений: 59
03.03.2020, 11:34  [ТС] 5
Цитата Сообщение от oleg-m1973 Посмотреть сообщение
Соственно, это ответ на вопрос почему не проснулся нулевой поток
да как всегда, сутки голову ломал, а как на форум написал так и ответ очевиден. но решения пока не вижу, кроме как завести ещё один condition_variable
0
6577 / 4562 / 1843
Регистрация: 07.05.2019
Сообщений: 13,726
03.03.2020, 11:59 6
Лучший ответ Сообщение было отмечено Hirurg2605 как решение

Решение

Цитата Сообщение от Hirurg2605 Посмотреть сообщение
да как всегда, сутки голову ломал, а как на форум написал так и ответ очевиден. но решения пока не вижу, кроме как завести ещё один condition_variable
Рекомендую убрать один (или даже оба) и разбить алгоритм на части.

Добавлено через 2 минуты
Ведущую строку формируй не в потоках, а прямо в main

Добавлено через 30 секунд
Там же устанавливай счётчик и жди, кгда он обнулится

Добавлено через 18 минут
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
std::mutex mx;
std::condition_variable cv;
volatile size_t cnt_start = 0;
volatile size_t cnt_fin = 0;
 
void ThreadProc()
{
    for (;;)
    {
        {
            std::unique_lock lock(mx);
            while (!cnt_start)
                cv.wait(lock);
 
            --cnt_start;
        }
 
        //Calc
 
        std::lock_guard lock(mx);
        if (--cnt_fin == 0)
            cv.notify_all();
    }
}
 
int main()
{
    const size_t n = std::thread::hardware_concurrency();
    //Start threads
 
    for (;;)
    {
        //calc row
        std::unique_lock lock(mx);
        cnt_start = cnt_fin = n;
        cv.notify_all();
        while (cnt_fin)
            cv.wait(lock);
    }
}
1
0 / 0 / 0
Регистрация: 29.08.2012
Сообщений: 59
03.03.2020, 14:04  [ТС] 7
oleg-m1973, спасибо огромное, ваша подсказка очень помогла! Я переделал свой алгоритм по вашему шаблону и всё заработало! Правда свой вариант с ещё одним condition_variable тоже пробовал, скорость сопоставимая, но мой вариант на 8 потоках умирает, а ваш гораздо более работоспособен

Добавлено через 24 минуты
Хотя нет, мой вариант тоже работает, просто я его неправильно проинициализировал. Но код такой страшный, что я его не буду выкладывать
0
51 / 149 / 33
Регистрация: 29.06.2019
Сообщений: 1,428
30.04.2021, 21:48 8
Цитата Сообщение от Hirurg2605 Посмотреть сообщение
Но код такой страшный, что я его не буду выкладывать
вопросы (в конце) ещё страшнее
- В чем причина столь низких показателей ускорения и эффективности параллельного алгоритма Гаусса?
- Существуют ли способ улучшения этих показателей?
м-да, кластеры ядер не всегда взлетают... (не проверяла ещё, но вопрос в цитате понравился)
0
IT_Exp
Эксперт
87844 / 49110 / 22898
Регистрация: 17.06.2006
Сообщений: 92,604
30.04.2021, 21:48
Помогаю со студенческими работами здесь

Можно ли использовать std::thread в С++ Builder?
Доброго всем времени суток. Возник вот такой вопрос. Можно ли использовать подход написания...

Embarcadero Berlin. Отсутствуют библиотеки многопоточности (thread,mutex) в std под платформу win 32bit
Господа, я тут на Берлин переехал, т.к. очень понадобилась многопоточность и лучшая ее реализация...

Параллельный алгоритм, решенный методом Гаусса-Зейделя
Всем привет.;)Нашла в инете текст программы, кот. мне нужна.Написана она на С++(параллельный...

Как распараллелить алгоритм Гаусса через библиотеку thread?
Всем привет. Для зачета дали задачу распараллелить на потоки алгоритм Гаусса, но только без MPI....


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
8
Ответ Создать тему
Опции темы

КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2023, CyberForum.ru