Форум программистов, компьютерный форум, киберфорум
C++: Сети
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  
 
Рейтинг 4.94/47: Рейтинг темы: голосов - 47, средняя оценка - 4.94
0 / 0 / 0
Регистрация: 27.02.2023
Сообщений: 27

Сервер очереди сообщений завершается преждевременно

13.01.2024, 23:03. Показов 15407. Ответов 3
Метки asio, c++ (Все метки)

Студворк — интернет-сервис помощи студентам
Я хочу написать свой асинхронный сервер, реализующий очередь сообщения с моделью издатель-подписчик. Пока что у меня такой класс сервера:

C++ (Qt)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#pragma once
 
#include <unordered_map>
#include <unordered_set>
#include <string>
#include <memory>
#include <thread>
#include <list>
#include <boost/asio.hpp>
 
#include "connection.h"
 
using namespace boost::asio;
using namespace boost::asio::ip;
 
namespace nano
{
    class server_interface
    {
    public:
        server_interface(std::uint16_t port)
            : acceptor(io, tcp::endpoint(tcp::v4(), port))
        {
 
        }
 
        virtual ~server_interface()
        {
            stop();
        }
 
        bool start()
        {
            try
            {
                listen();
                thread = std::thread([this]() {
                    io.run();
                });
            }
            catch (std::exception& e)
            {
                trace("[Server] stopped.");
                return false;
            }
 
            trace("[Server] started.");
            return true;
        }
 
        void stop()
        {
            io.stop();
            if (thread.joinable())
                thread.join();
            trace("[Server] stopped.");
        }
 
        void listen()
        {
            acceptor.async_accept([this](std::error_code ec, boost::asio::ip::tcp::socket socket) {
                if (!ec)
                {
                    std::cout << "[Server] new connection: " << socket.remote_endpoint() << std::endl;
 
                    std::shared_ptr<connection> conn =
                        std::make_shared<connection>(io, std::move(socket), subscribers);
 
                    connections.push_back(std::move(conn));
                    connections.back()->start();
                }
 
                listen();
            });
        }
 
        void update()
        {
 
        }
 
    private:
        void trace(const std::string& msg)
        {
            std::cout << msg << std::endl;
        }
 
    private:
        std::unordered_map<std::string, std::unordered_set<std::shared_ptr<connection>>> subscribers;
        std::list<std::shared_ptr<connection>> connections;
        boost::asio::io_context io;
        boost::asio::ip::tcp::acceptor acceptor;
        std::thread thread;
    };
}
Его реализация такая:

C++ (Qt)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
 
#include "server_interface.h"
 
class server : public nano::server_interface
{
public:
    server(std::uint16_t port)
        : server_interface(port)
    {
 
    }
};
 
int main()
{
    server s(7777);
    s.start();
 
    while (true)
    {
        s.update();
    }
 
    return 0;
}
Меня смущает цикл while, который я был вынужден написать, чтобы программа не завершалась преждевременно. Почему не достаточно того, что я в отдельном потоке запустил выполнение io_context.run()? Разве эта строчка не будет препятствовать преждевременному завершению программы, пока жив этот поток? Если это не так, то как реализовать метод сервера update, чтобы он работал согласно своему названию - запускал обновление состояния сервера, например. Или хотя бы, чтобы не было такого стремного цикла while, который делает только то, что грузит на 100% ядро процессора?
0
cpp_developer
Эксперт
20123 / 5690 / 1417
Регистрация: 09.04.2010
Сообщений: 22,546
Блог
13.01.2024, 23:03
Ответы с готовыми решениями:

Рекурсивный метод завершается преждевременно
Здравствуйте! Подскажите, пожалуйста, есть плоское дерево объектов в виде List (в котором у объекта List есть текущий объект и...

DialogBoxParam преждевременно завершается без ошибки
Доброго времени суток. История вопроса: решил сделать полностью GUI для ранее написанной проги. Логика: есть два окна, между их...

Очереди сообщение (создание очереди из N сообщений)
Нужна помощь в создании очереди из n сообщений как создать простую очередь я знаю но как указать конкретное возможное количество...

3
901 / 478 / 93
Регистрация: 10.06.2014
Сообщений: 2,700
14.01.2024, 12:27
Цитата Сообщение от AngelicQuasar Посмотреть сообщение
Почему не достаточно того, что я в отдельном потоке запустил выполнение io_context.run()?
Хотя бы потому, что ваш объект server s созданный в main будет уничтожен при выходе из функции main что сделает io_context, который работает в отдельном потоке, недействительным.

Судя по коду который вы показываете, вам нужно запускать io.run() в основном потоке (из main) а не в отдельном, лично я не вижу смысла в отдельном потоке при текущих обстоятельствах.

io.run() будет "работать" до тех пор, пока есть асинхронные задачи связанные с этим контекстом, учтите это (см в вашей соседней теме, там я уже говорил о shared_from_this + передачу self в захват а так же повторную регистрацию асинхронных операций из функции обратного вызова).

И почему вы добавляете коннекшены в очередь? Это выглядит противоречиво. Вы пытаетесь смешать асинхронщину с очередями, а это в целом очень противоречиво. Очередь подразумевает что клиенты будут обрабатываться "по очереди", тогда как асинхронная обработка подразумевает, что обрабатывать мы будем ту информацию, которая "готова", и неважно это информация из первого или последнего коннекшена
0
0 / 0 / 0
Регистрация: 27.02.2023
Сообщений: 27
14.01.2024, 13:34  [ТС]
Спасибо. Тогда получается такая реализация сервера:

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#pragma once
 
#include <unordered_map>
#include <unordered_set>
#include <string>
#include <memory>
#include <thread>
#include <list>
#include <boost/asio.hpp>
 
#include "connection.h"
 
using namespace boost::asio;
using namespace boost::asio::ip;
 
namespace nano
{
    class server_interface
    {
    public:
        server_interface(io_context& io, std::uint16_t port)
            : io(io), acceptor(io, tcp::endpoint(tcp::v4(), port))
        {
 
        }
 
        virtual ~server_interface()
        {
 
        }
 
        bool start()
        {
            try
            {
                listen();
            }
            catch (std::exception& e)
            {
                trace("[Server] stopped.");
                return false;
            }
 
            trace("[Server] started.");
            return true;
        }
 
        void listen()
        {
            acceptor.async_accept([this](boost::system::error_code ec, tcp::socket socket) {
                if (!ec)
                {
                    std::cout << "[Server] new connection: " << socket.remote_endpoint() << std::endl;
 
                    std::shared_ptr<connection> conn =
                        std::make_shared<connection>(io, std::move(socket), subscribers);
 
                    conn->start();
                }
 
                listen();
            });
        }
 
    private:
        void trace(const std::string& msg)
        {
            std::cout << msg << std::endl;
        }
 
    private:
        io_context& io;
        std::unordered_map<std::string, std::unordered_set<std::shared_ptr<connection>>> subscribers;
        boost::asio::ip::tcp::acceptor acceptor;
    };
}
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
 
#include "server_interface.h"
 
using namespace boost::asio;
 
class server : public nano::server_interface
{
public:
    server(io_context& io, std::uint16_t port)
        : server_interface(io, port)
    {
 
    }
};
 
int main()
{
    io_context io;
    server s(io, 7777);
    s.start();
 
    io.run();
 
    return 0;
}
И такой класс коннекшена:

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#pragma once
 
#include <iostream>
#include <string>
#include <regex>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <boost/asio.hpp>
#include <boost/json.hpp>
 
using namespace boost::asio;
using namespace boost::asio::ip;
using namespace boost::json;
 
namespace nano
{
class connection : public std::enable_shared_from_this<connection>
    {
    public:
        connection(io_context& io, tcp::socket socket,
                   std::unordered_map<std::string, std::unordered_set<std::shared_ptr<connection>>>& subscribers)
            : io(io), socket(std::move(socket)), subscribers(subscribers)
        {
 
        }
 
        void disconnect()
        {
            if (is_connected())
            {
                post(io, [this]() {
                    socket.close();
                });
            }
        }
 
        bool is_connected() const
        {
            return socket.is_open();
        }
 
        void start()
        {
            if (is_connected())
            {
                do_read();
            }
        }
 
        void send(const std::string& s)
        {
            auto self = shared_from_this();
            async_write(socket, buffer(s),
                    [this, self](const boost::system::error_code& ec, std::size_t length) {
                if (ec)
                {
                    std::cout << "Error while sending: " << ec.message() << std::endl;
                    return;
                }
            });
        }
 
    private:
        void do_read()
        {
            auto self = shared_from_this();
            async_read(socket, dynamic_buffer(line),
                boost::asio::transfer_at_least(1),
                [this, self](const boost::system::error_code& ec, std::size_t length) {
                if (ec)
                {
                    std::cout << "Error while read line: " << ec.message() << std::endl;
                    return;
                }
 
                handle_command(parse(line));
 
                line.clear();
                do_read();
            });
        }
 
        void handle_command(const value& json)
        {
            std::cout << json << std::endl;
 
            if (json.at("command") == "subscribe")
            {
                subscribers[value_to<std::string>(json.at("topic"))].emplace(shared_from_this());
            }
            else if (json.at("command") == "send")
            {
                for (auto& sub : subscribers[value_to<std::string>(json.at("topic"))])
                {
                    if (sub && sub != shared_from_this())
                    {
                        sub->send(value_to<std::string>(json.at("msg")));
                    }
                }
            }
        }
 
    private:
        boost::asio::io_service& io;
        tcp::socket socket;
        std::string line;
        std::unordered_map<std::string, std::unordered_set<std::shared_ptr<connection>>>& subscribers;
    };
}
Тогда возникают новые вопросы: как удалять коннекшены, соответствующие отключившимся клиентам?
И как обрабатывать команды от этих клиентов?

Например, команда
JSON
1
 {"command": "subscribe", "topic": "test"}
создает подписку на топик. Должна быть симметричная команда unsubscribe, которая удалит клиента из топика. Команда
JSON
1
{"command": "send", "topic": "test", "msg":"test"}
отправляет сообщение в топик и, следовательно, всем клиентам этого топика. В голову приходит мысль реализовать это при помощи конечного автомата. Но как это сделать и при помощи каких библиотек? Рискуем превратить класс connection в огромный спагетти-код с лесенками if'ов для обработки разных команд.

А, еще. Сейчас каждый клиент может отправлять сообщения в топик другим клиентам. Как сделать так, чтобы у клиентов были "роли" типа publisher/subscriber и чтобы только издатель мог отправлять сообщения в топик?

Добавлено через 9 минут
Получается, self мы захватываем в лямбду, чтобы коннекшн продолжал жить? (на него всегда есть хотя бы одна ссылка)
0
901 / 478 / 93
Регистрация: 10.06.2014
Сообщений: 2,700
14.01.2024, 14:05
Цитата Сообщение от AngelicQuasar Посмотреть сообщение
Получается, self мы захватываем в лямбду, чтобы коннекшн продолжал жить? (на него всегда есть хотя бы одна ссылка)
Да (почти). Хотя бы одна ссылка есть тогда, когда мы гарантируем наличие этой ссылки. Например, если соединение прервано, то мы получим непустой error code, и для этого коннекшена не будем повторно регистрировать асинхронную операцию, и следовательно, ссылок в таком случае уже не останется, что приведет к уничтожению объекта connection.

Цитата Сообщение от AngelicQuasar Посмотреть сообщение
Тогда возникают новые вопросы: как удалять коннекшены, соответствующие отключившимся клиентам?
Тут может быть несколько сценариев.

1) Пользователь отправляет какое-то специальное сообщение о том что хочет закрыть соединение
2) Пользователь закрывает сокет
3) Таймаут (ставим, что бы в случае потери связи, мы могли очистить ресурсы связанные с потерянным соединением)

По последним двум думаю должно быть понятно. А вот что касается первого пункта и того что бы спросили здесь
Цитата Сообщение от AngelicQuasar Посмотреть сообщение
И как обрабатывать команды от этих клиентов?
То вам нужен какой нибудь протокол. Например такой: каждое сообщение может содержать тип этого сообщения (тип команды), а так же данные связанные с этой командой. Когда читаете сообщение, смотрите на тип и (возможно) данные.

Например, если тип сообщения условный CLOSE_CONNECTION, значит не ставите в очередь новую асинхронную операцию на чтение для этого соединения, и тем самым провоцируете уничтожение объекта connection и освобождение ресурсов соединения (это произойдет потому что не будет создано новой ссылки на текущий объект connection). Точно так же можно действовать по отношению и к другим командам, просто для не-close команд будете продливать lifetime коннекшена используя новую ссылку и выполнять действия которые соответствуют этой команде.

Цитата Сообщение от AngelicQuasar Посмотреть сообщение
А, еще. Сейчас каждый клиент может отправлять сообщения в топик другим клиентам. Как сделать так, чтобы у клиентов были "роли" типа publisher/subscriber и чтобы только издатель мог отправлять сообщения в топик?
Это уже вопрос прав доступа к ресурсам. Отслеживайте основателя топика по его уникальным данными и не позволяйте кому нибудь другому вклиниться в этот топик.

А тем кто отслеживает топик нужно сообщать серверу, какой топик они отслеживают. Следовательно, вам нужен список клиентов по каждому топику. Когда придет сообщение на топик, можно пройтись по этому списку и отправить subscriber-ами то что пришло в их топик.

Добавлено через 1 минуту
Цитата Сообщение от AngelicQuasar Посмотреть сообщение
Рискуем превратить класс connection в огромный спагетти-код с лесенками if'ов для обработки разных команд.
Это вопрос архитектуры. Можете например посмотреть в сторону паттерна "Команда"
0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
raxper
Эксперт
30234 / 6612 / 1498
Регистрация: 28.12.2010
Сообщений: 21,154
Блог
14.01.2024, 14:05
Помогаю со студенческими работами здесь

Приложение завершается без каких-либо сообщений
Есть консольное приложение. В процессе работы возникает некая ошибка приводящая к тому что приложение просто тихо завершается как будто...

Очереди сообщений
Здравствуйте уважаемые участники этого форума. Я новичок в программировании. Пожалуйста помогите с таким вопросом. Я создал два...

Очереди сообщений
Здравствуйте, возникла одна проблема с передачей сообщений. Есть два процесса, первый процесс отправляет сообщение второму процессу,...

Очереди сообщений
Доброго всем дня!подскажите пожалуйста и помогите с написанием с неочень большой и сложной программкой удовлетворяющая условию :...

IPC Очереди сообщений
Всем привет! Просьба помочь с лабораторной! Язык изучать начал совсем недавно, так что для меня это все как набор иероглифов) вобщемс.....


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
4
Ответ Создать тему
Новые блоги и статьи
PhpStorm 2025.3: WSL Terminal всегда стартует в ~
and_y87 14.12.2025
PhpStorm 2025. 3: WSL Terminal всегда стартует в ~ (home), игнорируя директорию проекта Симптом: После обновления до PhpStorm 2025. 3 встроенный терминал WSL открывается в домашней директории. . .
Как объединить две одинаковые БД Access с разными данными
VikBal 11.12.2025
Помогите пожалуйста !! Как объединить 2 одинаковые БД Access с разными данными.
Новый ноутбук
volvo 07.12.2025
Всем привет. По скидке в "черную пятницу" взял себе новый ноутбук Lenovo ThinkBook 16 G7 на Амазоне: Ryzen 5 7533HS 64 Gb DDR5 1Tb NVMe 16" Full HD Display Win11 Pro
Музыка, написанная Искусственным Интеллектом
volvo 04.12.2025
Всем привет. Некоторое время назад меня заинтересовало, что уже умеет ИИ в плане написания музыки для песен, и, собственно, исполнения этих самых песен. Стихов у нас много, уже вышли 4 книги, еще 3. . .
От async/await к виртуальным потокам в Python
IndentationError 23.11.2025
Армин Ронахер поставил под сомнение async/ await. Создатель Flask заявляет: цветные функции - провал, виртуальные потоки - решение. Не threading-динозавры, а новое поколение лёгких потоков. Откат?. . .
Поиск "дружественных имён" СОМ портов
Argus19 22.11.2025
Поиск "дружественных имён" СОМ портов На странице: https:/ / norseev. ru/ 2018/ 01/ 04/ comportlist_windows/ нашёл схожую тему. Там приведён код на С++, который показывает только имена СОМ портов, типа,. . .
Сколько Государство потратило денег на меня, обеспечивая инсулином.
Programma_Boinc 20.11.2025
Сколько Государство потратило денег на меня, обеспечивая инсулином. Вот решила сделать интересный приблизительный подсчет, сколько государство потратило на меня денег на покупку инсулинов. . . .
Ломающие изменения в C#.NStar Alpha
Etyuhibosecyu 20.11.2025
Уже можно не только тестировать, но и пользоваться C#. NStar - писать оконные приложения, содержащие надписи, кнопки, текстовые поля и даже изображения, например, моя игра "Три в ряд" написана на этом. . .
Мысли в слух
kumehtar 18.11.2025
Кстати, совсем недавно имел разговор на тему медитаций с людьми. И обнаружил, что они вообще не понимают что такое медитация и зачем она нужна. Самые базовые вещи. Для них это - когда просто люди. . .
Создание Single Page Application на фреймах
krapotkin 16.11.2025
Статья исключительно для начинающих. Подходы оригинальностью не блещут. В век Веб все очень привыкли к дизайну Single-Page-Application . Быстренько разберем подход "на фреймах". Мы делаем одну. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru