Форум программистов, компьютерный форум, киберфорум
py-thonny
Войти
Регистрация
Восстановить пароль

Выполнение асинхронных задач в Python с asyncio

Запись от py-thonny размещена 12.05.2025 в 21:47
Показов 1319 Комментарии 0

Нажмите на изображение для увеличения
Название: f7095692-f284-497b-84ff-a5735b86b830.jpg
Просмотров: 73
Размер:	183.8 Кб
ID:	10798
Современный мир программирования похож на оживлённый мегаполис – тысячи процессов одновременно требуют внимания, ресурсов и времени. В этих джунглях операций возникают ситуации, когда программа тратит драгоценные миллисекунды в ожидании – будь то ответ от сервера, чтение файла или запрос к базе данных. Представьте повара, который замирает с ножом в руке, ожидая, пока закипит вода – непродуктивно, не так ли? Именно эту проблему блестяще решают асинхронные подходы, и Python с его модулем asyncio стал одним из главных проводников этой философии.

Для начала разберёмся с базовыми понятиями. Конкурентность и параллелизм – термины, которые часто путают даже опытные разработчики. Параллелизм – это одновременное выполнение задач на разных физических ядрах процессора. Представте несколько поваров, каждый на своей кухне готовит отдельное блюдо. А конкурентность – выполнение множества задач, переключаясь между ними. Это как один повар, который умело жонглирует несколькими рецептами – пока томится соус, он нарезает овощи, а когда овощи жарятся, маринует мясо. Асинхронное программирование – модель, позволяющая начать операцию и продолжить выполнение других задач, не дожидаясь её завершения. Когда операция закончится, программа "вернётся" и обработает результат. Ключевое отличие от многопоточности – всё происходит в одном потоке, без накладных расходов на создание и переключение контекста между потоками.

Python
1
2
3
4
5
6
7
8
9
# Синхронный подход - каждая операция блокирует выполнение программы
def download_all_sites(sites):
    for site in sites:
        download_site(site)  # Ждем завершения каждой загрузки
 
# Асинхронный подход - запускаем все и собираем результаты когда готовы
async def download_all_sites_async(sites):
    tasks = [download_site_async(site) for site in sites]
    await asyncio.gather(*tasks)  # Запускаем все задачи и ждем их завершения
История асинхронного программирования в Python напоминает эволюцию от обезъяны к человеку. Изначально были колбэки – неудобные, сложные в отлаживании и понимании. Затем появились генераторы и декораторы в PEP 342, что позволило создать первые библиотеки вроде Twisted и Tornado. Помню, как в 2012 году мучился с колбэк-адом в Twisted – код превращался в замысловатый узор из вложеных функций, читаемый только его автором, да и то спустя неделю уже с трудом. Настоящий прорыв случился с введением ключевых слов async и await в Python 3.5 (PEP 492). Это была революция – элегантный синтаксис, напоминающий обычный синхронный код. Идея не нова: JavaScript и C# уже имели подобный функционал, но для Python это стало поворотным моментом.

Python
1
2
3
4
5
6
7
8
# До Python 3.5 - неудобные генераторы
@asyncio.coroutine
def old_style():
    yield from some_coroutine()
 
# После Python 3.5 - элегантный синтаксис
async def modern_style():
    await some_coroutine()
Асинхронное программирование особенно важно для I/O-зависимых задач – когда ваши программы большую часть времени ждут данные откуда-то извне. Исследования показывают, что в типичных веб-приложениях до 90% времени тратится на ожидание ответа от базы данных или внешних API. С asyncio эти паузы превращаются в продуктиные участки для другого кода.

Базовые принципы работы с asyncio



Погружаясь в мир asyncio, первое, с чем мы сталкиваемся – корутины. Они стали настоящим прорывом в асинхронном программировании на Python. Корутины – это не просто функции, а специальные объекты, способные приостановить своё выполнение, пропустить вперёд другие задачи, а затем плавно вернуться в нужную точку. Объявляются они с помощью ключевого слова async def:

Python
1
2
3
4
async def get_data():
    # Это корутина, которая может "засыпать", не блокируя поток
    await asyncio.sleep(1)  # Уступаем управление на 1 секунду
    return "Данные получены"
Обратите внимание на ключевое слово await. Оно как волшебная палочка – сигнализирует: "здесь мы можем отдать контроль". Под капотом происходит хитрая магия – корутина сохраняет свой стек, контекст выполнения и все локальные переменные. Технически, это возможно благодаря генераторам, которые умеют замораживать и размораживать своё состояние. Сердце любого асинхронного приложения на базе asyncio – цикл событий (event loop). Представьте его как трудолюбивого диспетчера, который следит за всеми задачами, распределяет ресурсы и решает, кому сейчас работать. Когда одна задача засыпает, он тут же находит другую, готовую к выполнению.

Python
1
2
3
4
5
6
7
8
9
import asyncio
 
async def main():
    print("Привет")
    await asyncio.sleep(1)
    print("Мир")
 
# Создаем цикл событий и запускаем корутину
asyncio.run(main())  # В Python 3.7+, самый простой способ
В более ранних версиях Python или для более тонкой настройки мы можем управлять циклом событий вручную:

Python
1
2
3
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Важнейший плюс asyncio перед другими подходами к конкурентности – отсутствие потоков и связаных с ними проблем синхронизации доступа к ресурсам. Работая с потоками, я часто ловил дедлоки и состояния гонки. С asyncio такого кошмара нет – всё выполняется в одном потоке, плюс мы точно знаем, где управление может переключиться (только на await).

Жизненный цикл корутины напоминает бабочку: сначала это гусеница-объект при вызове async def. Использовав asyncio.create_task(), мы оборачиваем её в кокон задачи. И наконец, после await она превращается в результат. Но есть нюанс — если корутину не обернуть в задачу и не дождаться выполнения, она просто исчезнет как утрений туман, не выполнив своего предназначения.

Python
1
2
3
4
5
async def process_data():
    # Корутина создана, но не запущена
    data = await fetch_data()  # Ждём получения данных
    result = await process(data)  # Обрабатываем их
    return result  # Возвращаем результат
Помимо await для ожидания результата одной корутины, asyncio предлагает еще несколько элегантных конструкций:

Python
1
2
3
4
5
6
7
# Асинхронный итератор - для поочерёдной обработки элементов
async for item in async_generator():
    process(item)
    
# Асинхронный контекстный менеджер - для корректной инициализации и финализации ресурсов
async with async_resource() as resource:
    await resource.do_something()
Работа с футурами и задачами — ещё один важный аспект. Задача (Task) - обертка для корутины, которая отслеживает её выполнение. Футура (Future) - низкоуровневая конструкция, представляющая результат, который будет доступен... в будущем. Обычно мы напрямую с ними не работаем, но знать о них стоит.

Python
1
2
3
4
# Создание задачи - корутина начнет выполняться незамедлительно
task = asyncio.create_task(some_coroutine())
# Затем мы можем дождаться результата
result = await task
Контекстные менеджеры в асинхронном стиле похожи на обычные, но с приставкой async:

Python
1
2
3
4
5
6
7
8
9
class AsyncContextManager:
    async def __aenter__(self):
        # Асинхронная инициализация
        await self.initialize()
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # Асинхронное освобождение ресурсов
        await self.cleanup()
В сложных сценариях иногда требуется создать собственный цикл событий. Но будьте осторожны — я однажды потратил целый день, выясняя, почему не работает код, а причиной оказалась тривиальная ошибка в настройке цикла событий:

Python
1
2
3
4
5
6
7
8
9
# Создание собственного цикла событий
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
 
# Или даже кастомная политика цикла событий
class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
    def get_event_loop(self):
        # Кастомная логика получения цикла событий
        ...
Особая сила asyncio раскрывается, когда нужно запустить несколько корутин параллельно. Для этого есть три основных стратегии:

Python
1
2
3
4
5
6
7
8
9
10
# Запуск всех задач и ожидание всех результатов
results = await asyncio.gather(task1(), task2(), task3())
 
# Получение результатов по мере готовности
for task in asyncio.as_completed([task1(), task2(), task3()]):
    result = await task
    process_result(result)
    
# Ожидание завершения с дополнительными опциями
done, pending = await asyncio.wait([task1(), task2()], timeout=10)
Великой силой Python всегда была читаемость кода. С asyncio эта традиция продолжилась — асинхронный код выглядит почти как синхронный, но при этом работает намного эффективнее. Согласитесь, гораздо легче понять линейную последовательность операций с await, чем распутывать клубок колбэков, как это было раньше. В моей практике был случай, когда простая замена синхронных HTTP-запросов на асинхронные с помощью aiohttp ускорила парсер сайтов в 12 раз! Система, которая обрабатывала 100 URL за 2 минуты, стала справляться с этим за 10 секунд, причём на том же железе.

Еще одна мощная концепция в асинхронном программировании — это асинхронные генераторы. Они позволяют создать поток данных, который можно потреблять по мере готовности частей, не блокируя основной поток. Вспоминаю ситуацию, когда мы обрабатывали поток больших файлов — синхронный код загружал весь файл в память, что приводило к OutOfMemoryError на больших датасетах. Асинхронные генераторы элегантно решили проблему:

Python
1
2
3
4
5
6
7
8
9
10
11
async def read_large_file(file_path):
    async with aiofiles.open(file_path, 'r') as f:
        while True:
            chunk = await f.read(1024)  # Читаем по кусочкам
            if not chunk:
                break
            yield chunk  # Отдаём каждый кусок как только он готов
 
# Использование:
async for chunk in read_large_file('huge_data.csv'):
    process_chunk(chunk)
Асинхронный генератор определяется с помощью async def с оператором yield внутри. Использовать их можно через async for. Таким образом, обработка даже терабайтных файлов становится элегантной задачей.

Отдельного внимания заслуживает обработка исключений в асинхронном коде. Если в синхронном Python мы привыкли к try/except, то в асинхронном мире исключения могут "теряться" между задачами. Бывает, корутина выбросит исключение, а мы его не увидим, потому что забыли дождаться её результата.

Python
1
2
3
4
5
6
7
8
9
10
11
12
async def risky_operation():
    await asyncio.sleep(1)
    raise ValueError("Что-то пошло не так!")
 
# Задача создана, но исключение не будет замечено!
task = asyncio.create_task(risky_operation())
 
# Правильный вариант:
try:
    await task
except ValueError as e:
    print(f"Поймали ошибку: {e}")
Если не обработать исключение в асинхронной задаче, можно получить жуткие "unhandled exception" в логах и зависание программы. Однажды я потратил почти целый день, пытаясь понять, почему система иногда странно завершается — оказалось, необработанное исключение в одной из задач тихо убивало цикл событий.

Более элегантный способ объединения нескольких корутин — это asyncio.gather(). Он запускает все переданные корутины параллельно и возвращает список их результатов, сохраняя порядок:

Python
1
2
3
4
5
6
7
8
async def main():
    results = await asyncio.gather(
        fetch_data('https://api.example.com/data1'),
        fetch_data('https://api.example.com/data2'),
        fetch_data('https://api.example.com/data3')
    )
    # results содержит результаты в том же порядке, что и корутины
    data1, data2, data3 = results
Интересная особенность gather — флаг return_exceptions. Если установить его в True, вместо возбуждения исключений они будут возвращены как результаты:

Python
1
2
3
4
5
6
results = await asyncio.gather(
    will_succeed(),
    will_fail(),  # Выбросит исключение
    return_exceptions=True
)
# results = [some_result, SomeException()]
Это очень удобно, когда нужно запустить серию независимых операций, и неудача одной не должна прерывать остальные.
Если порядок результатов не важен, а скорость критична, на помощь приходит asyncio.as_completed(). Он возвращает итератор с корутинами по мере их завершения:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
async def fetch_with_timeout(url, timeout):
    try:
        return await asyncio.wait_for(fetch_data(url), timeout)
    except asyncio.TimeoutError:
        return f"Timeout for {url}"
 
urls = ['https://example.com', 'https://slow-api.org', 'https://fast-api.com']
tasks = [fetch_with_timeout(url, 5) for url in urls]
 
# Обрабатываем результаты в порядке их готовности
async for coro in asyncio.as_completed(tasks):
    result = await coro
    print(f"Получен результат: {result}")
Часто требуется более тонкий контроль над выполнением асинхронных задач — например, продолжить выполнение, когда хотя бы часть задач завершена, или установить таймаут. Тут на сцену выходит asyncio.wait():

Python
1
2
3
4
5
6
7
8
9
10
# Ждём, пока не завершатся любые 2 из 3 задач
done, pending = await asyncio.wait(
    [task1, task2, task3],
    return_when=asyncio.FIRST_COMPLETED,
    timeout=10
)
 
# Отменяем оставшиеся задачи
for task in pending:
    task.cancel()
Иногда бывает полезно группировать корутины для управления ими как единым целым. Для этого служит класс TaskGroup, доступный с Python 3.11:

Python
1
2
3
4
5
6
7
8
async def process_urls(urls):
    async with asyncio.TaskGroup() as tg:
        # Создаем задачи внутри группы
        tasks = [tg.create_task(fetch(url)) for url in urls]
        
    # Когда мы выходим из контекста, все задачи завершены
    # Если какая-то задача выбросит исключение, оно пробросится сюда
    return [task.result() for task in tasks]
Долгое время был вопрос: как правильно отменять асинхронные задачи? С TaskGroup ответ стал очевиден — при выходе из контекста с исключением, все незавершенные задачи автоматически отменяются.

Важно помнить, что в Python асинхронность не равна параллелизму. Все корутины всё еще выполняются в одном потоке. Если у вас CPU-bound задача, asyncio не даст прироста производительности — понадобится multiprocessing. Экспериментально я выяснил, что для обработки больших массивов данных лучшая стратегия — комбинация: используйте asyncio для I/O операций и multiprocessing для тяжелых вычислений.

Работа с событиями — ещё одна важная составляющая асинхронного программирования. asyncio.Event позволяет синхронизировать корутины, заставляя их ждать, пока не произойдет определенное событие:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
event = asyncio.Event()
 
async def waiter():
    # Ждём, пока событие не будет установлено
    await event.wait()
    print("Событие произошло!")
 
async def setter():
    await asyncio.sleep(1)
    # Сигнализируем всем ждущим корутинам
    event.set()
 
async def main():
    # Запускаем обе корутины
    await asyncio.gather(waiter(), setter())
Нельзя не упоминуть о asyncio.shield() — функции, которая защищает корутину от отмены. Это может быть жизненно важно для операций, прерывание которых может привести к проблемам:

Python
1
2
3
4
5
6
7
8
async def critical_operation():
    try:
        # Операция, которую нельзя прерывать
        await asyncio.shield(database_transaction())
    except asyncio.CancelledError:
        # Отмена получена, но транзакция завершится корректно
        await cleanup()
        raise  # Перебрасываем исключение дальше
На практике правильная организация асинхронного кода может превратить сложный набор взаимозависимых операций в простой и понятный алгоритм. В одном проекте я сталкнулся с задачей одновременого обновления данных на десятках серверов с зависимостями между обновлениями. Синхронная версия была запутаной и медленой. Асинхронный подход с использованием asyncio не только ускорил процесс в 8 раз, но и сделал код значительно чище и понятнее.

asyncio.wait vs asyncio.gather
Всем привет! :victory: Вопросы внизу. Есть два похожих теста: #1 - asyncio.wait import...

Python asyncio / aiohttp API 429 response error
Пытаюсь написать асинхронный API-запрос, нашел подходящий пример, но в ответ получаю ошибку: "429...

python. flet. asyncio.Queue
Всем привет, есть проблема с понимаем как работают очереди и/или проблема с тем, что уже...

Неправильная постановка задач для асинхронных функции
Ребята всем привет, у меня есть некий автоподписчик который должен с разных клиентов подписать на...


Продвинутые техники asyncio



Погружаясь глубже в подземелья asyncio, мы сталкиваемся с продвинутыми механизмами, похожими на фокусы опытного мага. И одними из самых практичных инструментов в этом арсенале являются семафоры и блокировки. Представте оживлённый ресторан с ограниченным количеством столиков — семафоры работают схожим образом, контролируя одновременный доступ к ресурсам.

Python
1
2
3
4
5
6
7
8
9
10
11
12
async def access_resource(semaphore, resource_id):
async with semaphore:
    # Только ограниченное число корутин может попасть сюда одновременно
    print(f"Доступ к ресурсу {resource_id}")
    await asyncio.sleep(1)  # Имитация работы с ресурсом
    print(f"Ресурс {resource_id} освобожден")
 
async def main():
# Создаём семафор, ограничивающий доступ до 3 одновременных операций
semaphore = asyncio.Semaphore(3)
# Запускаем 10 задач
await asyncio.gather(*[access_resource(semaphore, i) for i in range(10)])
Семафоры незаменимы, когда нужно ограничить нагрузку на внешние сервисы. Помню случай, когда ненароком положил сторонний API, отправив сотню запросов одновременно. После внедрения семафора система стала "вежливым собеседником", а не "назойливым спамером".

В отличие от семафоров, блокировки (Lock) обеспечивают эксклюзивный доступ – только одна корутина может войти в защищенный блок:

Python
1
2
3
4
5
6
7
8
lock = asyncio.Lock()
 
async def update_counter(counter, lock):
async with lock:
    # Критическая секция - только одна корутина может быть здесь
    local_value = counter["value"]
    await asyncio.sleep(0.1)  # Имитируем "тяжелую" работу
    counter["value"] = local_value + 1
Блокировки решают классическую проблему "состояния гонки" (race condition), когда несколько потоков пытаются модифицировать общие данные. Хотя asyncio работает в одном потоке, проблема всё ещё актуальна из-за переключения контекста на await.

Ещё один продвинутый инструмент – условные переменные (Condition), создающие удобный механизм для оповещения ожидающих корутин:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
condition = asyncio.Condition()
 
async def consumer():
async with condition:
    print("Потребитель ждёт...")
    await condition.wait()
    print("Потребитель уведомлен!")
 
async def producer():
await asyncio.sleep(1)
async with condition:
    print("Производитель уведомляет...")
    condition.notify_all()
Особого внимания заслуживает элегантное прекращение выполнения задач. В реальном мире операции не всегда завершаются успешно – сетевые таймауты, ошибки и просто необходимость прервать долгую операцию заставляют нас искать механизмы отмены задач:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def long_operation():
try:
    print("Операция начата")
    await asyncio.sleep(10)  # Долгая операция
    print("Операция завершена")
except asyncio.CancelledError:
    print("Операция прервана!")
    # Здесь можно провести освобождение ресурсов
    raise  # Важно: переброс исключения для правильного поведения
 
async def main():
task = asyncio.create_task(long_operation())
# Ждём немного и отменяем задачу
await asyncio.sleep(2)
task.cancel()
try:
    await task
except asyncio.CancelledError:
    print("Задача успешно отменена")
Нередко нужно установить таймаут для операции. Вместо ручной отмены через cancel() можно использовать wait_for:

Python
1
2
3
4
5
try:
# Ждём результата максимум 5 секунд
result = await asyncio.wait_for(fetch_data("https://api.example.com"), 5)
except asyncio.TimeoutError:
print("Операция заняла слишком много времени!")
На одном проекте мы встретились с загадочным зависанием системы – выяснилось, что одна корутина "застряла" в бесконечном цикле ожидания. После добавления таймаутов система стала стабильной и предсказуемой.

Асинхронные генераторы – мощный инструмент для потоковой обработки данных. Они позволяют "yield"-ить значения асинхронно:

Python
1
2
3
4
5
6
7
8
9
async def ticker(delay, to):
"""Асинхронный генератор, выдающий числа через определённые интервалы."""
for i in range(to):
    yield i
    await asyncio.sleep(delay)
 
async def main():
async for i in ticker(0.5, 10):
    print(i)
Я активно применял асинхронные генераторы при разработке системы мониторинга, которая получала данные из нескольких источников. Вместо ожидания всех данных сразу, система обрабатывала их по мере поступления – моментально реагируя на критические показатели.

Очереди в asyncio – еще один незаменимый инструмент для координации работы производителей и потребителей:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
queue = asyncio.Queue(maxsize=5)  # Ограничиваем размер очереди
 
async def producer():
for i in range(10):
    await queue.put(i)
    print(f"Произведено: {i}")
    await asyncio.sleep(1)
 
async def consumer():
while True:
    item = await queue.get()
    print(f"Потреблено: {item}")
    queue.task_done()  # Отмечаем задачу как выполненную
 
async def main():
# Запускаем производителя и потребителя
producers = [asyncio.create_task(producer()) for _ in range(2)]
consumers = [asyncio.create_task(consumer()) for _ in range(3)]
# Ждём, пока производители закончат
await asyncio.gather(*producers)
# Ждём, пока очередь опустеет
await queue.join()
# Отменяем потребителей
for c in consumers:
    c.cancel()
Очереди особенно полезны в паттерне "producer-consumer" с разным числом производителей и потребителей. Очередь автоматически блокирует операцию put(), если достигнут лимит maxsize, что обеспечивает естественный "backpressure".

Одна из любимых моих техник – сочетание очередей с ограничением одновременных операций через семафоры:

Python
1
2
3
4
5
6
7
async def worker(name, queue, semaphore):
while True:
    task = await queue.get()
    async with semaphore:
        print(f"Работник {name} выполняет {task}")
        await process_task(task)
    queue.task_done()
Есть еще PriorityQueue – очередь с приоритетами, незаменимая, когда некоторые задачи важнее других:

Python
1
2
3
4
5
6
7
8
9
10
pq = asyncio.PriorityQueue()
await pq.put((1, "Высокий приоритет"))
await pq.put((5, "Низкий приоритет"))
await pq.put((2, "Средний приоритет"))
 
# Обработка начнётся с элементов с наименьшим приоритетом (1)
while not pq.empty():
priority, item = await pq.get()
print(f"Обработка {item} (приоритет: {priority})")
pq.task_done()
Однажды эта техника спасла моё приложение для трейдинга – критические ордера автоматически попадали в начало очереди, обеспечивая минимальную задержку при резких изменениях рынка.
В разработке асинхронных приложений часто возникает задача запуска фоновых задач, которые должны работать "вечно". Но как быть с их корректным завершением? Паттерн "cancel_on_exit" приходит на помощь:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def background_task():
try:
    while True:
        await asyncio.sleep(1)
        print("Фоновая задача работает...")
except asyncio.CancelledError:
    print("Фоновая задача остановлена")
 
async def main():
task = asyncio.create_task(background_task())
try:
    # Основная логика приложения
    await asyncio.sleep(5)
finally:
    # Всегда отменяем фоновую задачу при выходе
    task.cancel()
    await asyncio.gather(task, return_exceptions=True)
Нельзя не упомянуть еще один продвинутый инструмент в арсенале asyncio — асинхронные итераторы. В отличие от обычных итераторов, которые реализуют методы __iter__ и __next__, асинхронные версии используют __aiter__ и __anext__:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class AsyncCounter:
    def __init__(self, limit):
        self.limit = limit
        self.counter = 0
        
    def __aiter__(self):
        return self
        
    async def __anext__(self):
        if self.counter < self.limit:
            await asyncio.sleep(0.1)  # Имитируем асинхронную работу
            self.counter += 1
            return self.counter
        else:
            raise StopAsyncIteration
 
async def main():
    async for number in AsyncCounter(5):
        print(number)
Эта техника особенно полезна при работе с потоковыми данными из внешних источников. В одном из проектов мы использовали асинхронные итераторы для обработки событий из Kafka — это позволило элегантно организовать код и не беспокоиться о ручном управлении подписками.
Нередко в асинхронном коде приходится объединять несколько источников данных. Для работы с несколькими асинхроными потоками появился удобный инструмент asyncio.StreamReader:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def read_data(reader):
    while True:
        line = await reader.readline()
        if not line:
            break
        yield line.decode()
 
async def combine_streams(stream1, stream2):
    readers = [read_data(stream1), read_data(stream2)]
    pending = set(readers)
    
    while pending:
        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED
        )
        
        for done_task in done:
            try:
                result = await done_task
                yield result
            except StopAsyncIteration:
                continue
Иногда требуется разделить тяжелые вычисления и I/O операции. В таких случаях полезен паттерн с использованем исполнителей (executors) — они позволяют запускать блокирующие операции в отдельном пуле потоков или процессов, не останавливая цикл событий:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def cpu_bound_task(data):
    # Тяжелая вычислительная задача
    result = 0
    for i in range(10**7):
        result += i * i
    return result + len(data)
 
async def mixed_workload(data):
    # Запускаем CPU-bound задачу в пуле процессов
    result = await asyncio.to_thread(cpu_bound_task, data)
    
    # Продолжаем асинхронную работу
    await asyncio.sleep(0.1)
    return result
Одна из малоизвестных, но мощных техник — использование asyncio.TaskGroup (доступно с Python 3.11) или asyncio.create_task с хитрым паттерном обработки исключений для создания надежных воркеров:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def worker(worker_id, task_queue):
    while True:
        try:
            task = await task_queue.get()
            print(f"Воркер {worker_id} начал задачу {task}")
            await process_task(task)
        except Exception as e:
            print(f"Ошибка в воркере {worker_id}: {e}")
        finally:
            task_queue.task_done()
 
async def supervisor(num_workers=5):
    task_queue = asyncio.Queue()
    
    # Заполняем очередь задачами
    for i in range(20):
        await task_queue.put(f"Task-{i}")
    
    # Запускаем воркеров
    workers = [
        asyncio.create_task(worker(i, task_queue))
        for i in range(num_workers)
    ]
    
    # Ждём завершения всех задач
    await task_queue.join()
    
    # Отменяем воркеров
    for w in workers:
        w.cancel()
Особого внимания заслуживает работа с несколькими асинхронными запросами, особенно когда нужно обрабатывать их результаты по мере готовности. Я часто использую asyncio.as_completed в комбинации с itertools.cycle:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import itertools
 
async def process_batch(urls):
    # Создаём пул из нескольких обработчиков
    processors = [data_processor(i) for i in range(3)]
    processor_cycle = itertools.cycle(processors)
    
    # Запускаем запросы
    tasks = [fetch_url(url) for url in urls]
    
    # Обрабатываем результаты по мере готовности
    for task in asyncio.as_completed(tasks):
        result = await task
        # Выбираем следующий доступный процессор по кругу
        processor = next(processor_cycle)
        await processor.process(result)
В критичных к производительности системах иногда требуется управлять приоритетом задач. Нестандартное, но эффективное решение — комбинация PriorityQueue с динамическим изменением приоритетов:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class PriorityTaskScheduler:
    def __init__(self):
        self.queue = asyncio.PriorityQueue()
        self._counter = itertools.count()  # Для сортировки задач с одинаковым приоритетом
        
    async def add_task(self, priority, coro):
        count = next(self._counter)
        await self.queue.put((priority, count, coro))
        
    async def run(self):
        while not self.queue.empty():
            priority, _, coro = await self.queue.get()
            try:
                result = await coro
                # Если нужно повторное выполнение с другим приоритетом
                if isinstance(result, tuple) and result[0] == 'reschedule':
                    new_priority, new_coro = result[1], result[2]
                    await self.add_task(new_priority, new_coro)
            except Exception as e:
                print(f"Ошибка при выполнении задачи: {e}")
            finally:
                self.queue.task_done()
В одном проекте по обработке финансовых транзакций эта техника позволила гибко управлять нагрузкой — транзакции с высоким приоритетом обрабатывались первыми, а менее критичные операции ждали своей очереди.
Важно понимать, что с увеличением сложности асинхронных систем растет и важность грамотного тестирования. Для этого asyncio предлагает asyncio.mock, а в современных фреймворках типа pytest есть встроенная подержка для тестирования асинхронного кода:

Python
1
2
3
4
5
6
7
8
async def test_async_function():
    # Мокаем асинхронный метод
    with patch('module.async_function') as mock_func:
        mock_func.return_value = Future()
        mock_func.return_value.set_result('mocked_result')
        
        result = await function_under_test()
        assert result == 'expected_result'

Реальные сценарии использования



Теория хороша, но настоящая магия начинается, когда асинхронность решает реальные задачи. За годы работы с asyncio я накопил коллекцию шаблонов и практик, которые превращают абстрактные концепции в работающие системы.

Одна из самых очевидных областей применения asyncio — сетевое взаимодействие. Вспоминаю свой первый крупный проект с асинхронностью: агрегатор новостей, который собирал данные с десятков источников. Синхронная версия тратила 50+ секунд на обход всех сайтов. С переходом на aiohttp время упало до 3-4 секунд!

Python
1
2
3
4
5
6
7
8
9
10
11
12
async def fetch(session, url):
async with session.get(url, timeout=10) as response:
return await response.text()
 
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
    tasks = [fetch(session, url) for url in urls]
    return await asyncio.gather(*tasks, return_exceptions=True)
 
# Использование
urls = ["https://news1.com", "https://news2.com", ..., "https://news50.com"]
results = asyncio.run(fetch_all(urls))
Ключевой момент: используйте одну сессию для всех запросов. Создание новой сессии для каждого запроса сводит на нет преимущества асинхронности из-за накладных расходов на установку соединения.

Особо примечательна работа с WebSocket через asyncio. Для проекта мониторинга криптовалютных бирж я использовал библиотеку websockets, позволяющую элегантно обрабатывать непрерывный поток данных:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def crypto_monitor():
uri = "wss://ws.exchange.com/marketdata"
async with websockets.connect(uri) as websocket:
    # Подписываемся на обновления
    await websocket.send(json.dumps({
        "method": "SUBSCRIBE",
        "params": ["btcusdt@trade", "ethusdt@trade"],
        "id": 1
    }))
    
    while True:
        data = await websocket.recv()
        parsed = json.loads(data)
        await process_trade_data(parsed)
Веб-разработка с asyncio стала настоящим прорывом. FastAPI — фреймворк, построенный поверх ASGI и использующий asyncio, демонстрирует впечатляющую производительность. В одном из проектов миграция с Flask на FastAPI увеличила пропускную способность API с 500 до почти 5000 запросов в секунду.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
 
app = FastAPI()
 
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_async_session)):
query = select(User).where(User.id == user_id)
result = await db.execute(query)
user = result.scalar_one_or_none()
if not user:
    raise HTTPException(status_code=404, detail="User not found")
return user
FastAPI автоматически запускает ваши асинхронные обработчики, обеспечивая неблокирующую обработку запросов. Для простых API этот прирост может быть незаметен, но для систем с тысячами соединений разница колосальная.

Работа с базами данных долгое время была ахиллесовой пятой асинхронных приложений. Традиционные драйверы БД блокировали выполнение, нивелируя преимущества asyncio. Ситуация изменилась с появлением драйверов вроде asyncpg для PostgreSQL и aiomysql для MySQL.

Python
1
2
3
4
5
6
7
8
9
10
import asyncpg
 
async def get_user_data(user_id):
conn = await asyncpg.connect('postgresql://user:password@localhost/database')
try:
    # Выполнение запроса неблокирующим способом
    row = await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
    return dict(row) if row else None
finally:
    await conn.close()
В современных проектах я предпочитаю использовать SQLAlchemy 1.4+, которая получила полноценную асинхронную поддержку:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
 
# Создаём движок и фабрику сессий
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
 
async def get_users_by_role(role_name):
async with async_session() as session:
    # Выполняем запрос асинхронно
    result = await session.execute(
        select(User).where(User.role == role_name)
    )
    return result.scalars().all()
Важный нюанс: в асинхронном режиме SQLAlchemy не выполняет автоматически неявные операции, как в синхронном режиме. Часто приходится явно вызывать await session.refresh(obj) после вставок.

Интеграция асинхронного и синхронного кода — задача, с которой рано или поздно сталкиваются все разработчики. В идеальном мире всё работало бы асинхронно, но реальность такова, что много библиотек остаются синхроными.
Главное правило: не блокируйте цикл событий. Для CPU-bound операций используйте run_in_executor:

Python
1
2
3
4
5
6
7
8
9
10
11
def cpu_intensive_operation(data):
# Какая-то тяжелая синхронная функция 
result = process_data_sync(data)
return result
 
async def mixed_function(data):
# Запускаем синхронный код в пуле потоков
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, cpu_intensive_operation, data)
# Продолжаем асинхронную работу
return result
Для I/O-bound операций нередко можно найти асинхронную альтернативу. Например, вместо requests используйте aiohttp, вместо psycopg2 — asyncpg.
Если приходится интегрироваться с блокирующими API, рассмотрите создание отдельного процесса с собственным циклом событий. Общаться между процессами можно через очереди:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Process, Queue
 
def blocking_worker(in_queue, out_queue):
while True:
    task = in_queue.get()
    if task is None:  # Сигнал остановки
        break
    # Выполняем блокирующую операцию
    result = some_blocking_api(task)
    out_queue.put(result)
 
async def async_controller():
# Создаём очереди и процесс
in_queue = Queue()
out_queue = Queue()
worker = Process(target=blocking_worker, args=(in_queue, out_queue))
worker.start()
 
try:
    # Асинхронно взаимодействуем с блокирующим API 
    # через отдельный процесс
    for task in tasks:
        in_queue.put(task)
        # Неблокирующая проверка результатов
        if not out_queue.empty():
            result = out_queue.get_nowait()
            await process_result(result)
finally:
    # Останавливаем воркер
    in_queue.put(None)
    worker.join()
Обработка больших данных — ещё одна область, где asyncio показывает себя отлично. Классический подход с загрузкой всего датасета в память часто не работает на реальных объёмах.
Для проекта аналитики логов я использовал потоковую обработку с asyncio.StreamReader:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
async def process_huge_log(file_path):
# Открываем файл для асинхронного чтения
proc = await asyncio.create_subprocess_exec(
    'cat', file_path,
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE
)
 
# Создаём читатель для потоковой обработки
reader = proc.stdout
buffer = bytearray()
stats = {"errors": 0, "warnings": 0, "infos": 0}
 
while True:
    # Читаем данные кусками
    chunk = await reader.read(8192)
    if not chunk:
        break
    
    buffer.extend(chunk)
    while b'\n' in buffer:
        # Обрабатываем строки по мере получения
        idx = buffer.find(b'\n')
        line = buffer[:idx].decode('utf-8', errors='replace')
        del buffer[:idx + 1]
        
        # Анализируем строку
        if 'ERROR' in line:
            stats["errors"] += 1
        elif 'WARNING' in line:
            stats["warnings"] += 1
        elif 'INFO' in line:
            stats["infos"] += 1
 
await proc.wait()
return stats
Этот подход позволил обрабатывать логи размером в десятки гигабайт, не загружая их полностью в память.
Для особо тяжёлых случаев эффективна комбинация asyncio с параллельной обработкой через multiprocessing:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def hybrid_processing(large_dataset_path):
# Разбиваем данные на чанки для параллельной обработки
chunks = split_data_into_chunks(large_dataset_path)
 
 
# Создаём пул процессов для CPU-bound операций
with ProcessPoolExecutor() as executor:
    loop = asyncio.get_running_loop()
    # Запускаем тяжёлые вычисления в отдельных процессах
    futures = [
        loop.run_in_executor(executor, process_chunk, chunk)
        for chunk in chunks
    ]
    # Параллельно с этим асинхронно обрабатываем другие задачи
    other_tasks = [other_async_task1(), other_async_task2()]
    
    # Ждём завершения всех задач
    results = await asyncio.gather(*futures, *other_tasks)
    
    # Собираем и агрегируем результаты
    chunk_results = results[:len(chunks)]
    return aggregate_results(chunk_results)
В одном из проектов анализа научных данных этот гибридный подход ускорил обработку в 15 раз по сравнению с чисто синхронной версией.
Асинхронность становится особенно полезной при обработке потоков данных в реальном времени. Например, для трансляции метрик мониторинга мы использовали связку asyncio и Server-Sent Events:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def sse_metrics(request):
response = web.StreamResponse(
    status=200,
    reason='OK',
    headers={'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive'}
)
await response.prepare(request)
 
try:
    while True:
        # Получаем метрики из асинхроного источника
        metrics = await collect_system_metrics()
        # Отправляем клиенту в формате SSE
        await response.write(
            f"data: {json.dumps(metrics)}\n\n".encode('utf-8')
        )
        await asyncio.sleep(1)
except ConnectionResetError:
    # Клиент отключился
    pass
 
return response
Эта модель SSE отлично подходит для дашбордов мониторинга, где клиент должен получать актуальные данные без постоянного опроса сервера. В одном из проектов она сократила нагрузку на сервер почти на 70% по сравнению с классическим опросом через AJAX каждые несколько секунд.
Еще один интересный сценарий — асинхронная обработка событий в брокерах сообщений. Например, подключение к RabbitMQ можно реализовать с помощью библиотеки aio-pika:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import aio_pika
 
async def process_message(message: aio_pika.IncomingMessage):
    async with message.process():
        data = message.body.decode()
        print(f"Получено сообщение: {data}")
        # Имитируем длительную обработку
        await asyncio.sleep(1)
        print(f"Сообщение обработано: {data}")
 
async def main():
    connection = await aio_pika.connect("amqp://guest:guest@localhost/")
    
    async with connection:
        channel = await connection.channel()
        
        # Объявляем очередь
        queue = await channel.declare_queue("test_queue", durable=True)
        
        # Запускаем обработку сообщений
        await queue.consume(process_message)
        
        # Цикл событий не завершается, пока мы не отменим задачу вручную
        await asyncio.Future()
У меня был проект системы уведомлений, где асинхронная архитектура с RabbitMQ позволила обрабатывать свыше 10 тысяч оповещений в минуту на скромном сервере. Секрет заключался в том, что во время ожидания ответа от внешних систем (SMS-шлюзов, прушеров), asyncio занимал процессор другими задачами.
Микросервисная архитектура и asyncio — еще один мощный тандем. В распределённых системах сервисы постоянно общаются друг с другом, и синхронные запросы создают каскадные задержки. С gRPC и asyncio эта проблема решается элегантно:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import grpc
from concurrent import futures
import service_pb2
import service_pb2_grpc
 
class ServiceImplementation(service_pb2_grpc.ServiceServicer):
    async def ProcessData(self, request, context):
        # Асинхронная обработка запроса
        result = await self.process_data_async(request.data)
        return service_pb2.Response(result=result)
    
    async def process_data_async(self, data):
        # Имитация асинхронной обработки
        await asyncio.sleep(0.1)
        return f"Processed: {data}"
 
async def serve():
    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
    service_pb2_grpc.add_ServiceServicer_to_server(
        ServiceImplementation(), server
    )
    server.add_insecure_port('[::]:50051')
    await server.start()
    await server.wait_for_termination()
 
if __name__ == '__main__':
    asyncio.run(serve())
Для планировщиков задач asyncio тоже показывает себя с лучшей стороны. Один из моих любимых паттернов — имплементация простого но гибкого шедулера:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class AsyncScheduler:
    def __init__(self):
        self.tasks = {}
        
    async def add_job(self, func, interval, job_id=None):
        if job_id is None:
            job_id = str(uuid.uuid4())
            
        self.tasks[job_id] = asyncio.create_task(self._job_runner(func, interval, job_id))
        return job_id
    
    async def remove_job(self, job_id):
        if job_id in self.tasks:
            self.tasks[job_id].cancel()
            del self.tasks[job_id]
            
    async def _job_runner(self, func, interval, job_id):
        try:
            while True:
                await func()
                await asyncio.sleep(interval)
        except asyncio.CancelledError:
            print(f"Задача {job_id} отменена")
            
    async def shutdown(self):
        for job_id in list(self.tasks.keys()):
            await self.remove_job(job_id)
В одном из проектов мы использовали подобную систему для реализации отложеного выполнения и периодических задач. Преимущество перед Celery оказалось в простоте — не нужен отдельный брокер сообщений, всё работает в рамках одного процесса.
Тестирование асинхронного кода исторически было головной болью для многих разработчиков. К счастью, современные инструменты сильно облегчили задачу. С pytest-asyncio можно писать тесты почти так же, как и для синхронных функций:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import pytest
 
@pytest.mark.asyncio
async def test_async_process():
    # Подготовка данных
    data = {"user_id": 123, "action": "login"}
    
    # Мокируем внешние зависимости
    with patch('module.external_api.get_user_status') as mock_status:
        # Настраиваем мок для возврата асинхронного результата
        future = asyncio.Future()
        future.set_result({"status": "active"})
        mock_status.return_value = future
        
        # Вызываем тестируемую функцию
        result = await process_user_action(data)
        
        # Проверяем результат
        assert result["success"] is True
        assert mock_status.called
Асинхронность также внесла революцию в разработку CLI-утилит. Вместо последовательного выполнения команд, современные CLI могут паралельно запускать задачи и показывать прогресс в реальном времени. Библиотека rich в сочетании с asyncio даёт потрясающие результаты:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
from rich.progress import Progress
 
async def process_file(file_path):
    total_size = os.path.getsize(file_path)
    processed = 0
    
    # Имитируем обработку файла по блокам
    chunk_size = 1024 * 1024  # 1MB
    async with aiofiles.open(file_path, 'rb') as f:
        while chunk := await f.read(chunk_size):
            # Обрабатываем чанк
            await asyncio.sleep(0.01)  # Имитация работы
            processed += len(chunk)
            # Обновляем прогресс
            yield processed / total_size * 100
 
async def main():
    files = ['large_file1.dat', 'large_file2.dat', 'large_file3.dat']
    
    with Progress() as progress:
        # Создаём задачи для индикаторов прогресса
        tasks = {
            file: progress.add_task(f"[green]Обработка {file}", total=100)
            for file in files
        }
        
        # Запускаем обработку всех файлов
        file_tasks = [process_file_with_progress(file, tasks[file], progress) 
                     for file in files]
        await asyncio.gather(*file_tasks)
В одном из моих проектов мы создали утилиту миграции данных с асинхронной обработкой и интерактивным интерфейсом. Она работала с терабайными базами и отображала не только прогресс, но и текущую скорость, оставшееся время и даже графики в терминале. Пользователи были в востроге — наконец-то не приходилось гадать, висит программа или обрабатывает данные.
Интересный кейс — использование asyncio для управления docker-контейнерами. С библиотекой aiodocker создание и управление целыми оркестрами контейнеров становится элегантной задачей:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import aiodocker
 
async def start_service(config):
    docker = aiodocker.Docker()
    
    # Запускаем контейнер
    container = await docker.containers.create(
        config={
            "Image": "postgres:13",
            "Env": ["POSTGRES_PASSWORD=mysecretpassword"],
            "HostConfig": {
                "PortBindings": {"5432/tcp": [{"HostPort": "5432"}]}
            }
        }
    )
    
    await container.start()
    logs = container.log(stdout=True, stderr=True, follow=True)
    
    # Читаем логи асинхронно, пока не увидим сообщение о готовности
    async for line in logs:
        if "database system is ready to accept connections" in line:
            print("БД запущена и готова к работе!")
            break
            
    return container

Оптимизация и подводные камни



Работа с асинхронным кодом напоминает хождение по минному полю – зазевался на момент и всё, приготовься к долгим часам отладки. За годы ковыряния в недрах asyncio я насобирал целую коллекцию граблей, на которые регулярно наступают даже опытные разработчики.
Первая и самая распостранённая ошибка – забыть await перед корутиной. Такая простая, казалось бы, вещь, но она превращается в настоящий кошмар:

Python
1
2
3
4
5
6
7
async def fetch_data():
    return await process_request()
 
async def main():
    # Забыли await!
    data = fetch_data()  # Возвращает корутину, а не результат
    print(data)  # Выведет что-то вроде <coroutine object fetch_data at 0x...>
Коварство этой ошибки в том, что Python не выбрасывает исключение – код выполняется, но корутина просто никогда не запускается. В больших системах такие "потерянные корутины" могут стать источником таинственных багов.
Другой распространённый антипаттерн – блокировка цикла событий. Помню, в одном проекте мы неделю искали причину странных фризов, и виновником оказался безобидный на вид вызов:

Python
1
2
3
4
5
async def process_data(items):
    for item in items:
        # CPU-bound операция блокирует весь цикл событий!
        result = compute_hash(item)  # Тяжелая синхронная функция
        await save_result(result)
Такой код – настоящий убийца производительности. Пока выполняется compute_hash(), весь цикл событий стоит, и другие корутины не могут выполняться. Правильное решение – использовать run_in_executor() для CPU-интенсивных операций.
С памятью в асинхронном мире тоже не всё гладко. Сборщик мусора Python'а порой ведёт себя странно с циклическими ссылками в корутинах. Я сталкивался с утечками, когда задачи вроде как завершались, но память не освобождалась:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def memory_leak_risk():
    # Создаём большой объект
    huge_data = [0] * (10**6)  # Список из миллиона элементов
    
    # Создаём замыкание, которое ссылается на huge_data
    async def inner_func():
        nonlocal huge_data
        await asyncio.sleep(1)
        return len(huge_data)
    
    # Запускаем, но забываем дождаться или отменить
    asyncio.create_task(inner_func())
    
    # huge_data остаётся в памяти, пока inner_func не завершится!
Для тяжелых приложений полезен режим отладки asyncio. Он выявляет множество проблем, которые трудно заметить невооруженным глазом:

Python
1
2
# Включаем режим отладки
asyncio.run(main(), debug=True)
В этом режиме asyncio выбрасывает предупреждения о корутинах, которые работают слишком долго, о незакрытых ресурсах и других потенциальных проблемах. В одном проекте это помогло нам выявить корутину, которая выполнялась 30+ секунд и блокировала всё приложение.

Для серьёзной отладки асинхронного кода понадобится более тяжёлая артелерия. Я часто использую связку aiodebugger и aiomonitor, которые позволяют инспектировать состояние асинхронных задач в реальном времени:

Python
1
2
3
4
5
6
7
8
9
import aiomonitor
 
async def main():
    # Основная логика приложения
    server = await setup_server()
    
    # Запускаем монитор на порту 50101
    with aiomonitor.Monitor(asyncio.get_event_loop()):
        await server.serve_forever()
Запустив такое приложение, можно подключиться к нему через telnet и исследовать текущие задачи, состояние цикла событий и многое другое.
Для профилирования асинхронной производительности обычный cProfile не всегда удобен. Лучше использовать pyinstrument – он учитывает особености асинхронного выполнения и показывает, где ваш код действительно тратит время:

Python
1
2
3
4
5
6
7
8
9
10
from pyinstrument import Profiler
 
async def main():
    profiler = Profiler()
    profiler.start()
    
    await do_async_work()
    
    profiler.stop()
    print(profiler.output_text(unicode=True, color=True))
Тестирование асинхронного кода – отдельная головная боль. Стандартный unittest не очень-то дружит с asyncio. Для серьёзного тестирования я рекомендую pytest с плагином pytest-asyncio:

Python
1
2
3
4
5
6
import pytest
 
@pytest.mark.asyncio
async def test_async_function():
    result = await function_to_test()
    assert result == expected_value
Мокинг асинхронных функций тоже имеет свои особености. Простая замена через unittest.mock может не сработать, если не учитывать, что мок должен возвращать объект, совместимый с await:

Python
1
2
3
4
5
6
7
8
9
async def test_with_mocked_dependency():
    with patch('module.async_dependency') as mock_dep:
        # Важно: создаём Future и устанавливаем результат
        future = asyncio.Future()
        future.set_result('mocked_result')
        mock_dep.return_value = future
        
        result = await function_using_dependency()
        assert result == 'expected_result'
Помню бессонные ночи с одним особо коварным багом – задача иногда зависала на продакшене, но у меня локально всё работало идеально. Разгадка пришла неожиданно: на продакшене был другой лимит дескрипторов файлов, и при большой нагрузке система достигала этого лимита. Мораль? Всегда тестируйте асинхронный код под нагрузкой, близкой к реальной.

С чего начать разбираться с Asyncio?
нужно написать парсер страницы, с загрузкой некоторых картинок, на Asyncio. С многопоточным и...

Коннектор к API используя AsyncIO, aiohttp
Py3.5. Работаю с библиотекой, подключаюсь к api telegram. Получаю сообщения и отправляю их в...

Выйти из петли в run_forever() [asyncio]
Пишу скрипт, который эмулирует запуск игр в стиме. Для этого была выбрана библиотека Steam для...

Asyncio корректное завершение в случае непредвиденной остановки программы
Есть скрипт (выложен не полностью) @asyncio.coroutine def do_work(data): for adres, ip_list in...

Asyncio - coroutine vs future vs task
Доброго времени суток, форумчане! Приступил к изучению asyncio. Использую python 3.5. Гуру...

Можно ли поставить таски на паузу? (asyncio)
Всем привет! Имеется такой код: from aiohttp import web import asyncio async def...

Asyncio ошибка работы парсера
Здравствуйте, имеется парсер на Python в связке asyncio + aiohttp + threadPoolExecutor + lxml....

WxPython и asyncio
Добрый день. Подскажите можно ли в приложении wxPython использовать модуль asyncio? Как я понимаю...

Параллельный запуск 2 методов через asyncio
Есть необходимость сделать код, продолжающий работу даже при поступлении запроса. Пересмотрела и...

Multiprocessing vs multithreading vs AsyncIO при загрузке файлов
Вопрос всезнающему All ! Нужно загрузить/выгрузить много разноразмерных файлов через протокол...

Проблема с asyncio
Здравствуйте! пытаюсь разобраться с asyncio.. Нужна помощь, ошибка: RuntimeError: This event loop...

В asyncio отстутвует метод create_task и метод run
Здравствуйте, у меня в asyncio отсутствует метод create_task и метод run. Мой код: from bs4 import...

Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Unity 4D
GameUnited 13.06.2025
Четырехмерное пространство. . . Звучит как что-то из научной фантастики, правда? Однако для меня, как разработчика со стажем в игровой индустрии, четвертое измерение давно перестало быть абстракцией из. . .
SSE (Server-Sent Events) в ASP.NET Core и .NET 10
UnmanagedCoder 13.06.2025
Кажется, Microsoft снова подкинула нам интересную фичу в новой версии фреймворка. Работая с превью . NET 10, я наткнулся на нативную поддержку Server-Sent Events (SSE) в ASP. NET Core Minimal APIs. Эта. . .
С днём независимости России!
Hrethgir 13.06.2025
Решил побеседовать, с утра праздничного дня, с LM о завоеваниях. То что она написала о народе, представителем которого я являюсь сам сначала возмутило меня, но дальше только смешило. Это чисто. . .
Лето вокруг.
kumehtar 13.06.2025
Лето вокруг. Наполненное бурями и ураганами событий. На фоне магии Жизни, священной и вечной, неумелой рукой человека рисуется панорама душевного непокоя. Странные серые краски проникают и. . .
Популярные LM модели ориентированы на увеличение затрат ресурсов пользователями сгенерированного кода (грязь -заслуги чистоплюев).
Hrethgir 12.06.2025
Вообще обратил внимание, что они генерируют код (впрочем так-же ориентированы разработчики чипов даже), чтобы пользователь их использующий уходил в тот или иной убыток. Это достаточно опытные модели,. . .
Топ10 библиотек C для квантовых вычислений
bytestream 12.06.2025
Квантовые вычисления - это та область, где теория встречается с практикой на границе наших знаний о физике. Пока большая часть шума вокруг квантовых компьютеров крутится вокруг языков высокого уровня. . .
Dispose и Finalize в C#
stackOverflow 12.06.2025
Работая с C# больше десяти лет, я снова и снова наблюдаю одну и ту же историю: разработчики наивно полагаются на сборщик мусора, как на волшебную палочку, которая решит все проблемы с памятью. Да,. . .
Повышаем производительность игры на Unity 6 с GPU Resident Drawer
GameUnited 11.06.2025
Недавно копался в новых фичах Unity 6 и наткнулся на GPU Resident Drawer - штуку, которая заставила меня присвистнуть от удивления. По сути, это внутренний механизм рендеринга, который автоматически. . .
Множества в Python
py-thonny 11.06.2025
В Python существует множество структур данных, но иногда я сталкиваюсь с задачами, где ни списки, ни словари не дают оптимального решения. Часто это происходит, когда мне нужно быстро проверять. . .
Работа с ccache/sccache в рамках C++
Loafer 11.06.2025
Утилиты ccache и sccache занимаются тем, что кешируют промежуточные результаты компиляции, таким образом ускоряя последующие компиляции проекта. Это означает, что если проект будет компилироваться. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru