Palich, Набросал вам простенькую реализацию пула потоков для wxWidgets и демонстрацию работы с ним.
Можете им пользоваться, но только осторожно, я не стал реализовывать безопасность по исключениям.
Что нужно - сами допилите.
threadpool.hpp
| C++ | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
| #ifndef THREAD_POOL_HPP
#define THREAD_POOL_HPP
#include <memory>
#include <type_traits>
#include <tuple>
#include <utility>
#include <functional>
#include <cassert>
/// В этом нэймспэйсе детали реализации,
/// которые вынужденно торчат наружу
/// из-за обобщенного шаблонного кода
namespace priv
{
struct ITask
{
public:
virtual ~ITask() = default;
virtual void DoExec() = 0;
virtual void DoComplete() = 0;
};
using TaskPtr = std::unique_ptr<ITask>;
namespace detail
{
template <class T>
struct Storage
{
using Type = T;
struct Wrapper
{
Type value;
};
Type &&GetValue()
{
assert(m_pointer == (void *)&m_storage);
return std::forward<Type>(m_pointer->value);
}
void SetValue(Type &&value)
{
assert(m_pointer == nullptr);
m_pointer = new (&m_storage) Wrapper{std::forward<Type>(value)};
}
Storage() = default;
Storage(Storage const &) = delete;
Storage &operator=(Storage const &) = delete;
~Storage()
{
if (m_pointer)
std::destroy_at(m_pointer);
}
std::aligned_storage_t<sizeof(Wrapper), alignof(Wrapper)> m_storage;
Wrapper *m_pointer = nullptr;
};
template <>
struct Storage<void>
{
using Type = void;
};
template <class Tuple>
struct InvokeResult;
template <class F, class... Args>
struct InvokeResult<std::tuple<F, Args...>> : std::invoke_result<F, Args...>
{
};
template <class Callback, class Tuple>
struct InvokerImpl : Storage<typename InvokeResult<Tuple>::type>
{
using typename Storage<typename InvokeResult<Tuple>::type>::Type;
template <class Cb, class Fn, class... Args>
explicit InvokerImpl(Cb &&callback, Fn &&function, Args &&...args)
: m_callback(std::forward<Cb>(callback)), m_packedTask(std::forward<Fn>(function), std::forward<Args>(args)...)
{
}
template <std::size_t... I>
Type InvokeImpl(std::index_sequence<I...>)
{
return std::invoke(std::get<I>(std::move(m_packedTask))...);
}
void Invoke()
{
if constexpr (std::is_same_v<Type, void>)
InvokeImpl(std::make_index_sequence<std::tuple_size_v<Tuple>>{});
else
this->SetValue(InvokeImpl(std::make_index_sequence<std::tuple_size_v<Tuple>>{}));
}
void Submit()
{
if constexpr (std::is_same_v<Type, void>)
std::invoke(m_callback);
else
std::invoke(m_callback, this->GetValue());
}
Callback m_callback;
Tuple m_packedTask;
};
template <class... Ts>
struct Task final : ITask, private detail::InvokerImpl<Ts...>
{
using Invoker = detail::InvokerImpl<Ts...>;
using Invoker::Invoker;
void DoExec() override { this->Invoke(); }
void DoComplete() override { this->Submit(); }
};
} // namespace detail
template <class Callback, class Callable, class... Args>
TaskPtr MakeTask(Callback &&callback, Callable &&callable, Args &&...args)
{
using TaskType = detail::Task<std::decay_t<Callback>, std::tuple<std::decay_t<Callable>, std::decay_t<Args>...>>;
return std::make_unique<TaskType>(std::forward<Callback>(callback),
std::forward<Callable>(callable),
std::forward<Args>(args)...);
}
struct DiscardCall
{
template <class... Args>
constexpr void operator()(Args &&...) const noexcept {}
};
class ThreadPoolImpl;
} // namespace priv
/// @brief Идентификатор задачи, валиден только на время её существования.
/// 0 - невалидный идентификатор.
using TaskId = std::size_t;
/// @brief Ничего не делающий коллбэк.
/// Используй для первого аргумента ThreadPool::AddTask,
/// если тебе не нужна обратная связь.
constexpr priv::DiscardCall NoCallback;
class wxString;
/// @brief Пул потоков.
class ThreadPool
{
public:
/// @brief Конструктор.
/// @param maxThreadCount Максимальное количество потоков пула,
/// если force = false, то оно будет ограничено [1, hardware threads - 1].
/// @param force Флаг снимает ограничение сверху на максимальное количество потоков.
ThreadPool(unsigned maxThreadCount = -1, bool force = false);
/// @brief Перед вызовом деструктора,
/// пул должен быть безопасно уничтожен методом Destroy.
~ThreadPool();
/// @brief Инвалидирует пул, отменяя запланированные задачи
/// открывает модальный диалог, ожидающий завершения выполняемых задач и присоединяет потоки.
/// @param force если true, не открывает диалог и отсоединяет потоки.
/// @thread_safety Функция должна быть вызвана из основного потока, если force = false.
void Destroy(bool force = false) const;
/// @brief Добавляет задачу в очередь запланированных задач.
/// Аналогично конструктору std::thread захватывает аргументы по значению.
/// @tparam Callback Тип функции обратного вызова
/// (c одноим аргументом, совместимым с типом возврата Сallable, если не void).
/// @tparam Callable Тип функции задачи.
/// @tparam ...Args Типы аргументов функции задачи.
/// @param call Коллбэк, котороый будет вызван в основном потоке,
/// в качестве аргумента ему будет передан результат вызова func(если не void) с аргументами args.
/// @param func Функция задачи, будет выполнена в одном из потоков пула.
/// @param ...args Аргументы для func, func должна быть вызываема с этими аргументами, после их конвертации в rvalues.
/// @return Идентификатор задачи, 0 при неудаче.
/// @details
/// Для не void функции аналогично выражению:
/// call(func(args...));
/// Иначе:
/// func(args...);
/// call();
/// @thread_safety Функция потокобезопасна.
template <class Callback, class Callable, class... Args>
TaskId AddTask(Callback &&call, Callable &&func, Args &&...args) const
{
return AddTask(priv::MakeTask(std::forward<Callback>(call),
std::forward<Callable>(func),
std::forward<Args>(args)...));
}
/// @brief Возвращает идентификатор потока, из которого вызвана функция.
/// @return Идентификатор в текстовом виде, чтобы не притягивать реализацию в виде std::thread::id.
/// @thread_safety Функция потокобезопасна.
static wxString GetThreadId();
private:
TaskId AddTask(priv::TaskPtr task) const;
std::unique_ptr<priv::ThreadPoolImpl> m_impl;
};
#endif |
|
threadpool.cpp
| C++ | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
| #include <wx/wxprec.h>
#include "threadpool.hpp"
#ifndef WX_PRECOMP
#include <wx/log.h>
#include <wx/event.h>
#include <wx/string.h>
#endif
#include <wx/progdlg.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>
#include <deque>
#include <sstream>
#ifndef NDEBUG
#define DEBUG_MSG(...) wxLogDebug(__VA_ARGS__)
#else
#define DEBUG_MSG(...)
#endif
namespace priv
{
inline TaskId GetId(const TaskPtr &p) { return reinterpret_cast<TaskId>(p.get()); }
inline unsigned OptimalThreadCount() { return wxMax(1, std::thread::hardware_concurrency()) - 1; }
class TaskCompletionEvent : public wxThreadEvent
{
public:
TaskCompletionEvent(wxEventType type, TaskPtr task)
: wxThreadEvent(type), m_task(std::move(task))
{
}
ITask *GetTask() const { return m_task.get(); }
TaskId GetTaskId() const { return priv::GetId(m_task); }
wxEvent *Clone() const override
{
DEBUG_MSG("TaskCompletionEvent is not cloneable!");
return new TaskCompletionEvent(GetEventType(), nullptr);
}
private:
TaskPtr m_task;
};
class ThreadPoolImpl : public wxEvtHandler
{
public:
ThreadPoolImpl(unsigned maxThreadCount, bool force);
void Destroy(bool terminate);
TaskId AddTask(TaskPtr task);
private:
void OnTaskCompletion(TaskCompletionEvent &evt);
TaskPtr PopTask();
void Entry();
std::mutex m_mutex;
std::condition_variable m_condition;
std::deque<TaskPtr> m_queue;
std::vector<std::thread> m_pool;
const unsigned m_maxThreadCount;
std::atomic_uint m_aliveThreads;
unsigned m_waitingWorkers;
bool m_isAlive;
wxDECLARE_EVENT_TABLE();
};
} // namespace priv
wxDECLARE_EVENT(myEVT_TASK_COMPLETED, priv::TaskCompletionEvent);
typedef void (wxEvtHandler::*TaskCompletionEventFunction)(priv::TaskCompletionEvent &);
#define TaskCompletionEventHandler(func) wxEVENT_HANDLER_CAST(TaskCompletionEventFunction, func)
#define EVT_TASK_COMPLETED(func) \
wx__DECLARE_EVT0(myEVT_TASK_COMPLETED, TaskCompletionEventHandler(func))
wxBEGIN_EVENT_TABLE(priv::ThreadPoolImpl, wxEvtHandler)
EVT_TASK_COMPLETED(priv::ThreadPoolImpl::OnTaskCompletion)
wxEND_EVENT_TABLE()
wxDEFINE_EVENT(myEVT_TASK_COMPLETED, priv::TaskCompletionEvent);
priv::ThreadPoolImpl::ThreadPoolImpl(unsigned maxThreadCount, bool force)
: wxEvtHandler()
, m_mutex()
, m_condition()
, m_queue()
, m_pool()
, m_maxThreadCount(wxMax(1, force ? maxThreadCount : wxMin(maxThreadCount, OptimalThreadCount())))
, m_aliveThreads(0)
, m_waitingWorkers(0)
, m_isAlive(true)
{
DEBUG_MSG("[ThreadPoolImpl] max threads = %u", m_maxThreadCount);
m_pool.reserve(m_maxThreadCount);
}
void priv::ThreadPoolImpl::Destroy(bool terminate)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_isAlive)
{
DEBUG_MSG("[ThreadPoolImpl::Destroy] Thread pool destroyed already!");
return;
}
m_isAlive = false;
auto queue = std::move(m_queue);
auto pool = std::move(m_pool);
lock.unlock();
m_condition.notify_all();
if (!terminate)
{
unsigned const total = pool.size();
wxProgressDialog dialog("Finalizing threads",
wxEmptyString,
total,
nullptr,
wxPD_APP_MODAL | wxPD_CAN_SKIP | wxPD_AUTO_HIDE);
for (;;)
{
unsigned const finished = total - m_aliveThreads.load(std::memory_order_relaxed);
dialog.Update(finished, wxString::Format("Please be patient, don't skip, wait... %u/%u", finished, total), &terminate);
if (finished == total || terminate)
break;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
for (unsigned i = 0; i < pool.size(); ++i)
{
assert(pool[i].joinable());
assert(terminate || pool[i].get_id() != std::this_thread::get_id());
if (terminate)
{
DEBUG_MSG("[ThreadPoolImpl] detaching thread %u", i);
pool[i].detach();
}
else
{
DEBUG_MSG("[ThreadPoolImpl] joining thread %u", i);
pool[i].join();
}
}
}
TaskId priv::ThreadPoolImpl::AddTask(TaskPtr task)
{
assert(task);
TaskId id = GetId(task);
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_isAlive)
{
DEBUG_MSG("[ThreadPoolImpl::AddTask] can't add task. ThreadPool has been destroyed already?");
return 0;
}
m_queue.push_back(std::move(task));
if (m_waitingWorkers == 0 && m_pool.size() < m_maxThreadCount)
{
DEBUG_MSG("[ThreadPoolImpl] starting thread %u", static_cast<unsigned>(m_pool.size()));
m_pool.emplace_back(&ThreadPoolImpl::Entry, this);
}
m_condition.notify_one();
return id;
}
void priv::ThreadPoolImpl::OnTaskCompletion(TaskCompletionEvent &evt)
{
assert(evt.GetTask());
evt.GetTask()->DoComplete();
DEBUG_MSG("[ThreadPoolImpl] task id=%zu completed", evt.GetTaskId());
}
priv::TaskPtr priv::ThreadPoolImpl::PopTask()
{
std::unique_lock<std::mutex> lock(m_mutex);
++m_waitingWorkers;
while (m_isAlive && m_queue.empty())
m_condition.wait(lock);
--m_waitingWorkers;
TaskPtr result;
if (m_isAlive)
{
assert(!m_queue.empty());
result.swap(m_queue.front());
m_queue.pop_front();
}
return result;
}
void priv::ThreadPoolImpl::Entry()
{
struct Guard
{
std::atomic_uint &m_counter;
Guard(std::atomic_uint &counter)
: m_counter(counter)
{
m_counter.fetch_add(1, std::memory_order_relaxed);
}
~Guard()
{
m_counter.fetch_sub(1, std::memory_order_relaxed);
}
} guard(m_aliveThreads);
for (;;)
{
auto task = PopTask();
if (!task)
return;
task->DoExec();
wxQueueEvent(this, new TaskCompletionEvent(myEVT_TASK_COMPLETED, std::move(task)));
}
}
ThreadPool::ThreadPool(unsigned maxThreadCount, bool force)
: m_impl(std::make_unique<priv::ThreadPoolImpl>(maxThreadCount, force))
{
}
ThreadPool::~ThreadPool() = default;
void ThreadPool::Destroy(bool force) const
{
m_impl->Destroy(force);
}
wxString ThreadPool::GetThreadId()
{
return (std::ostringstream() << std::this_thread::get_id()).str();
}
TaskId ThreadPool::AddTask(priv::TaskPtr task) const
{
return m_impl->AddTask(std::move(task));
} |
|
main.cpp
| C++ | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
| #include <wx/wxprec.h>
#include "threadpool.hpp"
#ifndef WX_PRECOMP
#include <wx/wx.h>
#endif
#include <wx/dataview.h>
#include <random>
ThreadPool const g_pool;
ThreadPool const &GetPool()
{
return g_pool;
}
class MyApp : public wxApp
{
public:
bool OnInit() override;
};
class MyFrame : public wxFrame
{
public:
MyFrame();
private:
void OnButtonClick(wxCommandEvent &);
void OnClose(wxCloseEvent &);
void TaskStarted(wxDataViewItem item, wxString const &threadId);
void UpdateProgress(wxDataViewItem item, int progress);
void DeleteItem(wxDataViewItem item);
wxDataViewItem CustomTask(wxDataViewItem item, int duration);
wxDataViewListStore *m_model;
wxSlider *m_slider;
wxCheckBox *m_check;
};
wxIMPLEMENT_APP(MyApp);
bool MyApp::OnInit()
{
auto w = new MyFrame();
w->Show();
return true;
}
MyFrame::MyFrame()
: wxFrame(nullptr, wxID_ANY, "Thread pool sample"), m_model(nullptr), m_slider(nullptr), m_check(nullptr)
{
auto *ctrl = new wxDataViewListCtrl(this, wxID_ANY, wxDefaultPosition, wxSize(-1, 300), wxDV_HORIZ_RULES);
m_model = ctrl->GetStore();
ctrl->AppendTextColumn("Task ID");
ctrl->AppendTextColumn("Thread ID");
ctrl->AppendTextColumn("Status");
ctrl->AppendProgressColumn("Progress");
auto *vsizer = new wxBoxSizer(wxVERTICAL);
vsizer->Add(ctrl, wxSizerFlags(1).Expand().Border(wxALL, 5));
auto *hsizer = new wxBoxSizer(wxHORIZONTAL);
vsizer->Add(hsizer, wxSizerFlags(0).Expand().Border(wxLEFT | wxRIGHT | wxBOTTOM, 5));
auto *button = new wxButton(this, wxID_ANY, "Add task");
Bind(wxEVT_BUTTON, &MyFrame::OnButtonClick, this, button->GetId());
hsizer->Add(button, wxSizerFlags(0).Expand().Border(wxRIGHT, 10));
hsizer->Add(new wxStaticText(this, wxID_ANY, "Task duration (seconds):"), wxSizerFlags(0).Center());
m_slider = new wxSlider(this, wxID_ANY, 10, 1, 60, wxDefaultPosition, wxSize(300, -1), wxSL_HORIZONTAL | wxSL_LABELS);
hsizer->Add(m_slider, wxSizerFlags(0).Center());
m_check = new wxCheckBox(this, wxID_ANY, "Randomize next duration");
hsizer->Add(m_check, wxSizerFlags(0).Center());
SetSizerAndFit(vsizer);
Bind(wxEVT_CLOSE_WINDOW, &MyFrame::OnClose, this, GetId());
}
void MyFrame::TaskStarted(wxDataViewItem item, wxString const &threadId)
{
m_model->SetValue(threadId, item, 1);
m_model->SetValue("working", item, 2);
m_model->ItemChanged(item);
}
void MyFrame::UpdateProgress(wxDataViewItem item, int progress)
{
wxVariant value;
m_model->GetValue(value, item, 3);
if (value.GetInteger() != progress)
{
m_model->SetValue(progress, item, 3);
m_model->ItemChanged(item);
}
}
void MyFrame::DeleteItem(wxDataViewItem item)
{
auto row = m_model->GetRow(item);
m_model->DeleteItem(row);
}
void MyFrame::OnButtonClick(wxCommandEvent &)
{
// визуальное представление задачи
wxVector<wxVariant> data;
data.push_back(wxEmptyString);
data.push_back("N/A");
data.push_back("pending");
data.push_back(0);
m_model->AppendItem(data);
// создаем задачу
auto item = m_model->GetItem(m_model->GetCount() - 1);
auto id = GetPool().AddTask([this](wxDataViewItem item) { this->DeleteItem(item); }, // коллбэк
&MyFrame::CustomTask, // функция задачи
this,
item,
m_slider->GetValue() * 5);
m_model->SetValue(wxString::Format("%zu", id), item, 0);
if (m_check->Get3StateValue() == wxCHK_CHECKED)
m_slider->SetValue(std::random_device()() % (m_slider->GetMax() - m_slider->GetMin() + 1) + m_slider->GetMin());
}
wxDataViewItem MyFrame::CustomTask(wxDataViewItem item, int duration)
{
// сигнализируем о старте задачи
CallAfter(&MyFrame::TaskStarted, item, ThreadPool::GetThreadId());
for (int i = 0; i < duration; ++i)
{
// обновляем прогресс
CallAfter(&MyFrame::UpdateProgress, item, (i + 1) * 100 / duration);
// симулируем бурную деятельность
wxMilliSleep(200);
}
// возвращаем айтем для коллбэка
return item;
}
void MyFrame::OnClose(wxCloseEvent &)
{
// безопасно завершаем работу пула
GetPool().Destroy();
Destroy();
} |
|
CMakeLists.txt
| Code | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| cmake_minimum_required(VERSION 3.13)
project(cyberforum LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
find_package(wxWidgets 2.9 REQUIRED COMPONENTS adv core base)
include(${wxWidgets_USE_FILE})
add_executable(test main.cpp threadpool.cpp)
target_link_libraries(test PRIVATE ${wxWidgets_LIBRARIES})
#[[
target_compile_options(test PRIVATE
-Wall
-Wextra
-Wpedantic
)
]] |
|
0
|