Форум программистов, компьютерный форум, киберфорум
Python: Web
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  
 
Рейтинг 4.60/5: Рейтинг темы: голосов - 5, средняя оценка - 4.60
 Аватар для kadr
0 / 0 / 1
Регистрация: 29.11.2012
Сообщений: 52

socketIO и фоновые задачи

05.06.2020, 10:44. Показов 1100. Ответов 6

Студворк — интернет-сервис помощи студентам
Всем привет, пытаюсь разобраться, как можно организовать фоновые задачи и скрестить их с веб сокетами.
Вот собственно постановка задачи:
1. Есть веб сокет сервер, который регистрирует пользователей с мобильного приложения.
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@sio.event
async def connect(sid, environ):
    logger.info(f'Client with id: {sid} was connected')
    if not await check_token(environ):
        logger.info(f'Client with id: {sid} have not correct token')
        raise ConnectionRefusedError('Token invalid')
 
    user: dict = await sio.get_session(sid)
    if not user:
        token: str = environ.get('HTTP_TOKEN')
        user = await get_user_info(token)
        patient_id: str = await get_patient_id(user)
        if len(patient_id) > 0:
            await sio.save_session(sid, {'user_id': user.get('id'), 'patient_id': patient_id})
    else:
        patient_id: str = user.get('patient_id')
 
    if len(patient_id) > 0 and not await is_client_connected(sid, patient_id):
        if clients_connections.get(patient_id) is None:
            clients_connections[patient_id] = [sid]
        else:
            clients_connections[patient_id].append(sid)
    await sio.start_background_task(send_notifications)
    await sio.send(f'You are connected with id: {sid}')
2. Есть метод который проверяет наличие новых уведомлений и рассылает их через сокет соединение, мобильному приложению, либо push уведомление, если в списке активных соединение, такого клтента нет.
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
async def send_notifications():
    while True:
        logger.info('Search new unread notifications')
        response: ClientResponse = await requests.get(f'{HOST}/api/mm/notifications/', params={'status': 'unread'},
                                                      auth=AUTH)
        text: dict = await response.json()
        if response.status == 200 and text.get('data') is not None:
            for notification in text.get('data'):
                message: str = notification.get('payload').get('text', '')
                patient_id: str = notification.get('recipient').get('id', '')
                pk: str = notification.get('id', '')
 
                if clients_connections.get(patient_id) is not None:
                    for sid in clients_connections.get(patient_id):
                        await sio.send(message, sid)
                    await change_notify_status_to_read(pk)
                else:
                    title: str = notification.get('payload').get('title')
                    data = {
                        "target_id": patient_id,
                        "data": {},
                        "title": title,
                        "body": message,
                    }
                    response: ClientResponse = await requests.post(f'{HOST}/api/mm/push/send', json=data, auth=AUTH)
                    if response.status == 200:
                        await change_notify_status_to_read(pk)
                        logger.info('Notification push success.')
                    else:
                        logger.warning(f'Notification push failed. {str(response.reason)}')
        await sio.sleep(TIMEOUT_IN_SEC)
При такой реализации, с while True, сервер перестает принимать новые соединения и полностью уходит на обработку фоновой задачи, если же убрать while True, то задача выполниться только 1 раз. Да и место вызова этой задачи не корректно, в идеале, ее надо запускать при старте самого сервера:
Python
1
2
3
if __name__ == '__main__':
    web.run_app(app, host='localhost', port=5000)
    sio.start_background_task(send_notifications)
но так она не запускается.
Собственно вопрос, как правильно организовывать scheduler для сокетов, может можно прикрутить celery к socketIO. Мне нужно, что бы в фоновой задаче был доступен словарь: clients_connections: Dict[str, List[str]] = {}, который заполняется при коннекте клиентов. Задача должна по расписанию опрашивать базу на наличие новых не прочитанных уведомлений и рассылать их клиентам их этого словаря.
0
cpp_developer
Эксперт
20123 / 5690 / 1417
Регистрация: 09.04.2010
Сообщений: 22,546
Блог
05.06.2020, 10:44
Ответы с готовыми решениями:

Socketio передача файлов
добрый день кто - то пробовал передавать файлы через сокет поделитесь пожалуйста или есть идеи как сделать life stream не через сокет ...

Flask фоновые задачи
Всем привет. Пишу веб сервис на flask. Не могу найти библиотеку для обработки фоновых задач в многопоточности). О Celery и RQ слышал, но...

Отложенные и фоновые задачи в android 8 и выше
Здравствуйте. Кто- нибудь может подсказать чем теперь можно решить данные вопросы по планированию задач и работы их в фоне, так как...

6
Просто Лис
Эксперт Python
 Аватар для Рыжий Лис
5973 / 3735 / 1099
Регистрация: 17.05.2012
Сообщений: 10,791
Записей в блоге: 9
06.06.2020, 13:51
Цитата Сообщение от kadr Посмотреть сообщение
был доступен словарь: clients_connections: Dict[str, List[str]] = {}
А в чём проблема хранить этот же словарь в базе?

И запустить отдельное приложение? Или даже запускать задачу по крону, что проще.
0
 Аватар для kadr
0 / 0 / 1
Регистрация: 29.11.2012
Сообщений: 52
06.06.2020, 16:05  [ТС]
Пробовал хранить в редиске, но она ругалась на что то не может хранить в себе объект socket, можно попробовать серилизовать этот словарь. Но на самом деле я хотел узнать, какие есть еще варианты организовать фоновые задачи в купе с socketio и кто как это делает.
0
Эксперт Python
5438 / 3859 / 1215
Регистрация: 28.10.2013
Сообщений: 9,552
Записей в блоге: 1
06.06.2020, 16:09
Цитата Сообщение от kadr Посмотреть сообщение
При такой реализации, с while True, сервер перестает принимать новые соединения и полностью уходит на обработку фоновой задачи
У вас используется обычная requests? Обычная синхронная - никаких await\async она не знает и просто тупо блокирует процесс.
0
Просто Лис
Эксперт Python
 Аватар для Рыжий Лис
5973 / 3735 / 1099
Регистрация: 17.05.2012
Сообщений: 10,791
Записей в блоге: 9
06.06.2020, 16:17
Не, я говорил, что ваше решение с хранением открытых сокетов не масштабируется.

Храните в базе список пользователей, которые онлайн, а ещё лучше помечайте просто признаком.

Воркеры держат соединения с веб-сокетами и с какой-то периодичностью обновляют статус "онлайн/нет". Так же проверяют в той же базе неотправленные сообщения.

Задача, которая запускается по крону, синхронно забирает сообщения и, если пользователь онлайн, добавляет их в очередь. Иначе немедленно отправляет сообщение пушом.
0
 Аватар для kadr
0 / 0 / 1
Регистрация: 29.11.2012
Сообщений: 52
08.06.2020, 00:31  [ТС]
У вас используется обычная requests? Обычная синхронная - никаких await\async она не знает и просто тупо блокирует процесс.
Нет, я использую асинхронный requests из пакета from aiohttp_requests import requests:
Python
1
response: ClientResponse = await requests.get(f'{HOST}/api/mm/notifications/', params={'status': 'unread'}, auth=AUTH)
Добавлено через 3 часа 13 минут
Не, я говорил, что ваше решение с хранением открытых сокетов не масштабируется.

Храните в базе список пользователей, которые онлайн, а ещё лучше помечайте просто признаком.

Воркеры держат соединения с веб-сокетами и с какой-то периодичностью обновляют статус "онлайн/нет". Так же проверяют в той же базе неотправленные сообщения.

Задача, которая запускается по крону, синхронно забирает сообщения и, если пользователь онлайн, добавляет их в очередь. Иначе немедленно отправляет сообщение пушом.
1. Попробовал сделать так. Закидываю в редиску данные вида:
Python
1
2
{'user_id': ['scoket_id']}
_redis.set(patient_id, ','.join(clients_connections[patient_id]))
2. Далее запускаю задачку, что бы выполнялась каждые 10 сек, но предварительно подключаюсь к серверу как клиент:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import logging
 
import socketio
from celery import Celery
from redis import Redis
 
logging.basicConfig(level=logging.DEBUG)
 
logger = logging.getLogger()
 
 
_redis: Redis = Redis(password='156489', db=1)
sio = socketio.Client()
sio.connect('http://localhost:5000', headers={'token': 'task'})
 
celery: Celery = Celery('task', broker='redis://:156489@localhost/0', backend='redis://:156489@localhost/2')
celery.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Samara',
    enable_utc=False,
)
 
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, test.s(), name='add every 10')
 
@celery.task(name='test')
def test():
    patient_ids: list = _redis.keys('*')
    for patient_id in patient_ids:
        for sid in _redis.get(patient_id.decode()).decode().split(','):
            sio.emit('notification', {sid: 'message from task'})
в самой функции дергаю всех клиентов и отправлю на сервер словарь с тем, кому какое сообщение нужно отправить.
3. На сервере в функции send_notification принимаю данные и рассылаю по клиентам.
Python
1
2
3
4
5
@sio.on('notification')
async def send_notification(sid, data):
    to, message = data
    logger.debug(sid, message)
    await sio.send(message, to=to)
Да только не работает почему то, не попадаю в эту функцию почему то.
0
 Аватар для kadr
0 / 0 / 1
Регистрация: 29.11.2012
Сообщений: 52
08.06.2020, 14:59  [ТС]
Такое ощущение, что emit из кастомной функции не работает.
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import logging
from http import HTTPStatus
 
import socketio
 
from requests import Response
import requests
from celery import Celery
from redis import Redis
 
logging.basicConfig(level=logging.DEBUG)
 
logger = logging.getLogger()
 
HOST = 'http://localhost:8080'
AUTH = ('root', 'secret')
 
_redis: Redis = Redis(password='156489', db=1)
sio = socketio.Client()
 
celery: Celery = Celery('tasks', broker='redis://:156489@localhost/0', backend='redis://:156489@localhost/0')
 
@sio.event
def connect():
    print('connection established')
    sio.emit('message', {'message': 'Task is connected'})
 
 
@sio.event
def disconnect():
    print('disconnected from server')
 
 
@sio.event
def message(data):
    print('message received with ', data)
 
 
sio.connect('http://localhost:5000', headers={'token': 'task'})
 
 
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, send_notifications.s(), name='add every 10')
 
@celery.task(name='send_notifications')
def send_notifications():
    logger.info('Search new unread notifications')
    print(sio.connected)
    sio.emit('notification', {'key': 'val'})
    sio.send('from task')
Bash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
127.0.0.1 - - [08/Jun/2020 15:48:00] "GET /socket.io/?transport=websocket&EIO=3&sid=962302825a0646f0993444d363647da7&t=1591616824.280261 HTTP/1.1" 200 0 56.197598
(62859) accepted ('127.0.0.1', 49621)
INFO:engineio.server:ed4c631622864269b2c37e1616f66a0d: Sending packet OPEN data {'sid': 'ed4c631622864269b2c37e1616f66a0d', 'upgrades': ['websocket'], 'pingTimeout': 60000, 'pingInterval': 25000}
INFO:root:Client with id: ed4c631622864269b2c37e1616f66a0d was connected
INFO:socketio.server:emitting event "message" to ed4c631622864269b2c37e1616f66a0d [/]
INFO:engineio.server:ed4c631622864269b2c37e1616f66a0d: Sending packet MESSAGE data 2["message","You are connected with id: ed4c631622864269b2c37e1616f66a0d"]
INFO:engineio.server:ed4c631622864269b2c37e1616f66a0d: Sending packet MESSAGE data 0
127.0.0.1 - - [08/Jun/2020 15:48:09] "GET /socket.io/?transport=polling&EIO=3&t=1591616889.736743 HTTP/1.1" 200 428 0.003129
(62859) accepted ('127.0.0.1', 49623)
INFO:engineio.server:ed4c631622864269b2c37e1616f66a0d: Received request to upgrade to websocket
INFO:engineio.server:ed4c631622864269b2c37e1616f66a0d: Upgrade to websocket successful
INFO:engineio.server:ed4c631622864269b2c37e1616f66a0d: Received packet MESSAGE data 2["message",{"message":"Task is connected"}]
INFO:socketio.server:received event "message" from ed4c631622864269b2c37e1616f66a0d [/]
{'message': 'Task is connected'}
Видно что сообщение из функции connect() приходит:
INFO:socketio.server:received event "message" from ed4c631622864269b2c37e1616f66a0d [/]
{'message': 'Task is connected'}
А из send_notifications функции ничего не приходит, хотя в логе видно, что сообщение отправляется:
Bash
1
2
3
4
5
6
7
8
9
[2020-06-08 15:50:51,526: INFO/Beat] Scheduler: Sending due task add every 10 (send_notifications)
[2020-06-08 15:50:51,551: INFO/MainProcess] Received task: send_notifications[33276b34-84ae-41ff-a9df-50b32f977729]  
[2020-06-08 15:50:51,553: INFO/ForkPoolWorker-8] Search new unread notifications
[2020-06-08 15:46:52,855: WARNING/ForkPoolWorker-8] True
[2020-06-08 15:50:51,555: INFO/ForkPoolWorker-8] Emitting event "notification" [/]
[2020-06-08 15:50:51,556: INFO/ForkPoolWorker-8] Sending packet MESSAGE data 2["notification",{"key":"val"}]
[2020-06-08 15:50:51,557: INFO/ForkPoolWorker-8] Emitting event "message" [/]
[2020-06-08 15:50:51,558: INFO/ForkPoolWorker-8] Sending packet MESSAGE data 2["message","from task"]
[2020-06-08 15:50:51,579: INFO/ForkPoolWorker-8] Task send_notifications[33276b34-84ae-41ff-a9df-50b32f977729] succeeded in 0.0262662609999893s: None
В чем может быть проблема, почему не приходят на сервер сообщения? Сокет соединение постоянно открыто, проверял
Python
1
print(sio.connected) == True
0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
raxper
Эксперт
30234 / 6612 / 1498
Регистрация: 28.12.2010
Сообщений: 21,154
Блог
08.06.2020, 14:59
Помогаю со студенческими работами здесь

Сервер задач останавливает фоновые задачи
Здравствуйте друзья! Буквально вчера появилась проблема при выключении ПК. Не могу отключить, пока не отмечу "Отключить всё...

VS2019 - как ее отучить запускать непрошеные фоновые задачи?
Занимается всякой хренью, съедая по 40% CPU (см аттач) Как ее отучить этим заниматься?

Nodejs + socketio - ошибка доступа с другого хоста
Со своего хоста захожу без проблем, а вот что дает, когда бытаюсь зайти с другого: Читал, что это может быть решено отключением...

Как реализовать индивидуальный чат на SocketIo Flask?
Пишу соц.сеть. Как реализовать чат между пользователями тет-а-тет? Точнее: имеется url: user/ - страница пользователя, чат там же. При...

Как мне изменять текст на сайте через socketio?
Мне нужно изменять текст на своём сайте в зависимости от того которого я скажу в консоле например мне нужно чтобы юзер не перезагружал...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
7
Ответ Создать тему
Новые блоги и статьи
Установка Qt Creator для C и C++: ставим среду, CMake и MinGW без фреймворка Qt
8Observer8 05.04.2026
Среду разработки Qt Creator можно установить без фреймворка Qt. Есть отдельный репозиторий для этой среды: https:/ / github. com/ qt-creator/ qt-creator, где можно скачать установщик, на вкладке Releases:. . .
AkelPad-скрипты, структуры, и немного лирики..
testuser2 05.04.2026
Такая программа, как AkelPad существует уже давно, и также давно существуют скрипты под нее. Тем не менее, прога живет, периодически что-то не спеша дополняется, улучшается. Что меня в первую очередь. . .
Отображение реквизитов в документе по условию и контроль их заполнения
Maks 04.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа "ПланированиеСпецтехники", разработанного в конфигурации КА2. Данный документ берёт данные из другого нетипового документа. . .
Фото всей Земли с борта корабля Orion миссии Artemis II
kumehtar 04.04.2026
Это первое подобное фото сделанное человеком за 50 лет. Снимок называют новым вариантом легендарной фотографии «The Blue Marble» 1972 года, сделанной с борта корабля «Аполлон-17». Новое фото. . .
Вывод диалогового окна перед закрытием, если документ не проведён
Maks 04.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа "СписаниеМатериалов", разработанного в конфигурации КА2. Задача: реализовать программный контроль на предмет проведения документа. . .
Программный контроль заполнения реквизитов табличной части документа
Maks 02.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа "СписаниеМатериалов", разработанного в конфигурации КА2. Задача: 1. Реализовать контроль заполнения реквизита. . .
wmic не является внутренней или внешней командой
Maks 02.04.2026
Решение: DISM / Online / Add-Capability / CapabilityName:WMIC~~~~ Отсюда: https:/ / winitpro. ru/ index. php/ 2025/ 02/ 14/ komanda-wmic-ne-naydena/
Программная установка даты и запрет ее изменения
Maks 02.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа "СписаниеМатериалов", разработанного в конфигурации КА2. Задача: при создании документов установить период списания автоматически. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru