263 / 152 / 33
Регистрация: 29.06.2019
Сообщений: 1,545

Потоко-безопасная Очередь

05.04.2021, 20:16. Показов 13268. Ответов 106
Метки нет (Все метки)

Студворк — интернет-сервис помощи студентам
... вырисовалась тема отсюда
Цитата Сообщение от zayats80888 Посмотреть сообщение
Хотя не сложно написать такую очередь(с раздельной блокировкой для "хвоста" и "головы") самостоятельно.
вот выполнила последнюю задачку отсюда - но не знаю, может лучше shared_mutex вместо просто mutex? правда на него unique_lock<shared_mutex> не ставится...
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <thread>
#include <condition_variable>
#include  <mutex>
#include <chrono>
#include <queue>
 
using namespace std;
 
  bool bDone;
  mutex m;
  std::condition_variable cv;
 
int main() {
    int c = 0;
    bool done = false;
    queue<int> goods;
 
    thread producer([&]() {
        std::lock_guard<mutex> lock(m);
        for (int i = 0; i < 500; ++i) {            
            goods.push(i);
            c++;
            cv.notify_one();
        }
 
        done = true;
    });
 
    thread consumer([&]() {
        std::unique_lock<mutex> lock(m);
        while (!done) {
            while (!goods.empty()) {
                goods.pop();
                c--;
                while (!bDone) cv.wait(lock);
            }
        }
    });
 
    producer.join();
    consumer.join();
    cout << "Net: " << c << endl;
}
тут же не раздельная блокировка? - это FIFO, насколько понимаю...
а раздельная - вы, наверно, имеете ввиду для LIFO... наверно лучше на list выполнять?
sorry, что много вопросов - стою на распутье - даже в отдельную тему вынесла
0
IT_Exp
Эксперт
34794 / 4073 / 2104
Регистрация: 17.06.2006
Сообщений: 32,602
Блог
05.04.2021, 20:16
Ответы с готовыми решениями:

Потоко-независимая очередь записывает 2е команды в одну ячейку. Почему ?
Доброго времени суток. Написал 2а приложения общяющихся между собой по TCP. Одно является сервером(планшет), второе клиент(windows). ...

Потоко-безопасный ObservableCollection с AddRange
Вообщем, я заполняю в нескольких потоках массив. на данный момент использую &quot;List&lt;ExpandoObject&gt; Вывод&quot; из-за...

Является ли boost::asio::tcp::acceptor потоко-безопасным ?
Мне сложно понять смысл того что написано в документации: ...

106
263 / 152 / 33
Регистрация: 29.06.2019
Сообщений: 1,545
23.04.2021, 14:05  [ТС]
Студворк — интернет-сервис помощи студентам
пример std::notify_all_at_thread_exit (последний код)... там же пример logger'a
0
263 / 152 / 33
Регистрация: 29.06.2019
Сообщений: 1,545
28.04.2021, 21:37  [ТС]
ИТОГО
моя рабочая версия при ожидании 2х флагов - всё-так потом отдельно пройтись по каждому флагу... просто рассматривая пример и чей-то совет там , столкнулась с проблемой теряющихся элементов -- поправлю в этой ветке
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// https://stackoverflow.com/questions/57074998/condition-variable-with-a-while-loop-in-thread-in-c
 
// ПРОБЛЕМА ПОСЛЕДНЕГО ЭЛЕМЕНТА В producer-consumer queue на 2core-CPU, т.е. 2 consumer'a по коду
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <chrono>
#include <atomic>
 
using namespace std;
 
