Форум программистов, компьютерный форум, киберфорум
Loafer
Войти
Регистрация
Восстановить пароль
Оценить эту запись

Потокобезопасная очередь

Запись от Loafer размещена 10.08.2020 в 16:51

Оставляю здесь кусок кода, который возможно мне понадобится в будущем.
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#pragma once
 
// Очередь реализуется согласано книге Anthony Williams -
// "C++ Concurrency in Action: Practical Multithreading".
// Смотреть главу 6, раздел 6.2
 
#include <condition_variable>
#include <memory>
#include <mutex>
 
template <typename T>
class ThreadSafeQueue {
 private:
  using LockGuard = std::lock_guard<std::mutex>;
  using UniqueLock = std::unique_lock<std::mutex>;
 
  // Очередь реализуется в виде односвязного списка
  struct Node {
    // Данные в узле списка
    std::shared_ptr<T> data = nullptr;
 
    // Указатель на следующей элемент в списке
    std::unique_ptr<Node> next = nullptr;
  };
 
 private:
  // Потокобезопасный доступ к концу очереди
  const Node* get_tail() const {
    LockGuard tail_lock(m_tail_mutex);
    return m_tail;
  }
 
  // Извлечение из начала очереди
  std::unique_ptr<Node> pop_head() {
    std::unique_ptr<Node> old_head = std::move(m_head);
    m_head = std::move(old_head->next);
    return std::move(old_head);
  }
 
  // Ожидание появления данных в очереди
  UniqueLock wait_for_data() {
    UniqueLock head_lock(m_head_mutex);
    m_data_cond.wait(head_lock, [this] { return m_head.get() != get_tail(); });
    return std::move(head_lock);
  }
 
 public:
  // Создаем фиктивный узел, чтобы next никогда не указывал на nullptr.
  // Это необходимо для потокобезопасной работы и удобства.
  ThreadSafeQueue() : m_head(std::make_unique<Node>()), m_tail(m_head.get()) {}
  ThreadSafeQueue(const ThreadSafeQueue&) = delete;
  ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;
 
  // Проверка наличия элемента и если он есть, то извлечение
  std::shared_ptr<T> try_pop() {
    LockGuard head_lock(m_head_mutex);
    if (m_head.get() == get_tail()) {
      return nullptr;
    }
 
    std::unique_ptr<Node> old_head = pop_head();
    return old_head->data;
  }
 
  // Проверка наличия элемента и если он есть, то извлечение
  bool try_pop(T& value) {
    LockGuard head_lock(m_head_mutex);
    if (m_head.get() == get_tail()) {
      return false;
    }
 
    value = std::move(*m_head->data);
    pop_head();
    return true;
  }
 
  // Извлечение элемента из очереди с ожиданием
  std::shared_ptr<T> wait_and_pop() {
    UniqueLock head_lock(wait_for_data());
    std::unique_ptr<Node> old_head = pop_head();
    return old_head->data;
  }
 
  // Извлечение элемента из очереди с ожиданием
  void wait_and_pop(T& value) {
    UniqueLock head_lock(wait_for_data());
    value = std::move(*m_head->data);
    pop_head();
  }
 
  // Вставка нового элемента в очередь. Надо понимать тот факт,
  // что фиктивная нода всегда находится в конце связного списка.
  // Если на фиктивную ноду указывают начало и конец очереди,
  // значит очередь пуста.
  void push(T new_value) {
    std::shared_ptr<T> new_data = std::make_shared<T>(std::move(new_value));
    std::unique_ptr<Node> new_dummy_node = std::make_unique<Node>();
    {
      LockGuard tail_lock(m_tail_mutex);
      Node* old_tail = m_tail;
      m_tail = new_dummy_node.get();
      old_tail->data = std::move(new_data);
      old_tail->next = std::move(new_dummy_node);
    }
    m_data_cond.notify_one();
  }
 
  bool empty() const {
    LockGuard head_lock(m_head_mutex);
    return m_head.get() == get_tail();
  }
 
 private:
  // Начало очереди
  mutable std::mutex m_head_mutex;
  std::unique_ptr<Node> m_head;
 
  // Конец очереди
  mutable std::mutex m_tail_mutex;
  Node* m_tail;
 
  std::condition_variable m_data_cond;
};
Unit-тесты к предыдущему коду:
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include <algorithm>
#include <future>
#include <iostream>
#include <utility>
 
#include <gtest/gtest.h>
#include "thread_safe_queue.h"
 
class ThreadSafeQueueTest : public ::testing::Test {
 public:
  using Pair = std::pair<std::size_t, std::size_t>;
 
  void push_to_queue(ThreadSafeQueue<Pair>& queue, std::size_t number,
                     std::size_t count) {
    for (std::size_t i = 0; i < count; ++i) {
      queue.push(std::make_pair(number, i));
    }
  }
 
  std::vector<Pair> wait_and_pop_from_queue(ThreadSafeQueue<Pair>& queue,
                                            std::size_t count) {
    std::vector<Pair> pairs(count);
    for (std::size_t i = 0; i < count; ++i) {
      queue.wait_and_pop(pairs[i]);
    }
    return pairs;
  }
 
  std::vector<Pair> try_pop_from_queue(ThreadSafeQueue<Pair>& queue,
                                       std::size_t count) {
    std::vector<Pair> pairs;
    pairs.reserve(count);
    for (std::size_t i = 0; i < count; ++i) {
      auto result = queue.try_pop();
      if (result) {
        pairs.push_back(*result);
      }
    }
    return pairs;
  }
};
 
TEST_F(ThreadSafeQueueTest, one_push_pop) {
  ThreadSafeQueue<std::size_t> queue;
  ASSERT_TRUE(queue.empty());
 
  queue.push(1);
  ASSERT_FALSE(queue.empty());
 
  std::size_t pop_result = *queue.try_pop();
  ASSERT_TRUE(queue.empty());
  ASSERT_EQ(queue.try_pop(), nullptr);
  ASSERT_EQ(pop_result, 1);
 
  queue.push(2);
  ASSERT_FALSE(queue.empty());
 
  ASSERT_TRUE(queue.try_pop(pop_result));
  ASSERT_TRUE(queue.empty());
  ASSERT_EQ(pop_result, 2);
}
 
TEST_F(ThreadSafeQueueTest, many_push_pop) {
  ThreadSafeQueue<std::size_t> queue;
  ASSERT_TRUE(queue.empty());
 
  queue.push(1);
  queue.push(2);
  ASSERT_FALSE(queue.empty());
 
  std::size_t pop_result = *queue.try_pop();
  ASSERT_FALSE(queue.empty());
  ASSERT_EQ(pop_result, 1);
 
  ASSERT_TRUE(queue.try_pop(pop_result));
  ASSERT_TRUE(queue.empty());
  ASSERT_EQ(pop_result, 2);
}
 
TEST_F(ThreadSafeQueueTest, wait_and_pop_from_queue) {
  ThreadSafeQueue<Pair> queue;
 
  std::future<void> push1 = std::async(&ThreadSafeQueueTest::push_to_queue,
                                       this, std::ref(queue), 0, 5);
 
  std::future<void> push2 = std::async(&ThreadSafeQueueTest::push_to_queue,
                                       this, std::ref(queue), 1, 5);
 
  std::future<std::vector<Pair>> pop1 = std::async(
      &ThreadSafeQueueTest::wait_and_pop_from_queue, this, std::ref(queue), 10);
 
  push1.get();
  push2.get();
  auto result = pop1.get();
  std::stable_partition(result.begin(), result.end(),
                        [](const Pair& pair) { return pair.first == 0; });
 
  std::vector<Pair> expected = {{0, 0},
                                {0, 1},
                                {0, 2},
                                {0, 3},
                                {0, 4},
                                {1, 0},
                                {1, 1},
                                {1, 2},
                                {1, 3},
                                {1, 4}};
 
  ASSERT_TRUE(queue.empty());
  ASSERT_EQ(queue.try_pop(), nullptr);
  ASSERT_EQ(result, expected);
}
 
TEST_F(ThreadSafeQueueTest, try_pop_from_queue) {
  ThreadSafeQueue<Pair> queue;
 
  std::future<void> push1 = std::async(&ThreadSafeQueueTest::push_to_queue,
                                       this, std::ref(queue), 0, 5);
 
  std::future<void> push2 = std::async(&ThreadSafeQueueTest::push_to_queue,
                                       this, std::ref(queue), 1, 5);
 
  std::future<std::vector<Pair>> pop1 = std::async(
      &ThreadSafeQueueTest::try_pop_from_queue, this, std::ref(queue), 10);
 
  push1.get();
  push2.get();
  auto result = pop1.get();
 
  while (!queue.empty()) {
    result.push_back(*queue.try_pop());
  }
 
  std::stable_partition(result.begin(), result.end(),
                        [](const Pair& pair) { return pair.first == 0; });
 
  std::vector<Pair> expected = {{0, 0},
                                {0, 1},
                                {0, 2},
                                {0, 3},
                                {0, 4},
                                {1, 0},
                                {1, 1},
                                {1, 2},
                                {1, 3},
                                {1, 4}};
 
  ASSERT_TRUE(queue.empty());
  ASSERT_EQ(queue.try_pop(), nullptr);
  ASSERT_EQ(result, expected);
}
 
int main(int argc, char** argv) {
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}
Размещено в C++
Просмотров 450 Комментарии 11
Всего комментариев 11
Комментарии
  1. Старый комментарий
    Аватар для Croessmah
    Эта очередь не является потокобезопасной.
    Запись от Croessmah размещена 11.08.2020 в 21:23 Croessmah вне форума
  2. Старый комментарий
    Аватар для Avazart
    Кажется что-то намудренно с умными указателям.

    Да и наверное стоит на git сразу ссылку давать что бы качать/смотреть/пробовать удобнее было читателям.
    Запись от Avazart размещена 11.08.2020 в 21:46 Avazart на форуме
    Обновил(-а) Avazart 11.08.2020 в 21:48
  3. Старый комментарий
    Аватар для Loafer
    Цитата:
    Сообщение от Croessmah Просмотреть комментарий
    Эта очередь не является потокобезопасной.
    Хотелось бы пруфов.
    Запись от Loafer размещена 11.08.2020 в 22:15 Loafer вне форума
  4. Старый комментарий
    Аватар для Loafer
    Цитата:
    Сообщение от Avazart Просмотреть комментарий
    Кажется что-то намудренно с умными указателям.
    Где именно?

    Цитата:
    Сообщение от Avazart Просмотреть комментарий
    Да и наверное стоит на git сразу ссылку давать что бы качать/смотреть/пробовать удобнее было читателям.
    Для небольших фрагментов кода не хочется создавать отдельные репозитории на GitHub'е.
    Запись от Loafer размещена 11.08.2020 в 22:16 Loafer вне форума
  5. Старый комментарий
    Аватар для Avazart
    Для начала:
    C++
    1
    2
    3
    4
    5
    6
    
     // Извлечение из начала очереди
      std::unique_ptr<Node> pop_head() {
        std::unique_ptr<Node> old_head = std::move(m_head);
        m_head = std::move(old_head->next);
        return std::move(old_head);
      }
    Почему Вы решили что тут не нужны блокировки и это потокобезопасно?


    C++
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    std::shared_ptr<T> try_pop() {
        LockGuard head_lock(m_head_mutex);
        if (m_head.get() == get_tail()) {
          return nullptr;
        }
     
        std::unique_ptr<Node> old_head = pop_head();
        return old_head->data;
      }
    Зачем здесь shared_ptr ? Какая аргументация?
    Запись от Avazart размещена 11.08.2020 в 23:12 Avazart на форуме
    Обновил(-а) Avazart 11.08.2020 в 23:17
  6. Старый комментарий
    Аватар для Loafer
    Цитата:
    Сообщение от Avazart Просмотреть комментарий
    Для начала:
    C++
    1
    2
    3
    4
    5
    6
    
     // Извлечение из начала очереди
      std::unique_ptr<Node> pop_head() {
        std::unique_ptr<Node> old_head = std::move(m_head);
        m_head = std::move(old_head->next);
        return std::move(old_head);
      }
    Почему Вы решили что тут не нужны блокировки и это потокобезопасно?
    Метод pop_head является приватным, значит пользователь его никогда не вызовет извне. А внутри других публичных методов, где вызывается метод pop_head, метод pop_head всегда защищен мьютексом.

    Цитата:
    Сообщение от Avazart Просмотреть комментарий
    C++
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    std::shared_ptr<T> try_pop() {
        LockGuard head_lock(m_head_mutex);
        if (m_head.get() == get_tail()) {
          return nullptr;
        }
     
        std::unique_ptr<Node> old_head = pop_head();
        return old_head->data;
      }
    Зачем здесь shared_ptr ? Какая аргументация?
    Это один из перегруженных вариантов try_pop, который возвращает shared_ptr на извлеченный элемент из очереди. Я делал очередь на основании книги, которая указана в самом верхнем комментарии в коде. Почему автор выбрал shared_ptr, я информации в книге не нашел. Наверное, из-за каких-то своих собственных предпочтений.
    Запись от Loafer размещена 12.08.2020 в 00:27 Loafer вне форума
  7. Старый комментарий
    Аватар для Avazart
    Я бы делал так
    C++
    1
    
    bool try_pop(T& value)
    Или учитывая "моду"
    C++
    1
    
    std::optional<T> pop()
    Но там конечно изначально смартпоитеры используются (что как по мне жирнавато) ибо очередь на списке основана так что тут спорно. Но внешне как то не очень очевидно как по мне.
    Запись от Avazart размещена 12.08.2020 в 18:40 Avazart на форуме
    Обновил(-а) Avazart 12.08.2020 в 20:59
  8. Старый комментарий
    Аватар для Avazart
    Я кстати тоже когда то пытался с этой темой разобраться вот моя тема:
    https://www.cyberforum.ru/cpp-... 25177.html
    Запись от Avazart размещена 31.08.2020 в 00:40 Avazart на форуме
  9. Старый комментарий
    Аватар для XLAT
    Цитата:
    Сообщение от Croessmah Просмотреть комментарий
    Эта очередь не является потокобезопасной.
    и тут Anthony Williams низко грохнулся с пьедестала отца и просветителя
    под мощной критикой уважаемого гуру Croessmah.
    Запись от XLAT размещена 31.08.2020 в 15:22 XLAT вне форума
    Обновил(-а) XLAT 31.08.2020 в 17:12
  10. Старый комментарий
    Аватар для Loafer
    Цитата:
    Сообщение от XLAT Просмотреть комментарий
    и тут Anthony Williams низко грохнулся с пьедестала отца и просветителя
    под мощной критикой уважаемого гуру Croessmah.
    Кстати говоря, он так и не предоставил доказательств своего изречения.
    Запись от Loafer размещена 31.08.2020 в 18:23 Loafer вне форума
  11. Старый комментарий
    Аватар для Avazart
    Пиши в личку, в блогах не работают уведомления.
    Запись от Avazart размещена 01.09.2020 в 11:20 Avazart на форуме
 
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2020, vBulletin Solutions, Inc.