Форум программистов, компьютерный форум CyberForum.ru

С++ для начинающих

Войти
Регистрация
Восстановить пароль
 
DiffEreD
1429 / 766 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
#1

Пул потоков на основе boost::lockfree::queue - C++

24.07.2014, 12:06. Просмотров 997. Ответов 2
Метки нет (Все метки)

В книге "Энтони Уильямс - Параллельное программирование на С++ в действии" обсуждается пул потоков на основе самописной потокобезопасной очереди:
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include <iostream>
#include <vector>
#include <type_traits>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <atomic>
#include "threadsafe_queue.h"
 
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::unique_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   void operator ()() { impl->call(); }
 
   function_wrapper() = default;
 
   function_wrapper(function_wrapper&& other) :
      impl(std::move(other.impl))
   {}
 
   function_wrapper& operator = (function_wrapper&& other)
   {
      impl = std::move(other.impl);
      return *this;
   }
 
   function_wrapper(const function_wrapper&) = delete;
   function_wrapper(function_wrapper&) = delete;
   function_wrapper& operator = (function_wrapper& other) = delete;
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
class thread_pool
{
   std::atomic<bool> done;
   threadsafe_queue<function_wrapper> work_queue;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.try_pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      //if (thread_count <= 1 || !work_queue.is_lock_free())
         //throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
 
/////////////////////////////////////////////////////////////////////
 
void foo() { std::cout << "Hello from id: " << std::this_thread::get_id() << std::endl; }
int plus(int x, int y) {return x+y;}
 
int main()
{
   thread_pool pool;
   auto f = pool.submit(std::bind(plus, 10, 20));
   for (int i = 0; i < 10; ++i)
      pool.submit(foo);
   std::cout << "10 + 20 = " << f.get() << "\n";
 
   system("pause");
   return 0;
}
Я же вместо самописной очереди хочу воспользоваться потокобезопасной очередью из boost. Для этого надо что бы тип, который параметризует boost::lockfree::queue удовлетворял следующие условия:
T must have a copy constructor
T must have a trivial assignment operator
T must have a trivial destructor
Ну переделал function_wrapper на это:
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
полный код
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <iostream>
#include <vector>
#include <type_traits>
#include <functional>
#include <exception>
#include <memory>
#include <thread>
#include <future>
#include <atomic>
#include <boost/lockfree/queue.hpp>
 
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
class thread_pool
{
   std::atomic<bool> done;
   boost::lockfree::queue<function_wrapper> work_queue;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      if (thread_count <= 1 || !work_queue.is_lock_free())
         throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
 
/////////////////////////////////////////////////////////////////////
 
void foo() { std::cout << "Hello from id: " << std::this_thread::get_id() << std::endl; }
int plus(int x, int y) {return x+y;}
 
int main()
{
   thread_pool pool;
   auto f = pool.submit(std::bind(plus, 10, 20));
   for (int i = 0; i < 10; ++i)
      pool.submit(foo);
   std::cout << "10 + 20 = " << f.get() << "\n";
 
   system("pause");
   return 0;
}
А компилятор мне ассерты выкидывает, мол у вашого класса нету тривиального деструктора и тривиального оператора присваивания. Как же еще надо переделывать function_wrapper что бы он "влез" в очередь?
И еще, не страдает ли function_wrapper с shared_ptr плохим дизайном? Ведь этот класс скрывает в себе обэкт который должен только перемещатся. Как я понимаю, при копировании shared_ptr обэкт на корый он указывает копироваться не будет и объекты function_wrapper теперь можно смело копировать.
Similar
Эксперт
41792 / 34177 / 6122
Регистрация: 12.04.2006
Сообщений: 57,940
24.07.2014, 12:06     Пул потоков на основе boost::lockfree::queue
Посмотрите здесь:

Есть ли в C++ пул потоков - C++
Перерыл много ссылок и не мог найти есть ли в с++ пул потоков,все источники указывают на с# ThreadPol,у меня курсовая пул потоков а...

Пул потоков с семафорами - C++
Задача:написать свой пуль потоков Написал вот такой код #include &lt;windows.h&gt; #include &quot;Worker.h&quot; #include&lt;list&gt; class...

Реализовать пул потоков, в который можно помещать функцию - C++
не могу реализовать как в книге главу 9 пул потоков который,в который можно помещать функцию что бы она возвращала значение вот код который...

Решить проблему: простой веб сервер на основе boost::asio падает именно при первом же обращении к серверу - C++
Запускаю код указанный ниже, но при обращении на айпи и порт который слушает сервер, сервер падает (debug error, abord) А браузер пишет...

Пример использование стека на основе массива и на основе двунаправленного списка - C++
здраствуте, можете привести примеры использывания стека на основе массива(1 код) и стек на основе двунаправленного списка(2...

как создать динамический пул максимально возможного размера? - C++
для создания динамического пула пользуемся функцией HeapCreate(), но что указать в качестве параметров - размеров тогда? HANDLE...

Статический пул для класса с помошью class::operator new непонятное исключение? - C++
Привет всем! Пытаюсь организовать статический пул(молниеносный аллокатор памяти. ) для некоторого класса. Написал сравнительный тест...

Queue сортировка - C++
Задание : отсортировать чергу по возростанию не использевать ООП queue &lt;int&gt; q; sort(q.front();q.back()) // не пашет подскажите...

контейнер queue - C++
здравствуйте , хотелось бы узнать как полечить доступ к элементам контейнера queue и как вывести элементы этого контейнера) помогите плиз)

Queue и пользовательский класс - C++
Недавно начал изучать STL, хотел поработать с queue. Можно ли очередь с типом, который я создал сам? Например class Simple { int a; ...

queue удаление из очереди - C++
Создать однонаправленную очередь с числами в диапазоне от –50 до +50. Удалить из очереди все элементы, расположенные до минимального ...

Контейнерный класс Queue - C++
Создать абстрактный базовый класс с виртуальной функцией - сумма прогрессии. Создать производные классы: арифметическая прогрессия и...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
После регистрации реклама в сообщениях будет скрыта и будут доступны все возможности форума.
ForEveR
В астрале
Эксперт С++
7968 / 4730 / 320
Регистрация: 24.06.2010
Сообщений: 10,539
Завершенные тесты: 3
24.07.2014, 12:40     Пул потоков на основе boost::lockfree::queue #2
A destructor is trivial if it is not user-provided and if:
— the destructor is not virtual,
— all of the direct base classes of its class have trivial destructors, and
— for all of the non-static data members of its class that are of class type (or array thereof), each such
class has a trivial destructor.

Otherwise, the destructor is non-trivial.
shared_ptr не является trivially_destructible.

A copy/move assignment operator for class X is trivial if it is not user-provided, its declared parameter type
is the same as if it had been implicitly declared, and if
— class X has no virtual functions (10.3) and no virtual base classes (10.1), and
— the assignment operator selected to copy/move each direct base class subobject is trivial, and
— for each non-static data member of X that is of class type (or array thereof), the assignment operator
selected to copy/move that member is trivial;

otherwise the copy/move assignment operator is non-trivial.
Ну и не является trivially_assignable.
DiffEreD
1429 / 766 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
24.07.2014, 13:22  [ТС]     Пул потоков на основе boost::lockfree::queue #3
Ясненько. Запихнуть невпихуемое не выйдет.
Я вот только что попробовал переписать с boost::lockfree::spsc_queue - все заработало. С описания из документации, я так понял, что эта очередь будет медленнее чем boost::lockfree::queue, но это не критично.
Вообщем, кто писал пулы потоков, нормальная такая реализация?
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
boost::lockfree::spsc_queue<function_wrapper> work_queue(128);
 
class thread_pool
{
   std::atomic<bool> done;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      if (thread_count <= 1 || !work_queue.is_lock_free())
         throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
Yandex
Объявления
24.07.2014, 13:22     Пул потоков на основе boost::lockfree::queue
Ответ Создать тему
Опции темы

КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2017, vBulletin Solutions, Inc.
Рейтинг@Mail.ru