Современный мир программирования похож на оживлённый мегаполис – тысячи процессов одновременно требуют внимания, ресурсов и времени. В этих джунглях операций возникают ситуации, когда программа тратит драгоценные миллисекунды в ожидании – будь то ответ от сервера, чтение файла или запрос к базе данных. Представьте повара, который замирает с ножом в руке, ожидая, пока закипит вода – непродуктивно, не так ли? Именно эту проблему блестяще решают асинхронные подходы, и Python с его модулем asyncio стал одним из главных проводников этой философии.
Для начала разберёмся с базовыми понятиями. Конкурентность и параллелизм – термины, которые часто путают даже опытные разработчики. Параллелизм – это одновременное выполнение задач на разных физических ядрах процессора. Представте несколько поваров, каждый на своей кухне готовит отдельное блюдо. А конкурентность – выполнение множества задач, переключаясь между ними. Это как один повар, который умело жонглирует несколькими рецептами – пока томится соус, он нарезает овощи, а когда овощи жарятся, маринует мясо. Асинхронное программирование – модель, позволяющая начать операцию и продолжить выполнение других задач, не дожидаясь её завершения. Когда операция закончится, программа "вернётся" и обработает результат. Ключевое отличие от многопоточности – всё происходит в одном потоке, без накладных расходов на создание и переключение контекста между потоками.
Python | 1
2
3
4
5
6
7
8
9
| # Синхронный подход - каждая операция блокирует выполнение программы
def download_all_sites(sites):
for site in sites:
download_site(site) # Ждем завершения каждой загрузки
# Асинхронный подход - запускаем все и собираем результаты когда готовы
async def download_all_sites_async(sites):
tasks = [download_site_async(site) for site in sites]
await asyncio.gather(*tasks) # Запускаем все задачи и ждем их завершения |
|
История асинхронного программирования в Python напоминает эволюцию от обезъяны к человеку. Изначально были колбэки – неудобные, сложные в отлаживании и понимании. Затем появились генераторы и декораторы в PEP 342, что позволило создать первые библиотеки вроде Twisted и Tornado. Помню, как в 2012 году мучился с колбэк-адом в Twisted – код превращался в замысловатый узор из вложеных функций, читаемый только его автором, да и то спустя неделю уже с трудом. Настоящий прорыв случился с введением ключевых слов async и await в Python 3.5 (PEP 492). Это была революция – элегантный синтаксис, напоминающий обычный синхронный код. Идея не нова: JavaScript и C# уже имели подобный функционал, но для Python это стало поворотным моментом.
Python | 1
2
3
4
5
6
7
8
| # До Python 3.5 - неудобные генераторы
@asyncio.coroutine
def old_style():
yield from some_coroutine()
# После Python 3.5 - элегантный синтаксис
async def modern_style():
await some_coroutine() |
|
Асинхронное программирование особенно важно для I/O-зависимых задач – когда ваши программы большую часть времени ждут данные откуда-то извне. Исследования показывают, что в типичных веб-приложениях до 90% времени тратится на ожидание ответа от базы данных или внешних API. С asyncio эти паузы превращаются в продуктиные участки для другого кода.
Базовые принципы работы с asyncio
Погружаясь в мир asyncio , первое, с чем мы сталкиваемся – корутины. Они стали настоящим прорывом в асинхронном программировании на Python. Корутины – это не просто функции, а специальные объекты, способные приостановить своё выполнение, пропустить вперёд другие задачи, а затем плавно вернуться в нужную точку. Объявляются они с помощью ключевого слова async def :
Python | 1
2
3
4
| async def get_data():
# Это корутина, которая может "засыпать", не блокируя поток
await asyncio.sleep(1) # Уступаем управление на 1 секунду
return "Данные получены" |
|
Обратите внимание на ключевое слово await . Оно как волшебная палочка – сигнализирует: "здесь мы можем отдать контроль". Под капотом происходит хитрая магия – корутина сохраняет свой стек, контекст выполнения и все локальные переменные. Технически, это возможно благодаря генераторам, которые умеют замораживать и размораживать своё состояние. Сердце любого асинхронного приложения на базе asyncio – цикл событий (event loop). Представьте его как трудолюбивого диспетчера, который следит за всеми задачами, распределяет ресурсы и решает, кому сейчас работать. Когда одна задача засыпает, он тут же находит другую, готовую к выполнению.
Python | 1
2
3
4
5
6
7
8
9
| import asyncio
async def main():
print("Привет")
await asyncio.sleep(1)
print("Мир")
# Создаем цикл событий и запускаем корутину
asyncio.run(main()) # В Python 3.7+, самый простой способ |
|
В более ранних версиях Python или для более тонкой настройки мы можем управлять циклом событий вручную:
Python | 1
2
3
| loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close() |
|
Важнейший плюс asyncio перед другими подходами к конкурентности – отсутствие потоков и связаных с ними проблем синхронизации доступа к ресурсам. Работая с потоками, я часто ловил дедлоки и состояния гонки. С asyncio такого кошмара нет – всё выполняется в одном потоке, плюс мы точно знаем, где управление может переключиться (только на await ).
Жизненный цикл корутины напоминает бабочку: сначала это гусеница-объект при вызове async def . Использовав asyncio.create_task() , мы оборачиваем её в кокон задачи. И наконец, после await она превращается в результат. Но есть нюанс — если корутину не обернуть в задачу и не дождаться выполнения, она просто исчезнет как утрений туман, не выполнив своего предназначения.
Python | 1
2
3
4
5
| async def process_data():
# Корутина создана, но не запущена
data = await fetch_data() # Ждём получения данных
result = await process(data) # Обрабатываем их
return result # Возвращаем результат |
|
Помимо await для ожидания результата одной корутины, asyncio предлагает еще несколько элегантных конструкций:
Python | 1
2
3
4
5
6
7
| # Асинхронный итератор - для поочерёдной обработки элементов
async for item in async_generator():
process(item)
# Асинхронный контекстный менеджер - для корректной инициализации и финализации ресурсов
async with async_resource() as resource:
await resource.do_something() |
|
Работа с футурами и задачами — ещё один важный аспект. Задача (Task) - обертка для корутины, которая отслеживает её выполнение. Футура (Future) - низкоуровневая конструкция, представляющая результат, который будет доступен... в будущем. Обычно мы напрямую с ними не работаем, но знать о них стоит.
Python | 1
2
3
4
| # Создание задачи - корутина начнет выполняться незамедлительно
task = asyncio.create_task(some_coroutine())
# Затем мы можем дождаться результата
result = await task |
|
Контекстные менеджеры в асинхронном стиле похожи на обычные, но с приставкой async :
Python | 1
2
3
4
5
6
7
8
9
| class AsyncContextManager:
async def __aenter__(self):
# Асинхронная инициализация
await self.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Асинхронное освобождение ресурсов
await self.cleanup() |
|
В сложных сценариях иногда требуется создать собственный цикл событий. Но будьте осторожны — я однажды потратил целый день, выясняя, почему не работает код, а причиной оказалась тривиальная ошибка в настройке цикла событий:
Python | 1
2
3
4
5
6
7
8
9
| # Создание собственного цикла событий
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
# Или даже кастомная политика цикла событий
class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
# Кастомная логика получения цикла событий
... |
|
Особая сила asyncio раскрывается, когда нужно запустить несколько корутин параллельно. Для этого есть три основных стратегии:
Python | 1
2
3
4
5
6
7
8
9
10
| # Запуск всех задач и ожидание всех результатов
results = await asyncio.gather(task1(), task2(), task3())
# Получение результатов по мере готовности
for task in asyncio.as_completed([task1(), task2(), task3()]):
result = await task
process_result(result)
# Ожидание завершения с дополнительными опциями
done, pending = await asyncio.wait([task1(), task2()], timeout=10) |
|
Великой силой Python всегда была читаемость кода. С asyncio эта традиция продолжилась — асинхронный код выглядит почти как синхронный, но при этом работает намного эффективнее. Согласитесь, гораздо легче понять линейную последовательность операций с await , чем распутывать клубок колбэков, как это было раньше. В моей практике был случай, когда простая замена синхронных HTTP-запросов на асинхронные с помощью aiohttp ускорила парсер сайтов в 12 раз! Система, которая обрабатывала 100 URL за 2 минуты, стала справляться с этим за 10 секунд, причём на том же железе.
Еще одна мощная концепция в асинхронном программировании — это асинхронные генераторы. Они позволяют создать поток данных, который можно потреблять по мере готовности частей, не блокируя основной поток. Вспоминаю ситуацию, когда мы обрабатывали поток больших файлов — синхронный код загружал весь файл в память, что приводило к OutOfMemoryError на больших датасетах. Асинхронные генераторы элегантно решили проблему:
Python | 1
2
3
4
5
6
7
8
9
10
11
| async def read_large_file(file_path):
async with aiofiles.open(file_path, 'r') as f:
while True:
chunk = await f.read(1024) # Читаем по кусочкам
if not chunk:
break
yield chunk # Отдаём каждый кусок как только он готов
# Использование:
async for chunk in read_large_file('huge_data.csv'):
process_chunk(chunk) |
|
Асинхронный генератор определяется с помощью async def с оператором yield внутри. Использовать их можно через async for . Таким образом, обработка даже терабайтных файлов становится элегантной задачей.
Отдельного внимания заслуживает обработка исключений в асинхронном коде. Если в синхронном Python мы привыкли к try/except , то в асинхронном мире исключения могут "теряться" между задачами. Бывает, корутина выбросит исключение, а мы его не увидим, потому что забыли дождаться её результата.
Python | 1
2
3
4
5
6
7
8
9
10
11
12
| async def risky_operation():
await asyncio.sleep(1)
raise ValueError("Что-то пошло не так!")
# Задача создана, но исключение не будет замечено!
task = asyncio.create_task(risky_operation())
# Правильный вариант:
try:
await task
except ValueError as e:
print(f"Поймали ошибку: {e}") |
|
Если не обработать исключение в асинхронной задаче, можно получить жуткие "unhandled exception" в логах и зависание программы. Однажды я потратил почти целый день, пытаясь понять, почему система иногда странно завершается — оказалось, необработанное исключение в одной из задач тихо убивало цикл событий.
Более элегантный способ объединения нескольких корутин — это asyncio.gather() . Он запускает все переданные корутины параллельно и возвращает список их результатов, сохраняя порядок:
Python | 1
2
3
4
5
6
7
8
| async def main():
results = await asyncio.gather(
fetch_data('https://api.example.com/data1'),
fetch_data('https://api.example.com/data2'),
fetch_data('https://api.example.com/data3')
)
# results содержит результаты в том же порядке, что и корутины
data1, data2, data3 = results |
|
Интересная особенность gather — флаг return_exceptions . Если установить его в True , вместо возбуждения исключений они будут возвращены как результаты:
Python | 1
2
3
4
5
6
| results = await asyncio.gather(
will_succeed(),
will_fail(), # Выбросит исключение
return_exceptions=True
)
# results = [some_result, SomeException()] |
|
Это очень удобно, когда нужно запустить серию независимых операций, и неудача одной не должна прерывать остальные.
Если порядок результатов не важен, а скорость критична, на помощь приходит asyncio.as_completed() . Он возвращает итератор с корутинами по мере их завершения:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
| async def fetch_with_timeout(url, timeout):
try:
return await asyncio.wait_for(fetch_data(url), timeout)
except asyncio.TimeoutError:
return f"Timeout for {url}"
urls = ['https://example.com', 'https://slow-api.org', 'https://fast-api.com']
tasks = [fetch_with_timeout(url, 5) for url in urls]
# Обрабатываем результаты в порядке их готовности
async for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Получен результат: {result}") |
|
Часто требуется более тонкий контроль над выполнением асинхронных задач — например, продолжить выполнение, когда хотя бы часть задач завершена, или установить таймаут. Тут на сцену выходит asyncio.wait() :
Python | 1
2
3
4
5
6
7
8
9
10
| # Ждём, пока не завершатся любые 2 из 3 задач
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.FIRST_COMPLETED,
timeout=10
)
# Отменяем оставшиеся задачи
for task in pending:
task.cancel() |
|
Иногда бывает полезно группировать корутины для управления ими как единым целым. Для этого служит класс TaskGroup , доступный с Python 3.11:
Python | 1
2
3
4
5
6
7
8
| async def process_urls(urls):
async with asyncio.TaskGroup() as tg:
# Создаем задачи внутри группы
tasks = [tg.create_task(fetch(url)) for url in urls]
# Когда мы выходим из контекста, все задачи завершены
# Если какая-то задача выбросит исключение, оно пробросится сюда
return [task.result() for task in tasks] |
|
Долгое время был вопрос: как правильно отменять асинхронные задачи? С TaskGroup ответ стал очевиден — при выходе из контекста с исключением, все незавершенные задачи автоматически отменяются.
Важно помнить, что в Python асинхронность не равна параллелизму. Все корутины всё еще выполняются в одном потоке. Если у вас CPU-bound задача, asyncio не даст прироста производительности — понадобится multiprocessing . Экспериментально я выяснил, что для обработки больших массивов данных лучшая стратегия — комбинация: используйте asyncio для I/O операций и multiprocessing для тяжелых вычислений.
Работа с событиями — ещё одна важная составляющая асинхронного программирования. asyncio.Event позволяет синхронизировать корутины, заставляя их ждать, пока не произойдет определенное событие:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| event = asyncio.Event()
async def waiter():
# Ждём, пока событие не будет установлено
await event.wait()
print("Событие произошло!")
async def setter():
await asyncio.sleep(1)
# Сигнализируем всем ждущим корутинам
event.set()
async def main():
# Запускаем обе корутины
await asyncio.gather(waiter(), setter()) |
|
Нельзя не упоминуть о asyncio.shield() — функции, которая защищает корутину от отмены. Это может быть жизненно важно для операций, прерывание которых может привести к проблемам:
Python | 1
2
3
4
5
6
7
8
| async def critical_operation():
try:
# Операция, которую нельзя прерывать
await asyncio.shield(database_transaction())
except asyncio.CancelledError:
# Отмена получена, но транзакция завершится корректно
await cleanup()
raise # Перебрасываем исключение дальше |
|
На практике правильная организация асинхронного кода может превратить сложный набор взаимозависимых операций в простой и понятный алгоритм. В одном проекте я сталкнулся с задачей одновременого обновления данных на десятках серверов с зависимостями между обновлениями. Синхронная версия была запутаной и медленой. Асинхронный подход с использованием asyncio не только ускорил процесс в 8 раз, но и сделал код значительно чище и понятнее.
asyncio.wait vs asyncio.gather Всем привет! :victory:
Вопросы внизу.
Есть два похожих теста:
#1 - asyncio.wait
import... Python asyncio / aiohttp API 429 response error Пытаюсь написать асинхронный API-запрос, нашел подходящий пример, но в ответ получаю ошибку: "429... python. flet. asyncio.Queue Всем привет, есть проблема с понимаем как работают очереди и/или проблема с тем, что уже... Неправильная постановка задач для асинхронных функции Ребята всем привет, у меня есть некий автоподписчик который должен с разных клиентов подписать на...
Продвинутые техники asyncio
Погружаясь глубже в подземелья asyncio , мы сталкиваемся с продвинутыми механизмами, похожими на фокусы опытного мага. И одними из самых практичных инструментов в этом арсенале являются семафоры и блокировки. Представте оживлённый ресторан с ограниченным количеством столиков — семафоры работают схожим образом, контролируя одновременный доступ к ресурсам.
Python | 1
2
3
4
5
6
7
8
9
10
11
12
| async def access_resource(semaphore, resource_id):
async with semaphore:
# Только ограниченное число корутин может попасть сюда одновременно
print(f"Доступ к ресурсу {resource_id}")
await asyncio.sleep(1) # Имитация работы с ресурсом
print(f"Ресурс {resource_id} освобожден")
async def main():
# Создаём семафор, ограничивающий доступ до 3 одновременных операций
semaphore = asyncio.Semaphore(3)
# Запускаем 10 задач
await asyncio.gather(*[access_resource(semaphore, i) for i in range(10)]) |
|
Семафоры незаменимы, когда нужно ограничить нагрузку на внешние сервисы. Помню случай, когда ненароком положил сторонний API, отправив сотню запросов одновременно. После внедрения семафора система стала "вежливым собеседником", а не "назойливым спамером".
В отличие от семафоров, блокировки (Lock ) обеспечивают эксклюзивный доступ – только одна корутина может войти в защищенный блок:
Python | 1
2
3
4
5
6
7
8
| lock = asyncio.Lock()
async def update_counter(counter, lock):
async with lock:
# Критическая секция - только одна корутина может быть здесь
local_value = counter["value"]
await asyncio.sleep(0.1) # Имитируем "тяжелую" работу
counter["value"] = local_value + 1 |
|
Блокировки решают классическую проблему "состояния гонки" (race condition), когда несколько потоков пытаются модифицировать общие данные. Хотя asyncio работает в одном потоке, проблема всё ещё актуальна из-за переключения контекста на await .
Ещё один продвинутый инструмент – условные переменные (Condition ), создающие удобный механизм для оповещения ожидающих корутин:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
| condition = asyncio.Condition()
async def consumer():
async with condition:
print("Потребитель ждёт...")
await condition.wait()
print("Потребитель уведомлен!")
async def producer():
await asyncio.sleep(1)
async with condition:
print("Производитель уведомляет...")
condition.notify_all() |
|
Особого внимания заслуживает элегантное прекращение выполнения задач. В реальном мире операции не всегда завершаются успешно – сетевые таймауты, ошибки и просто необходимость прервать долгую операцию заставляют нас искать механизмы отмены задач:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| async def long_operation():
try:
print("Операция начата")
await asyncio.sleep(10) # Долгая операция
print("Операция завершена")
except asyncio.CancelledError:
print("Операция прервана!")
# Здесь можно провести освобождение ресурсов
raise # Важно: переброс исключения для правильного поведения
async def main():
task = asyncio.create_task(long_operation())
# Ждём немного и отменяем задачу
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Задача успешно отменена") |
|
Нередко нужно установить таймаут для операции. Вместо ручной отмены через cancel() можно использовать wait_for :
Python | 1
2
3
4
5
| try:
# Ждём результата максимум 5 секунд
result = await asyncio.wait_for(fetch_data("https://api.example.com"), 5)
except asyncio.TimeoutError:
print("Операция заняла слишком много времени!") |
|
На одном проекте мы встретились с загадочным зависанием системы – выяснилось, что одна корутина "застряла" в бесконечном цикле ожидания. После добавления таймаутов система стала стабильной и предсказуемой.
Асинхронные генераторы – мощный инструмент для потоковой обработки данных. Они позволяют "yield"-ить значения асинхронно:
Python | 1
2
3
4
5
6
7
8
9
| async def ticker(delay, to):
"""Асинхронный генератор, выдающий числа через определённые интервалы."""
for i in range(to):
yield i
await asyncio.sleep(delay)
async def main():
async for i in ticker(0.5, 10):
print(i) |
|
Я активно применял асинхронные генераторы при разработке системы мониторинга, которая получала данные из нескольких источников. Вместо ожидания всех данных сразу, система обрабатывала их по мере поступления – моментально реагируя на критические показатели.
Очереди в asyncio – еще один незаменимый инструмент для координации работы производителей и потребителей:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| queue = asyncio.Queue(maxsize=5) # Ограничиваем размер очереди
async def producer():
for i in range(10):
await queue.put(i)
print(f"Произведено: {i}")
await asyncio.sleep(1)
async def consumer():
while True:
item = await queue.get()
print(f"Потреблено: {item}")
queue.task_done() # Отмечаем задачу как выполненную
async def main():
# Запускаем производителя и потребителя
producers = [asyncio.create_task(producer()) for _ in range(2)]
consumers = [asyncio.create_task(consumer()) for _ in range(3)]
# Ждём, пока производители закончат
await asyncio.gather(*producers)
# Ждём, пока очередь опустеет
await queue.join()
# Отменяем потребителей
for c in consumers:
c.cancel() |
|
Очереди особенно полезны в паттерне "producer-consumer" с разным числом производителей и потребителей. Очередь автоматически блокирует операцию put() , если достигнут лимит maxsize , что обеспечивает естественный "backpressure".
Одна из любимых моих техник – сочетание очередей с ограничением одновременных операций через семафоры:
Python | 1
2
3
4
5
6
7
| async def worker(name, queue, semaphore):
while True:
task = await queue.get()
async with semaphore:
print(f"Работник {name} выполняет {task}")
await process_task(task)
queue.task_done() |
|
Есть еще PriorityQueue – очередь с приоритетами, незаменимая, когда некоторые задачи важнее других:
Python | 1
2
3
4
5
6
7
8
9
10
| pq = asyncio.PriorityQueue()
await pq.put((1, "Высокий приоритет"))
await pq.put((5, "Низкий приоритет"))
await pq.put((2, "Средний приоритет"))
# Обработка начнётся с элементов с наименьшим приоритетом (1)
while not pq.empty():
priority, item = await pq.get()
print(f"Обработка {item} (приоритет: {priority})")
pq.task_done() |
|
Однажды эта техника спасла моё приложение для трейдинга – критические ордера автоматически попадали в начало очереди, обеспечивая минимальную задержку при резких изменениях рынка.
В разработке асинхронных приложений часто возникает задача запуска фоновых задач, которые должны работать "вечно". Но как быть с их корректным завершением? Паттерн "cancel_on_exit" приходит на помощь:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| async def background_task():
try:
while True:
await asyncio.sleep(1)
print("Фоновая задача работает...")
except asyncio.CancelledError:
print("Фоновая задача остановлена")
async def main():
task = asyncio.create_task(background_task())
try:
# Основная логика приложения
await asyncio.sleep(5)
finally:
# Всегда отменяем фоновую задачу при выходе
task.cancel()
await asyncio.gather(task, return_exceptions=True) |
|
Нельзя не упомянуть еще один продвинутый инструмент в арсенале asyncio — асинхронные итераторы. В отличие от обычных итераторов, которые реализуют методы __iter__ и __next__ , асинхронные версии используют __aiter__ и __anext__ :
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| class AsyncCounter:
def __init__(self, limit):
self.limit = limit
self.counter = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.counter < self.limit:
await asyncio.sleep(0.1) # Имитируем асинхронную работу
self.counter += 1
return self.counter
else:
raise StopAsyncIteration
async def main():
async for number in AsyncCounter(5):
print(number) |
|
Эта техника особенно полезна при работе с потоковыми данными из внешних источников. В одном из проектов мы использовали асинхронные итераторы для обработки событий из Kafka — это позволило элегантно организовать код и не беспокоиться о ручном управлении подписками.
Нередко в асинхронном коде приходится объединять несколько источников данных. Для работы с несколькими асинхроными потоками появился удобный инструмент asyncio.StreamReader :
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| async def read_data(reader):
while True:
line = await reader.readline()
if not line:
break
yield line.decode()
async def combine_streams(stream1, stream2):
readers = [read_data(stream1), read_data(stream2)]
pending = set(readers)
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
for done_task in done:
try:
result = await done_task
yield result
except StopAsyncIteration:
continue |
|
Иногда требуется разделить тяжелые вычисления и I/O операции. В таких случаях полезен паттерн с использованем исполнителей (executors) — они позволяют запускать блокирующие операции в отдельном пуле потоков или процессов, не останавливая цикл событий:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| def cpu_bound_task(data):
# Тяжелая вычислительная задача
result = 0
for i in range(10**7):
result += i * i
return result + len(data)
async def mixed_workload(data):
# Запускаем CPU-bound задачу в пуле процессов
result = await asyncio.to_thread(cpu_bound_task, data)
# Продолжаем асинхронную работу
await asyncio.sleep(0.1)
return result |
|
Одна из малоизвестных, но мощных техник — использование asyncio.TaskGroup (доступно с Python 3.11) или asyncio.create_task с хитрым паттерном обработки исключений для создания надежных воркеров:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| async def worker(worker_id, task_queue):
while True:
try:
task = await task_queue.get()
print(f"Воркер {worker_id} начал задачу {task}")
await process_task(task)
except Exception as e:
print(f"Ошибка в воркере {worker_id}: {e}")
finally:
task_queue.task_done()
async def supervisor(num_workers=5):
task_queue = asyncio.Queue()
# Заполняем очередь задачами
for i in range(20):
await task_queue.put(f"Task-{i}")
# Запускаем воркеров
workers = [
asyncio.create_task(worker(i, task_queue))
for i in range(num_workers)
]
# Ждём завершения всех задач
await task_queue.join()
# Отменяем воркеров
for w in workers:
w.cancel() |
|
Особого внимания заслуживает работа с несколькими асинхронными запросами, особенно когда нужно обрабатывать их результаты по мере готовности. Я часто использую asyncio.as_completed в комбинации с itertools.cycle :
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| import itertools
async def process_batch(urls):
# Создаём пул из нескольких обработчиков
processors = [data_processor(i) for i in range(3)]
processor_cycle = itertools.cycle(processors)
# Запускаем запросы
tasks = [fetch_url(url) for url in urls]
# Обрабатываем результаты по мере готовности
for task in asyncio.as_completed(tasks):
result = await task
# Выбираем следующий доступный процессор по кругу
processor = next(processor_cycle)
await processor.process(result) |
|
В критичных к производительности системах иногда требуется управлять приоритетом задач. Нестандартное, но эффективное решение — комбинация PriorityQueue с динамическим изменением приоритетов:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| class PriorityTaskScheduler:
def __init__(self):
self.queue = asyncio.PriorityQueue()
self._counter = itertools.count() # Для сортировки задач с одинаковым приоритетом
async def add_task(self, priority, coro):
count = next(self._counter)
await self.queue.put((priority, count, coro))
async def run(self):
while not self.queue.empty():
priority, _, coro = await self.queue.get()
try:
result = await coro
# Если нужно повторное выполнение с другим приоритетом
if isinstance(result, tuple) and result[0] == 'reschedule':
new_priority, new_coro = result[1], result[2]
await self.add_task(new_priority, new_coro)
except Exception as e:
print(f"Ошибка при выполнении задачи: {e}")
finally:
self.queue.task_done() |
|
В одном проекте по обработке финансовых транзакций эта техника позволила гибко управлять нагрузкой — транзакции с высоким приоритетом обрабатывались первыми, а менее критичные операции ждали своей очереди.
Важно понимать, что с увеличением сложности асинхронных систем растет и важность грамотного тестирования. Для этого asyncio предлагает asyncio.mock , а в современных фреймворках типа pytest есть встроенная подержка для тестирования асинхронного кода:
Python | 1
2
3
4
5
6
7
8
| async def test_async_function():
# Мокаем асинхронный метод
with patch('module.async_function') as mock_func:
mock_func.return_value = Future()
mock_func.return_value.set_result('mocked_result')
result = await function_under_test()
assert result == 'expected_result' |
|
Реальные сценарии использования
Теория хороша, но настоящая магия начинается, когда асинхронность решает реальные задачи. За годы работы с asyncio я накопил коллекцию шаблонов и практик, которые превращают абстрактные концепции в работающие системы.
Одна из самых очевидных областей применения asyncio — сетевое взаимодействие. Вспоминаю свой первый крупный проект с асинхронностью: агрегатор новостей, который собирал данные с десятков источников. Синхронная версия тратила 50+ секунд на обход всех сайтов. С переходом на aiohttp время упало до 3-4 секунд!
Python | 1
2
3
4
5
6
7
8
9
10
11
12
| async def fetch(session, url):
async with session.get(url, timeout=10) as response:
return await response.text()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
# Использование
urls = ["https://news1.com", "https://news2.com", ..., "https://news50.com"]
results = asyncio.run(fetch_all(urls)) |
|
Ключевой момент: используйте одну сессию для всех запросов. Создание новой сессии для каждого запроса сводит на нет преимущества асинхронности из-за накладных расходов на установку соединения.
Особо примечательна работа с WebSocket через asyncio. Для проекта мониторинга криптовалютных бирж я использовал библиотеку websockets, позволяющую элегантно обрабатывать непрерывный поток данных:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def crypto_monitor():
uri = "wss://ws.exchange.com/marketdata"
async with websockets.connect(uri) as websocket:
# Подписываемся на обновления
await websocket.send(json.dumps({
"method": "SUBSCRIBE",
"params": ["btcusdt@trade", "ethusdt@trade"],
"id": 1
}))
while True:
data = await websocket.recv()
parsed = json.loads(data)
await process_trade_data(parsed) |
|
Веб-разработка с asyncio стала настоящим прорывом. FastAPI — фреймворк, построенный поверх ASGI и использующий asyncio, демонстрирует впечатляющую производительность. В одном из проектов миграция с Flask на FastAPI увеличила пропускную способность API с 500 до почти 5000 запросов в секунду.
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
| from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_async_session)):
query = select(User).where(User.id == user_id)
result = await db.execute(query)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user |
|
FastAPI автоматически запускает ваши асинхронные обработчики, обеспечивая неблокирующую обработку запросов. Для простых API этот прирост может быть незаметен, но для систем с тысячами соединений разница колосальная.
Работа с базами данных долгое время была ахиллесовой пятой асинхронных приложений. Традиционные драйверы БД блокировали выполнение, нивелируя преимущества asyncio. Ситуация изменилась с появлением драйверов вроде asyncpg для PostgreSQL и aiomysql для MySQL.
Python | 1
2
3
4
5
6
7
8
9
10
| import asyncpg
async def get_user_data(user_id):
conn = await asyncpg.connect('postgresql://user:password@localhost/database')
try:
# Выполнение запроса неблокирующим способом
row = await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
return dict(row) if row else None
finally:
await conn.close() |
|
В современных проектах я предпочитаю использовать SQLAlchemy 1.4+, которая получила полноценную асинхронную поддержку:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
# Создаём движок и фабрику сессий
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_users_by_role(role_name):
async with async_session() as session:
# Выполняем запрос асинхронно
result = await session.execute(
select(User).where(User.role == role_name)
)
return result.scalars().all() |
|
Важный нюанс: в асинхронном режиме SQLAlchemy не выполняет автоматически неявные операции, как в синхронном режиме. Часто приходится явно вызывать await session.refresh(obj) после вставок.
Интеграция асинхронного и синхронного кода — задача, с которой рано или поздно сталкиваются все разработчики. В идеальном мире всё работало бы асинхронно, но реальность такова, что много библиотек остаются синхроными.
Главное правило: не блокируйте цикл событий. Для CPU-bound операций используйте run_in_executor:
Python | 1
2
3
4
5
6
7
8
9
10
11
| def cpu_intensive_operation(data):
# Какая-то тяжелая синхронная функция
result = process_data_sync(data)
return result
async def mixed_function(data):
# Запускаем синхронный код в пуле потоков
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, cpu_intensive_operation, data)
# Продолжаем асинхронную работу
return result |
|
Для I/O-bound операций нередко можно найти асинхронную альтернативу. Например, вместо requests используйте aiohttp, вместо psycopg2 — asyncpg.
Если приходится интегрироваться с блокирующими API, рассмотрите создание отдельного процесса с собственным циклом событий. Общаться между процессами можно через очереди:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| from multiprocessing import Process, Queue
def blocking_worker(in_queue, out_queue):
while True:
task = in_queue.get()
if task is None: # Сигнал остановки
break
# Выполняем блокирующую операцию
result = some_blocking_api(task)
out_queue.put(result)
async def async_controller():
# Создаём очереди и процесс
in_queue = Queue()
out_queue = Queue()
worker = Process(target=blocking_worker, args=(in_queue, out_queue))
worker.start()
try:
# Асинхронно взаимодействуем с блокирующим API
# через отдельный процесс
for task in tasks:
in_queue.put(task)
# Неблокирующая проверка результатов
if not out_queue.empty():
result = out_queue.get_nowait()
await process_result(result)
finally:
# Останавливаем воркер
in_queue.put(None)
worker.join() |
|
Обработка больших данных — ещё одна область, где asyncio показывает себя отлично. Классический подход с загрузкой всего датасета в память часто не работает на реальных объёмах.
Для проекта аналитики логов я использовал потоковую обработку с asyncio.StreamReader:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| async def process_huge_log(file_path):
# Открываем файл для асинхронного чтения
proc = await asyncio.create_subprocess_exec(
'cat', file_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Создаём читатель для потоковой обработки
reader = proc.stdout
buffer = bytearray()
stats = {"errors": 0, "warnings": 0, "infos": 0}
while True:
# Читаем данные кусками
chunk = await reader.read(8192)
if not chunk:
break
buffer.extend(chunk)
while b'\n' in buffer:
# Обрабатываем строки по мере получения
idx = buffer.find(b'\n')
line = buffer[:idx].decode('utf-8', errors='replace')
del buffer[:idx + 1]
# Анализируем строку
if 'ERROR' in line:
stats["errors"] += 1
elif 'WARNING' in line:
stats["warnings"] += 1
elif 'INFO' in line:
stats["infos"] += 1
await proc.wait()
return stats |
|
Этот подход позволил обрабатывать логи размером в десятки гигабайт, не загружая их полностью в память.
Для особо тяжёлых случаев эффективна комбинация asyncio с параллельной обработкой через multiprocessing:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| async def hybrid_processing(large_dataset_path):
# Разбиваем данные на чанки для параллельной обработки
chunks = split_data_into_chunks(large_dataset_path)
# Создаём пул процессов для CPU-bound операций
with ProcessPoolExecutor() as executor:
loop = asyncio.get_running_loop()
# Запускаем тяжёлые вычисления в отдельных процессах
futures = [
loop.run_in_executor(executor, process_chunk, chunk)
for chunk in chunks
]
# Параллельно с этим асинхронно обрабатываем другие задачи
other_tasks = [other_async_task1(), other_async_task2()]
# Ждём завершения всех задач
results = await asyncio.gather(*futures, *other_tasks)
# Собираем и агрегируем результаты
chunk_results = results[:len(chunks)]
return aggregate_results(chunk_results) |
|
В одном из проектов анализа научных данных этот гибридный подход ускорил обработку в 15 раз по сравнению с чисто синхронной версией.
Асинхронность становится особенно полезной при обработке потоков данных в реальном времени. Например, для трансляции метрик мониторинга мы использовали связку asyncio и Server-Sent Events:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| async def sse_metrics(request):
response = web.StreamResponse(
status=200,
reason='OK',
headers={'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'}
)
await response.prepare(request)
try:
while True:
# Получаем метрики из асинхроного источника
metrics = await collect_system_metrics()
# Отправляем клиенту в формате SSE
await response.write(
f"data: {json.dumps(metrics)}\n\n".encode('utf-8')
)
await asyncio.sleep(1)
except ConnectionResetError:
# Клиент отключился
pass
return response |
|
Эта модель SSE отлично подходит для дашбордов мониторинга, где клиент должен получать актуальные данные без постоянного опроса сервера. В одном из проектов она сократила нагрузку на сервер почти на 70% по сравнению с классическим опросом через AJAX каждые несколько секунд.
Еще один интересный сценарий — асинхронная обработка событий в брокерах сообщений. Например, подключение к RabbitMQ можно реализовать с помощью библиотеки aio-pika:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| import aio_pika
async def process_message(message: aio_pika.IncomingMessage):
async with message.process():
data = message.body.decode()
print(f"Получено сообщение: {data}")
# Имитируем длительную обработку
await asyncio.sleep(1)
print(f"Сообщение обработано: {data}")
async def main():
connection = await aio_pika.connect("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Объявляем очередь
queue = await channel.declare_queue("test_queue", durable=True)
# Запускаем обработку сообщений
await queue.consume(process_message)
# Цикл событий не завершается, пока мы не отменим задачу вручную
await asyncio.Future() |
|
У меня был проект системы уведомлений, где асинхронная архитектура с RabbitMQ позволила обрабатывать свыше 10 тысяч оповещений в минуту на скромном сервере. Секрет заключался в том, что во время ожидания ответа от внешних систем (SMS-шлюзов, прушеров), asyncio занимал процессор другими задачами.
Микросервисная архитектура и asyncio — еще один мощный тандем. В распределённых системах сервисы постоянно общаются друг с другом, и синхронные запросы создают каскадные задержки. С gRPC и asyncio эта проблема решается элегантно:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import grpc
from concurrent import futures
import service_pb2
import service_pb2_grpc
class ServiceImplementation(service_pb2_grpc.ServiceServicer):
async def ProcessData(self, request, context):
# Асинхронная обработка запроса
result = await self.process_data_async(request.data)
return service_pb2.Response(result=result)
async def process_data_async(self, data):
# Имитация асинхронной обработки
await asyncio.sleep(0.1)
return f"Processed: {data}"
async def serve():
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
service_pb2_grpc.add_ServiceServicer_to_server(
ServiceImplementation(), server
)
server.add_insecure_port('[::]:50051')
await server.start()
await server.wait_for_termination()
if __name__ == '__main__':
asyncio.run(serve()) |
|
Для планировщиков задач asyncio тоже показывает себя с лучшей стороны. Один из моих любимых паттернов — имплементация простого но гибкого шедулера:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| class AsyncScheduler:
def __init__(self):
self.tasks = {}
async def add_job(self, func, interval, job_id=None):
if job_id is None:
job_id = str(uuid.uuid4())
self.tasks[job_id] = asyncio.create_task(self._job_runner(func, interval, job_id))
return job_id
async def remove_job(self, job_id):
if job_id in self.tasks:
self.tasks[job_id].cancel()
del self.tasks[job_id]
async def _job_runner(self, func, interval, job_id):
try:
while True:
await func()
await asyncio.sleep(interval)
except asyncio.CancelledError:
print(f"Задача {job_id} отменена")
async def shutdown(self):
for job_id in list(self.tasks.keys()):
await self.remove_job(job_id) |
|
В одном из проектов мы использовали подобную систему для реализации отложеного выполнения и периодических задач. Преимущество перед Celery оказалось в простоте — не нужен отдельный брокер сообщений, всё работает в рамках одного процесса.
Тестирование асинхронного кода исторически было головной болью для многих разработчиков. К счастью, современные инструменты сильно облегчили задачу. С pytest-asyncio можно писать тесты почти так же, как и для синхронных функций:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import pytest
@pytest.mark.asyncio
async def test_async_process():
# Подготовка данных
data = {"user_id": 123, "action": "login"}
# Мокируем внешние зависимости
with patch('module.external_api.get_user_status') as mock_status:
# Настраиваем мок для возврата асинхронного результата
future = asyncio.Future()
future.set_result({"status": "active"})
mock_status.return_value = future
# Вызываем тестируемую функцию
result = await process_user_action(data)
# Проверяем результат
assert result["success"] is True
assert mock_status.called |
|
Асинхронность также внесла революцию в разработку CLI-утилит. Вместо последовательного выполнения команд, современные CLI могут паралельно запускать задачи и показывать прогресс в реальном времени. Библиотека rich в сочетании с asyncio даёт потрясающие результаты:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| import asyncio
from rich.progress import Progress
async def process_file(file_path):
total_size = os.path.getsize(file_path)
processed = 0
# Имитируем обработку файла по блокам
chunk_size = 1024 * 1024 # 1MB
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(chunk_size):
# Обрабатываем чанк
await asyncio.sleep(0.01) # Имитация работы
processed += len(chunk)
# Обновляем прогресс
yield processed / total_size * 100
async def main():
files = ['large_file1.dat', 'large_file2.dat', 'large_file3.dat']
with Progress() as progress:
# Создаём задачи для индикаторов прогресса
tasks = {
file: progress.add_task(f"[green]Обработка {file}", total=100)
for file in files
}
# Запускаем обработку всех файлов
file_tasks = [process_file_with_progress(file, tasks[file], progress)
for file in files]
await asyncio.gather(*file_tasks) |
|
В одном из моих проектов мы создали утилиту миграции данных с асинхронной обработкой и интерактивным интерфейсом. Она работала с терабайными базами и отображала не только прогресс, но и текущую скорость, оставшееся время и даже графики в терминале. Пользователи были в востроге — наконец-то не приходилось гадать, висит программа или обрабатывает данные.
Интересный кейс — использование asyncio для управления docker-контейнерами. С библиотекой aiodocker создание и управление целыми оркестрами контейнеров становится элегантной задачей:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| import aiodocker
async def start_service(config):
docker = aiodocker.Docker()
# Запускаем контейнер
container = await docker.containers.create(
config={
"Image": "postgres:13",
"Env": ["POSTGRES_PASSWORD=mysecretpassword"],
"HostConfig": {
"PortBindings": {"5432/tcp": [{"HostPort": "5432"}]}
}
}
)
await container.start()
logs = container.log(stdout=True, stderr=True, follow=True)
# Читаем логи асинхронно, пока не увидим сообщение о готовности
async for line in logs:
if "database system is ready to accept connections" in line:
print("БД запущена и готова к работе!")
break
return container |
|
Оптимизация и подводные камни
Работа с асинхронным кодом напоминает хождение по минному полю – зазевался на момент и всё, приготовься к долгим часам отладки. За годы ковыряния в недрах asyncio я насобирал целую коллекцию граблей, на которые регулярно наступают даже опытные разработчики.
Первая и самая распостранённая ошибка – забыть await перед корутиной. Такая простая, казалось бы, вещь, но она превращается в настоящий кошмар:
Python | 1
2
3
4
5
6
7
| async def fetch_data():
return await process_request()
async def main():
# Забыли await!
data = fetch_data() # Возвращает корутину, а не результат
print(data) # Выведет что-то вроде <coroutine object fetch_data at 0x...> |
|
Коварство этой ошибки в том, что Python не выбрасывает исключение – код выполняется, но корутина просто никогда не запускается. В больших системах такие "потерянные корутины" могут стать источником таинственных багов.
Другой распространённый антипаттерн – блокировка цикла событий. Помню, в одном проекте мы неделю искали причину странных фризов, и виновником оказался безобидный на вид вызов:
Python | 1
2
3
4
5
| async def process_data(items):
for item in items:
# CPU-bound операция блокирует весь цикл событий!
result = compute_hash(item) # Тяжелая синхронная функция
await save_result(result) |
|
Такой код – настоящий убийца производительности. Пока выполняется compute_hash() , весь цикл событий стоит, и другие корутины не могут выполняться. Правильное решение – использовать run_in_executor() для CPU-интенсивных операций.
С памятью в асинхронном мире тоже не всё гладко. Сборщик мусора Python'а порой ведёт себя странно с циклическими ссылками в корутинах. Я сталкивался с утечками, когда задачи вроде как завершались, но память не освобождалась:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def memory_leak_risk():
# Создаём большой объект
huge_data = [0] * (10**6) # Список из миллиона элементов
# Создаём замыкание, которое ссылается на huge_data
async def inner_func():
nonlocal huge_data
await asyncio.sleep(1)
return len(huge_data)
# Запускаем, но забываем дождаться или отменить
asyncio.create_task(inner_func())
# huge_data остаётся в памяти, пока inner_func не завершится! |
|
Для тяжелых приложений полезен режим отладки asyncio. Он выявляет множество проблем, которые трудно заметить невооруженным глазом:
Python | 1
2
| # Включаем режим отладки
asyncio.run(main(), debug=True) |
|
В этом режиме asyncio выбрасывает предупреждения о корутинах, которые работают слишком долго, о незакрытых ресурсах и других потенциальных проблемах. В одном проекте это помогло нам выявить корутину, которая выполнялась 30+ секунд и блокировала всё приложение.
Для серьёзной отладки асинхронного кода понадобится более тяжёлая артелерия. Я часто использую связку aiodebugger и aiomonitor , которые позволяют инспектировать состояние асинхронных задач в реальном времени:
Python | 1
2
3
4
5
6
7
8
9
| import aiomonitor
async def main():
# Основная логика приложения
server = await setup_server()
# Запускаем монитор на порту 50101
with aiomonitor.Monitor(asyncio.get_event_loop()):
await server.serve_forever() |
|
Запустив такое приложение, можно подключиться к нему через telnet и исследовать текущие задачи, состояние цикла событий и многое другое.
Для профилирования асинхронной производительности обычный cProfile не всегда удобен. Лучше использовать pyinstrument – он учитывает особености асинхронного выполнения и показывает, где ваш код действительно тратит время:
Python | 1
2
3
4
5
6
7
8
9
10
| from pyinstrument import Profiler
async def main():
profiler = Profiler()
profiler.start()
await do_async_work()
profiler.stop()
print(profiler.output_text(unicode=True, color=True)) |
|
Тестирование асинхронного кода – отдельная головная боль. Стандартный unittest не очень-то дружит с asyncio. Для серьёзного тестирования я рекомендую pytest с плагином pytest-asyncio :
Python | 1
2
3
4
5
6
| import pytest
@pytest.mark.asyncio
async def test_async_function():
result = await function_to_test()
assert result == expected_value |
|
Мокинг асинхронных функций тоже имеет свои особености. Простая замена через unittest.mock может не сработать, если не учитывать, что мок должен возвращать объект, совместимый с await :
Python | 1
2
3
4
5
6
7
8
9
| async def test_with_mocked_dependency():
with patch('module.async_dependency') as mock_dep:
# Важно: создаём Future и устанавливаем результат
future = asyncio.Future()
future.set_result('mocked_result')
mock_dep.return_value = future
result = await function_using_dependency()
assert result == 'expected_result' |
|
Помню бессонные ночи с одним особо коварным багом – задача иногда зависала на продакшене, но у меня локально всё работало идеально. Разгадка пришла неожиданно: на продакшене был другой лимит дескрипторов файлов, и при большой нагрузке система достигала этого лимита. Мораль? Всегда тестируйте асинхронный код под нагрузкой, близкой к реальной.
С чего начать разбираться с Asyncio? нужно написать парсер страницы, с загрузкой некоторых картинок, на Asyncio.
С многопоточным и... Коннектор к API используя AsyncIO, aiohttp Py3.5.
Работаю с библиотекой, подключаюсь к api telegram. Получаю сообщения и отправляю их в... Выйти из петли в run_forever() [asyncio] Пишу скрипт, который эмулирует запуск игр в стиме.
Для этого была выбрана библиотека Steam для... Asyncio корректное завершение в случае непредвиденной остановки программы Есть скрипт (выложен не полностью)
@asyncio.coroutine
def do_work(data):
for adres, ip_list in... Asyncio - coroutine vs future vs task Доброго времени суток, форумчане!
Приступил к изучению asyncio. Использую python 3.5.
Гуру... Можно ли поставить таски на паузу? (asyncio) Всем привет! Имеется такой код:
from aiohttp import web
import asyncio
async def... Asyncio ошибка работы парсера Здравствуйте, имеется парсер на Python в связке asyncio + aiohttp + threadPoolExecutor + lxml.... WxPython и asyncio Добрый день.
Подскажите можно ли в приложении wxPython использовать модуль asyncio?
Как я понимаю... Параллельный запуск 2 методов через asyncio Есть необходимость сделать код, продолжающий работу даже при поступлении запроса. Пересмотрела и... Multiprocessing vs multithreading vs AsyncIO при загрузке файлов Вопрос всезнающему All !
Нужно загрузить/выгрузить много разноразмерных файлов через протокол... Проблема с asyncio Здравствуйте! пытаюсь разобраться с asyncio.. Нужна помощь, ошибка: RuntimeError: This event loop... В asyncio отстутвует метод create_task и метод run Здравствуйте, у меня в asyncio отсутствует метод create_task и метод run. Мой код:
from bs4 import...
|