int main() {
    int n_core = std::thread::hardware_concurrency();
    vector<std::thread> workers;
    volatile int max = 5;
    volatile atomic<int> count = 0;
    std::condition_variable cv;
    std::mutex m, mp;
    int timecalled = 0;
 
    // consumers max qty
    for (int i = 0; i < n_core; i++) {
        workers.emplace_back(std::thread{[&max, &count, &m, &cv, &mp]() {
            while (true) {                
                std::unique_lock<std::mutex> lk{m};
 
                {
                    std::lock_guard<mutex> lock(mp);
                    std::cout << "t_id " << std::this_thread::get_id() << " cv wait start " << std::endl;
                }
                // ждет условную переменную, используя лямбда-функцию в качестве предиката и !! возвращает объект блокировки вызывающей программе
                cv.wait(lk, [&max, &count]() { return count > 0 || max==0; });
                //if(max<=0) break;
                // 1.           // count check решает ПРОБЛЕМУ ПОСЛЕДНЕГО ЭЛЕМЕНТА - чтобы 2й поток не уменьшил count ЕЩЁ БОЛЬШЕ до -1
                if ( count > 0 ) {
                
                    {
                        std::lock_guard<mutex> lock(mp);
                        std::cout << "t_id " << std::this_thread::get_id() << " consumer - " << count << std::endl;
                    }
                    count--;
                    
                    {
                        std::lock_guard<mutex> lock(mp);
                        std::cout << "t_id " << std::this_thread::get_id() << " notify dec" << std::endl;
                    }
                    
                    cv.notify_one();
                    //this_thread::sleep_for(chrono::milliseconds(1));
                }
                // 2. finish_check anyway
                if(max==0) {
                    {
                        std::lock_guard<mutex> lock(mp);
                        std::cout << "t_id " << std::this_thread::get_id() << " finished " << std::endl;
                    }
                    break;
                }
            }
        }});
    }
 
    // producing from main
    while (max > 0) {        
        std::unique_lock<std::mutex> lk{m};
               
        cv.wait(lk, [&count]() { return count == 0; });
        
        count++;
        max--;
        timecalled++;
        std::cout << "MAIN t_id " << std::this_thread::get_id() << " notify inc" << std::endl;
 
        cv.notify_one();
        //this_thread::sleep_for(chrono::milliseconds(1));
    }
 
    for (auto &w : workers) {
        w.join();
    }
 
    std::cout << "qty all:" << timecalled << std::endl; // must be equal to max
    std::cout << "count now: " << count << std::endl; // must be zero !!!
    
    return 0;
}
но если producer бросит исключение, то consumer зависнет в wait... поэтому логично ещё одно условие condition и ещё один thread для обработки исключения... примерно, как здесь Logger - и сброс флага после обработки...
вообще, конечно, столько флагов пихать в condition , потом их рассматривать... как-то добавляет кода...
Вильямс предлагает несколько манёвров по обращению с ошибками (p.229)
Вильямс предлагает несколько манёвров по обращению с ошибками
В части безопасности относительно исключений есть мелкая не-
приятность – если помещения данных в очередь ожидают несколь-
ко потоков, то лишь один из них будет разбужен в результате вызова
data_cond.notify_one(). Однако если этот поток возбудит исклю-
чение в wait_and_pop(), например при конструировании std::
shared_ptr<> ������, то ни один из оставшихся потоков разбужен не
будет. Если это неприемлемо, то можно заменить notify_one() на
data_cond.notify_all(), тогда будут разбужены все потоки, но за
это придется заплатить – большая часть из них сразу же уснет снова,
увидев, что очередь по-прежнему пуста.
Другой вариант – включить
в wait_and_pop() обращение к notify_one() в случае исключения,
тогда другой поток сможет попытаться извлечь находящееся в очереди.
значение.
Третий вариант – перенести инициализацию std::shared_
ptr<> в push() и сохранять экземпляры std::shared_ptr<>, а не
сами значения данных. Тогда при копировании std::shared_ptr<>
из внутренней очереди std::queue<> никаких исключений возник-
нуть не может, и wait_and_pop() становится безопасной.

- с учётом того, что он размещает в очередь std::shared_ptr<>

Добавлено через 8 минут
p.s.
- это, действительно, лишь blocking_queue, запускающая consumer'ов данных из общего потока... лишь бы код consumer'ов после снятия блокировки (после notify) больше нигде не нуждался в ней снова... и дальше можно идти параллельно (асинхронно в моём случае - через switch context)
0
263 / 152 / 33
Регистрация: 29.06.2019
Сообщений: 1,545
29.04.2021, 10:52  [ТС]
Цитата Сообщение от JeyCi Посмотреть сообщение
примерно, как здесь Logger
в try-catch обернуть
Логирование — единственный способ отладки недетерминированных сценариев (например, многопоточность или сеть)
Добавлено через 1 час 32 минуты
исключения в потоках - Безопасность потоков в С++
0
Заблокирован
29.04.2021, 12:51
Вот спасибо. Как раз то, что надо

Добавлено через 10 минут
Точно)
1
263 / 152 / 33
Регистрация: 29.06.2019
Сообщений: 1,545
29.04.2021, 20:35  [ТС]
и Exceptions, и RAII
- всё-таки не могу пока прикинуть, как поток можно инкапсулировать... чтобы И от всех members нормально перехватывались исключения (если вдруг), И join'ились объекты при этих исключениях самостоятельно... и обернуть consumer'а, например в такой Wrapper...
а то потокобезопасная то queue - это хорошо, но exception-safety тоже хотелось бы, да и join() в деструкторе и в случае exceptions... не знаю, как соединить инкапсуляцию и exceptions-dealing... например, выход из потока, когда др. поток бросает исключение, а этот висит в wait -- понятно, что с флагом можно поколдовать... но всё-таки хочется поток инкапсулировать для корректного удаления (с join) в случае чего - ... ведь
что стандартная библиотека противоречит принципу RAII — при создании std::thread мы сами должны позаботиться о корректном управлении ресурсами, то есть явно вызвать join или detach. По этой причине некоторые программисты советуют не использовать объекты std::thread. Так же как new и delete, std::thread предоставляет возможность построить на основе них более высокоуровневые инструменты.
p.s.
по захвату ресурсов и ЖЦ в общем и целом - здесь - но нюансы для разработки Wrapper'a для потока пока не знаю, кроме, как примеры посматриваю...

Добавлено через 22 минуты
p.p.s.
c future это, как уже оговаривалось, возможно - например
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 // каким образом мы можем передать исключение из одного потока в другой:
void worker(std::promise<void> & p) {
    try {
        throw std::runtime_error("exception from thread");
    } catch (...) {
        p.set_exception(std::current_exception());
    }
}
 
void checkThreadAndException() {
    std::promise<void> p;
    auto result = p.get_future();
 
    std::thread t(worker, ref(p));
    t.detach();
 
    try {
        result.get();
    } catch (const std::runtime_error & e) {
        std::cout << "runtime error catched from async worker" << std::endl;
    }
}
 
int main()
{
    checkThreadAndException();
    return 0;
}
Добавлено через 17 минут
Также стоит отметить различия в реализации компиляторов. Так, Visual Studio позволяет инициализировать объект exception строкой. Например, можно передать сообщение об ошибке
0
263 / 152 / 33
Регистрация: 29.06.2019
Сообщений: 1,545
30.04.2021, 11:24  [ТС]
Цитата Сообщение от JeyCi Посмотреть сообщение
И исключения (если вдруг), И join'ились
действительно,
при RAII - ведь если join в Деструкторе, а деструктор срабатывает при выходе в out-of-scope. -- то и проблем быть не должно, when unwinding stack... здесь
Think of exceptions as a separate return type that gets used only when needed.
p.s.
Note :
The use of Dynamic Exception Specification has been deprecated after C++11,
Добавлено через 1 час 47 минут
однако
exception-safe
1.Exception safety is rarely about writing try and catch-and the more rarely is the better.
2.Prefer handling exception cleanup automatically by using destructor instead of try/catch
но если придётся

Кликните здесь для просмотра всего текста
In general, there are three ways of handling error conditions:
1.Aborting the program.
2.Return an error code.
3.Throwing exceptions.

When called from main(), there are subtle differences:
1.return
- Destructors will be called for local/static/global objects.
2.exit
- No destructors will be called for local objects. But destructor for static/global objects will be called.
3.abort
- No destructors will be called.
“Often the best way to deal with exceptions is to not handle them at all. If you can let them pass through your code and allow destructors to handle cleanup, your code will be cleaner.” David Abrahams

The running thread will not be caught in your try/catch statement because it is running in another thread. Try/Catch only works for the current thread. What you need to do is have try/catch in the function being run by the thread, and have some way of managing what happens when that crash occurs.

Think of the stacks involved - when an exception is thrown, it goes up the stack until it reaches an appropriate catch block. The new thread has a completely separate stack to the creating thread, so it'll never reach the catch block in the creating thread's stack.

EDIT: Of course, you could design your system so that the creating thread did wait for other things to happen - a bit like the message loop in a Windows Forms application. The new thread could then catch the exception and send a message to the creating thread, which could then deal with the exception. That isn't the normal setup though - you have to do it all explicitly.
ИТОГО
The running thread will not be caught in your try/catch statement because it is running in another thread. Try/Catch only works for the current thread. What you need to do is have try/catch in the function being run by the thread, and have some way of managing what happens when that crash occurs.
с.39
It’s not enough to use RAII, to be exception safe.
Make extra care to uphold any invariants that you have defined.
- надо аккуратно (smart_ptr - может быть достаточно.....) помещать и извлекать элементы из очереди в MT-окружении (как и в ST) - ... хотя можно и/или класс элементов тоже выполнить в RAII (идиома copy-and-swap)...

Добавлено через 17 минут
так работает (что закомментировано - не работает)
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// https://tproger.ru/problems/safe-threads-in-cpp/
// Безопасность потоков в С++
 
/*
получается, что стандартная библиотека противоречит принципу RAII — при создании std::thread мы сами должны позаботиться о корректном управлении ресурсами, 
то есть явно вызвать join или detach. 
*/
 
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <exception>
 
using namespace std;
 
 
class Consumer {
private:
    std::atomic<bool> exit_flag; // флаг для синхронизации (опционально)
    std::thread thr;
public:
     Consumer()
          : exit_flag(false)
          , thr( &Consumer::run, this )
     {
         // после создания потока не делайте тут ничего, что бросает исключение,
         // поскольку в этом случае не будет вызван деструктор объекта Consumer,
         // поток не будет завершен, а программа упадет
     }
 
     ~Consumer() noexcept {
          exit_flag = true; // говорим потоку остановиться
          thr.join();
     }
    void run() {
        while (!exit_flag) {
            // делаем что-нибудь
            //std::cout << "t_id " << std::this_thread::get_id() << " consumer - " <<  std::endl;
            printf("t_id %d\n",std::this_thread::get_id());
            try{
                throw runtime_error ("Error: in try block. \n");
            }
            catch (const std::exception& e) {
                //rethrow runtime_error ("Error: ");
                printf("Failed to finish fx: Ignoring exception in catch block -- %s", e.what());
            break;
            }            
        }
    }
};
 
int main(int argc, const char *argv[])
{
    //try   {
        Consumer c;
        c.run();
    //} 
    //catch (const char* exception)
    //{
    //  std::cerr << "Failed to finish thread - Ignoring exception! " << exception << '\n';
    //}
 
    return 0;
}
0
263 / 152 / 33
Регистрация: 29.06.2019
Сообщений: 1,545
30.04.2021, 22:21  [ТС]
Цитата Сообщение от JeyCi Посмотреть сообщение
try and catch
действительно, rethrow несложно из функции поока
thread
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// https://stackoverflow.com/questions/233127/how-can-i-propagate-exceptions-between-threads
 
// Note that exception_ptr is a shared ptr-like pointer, so you will need to keep at least one exception_ptr pointing to each exception or they will be released
 
#include<iostream>
#include<thread>
#include<exception>
#include<stdexcept>
 
static std::exception_ptr teptr = nullptr;
 
void f()
{
    try
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        throw std::runtime_error("To be passed between threads");
    }
    catch(...)
    {
        teptr = std::current_exception();
    }
}
 
int main(int argc, char **argv)
{
    std::thread mythread(f);
    mythread.join();
 
    if (teptr) {
        try{
            std::rethrow_exception(teptr);
        }
        catch(const std::exception &ex)
        {
            std::cerr << "Thread exited with exception: " << ex.what() << "\n";
        }
    }
 
    return 0;
}
0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
BasicMan
Эксперт
29316 / 5623 / 2384
Регистрация: 17.02.2009
Сообщений: 30,364
Блог
30.04.2021, 22:21
Помогаю со студенческими работами здесь

Сформировать односвязную очередь из элементов, которые входят в очередь Q1, но не входят в очередь Q2
Составить программу обработки динамической структуры данных: сформировать односвязную очередь Q из элементов, которые входят в очередь Q1,...

Не безопасная форма
Привет, пользуюсь менеджером паролей Lastpass, и мне интересно, что такое не безопасная форма? У разработчика спрашивал, но он объяснил что...

Безопасная работа
_http://www.webpark . ru/comments.php?id=45545

Безопасная регистрация
Всем привет. Вот я собственно попытался сделать систему регистрации, в PHP пока еще не особо силен, только понемногу начинаю учится. ...

Безопасная авторизация
Здравствуйте! Есть приложение клиент, которое обращается к wcf сервису, чтобы то произвело какие то действия. Клиент написан под...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
107
Ответ Создать тему
Опции темы

Новые блоги и статьи
[golang] Двоичная куча, min-heap
alhaos 20.05.2026
Двоичная куча Двоичная куча — структура данных, которая всегда держит самый важный элемент наготове. Представьте очередь к хилеру в игре, и очередь из игроков в приоритете те у кого меньше. . .
[golang] Breadth-First Search
alhaos 19.05.2026
BFS (Breadth-First Search) — это базовый алгоритм обхода графа в ширину, который поуровнево исследует все связанные вершины. Он начинает с выбранной точки и проверяет всех соседей, прежде чем. . .
[golang] Алгоритм «Хак Госпера»
alhaos 17.05.2026
Алгоритм «Хак Госпера» Хак Госпера (Gosper's Hack) — алгоритм нахождения следующего по величине числа с тем же количеством установленных бит. Придуман Биллом Госпером в 1970-х, опубликован в. . .
Рисование бинарного древа до 6-го колена на js, svg.
russiannick 17.05.2026
<svg width="335" height="240" viewBox="0 0 335 240" fill="#e5e1bb"> <style> <!]> </ style> <g id="bush"> </ g> </ svg> function fn(){ let rost;/ / высота древа let xx=165,yy=210,w=256;
FSharp: interface of module
DevAlt 16.05.2026
Интерфейс модуля F# позволяет управлять доступностью членов, содержащихся в реализации модуля. По-умолчанию все члены модуля доступны: module Foo let x = 10 let boo () = printfn "boo" . . .
Хитросплетение родственных связей пантеона греческих богов.
russiannick 14.05.2026
Однооконник, позволяющий узреть и изучить отдельных героев древней Греции. <!DOCTYPE html> <html lang="ru"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible". . .
[golang] Угол между стрелками часов
alhaos 12.05.2026
По заданным значениям часа и минуты необходимо определить значение меньшего угла между стрелками аналогового циферблата часов. import "math" func angleClock(hour int, minutes int) float64 { . . .
Debian 13: Установка Lazarus QT5
ВитГо 09.05.2026
Эта инструкция моя компиляция инструкций volvo https:/ / www. cyberforum. ru/ blogs/ 203668/ 10753. html и его же старой инструкции по установке Lazarus с gtk2. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru