Форум программистов, компьютерный форум, киберфорум
Python: Django
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  
 
Рейтинг 4.67/3: Рейтинг темы: голосов - 3, средняя оценка - 4.67
 Аватар для Wi0M
395 / 123 / 48
Регистрация: 26.10.2013
Сообщений: 734

Celery and RabbitMQ

29.04.2020, 13:41. Показов 685. Ответов 2
Метки нет (Все метки)

Студворк — интернет-сервис помощи студентам
Всем привет. У меня есть задача, которая должна обойти весь индекс эластика и запустить над каждым объектом работу (другая задача).

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@celery_app.task(bind=True, default_retry_delay=10, max_retries=100, soft_time_limit=120, time_limit=240)
def search_and_run_extractor(self, resource_host, index, limit=-1, sid=None, counter=0):
    _scroll_timeout = '30m'
    _scroll_size = settings.EXTRACTOR_ELASTIC_SCROLL_SIZE
    _indexes = list(index)
    index = _indexes[-1]
    try:
        es = get_elastic(settings.ELASTIC_HOSTS)
        if sid is not None:
            # если sid указан, то делаем выборку по нему.
            data = es.scroll(scroll_id=sid, scroll=_scroll_timeout)
        else:
            # это первый запуск, нужно инициализировать скролл.
            if not es.indices.exists(index=index):
                # если индекса нет, то нам не с чем работать.
                raise StopScrolling(f"Index {index} not found!")
            # инициализируем скрол
            search_body = get_search_body(resource_host)
            data = es.search(index=index, scroll=_scroll_timeout, size=_scroll_size, body=search_body)
 
        # обновляем ID скрола
        sid = data['_scroll_id']
        hits_count = len(data['hits']['hits'])
        if hits_count > 0:
            counter += hits_count
            if 0 < limit <= counter:
                raise StopScrolling('Limit')
            # обрабатываем пачку данных
            process_hits(data['hits']['hits'])
            # кладем эту же задачу обратно в очередь, добавляя к параметрам sid, чтобы продолжить скроллинг после обработки запущенной пачки
            search_and_run_extractor.delay(resource_host, _indexes, limit=limit, sid=sid, counter=counter)
            return f'OK. {counter}'
        else:
            raise StopScrolling('End of objects!')
    except (SoftTimeLimitExceeded, elastic_exceptions.ElasticsearchException) as e:
        send_log(f"SFE {type(e).__name__}::{e.args}", task_id=resource_host, resource_host="extractor",
                 level=logging.ERROR, job_name="search_for_extract", index=index)
        search_and_run_extractor.delay(resource_host, _indexes, limit=limit, sid=sid, counter=counter)
        return 'Retry'
    except StopScrolling as e:
        send_log(f"SFE {type(e).__name__}::{e.args}", task_id=resource_host, resource_host="extractor",
                 level=logging.ERROR, job_name="search_for_extract", index=index)
        if len(_indexes) > 1:
            _indexes.pop()
            search_and_run_extractor.delay(resource_host, _indexes, limit=limit, sid=None, counter=0)
        return str(e.args)
    except Exception as e:
        send_log(f"SFE {type(e).__name__}::{e.args}", task_id=resource_host, resource_host="extractor",
                 level=logging.ERROR, job_name="search_for_extract", index=index)
        return 'Error'
проблема в том, что обход индекса завершается преждевременно, при этом я не вижу никаких ошибок. Все что мне удалось заметить: в определенный момент, эта задача создает новую (я видел task_id), но она запущена не была. Для отслеживания задач я использую Flower (но он мне не нравится, так как позволяет себе очищать лог).
Подскажите как я могу выяснить причину, по которой этот код не работает, запуская себя до тех пор пока в эластике не кончатся объекты по скролу?
Если требуется больше информации - пишите.
0
IT_Exp
Эксперт
34794 / 4073 / 2104
Регистрация: 17.06.2006
Сообщений: 32,602
Блог
29.04.2020, 13:41
Ответы с готовыми решениями:

Celery error
Добрый день! Реализовал связку tornado + celery. Все работает, запускается. Пока в worker у меня один декоратор Task, который...

Django и celery shared_task
Как запустить shared_task в асинхронном режиме? Есть у меня celery таска from celery import shared_task import requests ...

Подскажите литературу по Celery
Добрый день, друзья! Посоветуйте, пожалуйста, почитать хорошие материалы (книги, туториалы и т.п) по фреймворку celery и по реализациям...

2
 Аватар для Wi0M
395 / 123 / 48
Регистрация: 26.10.2013
Сообщений: 734
03.05.2020, 13:01  [ТС]
назначил этой задаче отдельную очередь, сначала все работало хорошо. 2-е суток шла работа (проел 17кк из 57кк). а сегодня прекратилась. и опять без каких либо сообщений лога.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CELERY_QUEUES = (
    ...
    Queue('extractors', Exchange('extractors'), routing_key='extractors', queue_arguments={'x-max-priority': 10}),
    Queue('scroll', Exchange('scroll'), routing_key='scroll', queue_arguments={'x-max-priority': 10}),
    ...
)
 
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
 
CELERY_TASK_ROUTES = {
   ...
    'api.tasks.extraction.exctract': {'queue': 'extractors'},
    'api.tasks.extraction.search_and_run_extractor': {'queue': 'scroll'},
   ...
}
и вытащил задачу обходящую индекс не только в отдельную очередь, а еще и работника на нее отдельного поставил в один поток.

Bash
1
python -m celery multi start --app={$APPNAME} --pidfile=/tmp/%n.pid --logfile=/app/logs/celery/%n.log 20 -c 4 -Q:1-2 q1 -Q:3-10 q2,q3 -Q:11-19 extractors -c:20 1 -Q:20 scroll -E &&
Смотрю что из этого выйдет.

Добавлено через 6 минут
Самое странное, что еще до того как я начал что-то исправлять, я успешно обошел другой индекс в котором было 10кк записей
0
 Аватар для Wi0M
395 / 123 / 48
Регистрация: 26.10.2013
Сообщений: 734
17.07.2020, 16:47  [ТС]
в общем. плохая идея публиковать себя же в пределах одной очереди. т.е. задачу должен публиковать кто-то из вне. тогда будет работать.
0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
BasicMan
Эксперт
29316 / 5623 / 2384
Регистрация: 17.02.2009
Сообщений: 30,364
Блог
17.07.2020, 16:47
Помогаю со студенческими работами здесь

Java 8 и rabbitmq
Пишу приложение, которое должно слушать очередь и что-то делать. На сайте самого rabbitmq, есть пример. Но в контексте моего приложения...

Кластер RabbitMQ
Есть тут те кто использует данный брокер сообщений? Задача создать кластер отказоустойчивый и с балансировкой нагрузки. Почитав инфу в...

rabbitmq-server
Здравствуйте. Я новичок в программировании. Изучаю Python. В качестве задания делаю интернет-магазин на django. Вроде все работает, но...

Разобраться с логикой приложения (Celery)
Всем привет! Использую фреймворк Django + Celery. У меня есть список товаров, которые обновляются (парсятся) каждые 30 минут. Я хочу...

Задачи не выполняются асинхронно celery
Здравствуйте! Используя celery и запуская задачи одну за другую не получается добиться асинхронности. Celery производит выполнение первой...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
3
Ответ Создать тему
Новые блоги и статьи
Установка Android SDK, NDK, JDK, CMake и т.д.
8Observer8 25.01.2026
Содержание блога Перейдите по ссылке: https:/ / developer. android. com/ studio и в самом низу страницы кликните по архиву "commandlinetools-win-xxxxxx_latest. zip" Извлеките архив и вы увидите. . .
Вывод текста со шрифтом TTF на Android с помощью библиотеки SDL3_ttf
8Observer8 25.01.2026
Содержание блога Если у вас не установлены Android SDK, NDK, JDK, и т. д. то сделайте это по следующей инструкции: Установка Android SDK, NDK, JDK, CMake и т. д. Сборка примера Скачайте. . .
Использование SDL3-callbacks вместо функции main() на Android, Desktop и WebAssembly
8Observer8 24.01.2026
Содержание блога Если вы откроете примеры для начинающих на официальном репозитории SDL3 в папке: examples, то вы увидите, что все примеры используют следующие четыре обязательные функции, а. . .
моя боль
iceja 24.01.2026
Выложила интерполяцию кубическими сплайнами www. iceja. net REST сервисы временно не работают, только через Web. Написала за 56 рабочих часов этот сайт с нуля. При помощи perplexity. ai PRO , при. . .
Модель сукцессии микоризы
anaschu 24.01.2026
Решили писать научную статью с неким РОманом
http://iceja.net/ математические сервисы
iceja 20.01.2026
Обновила свой сайт http:/ / iceja. net/ , приделала Fast Fourier Transform экстраполяцию сигналов. Однако предсказывает далеко не каждый сигнал (см ограничения http:/ / iceja. net/ fourier/ docs ). Также. . .
http://iceja.net/ сервер решения полиномов
iceja 18.01.2026
Выкатила http:/ / iceja. net/ сервер решения полиномов (находит действительные корни полиномов методом Штурма). На сайте документация по API, но скажу прямо VPS слабенький и 200 000 полиномов. . .
Расчёт переходных процессов в цепи постоянного тока
igorrr37 16.01.2026
/ * Дана цепь(не выше 3-го порядка) постоянного тока с элементами R, L, C, k(ключ), U, E, J. Программа находит переходные токи и напряжения на элементах схемы классическим методом(1 и 2 з-ны. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru