Форум программистов, компьютерный форум, киберфорум
Python: Сети
Войти
Регистрация
Восстановить пароль
Карта форума Темы раздела Блоги Сообщество Поиск Заказать работу  
 
Рейтинг 4.80/5: Рейтинг темы: голосов - 5, средняя оценка - 4.80
1303 / 843 / 409
Регистрация: 12.03.2018
Сообщений: 2,305
1

Почему flask_socketio.disconnect препятствует завершению потока?

12.02.2020, 12:11. Показов 927. Ответов 1
Метки нет (Все метки)

Author24 — интернет-сервис помощи студентам
Есть следующая система: [Клиент] - [Веб-сервер] - [Коннектор].

Коннектор - это своего рода промежуточный код между веб-сервером и источником данных.

Мне нужно контролировать соединение сервера с коннектором. Если соединение потеряно, я должен уведомить клиента.

Связь между веб-сервером и коннектором организована с помощью socketio.

Проблема в том, что если коннектор перестает работать, то веб-сервер узнает об этом только через минуту (это в лучшем случае).

Я решил, что сервер должен проверять состояние коннектора каждую секунду.

Когда коннектор подключается к серверу, запускается фоновая задача. Суть задачи: каждую секунду: 1) фиксировать время; 2) сохранить фиксированное время в стеке; 3) отправить эхо-сообщение на соединитель. (см. server.background_thread)

Соединитель принимает эхо-сообщение и метку времени в качестве параметра и отправляет эхо-сообщение на веб-сервер, в качестве параметра он передает полученную метку времени. (см. client.echo)

Веб-сервер получает эхо-сообщение, если отметка времени равна последнему значению в стеке, то это значение удаляется из стека. (см. server.on_echo_connector)

На веб-сервере на каждой итерации проверяется размер стека (см. Server.background_thread). Если оно больше 5, то это означает, что коннектор не отвечал на эхо-сообщение 5 раз, считаем, что коннектор недоступен.

Когда сервер понимает, что коннектор недоступен, необходимо завершить поток, который отправил эхо-сообщения соединителю.

Когда размер стека превышает 5, я выхожу из бесконечного цикла и вызываю flask_socketio.disconnect (connector_sid, '/ connector'). После этого вызова ничего не работает (например, print и т.д.)

В методе on_disconnect_connector (server) вызывается thread.join() и никогда не завершается.

Нужно завершить поток, чтобы при повторном запуске коннектора он успешно подключился, и все началось заново.

Как решить эту проблему?

Сервер
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
 
import os
import threading
import time
import collections
from datetime import datetime
 
import flask
import flask_socketio
 
def get_unix_time():
    return int(time.mktime(datetime.now().timetuple()))
 
class Stack(collections.deque):
 
    def __init__(self, iterable=(), maxlen=None):
        collections.deque.__init__(self, iterable, maxlen)
 
    @property
    def size(self):
        return len(self)
 
    @property
    def empty(self):
        return self.size == 0
 
    @property
    def head(self):
        return self[-1]
 
    @property
    def tail(self):
        return self[0]
 
    def push(self, x):
        self.append(x)
 
# SERVER
 
app = flask.Flask(__name__)
sio = flask_socketio.SocketIO(app, async_mode='gevent')
 
connector_sid = None
echo_stack = Stack()
 
thread = None
thread_lock = threading.Lock()
 
 
def background_thread(app):
    time.sleep(2)  # delay for normal connection
 
    while True:
        if echo_stack.size >= 5:
            break
        time_ = get_unix_time()
        echo_stack.push(time_)
        sio.emit('echo', time_, namespace='/connector')
        sio.sleep(1)
 
    with app.app_context():
        flask_socketio.disconnect(connector_sid, '/connector')
 
 
@sio.on('connect', namespace='/connector')
def on_connect_connector():
    """Connector connection event handler."""
    global connector_sid, thread
    print 'Attempt to connect a connector {}...'.format(request.sid)
 
    # if the connector is already connected, reject the connection
    if connector_sid is not None:
        print 'Connection for connector {} rejected'.format(request.sid)
        return False
        # raise flask_socketio.ConnectionRefusedError('Connector already connected')
 
    connector_sid = request.sid
    print('Connector {} connected'.format(request.sid))
 
    with thread_lock:
        if thread is None:
            thread = sio.start_background_task(
                background_thread, current_app._get_current_object())
 
    # уведомить клиента, что коннектор подключен
    sio.emit('set_connector_status', True, namespace='/client')
 
 
@sio.on('disconnect', namespace='/connector')
def on_disconnect_connector():
    """Connector disconnect event handler."""
    global connector_sid, thread
 
    print 'start join'
    thread.join()
    print 'end join'
    thread = None
    print 'after disconet:', thread
 
    connector_sid = None
 
    echo_stack.clear()
 
    print('Connector {} disconnect'.format(request.sid))
 
    # уведомить клиент, что коннектор отключен
    sio.emit('set_connector_status', False, namespace='/client')
 
 
@sio.on('echo', namespace='/connector')
def on_echo_connector(time_):
    if not echo_stack.empty:
        if echo_stack.head == time_:
            echo_stack.pop()
 
 
@sio.on('message', namespace='/connector')
def on_message_connector(cnt):
    # print 'Msg: {}'.format(cnt)
    pass
 
if __name__ == '__main__':
    sio.run(app)
Клиент
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -*- coding: utf-8 -*-
 
import sys
import threading
import time
 
import socketio
import socketio.exceptions
 
sio = socketio.Client()
thread = None
thread_lock = threading.Lock()
work = False
 
 
def background_thread():
    # example task
    cnt = 0
    while work:
        cnt += 1
        if cnt % 10 == 0:
            sio.emit('message', cnt // 10, namespace='/connector')
        sio.sleep(0.1)
 
 
@sio.on('connect', namespace='/connector')
def on_connect():
    """Обработчик события подключенеия к серверу."""
    global thread, work
 
    print '\n-----            Connected to server            -----' \
          '\n----- My SID:  {} -----\n'.format(sio.sid)
 
    work = True  # установить флаг работы
 
    # запустить в потоке цикл опроса Пирамиды
    with thread_lock:
        if thread is None:
            thread = sio.start_background_task(background_thread)
 
 
@sio.on('disconnect', namespace='/connector')
def on_disconnect():
    """Обработчик события отключения от сервера."""
    global thread, work
 
    # сбросить флаг работы, чтобы на след. итерации беск. цикл завершился
    work = False
    thread.join()
    thread = None
 
    # отключиться от сервера
    sio.disconnect()
    print '\n-----         Disconnected from server          -----\n'
 
    # перейти в режим бексонечных попыток подключения к серверу
    main()
 
 
@sio.on('echo', namespace='/connector')
def on_echo(time_):
    sio.emit('echo', time_, namespace='/connector')
 
 
def main():
    while True:
        try:
            sio.connect('http://localhost:5000/connector',
                        namespaces=['/connector'])
            sio.wait()
        except socketio.exceptions.ConnectionError:
            print 'Trying to connect to the server...'
            time.sleep(1)
        except KeyboardInterrupt:
            print '\n---------- EXIT ---------\n'
            sys.exit()
        except Exception as e:
            print e
 
 
if __name__ == '__main__':
    print '\n---------- START CLIENT ----------\n'
    main()
0
Programming
Эксперт
94731 / 64177 / 26122
Регистрация: 12.04.2006
Сообщений: 116,782
12.02.2020, 12:11
Ответы с готовыми решениями:

Многопоточность. Создать визуальный объект по завершению потока
По окончанию работы потока необходимо создавать на форме определенное количество TButton'ов, с...

Породить два потока. По завершению одного (любого) из них второй должен сразу же порождать еще один поток
Породить два потока. По завершению одного (любого) из них второй должен сразу же порождать еще один...

Почему такой айди потока
Вообщем создаю поток и получаю его уникальный айди Thread trd = new Thread(); int...

Почему процесс Idle имеет 32 потока
Почему процесс Idle имеет 32 потока, хотя у процессора 16 логических процессоров (ОС Windows 11,...

Почему не меняется текст заголовка формы из потока
Почему когда меняю через созданную мною функцию this.text не меняет своего значения? примерprivate...

1
1303 / 843 / 409
Регистрация: 12.03.2018
Сообщений: 2,305
12.02.2020, 21:11  [ТС] 2
Проблема долгого ожидания сервером сообщения о том, что коннектор был отключен, заключается в том, что клиент использует в качестве транспорта pooling. socketio выполняет автоматический переход на транспорт websocket, если это возможно, а возможно это тогда, и только тогда, когда установлена библиотека websocket-client.
Bash
1
pip install "python-socketio[client]"
или
Bash
1
pip install websocket-client
Благодаря данной библиотеке, транспорт меняется на websocket (либо можно сразу указать его в параметрах подключения для клиента). А с websocket информирование сервера о том, что коннектор был отключен, происходит практически мгновенно.
Источник
0
12.02.2020, 21:11
IT_Exp
Эксперт
87844 / 49110 / 22898
Регистрация: 17.06.2006
Сообщений: 92,604
12.02.2020, 21:11
Помогаю со студенческими работами здесь

Почему консоль ждет конца другого потока?
System.out.println("1"); Thread t = new Thread(()->{ try { ...

Почему 4 потока показывают одинаковый результат с 2 потоками
написал программу которая параллелит быструю сортировку на 1, 2, 4 потока: #include<iostream>...

Почему это вызывается из одного и того же потока
У меня есть объект в основном потоке при qDebug() << this.thread() выводит, допустим, 0x53ead8 ...

Почему объект типа std::vector не читается из потока?
# include <iostream> # include <vector> # include <fstream> using namespace std; int main...

Почему приостановка потока совершается ДО вывода текста метки?
Почему приостановка потока совершается ДО вывода текста метки? if (Info.MoveUseArg == 7) // юс...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
2
Ответ Создать тему
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2024, CyberForum.ru