Форум программистов, компьютерный форум CyberForum.ru

Пул потоков на основе boost::lockfree::queue - C++

Восстановить пароль Регистрация
Другие темы раздела
C++ Не найдена зависимая сборка http://www.cyberforum.ru/cpp-beginners/thread1233154.html
Нашел статью, в которой говорится, что OpenMP от VS2005 работает лучше, нежели от VS2010. Собрал проект согласно инструкции; запустил, потестил и согласился с ней. Вот только одно но: собранный экзешник на голой винде не запускается, в журнале событий пишет следующее: Ошибка при создании контекста активации для "C:\Users\____\Desktop\4 branch\new.exe". Не найдена зависимая сборка...
C++ Как сделать чтобы каждый раз рандомное число менялось? Вопрос таков: Как сделать чтобы каждый раз рандомное число менялось? И каким образом или на каком этапе "выбирается" это число, т.к сколько бы раз я не компилировал всегда выводится 42. #include<iostream> #include<cstdlib> using namespace std; int main() { int n,m; n=rand()%100+1; cout<<"Enter number m="; http://www.cyberforum.ru/cpp-beginners/thread1233148.html
C++ Вывести одно целое число– количество возможных разбиений войска на отряды
Согласно многовековой традиции, сэр Петрейн каждую субботу ходит охотиться на дракона. Однако, за один вечер до выхода в очередной поход, он понял, что просто невозможно идти охотится на дракона без войска, состоящего из n верных воинов. Более того, чтобы охота получилась удачной, войско нужно разбить на три отряда, каждым из которых будет командовать опытный и закаленный в боях командир. У...
C++ Двойной цикл: нужно получить первое значение если выполнится условие
есть двойной цикл и нужно получить первое значение если выполнится это условие if (!field->IsWin()) получаем значения и останавливаем циклы как это сделать? for (int x = 0; x < columns; ++x) { for (int y = 0; y < rows; ++y) { if (field) { if (!field->IsWin()) { posX=x; posY=y;
C++ Как правильно добавлять и удалять элементы в вектор и из него http://www.cyberforum.ru/cpp-beginners/thread1233116.html
Всем доброго времени суток. Прошу объяснить как правильно добавлять и удалять элементы в вектор и из него... Использую его для хранения объектов разных классов(понимаю что cut, но так удобнее). Инициализация: std::vector<void*> ArrayElements; По нажатию на кнопку добавляю: ArrayElements.push_back( (void*)(new SystemRZA(tmp))); С другой кнопки удаляю:...
C++ Создайте класс employee, который содержит имя (объект класса string) и номер (типа long) служащего Создайте класс employee, который содержит имя (объект класса string) и но- мер (типа long) служащего. Включите в него метод getdata(), предназна- ченный для получения данных от пользователя и помещения их в объект, и метод putdata(), для вывода данных. Предполагаем, что имя не может иметь внутренних пробелов. Напишите функцию main(), использующую этот класс. Вам нужно будет создать массив... подробнее

Показать сообщение отдельно
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
24.07.2014, 12:06     Пул потоков на основе boost::lockfree::queue
В книге "Энтони Уильямс - Параллельное программирование на С++ в действии" обсуждается пул потоков на основе самописной потокобезопасной очереди:
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include <iostream>
#include <vector>
#include <type_traits>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <atomic>
#include "threadsafe_queue.h"
 
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::unique_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   void operator ()() { impl->call(); }
 
   function_wrapper() = default;
 
   function_wrapper(function_wrapper&& other) :
      impl(std::move(other.impl))
   {}
 
   function_wrapper& operator = (function_wrapper&& other)
   {
      impl = std::move(other.impl);
      return *this;
   }
 
   function_wrapper(const function_wrapper&) = delete;
   function_wrapper(function_wrapper&) = delete;
   function_wrapper& operator = (function_wrapper& other) = delete;
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
class thread_pool
{
   std::atomic<bool> done;
   threadsafe_queue<function_wrapper> work_queue;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.try_pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      //if (thread_count <= 1 || !work_queue.is_lock_free())
         //throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
 
/////////////////////////////////////////////////////////////////////
 
void foo() { std::cout << "Hello from id: " << std::this_thread::get_id() << std::endl; }
int plus(int x, int y) {return x+y;}
 
int main()
{
   thread_pool pool;
   auto f = pool.submit(std::bind(plus, 10, 20));
   for (int i = 0; i < 10; ++i)
      pool.submit(foo);
   std::cout << "10 + 20 = " << f.get() << "\n";
 
   system("pause");
   return 0;
}
Я же вместо самописной очереди хочу воспользоваться потокобезопасной очередью из boost. Для этого надо что бы тип, который параметризует boost::lockfree::queue удовлетворял следующие условия:
T must have a copy constructor
T must have a trivial assignment operator
T must have a trivial destructor
Ну переделал function_wrapper на это:
Кликните здесь для просмотра всего текста
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
полный код
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <iostream>
#include <vector>
#include <type_traits>
#include <functional>
#include <exception>
#include <memory>
#include <thread>
#include <future>
#include <atomic>
#include <boost/lockfree/queue.hpp>
 
class function_wrapper
{
   struct impl_base {
      virtual void call() = 0;
      virtual ~impl_base() {}
   };
 
   std::shared_ptr<impl_base> impl;
 
   template <typename F>
   struct impl_type : impl_base
   {
      F f;
      impl_type(F&& f_) : f(std::move(f_)) {}
      void call() override { f(); }
   };
 
public:
   template <typename F>
   function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f)))
   {}
 
   function_wrapper() = default;
   function_wrapper(const function_wrapper&) = default;
   function_wrapper(function_wrapper&) = default;
   ~function_wrapper() = default;
 
   void operator ()() { impl->call(); }
};
 
class join_thread
{
   std::vector<std::thread>& threads;
public:
   explicit join_thread(std::vector<std::thread>& threads_) :
      threads(threads_)
   {}
   ~join_thread()
   {
      for (auto& thread : threads) {
         if (thread.joinable()) thread.join();
      }
   }
};
 
class thread_pool
{
   std::atomic<bool> done;
   boost::lockfree::queue<function_wrapper> work_queue;
   std::vector<std::thread> threads;
   join_thread joiner;
 
   void worker_thread()
   {
      while (!done)
      {
         function_wrapper task;
         if (work_queue.pop(task))
            task();
         else
            std::this_thread::yield();
      }
   }
 
public:
   thread_pool() : done(false), joiner(threads)
   {
      size_t const thread_count = std::thread::hardware_concurrency();
 
      if (thread_count <= 1 || !work_queue.is_lock_free())
         throw std::logic_error("not compatible system for thread_pool");
 
      try {
         for (size_t i = 0; i < thread_count; ++i)
            threads.emplace_back(&thread_pool::worker_thread, this);
      }
      catch(...) {
         done = true;
         throw;
      }
   }
 
   ~thread_pool() { done = true; }
 
   template <typename FunctionType>
   std::future<typename std::result_of<FunctionType()>::type>
   submit(FunctionType f)
   {
      typedef typename std::result_of<FunctionType()>::type
            result_type;
      std::packaged_task<result_type()> task(std::move(f));
      std::future<result_type> res(task.get_future());
      work_queue.push(std::move(task));
      return res;
   }
};
 
/////////////////////////////////////////////////////////////////////
 
void foo() { std::cout << "Hello from id: " << std::this_thread::get_id() << std::endl; }
int plus(int x, int y) {return x+y;}
 
int main()
{
   thread_pool pool;
   auto f = pool.submit(std::bind(plus, 10, 20));
   for (int i = 0; i < 10; ++i)
      pool.submit(foo);
   std::cout << "10 + 20 = " << f.get() << "\n";
 
   system("pause");
   return 0;
}
А компилятор мне ассерты выкидывает, мол у вашого класса нету тривиального деструктора и тривиального оператора присваивания. Как же еще надо переделывать function_wrapper что бы он "влез" в очередь?
И еще, не страдает ли function_wrapper с shared_ptr плохим дизайном? Ведь этот класс скрывает в себе обэкт который должен только перемещатся. Как я понимаю, при копировании shared_ptr обэкт на корый он указывает копироваться не будет и объекты function_wrapper теперь можно смело копировать.
После регистрации реклама в сообщениях будет скрыта и будут доступны все возможности форума.
 
Текущее время: 12:58. Часовой пояс GMT +3.
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2016, vBulletin Solutions, Inc.
Рейтинг@Mail.ru