Эксперт С++
8385 / 6147 / 615
Регистрация: 10.12.2010
Сообщений: 28,683
Записей в блоге: 30
1

Потокобезопасная очередь: критика реализации

05.09.2015, 15:56. Показов 7474. Ответов 25

Author24 — интернет-сервис помощи студентам
Прочитал третью часть
"Уильямс Э. "Параллельное программирование на С++ в действии. Практика разработки многопоточных программ" - 2012"
увидел там пример реализации потокобезопасной очереди, но к своему удивлению пример реализации основан на блокировании всей очереди.
Погуглив увидел те же примеры с блокированием всего контейнера.

Ранее я смотрел лекториум

Собственно вероятно можно реализовать с блокировками более оптимальным образом не блокирую "всё".

Накатал такую реализацию:
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#include <thread>
#include <mutex>
using namespace  std;
 
template <typename T>
struct LockableNode
{
   T data;
   LockableNode<T>* next;
 
   // for std::lock_guard & std::lock
   void lock()  { m_.lock();   }
   void unlock(){ m_.unlock(); }
 
   LockableNode(T data=T(),LockableNode<T>* next=0)
       :data(data),next(next)
   { }
 
   private:
    mutex m_;
};
 
template <typename T>
class Queue
{
   public:
      Queue();
      ~Queue();
 
      void push(T e);
      bool pop(T& e);
      //T popAndWait();
 
      void clear();
 
   private:
      void unsynchronizedClear();
 
      typedef LockableNode<T> NodeType;
      typedef std::lock_guard<NodeType> NodeGuard;
      typedef NodeType* NodePtr;
 
      NodePtr fictiveHead_;
      NodePtr fictiveTail_;
};
 
template <typename T>
Queue<T>::Queue()
   :fictiveHead_(new LockableNode<T>()),
    fictiveTail_(new LockableNode<T>())
{}
 
 
template <typename T>
void Queue<T>::unsynchronizedClear()
{
  for(NodePtr ptr= fictiveHead_->next;
      ptr;
      fictiveHead_->next= ptr->next)
  {
    delete ptr;
  }
  fictiveHead_->next= fictiveTail_->next= 0;
}
 
template <typename T>
void Queue<T>::clear()
{
  lock(*fictiveHead_,*fictiveTail_);
  unsynchronizedClear();
}
 
 
template <typename T>
Queue<T>::~Queue()
{
  unsynchronizedClear();
  delete fictiveHead_;
  delete fictiveTail_;
}
 
 
template <typename T>
void Queue<T>::push(T e)
{  
  NodePtr newNode=  new LockableNode<T>(e);
  NodeGuard tailGuard(*fictiveTail_);
 
  if(fictiveTail_->next) // has real Node
  {
    NodeGuard lastGuard(*fictiveTail_->next);
    NodePtr   last=  fictiveTail_->next;
    last->next =     fictiveTail_->next = newNode;
  }
  else  // if empty
  {
    NodeGuard headGuard(*fictiveHead_);
    fictiveHead_->next = fictiveTail_->next = newNode;
  }
}
 
template <typename T>
bool Queue<T>::pop(T& e)
{
  NodeGuard headGuard(*fictiveHead_);
  if(fictiveHead_->next)
  {
    NodeGuard firstGuard(*(fictiveHead_->next));
    NodePtr   first= fictiveHead_->next;
 
    if(first->next)
    {
      fictiveHead_->next = first->next;
    }
    else // one real Node
    {
      NodeGuard tailGuard(*fictiveTail_);
      fictiveHead_->next= fictiveTail_->next= 0; // queue will be empty
    }
 
    e= first->data;
    delete first;
    return true;
 }
  else
      return false;
}
 
 
//template <typename T>
//T Queue<T>::popAndWait()
//{
 
//}
 
 
void fillQ(Queue<string>& sq)
{
   for(int i=0; i<50 ; ++i)
   {
      this_thread::sleep_for(chrono::milliseconds(rand()%500));
      sq.push(to_string(i));
   }
}
//-------------------------------------------------------------------------
 
int main()
{
  srand(time(0));
  {
    Queue<string> sq;
    thread th(&fillQ,ref(sq));
 
    for(;;)
    {
      string s;
      if(sq.pop(s))
        cout<< s <<endl;
      else
      {
        cout<< "-" <<endl;
        this_thread::sleep_for(chrono::milliseconds(rand()%500));
      }
    }
 
    th.join();
  }
    cout<<"done!"<<endl;
    return 0;
}


Потокобезопасная очередь: критика реализации


Но не уверен что сделал все провильно, а так же у меня дилема насчет реализацией popAndWait() метода.

Возможно кто-то подскажет ссылки или лит-ру на эту тему?
Ну или кто-то укажет на ошибки в моем коде?
2
Programming
Эксперт
94731 / 64177 / 26122
Регистрация: 12.04.2006
Сообщений: 116,782
05.09.2015, 15:56
Ответы с готовыми решениями:

Создать очередь вещественных значений, для реализации используя односвязные списки
Создать очередь вещественных значений, для реализации используя односвязные списки. Реализовать...

Отделение интерфейса от реализации класса: компиляция кода реализации
Доброго времени суток, У меня возникла проблема с отделением интерфейса от реализации класса....

Потокобезопасная очередь
Необходимо очередь, которая удовлетворяет следующим условиям: 1. Потокобезопасная (N писателей и...

Потокобезопасная очередь
нужна потокобезопасная очередь с ожиданием и отказами. очередь должна быть с ограничением по...

25
Эксперт С++
8385 / 6147 / 615
Регистрация: 10.12.2010
Сообщений: 28,683
Записей в блоге: 30
05.09.2015, 16:09  [ТС] 2
Точная ссылка на видео с привязкой к времени https://www.youtube.com/watch?... Tps#t=2988
0
2782 / 1935 / 570
Регистрация: 05.06.2014
Сообщений: 5,600
05.09.2015, 16:31 3
Цитата Сообщение от Avazart Посмотреть сообщение
Собственно вероятно можно реализовать с блокировками более оптимальным образом не блокирую "всё".
Это уже на месте смотреть надо. Конструктор std::mutex, скорее всего дергает системный CreateMutex. А скорость системных вызовов вызывает у меня сильные сомнения (переключение контекстов, опять же, синхронизация). Как бы время на создание такой очереди не съело все выгоды от частичной блокировки.

Ну и плюс к тому, если ваша программа часто дергает синхронизацию, вероятно, многопоточность вы делаете неправильно. А если синхронизация дергается редко, так ли уж важно залочена вся очередь или только ее часть?
Цитата Сообщение от Avazart Посмотреть сообщение
Ну или кто-то укажет на ошибки в моем коде?
С ходу - очередь не блокируется при чтении. Если одновременно с этим чтением начнется запись, будет прочитан какой-то винегрет из старого и нового. UPD А, нет, не туда посмотрел, в pop защита есть.
1
1458 / 795 / 257
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
05.09.2015, 16:41 4
Цитата Сообщение от Avazart Посмотреть сообщение
вероятно можно реализовать с блокировками более оптимальным образом не блокирую "всё"
Там далее в книге будут рассмотрены потокобезопасные структуры данных без блокировок на основе атомарных переменных.
1
Ушел с форума
Эксперт С++
16473 / 7436 / 1187
Регистрация: 02.05.2013
Сообщений: 11,617
Записей в блоге: 1
05.09.2015, 17:03 5
Avazart, я, конечно, извиняюсь за оффтоп, но что-то у меня сильные сомнения
относительно эффективности такой очереди. Во-первых, объект синхронизации в каждом
элементе - это явный перебор. Так и вижу себе очередь с миллионом мьютексов.
Поиск в таком контейнере будет, как мне кажется, ну очень небыстрым, потому что
придется на время захватывать мьютекс каждого элемента.

Во-вторых, список - это, как правило (с прикладной точки зрения, а не теоретической),
вставка и удаление элементов в начало и конец. Это и будет тот самый "худший случай", в
который упрется вся производительность и это, по сути, ничем не лучше одного глобального
мьютекса на весь список. Потому что потоки в большинстве случаев будут конкурировать
не за средние элементы (что было бы хорошо, как показывал лектор), а за крайние.

Ну и в-третьих, не будем забывать, что вставка и удаление элемента в список - это
правка всего лишь одного или двух указателей, это несколько ассемблерных инструкций,
здесь для синхронизации даже мьютексы не всегда нужны, можно обойтись простейшим
спинлоком или CAS (cmpxchg).
5
Игогошка!
1801 / 708 / 44
Регистрация: 19.08.2012
Сообщений: 1,367
05.09.2015, 17:06 6
Цитата Сообщение от Avazart Посмотреть сообщение
Собственно вероятно можно реализовать с блокировками более оптимальным образом не блокирую "всё".
Можно реализовать кучей разных способов. Вопрос в конкретных требованиях к очереди. Плюс ты бежишь впереди паровоза - там дальше в книге будут и другие реализации.
Вообще я советую читать Уильямса всего лишь как некий гайд по С++ реализации, когда уже есть хорошая база в многопоточности. Потому что само многопоточное программирование книжка описывает слабенько.
0
Эксперт С++
8739 / 4317 / 960
Регистрация: 15.11.2014
Сообщений: 9,760
05.09.2015, 17:06 7
Цитата Сообщение от Убежденный Посмотреть сообщение
можно обойтись простейшим
спинлоком или CAS (cmpxchg).
в точку!

спинлок рулит и педалит, если нужно оберегать отдельный элемент,
обработка которого не занимает много времени.
0
Эксперт С++
8385 / 6147 / 615
Регистрация: 10.12.2010
Сообщений: 28,683
Записей в блоге: 30
05.09.2015, 17:55  [ТС] 8
Цитата Сообщение от Renji Посмотреть сообщение
Это уже на месте смотреть надо. Конструктор std::mutex, скорее всего дергает системный CreateMutex. А скорость системных вызовов вызывает у меня сильные сомнения (переключение контекстов, опять же, синхронизация). Как бы время на создание такой очереди не съело все выгоды от частичной блокировки.
Не факт, я больше склонаяюсь ктому то там критическая секция которая как я полагаю микс спинлока+мютекса

Цитата Сообщение от Renji Посмотреть сообщение
С ходу - очередь не блокируется при чтении. Если одновременно с этим чтением начнется запись, будет прочитан какой-то винегрет из старого и нового. UPD А, нет, не туда посмотрел, в pop защита есть.
Блокируются только крайние элементы первый/последний и фиктивные ноды.

Цитата Сообщение от Убежденный Посмотреть сообщение
Потому что потоки в большинстве случаев будут конкурировать
не за средние элементы (что было бы хорошо, как показывал лектор), а за крайние.
Ну вы это насчет видео, как бы для очереди как бы мне не нужен поиск/вставка в середину.

Цитата Сообщение от Убежденный Посмотреть сообщение
Ну и в-третьих, не будем забывать, что вставка и удаление элемента в список - это
правка всего лишь одного или двух указателей, это несколько ассемблерных инструкций,
здесь для синхронизации даже мьютексы не всегда нужны, можно обойтись простейшим
спинлоком или CAS (cmpxchg).
Может и так только я слабо себе предстваляю возможность "ожидания" при CAS.

Добавлено через 10 минут
Цитата Сообщение от DiffEreD Посмотреть сообщение
Там далее в книге будут рассмотрены потокобезопасные структуры данных без блокировок на основе атомарных переменных.
Ок будем читать дальше...
0
267 / 170 / 40
Регистрация: 25.08.2014
Сообщений: 1,087
Записей в блоге: 1
05.09.2015, 18:20 9
Цитата Сообщение от Avazart Посмотреть сообщение
LockableNode<T>* next;
А прикинь что next не залочен и его pop, пока ты добавляешь current (назовём так текущий). И куда указывает next теперь? Всё таки надо блокировать "трущиеся" детали, а они тут единственные попадают под методы - это конец и начало очереди.
0
Эксперт С++
8385 / 6147 / 615
Регистрация: 10.12.2010
Сообщений: 28,683
Записей в блоге: 30
05.09.2015, 18:33  [ТС] 10
Цитата Сообщение от Enno Посмотреть сообщение
А прикинь что next не залочен и его pop, пока ты добавляешь current (назовём так текущий). И куда указывает next теперь? Всё таки надо блокировать "трущиеся" детали, а они тут единственные попадают под методы - это конец и начало очереди.
Что-то я не понял вас, вы код смотрели? Первым дело блокируются фиктивные элементы а потом первый/последний(если они есть).
Т.е в каких случаях next может не быть заблокирован непонятно.
0
Ушел с форума
Эксперт С++
16473 / 7436 / 1187
Регистрация: 02.05.2013
Сообщений: 11,617
Записей в блоге: 1
05.09.2015, 18:36 11
Цитата Сообщение от Avazart Посмотреть сообщение
как бы для очереди как бы мне не нужен поиск/вставка в середину.
Так в этом и дело.
Выходит, что вставка или удаление в начало или конец очереди
(или списка) - это блокировка первого или последнего ее элементов.
Если несколько потоков будут пытаться делать это одновременно,
они по любому упрутся в одну из блокировок. Получается, что
это ничуть не лучше одной глобальной блокировки.

Цитата Сообщение от Avazart Посмотреть сообщение
я слабо себе предстваляю возможность "ожидания" при CAS
А там ждать вообще не надо.
Захватили CAS, быстренько переставили указатели и тут же выходим:
C++
1
2
3
4
5
6
7
void push_back(node<T> * pNode)
{
    AcquireLock(); // CAS
    m_pLastNode->Next = pNode; // Ну и хренасе тут мьютексу делать ? :)
    m_pLastNode = pNode;
    ReleaseLock();
}
Ни тебе переключений контекстов, ни системных вызовов, ничего.
Да, будет блокировка шины CPU, но она в любом случае будет при
захвате любого объекта синхронизации.
1
267 / 170 / 40
Регистрация: 25.08.2014
Сообщений: 1,087
Записей в блоге: 1
05.09.2015, 18:38 12
Avazart, да, упустил этот момент что-то. Окей, тогда разве ваш вариант не является всё той же полной блокировкой очереди? Я конечно не спец, но почему бы не выделить 1 объект синхронизации для всей очереди?
Цитата Сообщение от Avazart Посмотреть сообщение
for(NodePtr ptr= fictiveHead_->next; ptr; fictiveHead_->next= ptr->next)
{ delete ptr; }
ptr не меняется по циклу.
0
Эксперт С++
8385 / 6147 / 615
Регистрация: 10.12.2010
Сообщений: 28,683
Записей в блоге: 30
05.09.2015, 18:55  [ТС] 13
Цитата Сообщение от Убежденный Посмотреть сообщение
Если несколько потоков будут пытаться делать это одновременно,
они по любому упрутся в одну из блокировок. Получается, что
это ничуть не лучше одной глобальной блокировки.
Ну упираться они будут только если количество элементов 0...1 если на текущий момент там больше элементов то вставляющий и удаляющий элементы не будут "упираться", какждый будет блокировать свою пару крайних элементов.

Добавлено через 7 минут

Не по теме:

Цитата Сообщение от Enno Посмотреть сообщение
ptr не меняется по циклу.
Да там лажа, но не суть.



Добавлено через 1 минуту
Цитата Сообщение от Убежденный Посмотреть сообщение
Захватили CAS, быстренько переставили указатели и тут же выходим:
Ну так там два указателя, две операции, разве на CAS-ах можно реализовать такое?
Кроме того подозреваю там кое-где будет три операции в зависимости от кол-во элементов.

Добавлено через 5 минут
Цитата Сообщение от Enno Посмотреть сообщение
Я конечно не спец, но почему бы не выделить 1 объект синхронизации для всей очереди?
Потому что это излишне, по сути когда в контейнер не пуст мы используем только начало очереди для удаление и конец для добавления- при чем независимо.
Вставляющий поток не будет конкурировать с удаляющим, если контейнер имеет больше одного элемента в себе.
0
267 / 170 / 40
Регистрация: 25.08.2014
Сообщений: 1,087
Записей в блоге: 1
05.09.2015, 18:58 14
Цитата Сообщение от Avazart Посмотреть сообщение
Потому что это излишне, по сути когда в контейнер не пуст мы используем только начало очереди для удаление и конец для добавления- при чем независимо.
Понял. Это будет эффективно когда операции длятся долго. Например, когда данные размазаны по кластеру.
0
Эксперт С++
8385 / 6147 / 615
Регистрация: 10.12.2010
Сообщений: 28,683
Записей в блоге: 30
05.09.2015, 19:00  [ТС] 15
Цитата Сообщение от Enno Посмотреть сообщение
Понял. Это будет эффективно когда операции длятся долго. Например, когда данные размазаны по кластеру.
Скажем так когда вставляющий поток производит операции быстрее чем производится удаление удаляющим потоком.

Хотя опять же
Цитата Сообщение от Renji Посмотреть сообщение
Это уже на месте смотреть надо. Конструктор std::mutex, скорее всего дергает системный CreateMutex. А скорость системных вызовов вызывает у меня сильные сомнения (переключение контекстов, опять же, синхронизация). Как бы время на создание такой очереди не съело все выгоды от частичной блокировки.
Это хорошее заммечание вставка(создание мютекса) может тоже занимать значительное время, что нивелирует выигрышь, но это уже рализацию нужно смотреть и делать замеры.
0
Ушел с форума
Эксперт С++
16473 / 7436 / 1187
Регистрация: 02.05.2013
Сообщений: 11,617
Записей в блоге: 1
05.09.2015, 19:03 16
Цитата Сообщение от Avazart Посмотреть сообщение
Ну так там два указателя, две операции, разве на CAS-ах можно реализовать такое?
Можно. Правда, код после этого станет не совсем очевидным
Да, и есть же вот такой паттерн использования CAS (типа спинлок):
C
1
2
3
while (compare_ans_swap(&State, NewValue)) {}
// ...
release();
0
Эксперт С++
8385 / 6147 / 615
Регистрация: 10.12.2010
Сообщений: 28,683
Записей в блоге: 30
05.09.2015, 19:06  [ТС] 17
Убежденный, Мм а что такое release() ?
0
Ушел с форума
Эксперт С++
16473 / 7436 / 1187
Регистрация: 02.05.2013
Сообщений: 11,617
Записей в блоге: 1
05.09.2015, 19:08 18
Цитата Сообщение от Avazart Посмотреть сообщение
Мм а что такое release
Возврат State в исходное состояние (State = OldValue).
0
Эксперт С++
8385 / 6147 / 615
Регистрация: 10.12.2010
Сообщений: 28,683
Записей в блоге: 30
05.09.2015, 19:12  [ТС] 19
Ну compare_ans_swap(&State, NewValue) это операция над одним указателем.
0
Ушел с форума
Эксперт С++
16473 / 7436 / 1187
Регистрация: 02.05.2013
Сообщений: 11,617
Записей в блоге: 1
05.09.2015, 19:23 20
Не-а, не обязательно
Есть cmpxchg8b и cmpxchg16b, которые позволяют делать "double cas"
над данными размером в 4 и 8 байт, обычно указателями (x86 и x64).
Правда, они далеко не всеми CPU поддерживаются...
0
05.09.2015, 19:23
IT_Exp
Эксперт
87844 / 49110 / 22898
Регистрация: 17.06.2006
Сообщений: 92,604
05.09.2015, 19:23
Помогаю со студенческими работами здесь

Помогите найти ошибки в реализации класса «Очередь»
Класс «Очередь». Методы: добавление элемента, удаление элемента, удаление из очереди всех...

Используя модуль для реализации дека целых чисел, реализовать очередь на базе дека
Уважаемые программисты!Очень нужна Ваша помощь: (помогите решить, разобраться или хотябы просто...

Потокобезопасная коллекция
Здравствуйте, у меня следующая проблема: есть пользовательский класс Book, а также потокобезопасная...

Ява потокобезопасная переменная
Помогите пожалуйста с теорией потоко-безопасных переменных! Основы знаю(для Делфи учил), но на...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
20
Ответ Создать тему
Опции темы

КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2024, CyberForum.ru