Форум программистов, компьютерный форум, киберфорум
Наши страницы
oleg-m1973
Войти
Регистрация
Восстановить пароль
Оценить эту запись

Параллельная обработка массива

Запись от oleg-m1973 размещена 24.07.2019 в 13:18
Обновил(-а) oleg-m1973 25.07.2019 в 22:07 (Добавлены комментарии в пример)

Собственно алгоритм здесь простой:
- Создать несколько потоков. Лучше по одному на процессор/ядро, т.к. предполагаетсмя что эта обработка будет проходить без блокировок и использовать 100% времени процессора. Таким образом создавать боольше потоков, чем есть процессоров бессмыссленно. Меньше можно.
- Разбить массив на приблизительно одинаковые куски, по количеству потоков.
- Дать потокам сигнал, чтобы каждый взял свой кусок массива и начал его обрабатывать
- Подождать, пока все потоки не закончат эту обработку. Потоки при этом не завершаются, чтобы не тратить ресурсы на создание потоков.

При реализации, пришла мысль, что здесь лучше свести это задачу к более универсальной - к обработчику очереди задач.
Добавляешь задачу в очередь, потоки их забирают оттуда и обрабатывают, а ты ждешь, пока все твои задачи не завершатся.
Естественно, здесь появляются дополнительные накладные расходы, но я постарался их минимизировать.

Получился вот такой класс.
На нижеприведённом примере даёт выигрыш по сравнению с однопоточным в ~2.5 раза, на 4-ядерном проце. Немного, но тоже неплохо.

Кликните здесь для просмотра всего текста

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include <iostream>
 
#include <thread>
#include <mutex>
#include <condition_variable>
#include <type_traits>
#include <future>
 
#include <vector>
 
template <typename T> constexpr auto &remove_ptr(T &ptr) noexcept {return ptr;}
template <typename T> constexpr auto &remove_ptr(T *ptr) noexcept {return *ptr;}
template <typename T> constexpr auto &remove_ptr(std::unique_ptr<T> &ptr) noexcept {return *ptr;}
template <typename T> constexpr auto &remove_ptr(std::shared_ptr<T> &ptr) noexcept {return *ptr;}
 
template <typename T>
class CTaskRunner
{
//Требования к классу T - у него должен быть оператор (), если указатель, то (*t)();
//Лишние исключения на ровном месте здесь не нужны
static_assert(std::is_nothrow_move_assignable_v<T> && std::is_nothrow_move_constructible_v<T>);
public:
    CTaskRunner(size_t threads)
    : m_cap{threads} //Резервируем элементы. Один на поток.
    {
        m_threads.reserve(threads);
        //Запускаем потоки. Они будут ждать, пока в очереди не появится задача
        for (; threads; --threads)
            m_threads.emplace_back(&CTaskRunner::ThreadProc, this);
    }
 
    CTaskRunner()
    : CTaskRunner(std::thread::hardware_concurrency())
    {
    }
 
    ~CTaskRunner()
    {
        {
            //Останавливаем потоки - взводим событие m_stop
            //Обязательно блокируем мьютекс!!! Т.к. m_cv.notify_all() воздействуют только на те потоки, которые стоят в m_cv.wait()
            //Без блокировки может получится, что мы выставим m_stop = true после того, как поток его проверит и тот встанет в m_cv.wait() навсегда
            std::lock_guard lock(m_mx); 
            m_stop = true;
            m_cv.notify_all();
        }
        //Ждем завершения каждого из потоков
        for (auto &item : m_threads)
            item.join();
    }
    
    template <typename... TT>
    void RunTask(TT&&... args)
    {
        //При добавлении задачи важно минимизировать затраты на добавление нового элемента в список, т.е. не делать new
        //Для этого резервируем элементы в m_pool и, по возможности берём новые оттуда. 
        std::lock_guard lock(m_mx);
        
        if (m_pool.empty()) //Если нет зарезервированных элементов, то создаём новый. Дорого, но, возможно, лучше, чем ждать пока появится
            m_tasks.emplace_back(std::forward<TT>(args)...);
        else 
        {
            //Переносим переносим существующий элемент из пула в очередь задач. Очень дешёвая операция.
            m_tasks.splice(m_tasks.end(), m_pool, m_pool.begin()); 
            m_tasks.back() = T(std::forward<TT>(args)...); //Присваиваем этому элементу новое значение
        }
 
        //Сообщаем одному потоку, что в очереди появилась задача
        m_cv.notify_one(); 
        //notify_all здесь делать необязательно, т.к. тут либо проснётся одни из потоков, 
        //либо какой-то поток возьмёт задачу из очереди перед тем, как заснуть
    }
 
protected:
    void ThreadProc()
    {
        std::unique_lock lock(m_mx);
 
        //Событие m_stop имеет приоритет перед задачами
        //Если оно взведено, потоки завершаются, независимо от наличия задач в очереди
        //В принципе, можно сделать наоборот, что поток завершается только если в очереди нет задач
        while (!m_stop) 
        {
            if (m_tasks.empty()) //Если задач нет
            {
                m_cv.wait(lock); //Разблокирум мьютекс и ждём события
                continue;
            }
 
            //Берём первую задачу из списка 
            auto fn = std::move(m_tasks.front());
            //Переносим элемент в пул свободных. Здесь тоже никаких new/delete не происходит
            m_pool.splice(m_pool.begin(), m_tasks, m_tasks.begin());
 
            lock.unlock(); //Разблокируем мьютекс, задача будет выполняться без блокировки
 
            //Выполняем задачу. Исключения здесь не нужны ни под каким видом.
            //fn() объявлена как noexcept, компилятор уберёт try-catch
            try {remove_ptr(fn)();} catch (...) {;} 
 
            lock.lock(); //Блокируем мьютекс, для ожидания новой задачи
        }
    }
 
    std::mutex m_mx; //Мьютекс для событий и блокировки списка задач и пула
    std::condition_variable m_cv;
    volatile bool m_stop = false; //Событие для остановки потоков
    
    size_t m_cap; //Количество зарезервированных элементов. Пока не используется.
    std::list<T> m_tasks, m_pool{m_cap}; //Очередь задач и список зарезервированных элементов, пул.
        
    std::vector<std::thread> m_threads; //Массив потоков
};


Ниже пример использования - поиск минимального и максимального элементов в массиве
Кликните здесь для просмотра всего текста

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
template <typename T = double>
struct alignas(std::hardware_destructive_interference_size) CFindMinMaxTask
// alignas(std::hardware_destructive_interference_size) - размер структуры выравнивается по размеру линии кэша
// В данном случае это не нужно, но, если в структуре будут переменные, 
// которые нужно изменять в процессе выполнения задачи, то лучше выровнять -
// сэкономятся ресурсы на загрузку данных в кэш и обратно
{
 
    //Указатель на начало и размер блока данных для обработки
    const T *m_data = nullptr;
    size_t m_sz = 0;
 
    using TRes = std::pair<T, T>;
    //Для ожидания, когда задача завершится и получения результата используем std::promise
    std::promise<TRes> m_promise;  
 
 
    void Reset(const T *data, size_t sz)
    {
        m_data = data;
        m_sz = sz;
 
        m_promise = std::promise<TRes>(); //Необходимо его сбрасывать перед повторным использованием
    }
 
    //Функция, которая будет выполняться в потоке. 
    void operator ()() noexcept 
    {
        auto min = *m_data;
        auto max = min;
        for (size_t i = 1; i < m_sz; ++i)
        {
            const auto x = m_data[i];
            if (x < min)
                min = x;
            else if (x > max)
                max = x;
        }
        //Сообщаем, что задача завершилась и возвращаем результат
        m_promise.set_value({min, max});
    }
};
 
static
auto TestParallel(std::vector<double> &data)
{
    using TTask = CFindMinMaxTask<double>;
 
    //Количество потоков равно числу процессоров
    const auto threads = std::thread::hardware_concurrency();
 
    //Здесь эти переменные объявлены внутри функции, но это необязательно - 
    //если нужно обрабатывать несколько массивов, то лучше объявить их выше и использовать повторно
    std::vector<TTask> items{threads};
    CTaskRunner<TTask *> tasks{threads}; //Задачи будут передаваться по указателю
 
    std::cout << "par: ";
    const auto tm = std::chrono::steady_clock::now();
 
    //Разбиваем массив на примерно одинаковые блоки c учётом ситуации когда размер массива не кратен количеству потоков
    //При обработке массива важно, чтобы каждый поток работал с непрерывным блоком, размером не меньше линии кэша (здесь не учитываю)
    //Иначе масса ресурсов уйдёт на загрузку/выгрузку данных в память и выигрыша практически не будет.
    auto *p = data.data();
    size_t sz = data.size();
    for (size_t i = items.size(); i > 0; --i)
    {
        auto &item = items[i - 1];
        const auto n = sz / i; 
        item.Reset(p, n);
 
        //Запускаем задачу
        tasks.RunTask(&item);
        p += n;
        sz -= n;
    }
 
    //У каждой задачи будет свой результат, пробегаемся по всем задачам и вычисляем конечный
    //Так как нам в любом случае нужно ждать завершения всех задач, то можно делать это по-очереди - 
    //подождать пока завершится первая, проверить вторую и подождать её и т.д. 
    auto res = items[0].m_promise.get_future().get(); //Ждём завершения первой задачи, берём её результат, в качестве начального
    for (size_t i = 1, n = items.size(); i < n; ++i)
    {
        auto x = items[i].m_promise.get_future().get(); //Ждём завершения следующих задач
        if (res.first > x.first)
            res.first = x.first;
 
        if (res.second < x.second)
            res.second = x.second;
    }
 
    const auto dt = std::chrono::steady_clock::now() - tm;
    std::cout
        << "tm: " << std::chrono::duration_cast<std::chrono::milliseconds>(dt).count()
        << std::fixed 
        << ", min: " << res.first
        << ", max: " << res.second
        << std::endl;
}
 
 
int main()
{
    const size_t _sz = 100'000'000;
    std::vector<double> data;
    data.reserve(_sz);
 
    for (intmax_t i = 0; i < _sz; ++i)
        data.emplace_back(double(i));
 
    //Выполняем в нескольких потоках
    TestParallel(data);
 
    //Выполняем в одном потоке
    CFindMinMaxTask<double> task;
    std::cout << "seq: ";
    const auto tm = std::chrono::steady_clock::now();
 
    task.Reset(data.data(), data.size());
    task();
    auto res = task.m_promise.get_future().get();
 
    const auto dt = std::chrono::steady_clock::now() - tm;
    std::cout
        << "tm: " << std::chrono::duration_cast<std::chrono::milliseconds>(dt).count()
        << std::fixed
        << ", min: " << res.first
        << ", max: " << res.second
        << std::endl;
    return 0;
}


Пример реализации примитивного пула потоков

Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class CThreadPool
{
public:
    template <typename... TT>
    CThreadPool(TT&&... args)
    : m_tasks(std::forward<TT>(args)...)
    {
    }
 
    template <typename TFunc, typename... TT>
    auto Run(TFunc &&func, TT&&... args)
    {
        using TRes = std::invoke_result_t<TFunc, TT...>;
        auto fn = std::bind(std::forward<TFunc>(func), std::forward<TT>(args)...);
 
        auto sp = std::make_unique<CTaskImpl<TRes, decltype(fn)>>(std::move(fn));
        auto res = sp->m_res.get_future();
 
        m_tasks.RunTask(std::move(sp));
        return res;
    }
 
protected:
    struct CTask
    {
        virtual ~CTask() = default;
        virtual void Invoke() noexcept = 0;
        void operator()() noexcept
        {
            Invoke();
        }
    };
 
    template <typename TRes, typename TFunc>
    struct CTaskImpl: CTask
    {
        CTaskImpl(TFunc &&func)
        : m_fn(std::move(func))
        {
        }
 
        virtual void Invoke() noexcept override
        {
            try
            {
                if constexpr(!std::is_void_v<TRes>)
                    m_res.set_value(m_fn());
                else
                {
                    m_fn();
                    m_res.set_value();
                }
            }
            catch (...)
            {
                m_res.set_exception(std::current_exception());
            }
        }
 
        TFunc m_fn;
        std::promise<TRes> m_res;
    };
 
 
    CTaskRunner<std::unique_ptr<CTask>> m_tasks;
};
 
int main()
{
    CThreadPool pool;
    std::vector<std::future<int>> futures;
    for (int i = 0; i < 10; ++i)
    {
        auto res = pool.Run([i]()
            {
                printf("Task #%d started\n", i);
                std::this_thread::sleep_for(1s);
                return i;
            });
 
        futures.emplace_back(std::move(res));
    }
 
    for (auto &item : futures)
        printf("Task #%d finished\n", item.get());
 
    return 0;
}
Размещено в C++, Multithreading
Просмотров 272 Комментарии 2
Всего комментариев 2
Комментарии
  1. Старый комментарий
    ага. тут вам и потоки и параллельные массивы и multithreading, ещё и шаблоны с классами впридачу....
    аффтар, жги!
    Запись от sam063rus размещена 11.08.2019 в 19:21 sam063rus вне форума
  2. Старый комментарий
    Цитата:
    Сообщение от sam063rus Просмотреть комментарий
    ага. тут вам и потоки и параллельные массивы и multithreading, ещё и шаблоны с классами впридачу....
    аффтар, жги!
    Ну да, всё это здесь присутствует. И?
    Запись от oleg-m1973 размещена 12.08.2019 в 23:00 oleg-m1973 вне форума
 
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2019, vBulletin Solutions, Inc.
Рейтинг@Mail.ru