Форум программистов, компьютерный форум CyberForum.ru

Пул потоков на основе boost::lockfree::queue - C++

Восстановить пароль Регистрация
 
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
24.07.2014, 12:06     Пул потоков на основе boost::lockfree::queue #1
В книге "Энтони Уильямс - Параллельное программирование на С++ в действии" обсуждается пул потоков на основе самописной потокобезопасной очереди:
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include <iostream>
#include <vector>
#include <type_traits>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <atomic>
#include "threadsafe_queue.h"
 
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::unique_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   void operator ()() { impl->call(); }
 
   function_wrapper() = default;
 
   function_wrapper(function_wrapper&& other) :
      impl(std::move(other.impl))
   {}
 
   function_wrapper& operator = (function_wrapper&& other)
   {
      impl = std::move(other.impl);
      return *this;
   }
 
   function_wrapper(const function_wrapper&) = delete;
   function_wrapper(function_wrapper&) = delete;
   function_wrapper& operator = (function_wrapper& other) = delete;
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
class thread_pool
{
   std::atomic<bool> done;
   threadsafe_queue<function_wrapper> work_queue;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.try_pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      //if (thread_count <= 1 || !work_queue.is_lock_free())
         //throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
 
/////////////////////////////////////////////////////////////////////
 
void foo() { std::cout << "Hello from id: " << std::this_thread::get_id() << std::endl; }
int plus(int x, int y) {return x+y;}
 
int main()
{
   thread_pool pool;
   auto f = pool.submit(std::bind(plus, 10, 20));
   for (int i = 0; i < 10; ++i)
      pool.submit(foo);
   std::cout << "10 + 20 = " << f.get() << "\n";
 
   system("pause");
   return 0;
}
Я же вместо самописной очереди хочу воспользоваться потокобезопасной очередью из boost. Для этого надо что бы тип, который параметризует boost::lockfree::queue удовлетворял следующие условия:
T must have a copy constructor
T must have a trivial assignment operator
T must have a trivial destructor
Ну переделал function_wrapper на это:
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
полный код
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <iostream>
#include <vector>
#include <type_traits>
#include <functional>
#include <exception>
#include <memory>
#include <thread>
#include <future>
#include <atomic>
#include <boost/lockfree/queue.hpp>
 
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
class thread_pool
{
   std::atomic<bool> done;
   boost::lockfree::queue<function_wrapper> work_queue;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      if (thread_count <= 1 || !work_queue.is_lock_free())
         throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
 
/////////////////////////////////////////////////////////////////////
 
void foo() { std::cout << "Hello from id: " << std::this_thread::get_id() << std::endl; }
int plus(int x, int y) {return x+y;}
 
int main()
{
   thread_pool pool;
   auto f = pool.submit(std::bind(plus, 10, 20));
   for (int i = 0; i < 10; ++i)
      pool.submit(foo);
   std::cout << "10 + 20 = " << f.get() << "\n";
 
   system("pause");
   return 0;
}
А компилятор мне ассерты выкидывает, мол у вашого класса нету тривиального деструктора и тривиального оператора присваивания. Как же еще надо переделывать function_wrapper что бы он "влез" в очередь?
И еще, не страдает ли function_wrapper с shared_ptr плохим дизайном? Ведь этот класс скрывает в себе обэкт который должен только перемещатся. Как я понимаю, при копировании shared_ptr обэкт на корый он указывает копироваться не будет и объекты function_wrapper теперь можно смело копировать.
Similar
Эксперт
41792 / 34177 / 6122
Регистрация: 12.04.2006
Сообщений: 57,940
24.07.2014, 12:06     Пул потоков на основе boost::lockfree::queue
Посмотрите здесь:

C++ контейнер queue
C++ Статический пул для класса с помошью class::operator new непонятное исключение?
конструктор копирования queue C++
Queue сортировка C++
C++ Создание и завершение процессов и потоков. Приоритеты выполнения потоков
C++ Есть ли в C++ пул потоков
C++ Реализовать пул потоков, в который можно помещать функцию
Error C2664: Client::First: невозможно преобразовать параметр 1 из "std::queue<_Ty>" в "std::queue<_Ty> &" C++

Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
После регистрации реклама в сообщениях будет скрыта и будут доступны все возможности форума.
ForEveR
Модератор
Эксперт C++
 Аватар для ForEveR
7927 / 4709 / 318
Регистрация: 24.06.2010
Сообщений: 10,524
Завершенные тесты: 3
24.07.2014, 12:40     Пул потоков на основе boost::lockfree::queue #2
A destructor is trivial if it is not user-provided and if:
— the destructor is not virtual,
— all of the direct base classes of its class have trivial destructors, and
— for all of the non-static data members of its class that are of class type (or array thereof), each such
class has a trivial destructor.

Otherwise, the destructor is non-trivial.
shared_ptr не является trivially_destructible.

A copy/move assignment operator for class X is trivial if it is not user-provided, its declared parameter type
is the same as if it had been implicitly declared, and if
— class X has no virtual functions (10.3) and no virtual base classes (10.1), and
— the assignment operator selected to copy/move each direct base class subobject is trivial, and
— for each non-static data member of X that is of class type (or array thereof), the assignment operator
selected to copy/move that member is trivial;

otherwise the copy/move assignment operator is non-trivial.
Ну и не является trivially_assignable.
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
24.07.2014, 13:22  [ТС]     Пул потоков на основе boost::lockfree::queue #3
Ясненько. Запихнуть невпихуемое не выйдет.
Я вот только что попробовал переписать с boost::lockfree::spsc_queue - все заработало. С описания из документации, я так понял, что эта очередь будет медленнее чем boost::lockfree::queue, но это не критично.
Вообщем, кто писал пулы потоков, нормальная такая реализация?
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
boost::lockfree::spsc_queue<function_wrapper> work_queue(128);
 
class thread_pool
{
   std::atomic<bool> done;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      if (thread_count <= 1 || !work_queue.is_lock_free())
         throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
Yandex
Объявления
24.07.2014, 13:22     Пул потоков на основе boost::lockfree::queue
Ответ Создать тему
Опции темы

Текущее время: 13:53. Часовой пояс GMT +3.
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2016, vBulletin Solutions, Inc.
Рейтинг@Mail.ru