Форум программистов, компьютерный форум CyberForum.ru
Наши страницы

С++ для начинающих

Войти
Регистрация
Восстановить пароль
 
DiffEreD
1430 / 767 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
#1

Пул потоков на основе boost::lockfree::queue - C++

24.07.2014, 12:06. Просмотров 1038. Ответов 2
Метки нет (Все метки)

В книге "Энтони Уильямс - Параллельное программирование на С++ в действии" обсуждается пул потоков на основе самописной потокобезопасной очереди:
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include <iostream>
#include <vector>
#include <type_traits>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <atomic>
#include "threadsafe_queue.h"
 
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::unique_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   void operator ()() { impl->call(); }
 
   function_wrapper() = default;
 
   function_wrapper(function_wrapper&& other) :
      impl(std::move(other.impl))
   {}
 
   function_wrapper& operator = (function_wrapper&& other)
   {
      impl = std::move(other.impl);
      return *this;
   }
 
   function_wrapper(const function_wrapper&) = delete;
   function_wrapper(function_wrapper&) = delete;
   function_wrapper& operator = (function_wrapper& other) = delete;
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
class thread_pool
{
   std::atomic<bool> done;
   threadsafe_queue<function_wrapper> work_queue;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.try_pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      //if (thread_count <= 1 || !work_queue.is_lock_free())
         //throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
 
/////////////////////////////////////////////////////////////////////
 
void foo() { std::cout << "Hello from id: " << std::this_thread::get_id() << std::endl; }
int plus(int x, int y) {return x+y;}
 
int main()
{
   thread_pool pool;
   auto f = pool.submit(std::bind(plus, 10, 20));
   for (int i = 0; i < 10; ++i)
      pool.submit(foo);
   std::cout << "10 + 20 = " << f.get() << "\n";
 
   system("pause");
   return 0;
}
Я же вместо самописной очереди хочу воспользоваться потокобезопасной очередью из boost. Для этого надо что бы тип, который параметризует boost::lockfree::queue удовлетворял следующие условия:
T must have a copy constructor
T must have a trivial assignment operator
T must have a trivial destructor
Ну переделал function_wrapper на это:
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
полный код
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <iostream>
#include <vector>
#include <type_traits>
#include <functional>
#include <exception>
#include <memory>
#include <thread>
#include <future>
#include <atomic>
#include <boost/lockfree/queue.hpp>
 
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
class thread_pool
{
   std::atomic<bool> done;
   boost::lockfree::queue<function_wrapper> work_queue;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      if (thread_count <= 1 || !work_queue.is_lock_free())
         throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
 
/////////////////////////////////////////////////////////////////////
 
void foo() { std::cout << "Hello from id: " << std::this_thread::get_id() << std::endl; }
int plus(int x, int y) {return x+y;}
 
int main()
{
   thread_pool pool;
   auto f = pool.submit(std::bind(plus, 10, 20));
   for (int i = 0; i < 10; ++i)
      pool.submit(foo);
   std::cout << "10 + 20 = " << f.get() << "\n";
 
   system("pause");
   return 0;
}
А компилятор мне ассерты выкидывает, мол у вашого класса нету тривиального деструктора и тривиального оператора присваивания. Как же еще надо переделывать function_wrapper что бы он "влез" в очередь?
И еще, не страдает ли function_wrapper с shared_ptr плохим дизайном? Ведь этот класс скрывает в себе обэкт который должен только перемещатся. Как я понимаю, при копировании shared_ptr обэкт на корый он указывает копироваться не будет и объекты function_wrapper теперь можно смело копировать.
0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Similar
Эксперт
41792 / 34177 / 6122
Регистрация: 12.04.2006
Сообщений: 57,940
24.07.2014, 12:06
Здравствуйте! Я подобрал для вас темы с ответами на вопрос Пул потоков на основе boost::lockfree::queue (C++):

Есть ли в C++ пул потоков - C++
Перерыл много ссылок и не мог найти есть ли в с++ пул потоков,все источники указывают на с# ThreadPol,у меня курсовая пул потоков а...

Пул потоков с семафорами - C++
Задача:написать свой пуль потоков Написал вот такой код #include &lt;windows.h&gt; #include &quot;Worker.h&quot; #include&lt;list&gt; class...

Реализовать пул потоков, в который можно помещать функцию - C++
не могу реализовать как в книге главу 9 пул потоков который,в который можно помещать функцию что бы она возвращала значение вот код который...

Решить проблему: простой веб сервер на основе boost::asio падает именно при первом же обращении к серверу - C++
Запускаю код указанный ниже, но при обращении на айпи и порт который слушает сервер, сервер падает (debug error, abord) А браузер пишет...

Error C2664: Client::First: невозможно преобразовать параметр 1 из "std::queue<_Ty>" в "std::queue<_Ty> &" - C++
barbershop.h: #ifndef __BARBERSHOP_H__ #define __BARBERSHOP_H__ #include &lt;vector&gt; #include &lt;queue&gt; #include &quot;wantshaircut.h&quot; ...

Создание и завершение процессов и потоков. Приоритеты выполнения потоков - C++
Здравствуйте. Буду очень раз если поможете понять,что конкретно нужно сделать в вот этом задании,пока особого представления о...

2
ForEveR
В астрале
Эксперт С++
7978 / 4737 / 321
Регистрация: 24.06.2010
Сообщений: 10,543
Завершенные тесты: 3
24.07.2014, 12:40 #2
A destructor is trivial if it is not user-provided and if:
— the destructor is not virtual,
— all of the direct base classes of its class have trivial destructors, and
— for all of the non-static data members of its class that are of class type (or array thereof), each such
class has a trivial destructor.

Otherwise, the destructor is non-trivial.
shared_ptr не является trivially_destructible.

A copy/move assignment operator for class X is trivial if it is not user-provided, its declared parameter type
is the same as if it had been implicitly declared, and if
— class X has no virtual functions (10.3) and no virtual base classes (10.1), and
— the assignment operator selected to copy/move each direct base class subobject is trivial, and
— for each non-static data member of X that is of class type (or array thereof), the assignment operator
selected to copy/move that member is trivial;

otherwise the copy/move assignment operator is non-trivial.
Ну и не является trivially_assignable.
1
DiffEreD
1430 / 767 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
24.07.2014, 13:22  [ТС] #3
Ясненько. Запихнуть невпихуемое не выйдет.
Я вот только что попробовал переписать с boost::lockfree::spsc_queue - все заработало. С описания из документации, я так понял, что эта очередь будет медленнее чем boost::lockfree::queue, но это не критично.
Вообщем, кто писал пулы потоков, нормальная такая реализация?
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
boost::lockfree::spsc_queue<function_wrapper> work_queue(128);
 
class thread_pool
{
   std::atomic<bool> done;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      if (thread_count <= 1 || !work_queue.is_lock_free())
         throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
0
MoreAnswers
Эксперт
37091 / 29110 / 5898
Регистрация: 17.06.2006
Сообщений: 43,301
24.07.2014, 13:22
Привет! Вот еще темы с ответами:

Пример использование стека на основе массива и на основе двунаправленного списка - C++
здраствуте, можете привести примеры использывания стека на основе массива(1 код) и стек на основе двунаправленного списка(2...

как создать динамический пул максимально возможного размера? - C++
для создания динамического пула пользуемся функцией HeapCreate(), но что указать в качестве параметров - размеров тогда? HANDLE...

Статический пул для класса с помошью class::operator new непонятное исключение? - C++
Привет всем! Пытаюсь организовать статический пул(молниеносный аллокатор памяти. ) для некоторого класса. Написал сравнительный тест...

Queue сортировка - C++
Задание : отсортировать чергу по возростанию не использевать ООП queue &lt;int&gt; q; sort(q.front();q.back()) // не пашет подскажите...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
3
Ответ Создать тему
Опции темы

КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2017, vBulletin Solutions, Inc.
Рейтинг@Mail.ru