Форум программистов, компьютерный форум, киберфорум
py-thonny
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  

asyncio и асинхронное программирование в Python: конкурентность, корутины, таски, async/await, event loop

Запись от py-thonny размещена 07.10.2025 в 21:08. Обновил(-а) py-thonny 07.10.2025 в 21:22
Показов 4083 Комментарии 0

Нажмите на изображение для увеличения
Название: asyncio и асинхронное программирование в Python.jpg
Просмотров: 258
Размер:	67.5 Кб
ID:	11266
1. asyncio и асинхронное программирование в Python: конкурентность, корутины, таски, async/await, event loop
2. asyncio и асинхронное программирование в Python: паттерны, футуры, примеры, работа с БД
3. asyncio и асинхронное программирование в Python: синхронизация, продвинутые примеры, асинхронный парсер

Ваше приложение отправляет запрос к базе данных. Что происходит дальше? Программа замирает, ждет ответа - секунду, две, может быть десять. В это время процессор простаивает, а пользователь смотрит на крутящийся индикатор загрузки. Это блокирующая операция - момент, когда выполнение останавливается и ждет завершения ввода-вывода. Я столкнулся с этой проблемой года три назад, разрабатывая систему мониторинга для интернет-магазина. Нужно было опрашивать сотню API поставщиков каждые пять минут. Первая версия на синхронном коде работала 12 минут - дольше интервала опроса. Классический тупик.

Блокирующие операции пожирают время там, где программа ничего полезного не делает - просто ждет. Сетевой запрос занимает 200 миллисекунд, чтение с диска - 50, обращение к внешнему API - секунду или больше. Если таких операций десятки или сотни, время ожидания складывается в катастрофические задержки.

Традиционный подход - запустить каждую операцию в отдельном потоке. Но потоки в Python ограничены GIL (Global Interpreter Lock), который позволяет выполняться только одному потоку за раз для операций с Python-объектами. Создание тысячи потоков съест гигабайты памяти и создаст overhead на переключение контекста. Асинхронность решает проблему иначе. Вместо ожидания одной операции программа переключается на другие задачи. Когда первая операция завершится, управление вернется к ней. Это как повар на кухне: пока одно блюдо тушится, он не стоит у плиты - режет овощи для следующего, помешивает соус, проверяет духовку. Один человек, но десять дел одновременно.

Python предлагает asyncio - встроенную библиотеку для асинхронного программирования. Она позволяет писать неблокирующий код без создания множества потоков, используя корутины и цикл событий. Разбираться будем детально, с примерами и подводными камнями.

Многопоточность и многопроцессность против асинхронности



Когда речь заходит о конкурентности в Python, разработчики часто путают три разных подхода. Каждый решает свою задачу, но применение не того инструмента может превратить быстрый код в медленный, а простую архитектуру - в клубок взаимных блокировок.

Многопоточность работает внутри одного процесса. Создаете несколько потоков, каждый выполняет свою задачу. Звучит идеально, но в Python есть GIL - механизм, который не дает двум потокам одновременно исполнять Python-код. Это значит, что для CPU-bound задач (вычисления, обработка данных) многопоточность бесполезна - потоки просто по очереди захватывают блокировку.

Зато для I/O-bound операций потоки работают. Когда поток ждет ответа от сети или диска, он отпускает GIL, и другой поток может работать. Но создание потока требует памяти (около 8 МБ на поток в Linux), а переключение между потоками стоит процессорного времени. Тысячу одновременных соединений на потоках не поднимешь.

Многопроцессность обходит GIL, создавая отдельные процессы Python. Каждый процесс получает собственный интерпретатор и память. Теперь можно использовать все ядра процессора для вычислений - настоящий параллелизм. Я применял это для обработки логов: раскидывал файлы по четырем процессам, каждый парсил свою часть. Время обработки сократилось в 3.7 раза на четырехядерном процессоре. Но процессы тяжелые. Создание процесса занимает десятки миллисекунд, каждый ест сотни мегабайт памяти. Передача данных между процессами требует сериализации - pickle преобразует объекты в байты и обратно. Это медленно и не всегда работает (сокеты, открытые файлы не сериализуются). Для тысяч мелких задач многопроцессность не подходит.

Асинхронность работает в одном потоке одного процесса. Вместо создания новых потоков или процессов используются корутины - функции, которые могут приостанавливать выполнение и возвращать управление. Цикл событий (event loop) координирует их работу, переключаясь между корутинами, когда они ждут I/O.

Корутина весит килобайты, а не мегабайты. Создается за микросекунды. Переключение между корутинами - это просто передача управления внутри одного потока, без syscall в ядро операционной системы. Можно запустить десятки тысяч корутин без заметного overhead. Вот конкретные цифры из моей практики. Задача: получить данные с 1000 URL. Синхронный код - 850 секунд (каждый запрос по очереди). Потоки с пулом на 50 потоков - 18 секунд. Asyncio с aiohttp - 3.2 секунды. Разница очевидна.

Но asyncio не для всего. CPU-bound задачи он не ускорит - один поток работает на одном ядре. Если нужно посчитать миллион чисел Фибоначчи, asyncio не поможет. Тут нужна многопроцессность.

Также asyncio требует неблокирующих библиотек. Обычный requests блокирует поток - нужен aiohttp. Стандартная psycopg2 для PostgreSQL - замените на asyncpg. Синхронная библиотека в асинхронном коде превращает всю конструкцию обратно в блокирующую. Есть еще тонкость с совместимостью. Нельзя просто вызвать await в синхронной функции или запустить синхронную блокирующую операцию в корутине без последствий. Это требует понимания того, как работает event loop, и умения правильно интегрировать код.

Я видел проект, где разработчик использовал asyncio для обработки изображений. CPU-intensive операции в одном потоке. Производительность была хуже синхронного кода из-за overhead на переключение корутин. Правильным решением была бы многопроцессность или даже синхронный код без всякой конкурентности.

Выбор подхода зависит от задачи. I/O-bound с множеством мелких операций - asyncio. CPU-bound вычисления - многопроцессность. Смешанная нагрузка - комбинация обоих: asyncio для сетевых запросов, ProcessPoolExecutor для тяжелых вычислений в фоне.

Многие думают, что asyncio - это всегда быстрее. Неправда. Для простого REST API с базой данных разница между Flask (синхронный) и FastAPI (асинхронный) может быть незаметна, если база отвечает быстро. Overhead на работу event loop съест выигрыш от неблокирующего I/O.

Асинхронность сияет там, где много одновременных операций с ожиданием: вебсокеты, длинные опросы, парсинг сотен сайтов, агрегация данных из десятков API. Там, где синхронный код проводит 90% времени в ожидании, а не в вычислениях.

asyncio Ошибка RuntimeError: Event loop is closed
Здравствуйте. осваиваю asyncio. Написал парсер на примере парсинга Wildberries. Выдает ошибку не...

Можно ли поставить таски на паузу? (asyncio)
Всем привет! Имеется такой код: from aiohttp import web import asyncio async def...

asyncio.wait vs asyncio.gather
Всем привет! :victory: Вопросы внизу. Есть два похожих теста: #1 - asyncio.wait import...

Асинхронное получение данных посредством asyncio.gather
Доброго времени суток, вопрос такой, когда приходят ссылки на обработку, (все уходят), но...


Когда asyncio действительно нужен



Asyncio не универсальное решение. Это инструмент для конкретных сценариев, и применение его не к месту создает больше проблем, чем решает. Разберем, где асинхронность оправдана, а где - пустая трата времени.

Первый сценарий - высокая конкурентность I/O операций. Ваше приложение одновременно обрабатывает сотни или тысячи сетевых соединений. Веб-скрейпер, который парсит 500 страниц за раз. API-агрегатор, собирающий данные с 30 внешних сервисов. Чат-сервер с десятками тысяч подключенных клиентов. В таких случаях asyncio показывает себя блестяще.

Второй сценарий - микросервисная архитектура. Ваш сервис вызывает 5-10 других сервисов для обработки одного запроса. Синхронный код выполняет эти вызовы последовательно: 200 мс + 150 мс + 300 мс = 650 мс только на ожидание. Asyncio позволяет делать запросы параллельно - общее время определяется самым медленным сервисом, а не суммой всех задержек.

Третий - работа с вебсокетами и длинными соединениями. WebSocket держит открытое соединение для двустороннего обмена данными. На каждое соединение нужен ресурс, который ждет входящих сообщений. Создавать поток на каждое соединение - расточительно. Asyncio обрабатывает тысячи соединений в одном потоке, переключаясь между ними при получении данных.

Четвертый - потоковая обработка данных. Читаете большой файл, обрабатываете каждую строку, записываете результат в базу. С asyncio можно читать следующую порцию данных, пока предыдущая обрабатывается и записывается. Конвейер работает непрерывно, без простоев.

Но есть ситуации, где asyncio бесполезен или вреден. CPU-bound задачи - первый антипример. Вычисление хеша, обработка изображений, машинное обучение. Эти операции не ждут - они активно используют процессор. Асинхронность тут не даст выигрыша, потому что нет моментов ожидания для переключения на другую задачу. Однажды коллега переписал систему генерации отчетов на asyncio. Отчеты строились из данных в памяти - никакого I/O. Результат: код стал сложнее, а время выполнения увеличилось на 15% из-за overhead на работу event loop. Просто не та задача.

Второй антипример - простые скрипты и утилиты. Если ваш код выполняет 3-5 операций последовательно и работает секунду, asyncio избыточен. Overhead на инициализацию event loop и усложнение кода не окупятся. Простой синхронный код читается легче и работает достаточно быстро.

Третий - работа с блокирующими библиотеками. Если вам нужна специфичная библиотека без асинхронного аналога (например, узкоспециализированный драйвер для какого-то оборудования), придется либо искать альтернативу, либо использовать run_in_executor для запуска блокирующего кода в отдельном потоке. Это усложняет архитектуру и частично нивелирует преимущества asyncio.

Еще момент - команда и кодовая база. Если в проекте никто не знает asyncio, а deadline горит, внедрение асинхронности может обернуться месяцами отладки странных deadlock'ов и race conditions. Синхронный код проще для большинства разработчиков.

Есть негласное правило: если ваше приложение тратит больше 70% времени на ожидание I/O и обрабатывает десятки одновременных операций - смотрите в сторону asyncio. Если времени на вычисления больше, чем на ожидание - многопроцессность или оптимизация алгоритмов. Если операций мало и они выполняются быстро - не усложняйте, оставайтесь на синхронном коде. Я видел проекты, где asyncio применили, потому что "это модно" или "все так делают". Результат - нечитаемый код, который медленнее синхронной версии. Инструмент должен соответствовать задаче, а не трендам.

Метрики производительности: где asyncio выигрывает, а где проигрывает



Цифры говорят громче слов. Asyncio показывает впечатляющие результаты в правильных сценариях и катастрофически проваливается в неподходящих. Разберем конкретные метрики.

Latency (задержка) - время от начала операции до получения результата. Для одиночного запроса asyncio не быстрее синхронного кода. Иногда даже медленнее из-за overhead на работу event loop. Простой GET-запрос: синхронный requests - 45 мс, асинхронный aiohttp - 48 мс. Разница в пределах погрешности, но asyncio не впереди.

Throughput (пропускная способность) - количество операций в единицу времени. Тут asyncio разворачивается. Тест: 1000 HTTP-запросов к API с задержкой ответа 100 мс. Синхронный код - 1000 * 0.1 = 100 секунд. Asyncio с 50 одновременными корутинами - около 2 секунд. Пропускная способность выросла в 50 раз.

Memory footprint (потребление памяти) - asyncio экономичнее потоков. Корутина занимает 1-2 КБ, поток - 8 МБ. Для 10000 одновременных операций: потоки съедят 80 ГБ (физически невозможно на большинстве машин), asyncio - 20 МБ плюс overhead на event loop. Я запускал 50000 корутин на ноутбуке с 16 ГБ RAM - работало без проблем.

CPU utilization (загрузка процессора) - asyncio использует одно ядро. Если у вас 8-ядерный процессор и I/O-bound задача, семь ядер простаивают. Многопоточность или многопроцессность могут распределить нагрузку, но asyncio упирается в один поток. Зато и накладные расходы минимальны - переключение между корутинами стоит наносекунды против микросекунд для потоков. Context switch cost (стоимость переключения контекста) - критичная метрика для конкурентных систем. Переключение между потоками требует системного вызова, сохранения регистров процессора, очистки кешей. Это 1-10 микросекунд. Переключение между корутинами - просто передача управления внутри программы, 50-100 наносекунд. При тысячах переключений в секунду разница огромна.

Реальный кейс из моей практики. Парсер новостей с 200 источников, каждый отдает RSS за 300 мс в среднем. Синхронный код: 200 * 0.3 = 60 секунд. Threading pool на 20 потоков: 10 секунд, но потребление памяти 160 МБ и CPU скачет до 40% на переключениях. Asyncio: 1.5 секунды (все 200 корутин одновременно), память 35 МБ, CPU - стабильные 8%.

Но есть подводный камень. Если хотя бы одна корутина выполняет блокирующую операцию, весь event loop встает. Я добавил в парсер обработку изображений через PIL - синхронную библиотеку. Время выросло до 45 секунд, потому что каждая обработка блокировала цикл событий на 200 мс. Решение - вынести CPU-bound операции в ProcessPoolExecutor.

Scalability (масштабируемость) - сколько одновременных соединений выдерживает система. На синхронном Flask с gunicorn и 4 воркерами - около 400 соединений до начала таймаутов. На асинхронном FastAPI с uvicorn - стабильно 10000+ соединений на той же машине. Но это при условии, что обработчики действительно асинхронные.

Response time distribution (распределение времени ответа) - важнее среднего значения. Синхронный код дает предсказуемые результаты: 100 мс ± 10 мс. Asyncio при высокой нагрузке может показывать 50 мс для 95% запросов и 500 мс для оставшихся 5% из-за накопления задач в очереди event loop. Нужен мониторинг percentiles, а не только средних значений.

Startup time (время запуска) - asyncio требует инициализации event loop, создания сессий для HTTP-клиентов, подключения к базам. Это 100-300 мс overhead. Для долгоживущих процессов несущественно, для AWS Lambda или коротких скриптов - заметная добавка к времени выполнения.

Error recovery (восстановление после ошибок) - asyncio сложнее отлаживать. Traceback из асинхронного кода длиннее и запутаннее. Если корутина упала с исключением и вы не обработали его через try/except или `gather(..., return_exceptions=True)`, может потеряться контекст ошибки. Синхронный код прямолинейнее в плане отладки.

Конкретный провал asyncio - большие вычисления в корутинах. Тест: парсинг JSON (CPU-bound). Синхронный код на 1000 файлов по 1 МБ - 8 секунд. Asyncio с теми же операциями - 8.7 секунды плюс нестабильность из-за блокировки event loop. Накладные расходы превысили несуществующий выигрыш. Еще провал - работа с файловой системой без специальных библиотек. Стандартный open() блокирует. Библиотека aiofiles добавляет асинхронность, но под капотом использует thread pool - по сути, костыль. Для интенсивной работы с файлами asyncio не оптимален.

Выигрыш asyncio измеряется не абсолютной скоростью, а эффективностью использования ресурсов при высокой конкурентности. Если нужно обработать 10 запросов - asyncio избыточен. Если 10000 - asyncio незаменим. Разница не в скорости одной операции, а в способности выполнять тысячи операций одновременно на скромном железе.

Внутренние механизмы Python: почему интерпретатор позволяет переключаться между задачами



Чтобы понять, как работает asyncio, нужно заглянуть внутрь интерпретатора Python. Там происходит довольно изящная магия, которая позволяет одному потоку выполнять множество задач.

В основе лежит концепция генераторов, которые появились в Python задолго до asyncio. Генератор - функция с ключевым словом yield, которая может приостановить выполнение, вернуть значение и потом продолжить с того же места. Состояние функции (локальные переменные, счетчик инструкций) сохраняется между вызовами.

Python
1
2
3
4
5
6
7
8
9
10
def simple_generator():
    print("Начало")
    yield 1
    print("Продолжение")
    yield 2
    print("Конец")
    
gen = simple_generator()
next(gen)  # Выведет "Начало", вернет 1
next(gen)  # Выведет "Продолжение", вернет 2
Интерпретатор сохраняет stack frame генератора - структуру данных с локальными переменными, позицией выполнения, ссылками на объекты. Когда генератор приостанавливается на yield, control flow возвращается в вызывающий код. При следующем вызове next() интерпретатор восстанавливает stack frame и продолжает выполнение. Корутины в asyncio расширяют эту идею. Вместо yield используется await, но механизм похож - функция может приостановиться, отдать управление, потом возобновиться. Ключевое отличие: генераторы передают данные через yield/send, корутины управляют потоком выполнения через await.

Когда вы пишете await asyncio.sleep(1), происходит следующее. Корутина приостанавливается и передает control flow в event loop. Event loop регистрирует таймер на одну секунду и переключается на другую готовую корутину. Через секунду таймер срабатывает, event loop помечает первую корутину как готовую к выполнению и возвращает управление ей при следующей итерации цикла.

Event loop - это бесконечный цикл, который опрашивает готовые к выполнению корутины и I/O события. Примерная логика:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Упрощенная схема event loop
ready_tasks = []
waiting_tasks = []
 
while True:
    # Проверяем I/O события (сокеты, таймеры)
    check_io_events()
    
    # Берем готовую задачу
    if ready_tasks:
        task = ready_tasks.pop(0)
        try:
            task.step()  # Выполняем до следующего await
        except StopIteration:
            # Корутина завершилась
            pass
    
    # Если нечего делать, ждем I/O
    if not ready_tasks:
        wait_for_io_events()
Под капотом Python использует селекторы (на Linux - epoll, на Windows - IOCP) для эффективного ожидания I/O событий. Вместо блокирующего ожидания каждого сокета, selector следит за сотнями сокетов одновременно и сообщает, какие готовы к чтению или записи. Критичный момент - переключение между корутинами происходит только на await. Если корутина выполняет долгую операцию без await, она блокирует весь event loop. Я наблюдал это в проекте, где корутина парсила большой XML без передачи управления - event loop зависал на 3 секунды.

GIL (Global Interpreter Lock) тут работает в нашу пользу. Поскольку все корутины выполняются в одном потоке, нет гонок за данными. Доступ к переменным безопасен без дополнительных блокировок. Это упрощает код по сравнению с многопоточностью.

Интерпретатор хранит для каждой корутины минимальный контекст - stack frame с локальными переменными и указатель на текущую инструкцию. Переключение сводится к сохранению одного указателя и восстановлению другого. Никаких syscall, никакого взаимодействия с планировщиком ОС.

Этот механизм называется кооперативной многозадачностью - задачи сами решают, когда отдать управление через await. Противоположность - вытесняющая многозадачность в потоках, где ОС принудительно переключает контекст по таймеру. Кооперативная многозадачность быстрее, но требует дисциплины - одна "жадная" корутина может заблокировать всю систему.

Основы asyncio и корутины



Корутина - это функция, определенная с ключевым словом async def. Вызов такой функции не выполняет ее код сразу, а возвращает объект корутины. Чтобы запустить корутину, нужно либо передать ее в asyncio.run(), либо использовать await внутри другой корутины.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
 
async def fetch_data():
    print("Начинаем загрузку")
    await asyncio.sleep(2)
    print("Загрузка завершена")
    return {"data": "результат"}
 
# Просто вызов не выполняет функцию
coro = fetch_data()  # Получаем объект корутины
print(type(coro))  # <class 'coroutine'>
 
# Запуск через asyncio.run()
result = asyncio.run(fetch_data())
Ключевое слово await приостанавливает выполнение текущей корутины и ждет завершения awaitable объекта. Awaitable - это корутина, Task или Future. Пока одна корутина ждет, event loop переключается на другие задачи.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def task_one():
    print("Task 1: старт")
    await asyncio.sleep(1)
    print("Task 1: финиш")
 
async def task_two():
    print("Task 2: старт")
    await asyncio.sleep(0.5)
    print("Task 2: финиш")
 
async def main():
    await asyncio.gather(task_one(), task_two())
 
asyncio.run(main())
Вывод покажет, что задачи выполняются конкурентно - task_two завершится раньше, хотя запущена позже. asyncio.gather() запускает несколько корутин параллельно и ждет завершения всех.
Важно понимать разницу между корутиной и Task. Корутина - это объект, который нужно запустить. Task - это обертка вокруг корутины, которая уже запланирована в event loop. Task может выполняться в фоне, пока основной код делает другие вещи.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def background_task():
    for i in range(5):
        print(f"Фоновая задача: {i}")
        await asyncio.sleep(0.5)
 
async def main():
    # Создаем Task - корутина начнет выполняться сразу
    task = asyncio.create_task(background_task())
    
    # Делаем другую работу
    await asyncio.sleep(1)
    print("Основная работа выполнена")
    
    # Ждем завершения фоновой задачи
    await task
 
asyncio.run(main())
Тут background_task запускается немедленно через create_task(), а основной код продолжает работу. Без await task в конце фоновая задача оборвется при завершении main(). Я однажды потратил два часа на отладку странной проблемы - корутины создавались, но не выполнялись. Оказалось, я забыл await перед вызовом. Python не ругался, просто игнорировал невыполненные корутины с предупреждением в логах. Теперь всегда проверяю.

Еще один паттерн - wait с таймаутом. Если операция должна завершиться за определенное время, asyncio.wait_for() прерывает ее по истечении срока.

Python
1
2
3
4
5
6
7
8
9
10
11
async def slow_operation():
    await asyncio.sleep(10)
    return "готово"
 
async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Операция заняла слишком много времени")
 
asyncio.run(main())
Для запуска нескольких корутин с получением результатов по мере готовности используется asyncio.as_completed(). Это полезно, когда нужно обрабатывать результаты сразу, не дожидаясь самой медленной задачи.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def fetch_url(url, delay):
    await asyncio.sleep(delay)
    return f"Данные с {url}"
 
async def main():
    urls = [
        ("site1.com", 2),
        ("site2.com", 0.5),
        ("site3.com", 1)
    ]
    
    tasks = [fetch_url(url, delay) for url, delay in urls]
    
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Получено: {result}")
 
asyncio.run(main())
Результаты придут в порядке завершения, а не в порядке запуска.
Обработка ошибок в асинхронном коде требует внимания. Если корутина внутри gather() упадет с исключением, gather() по умолчанию пробросит его наверх. Чтобы собрать все результаты, включая исключения, используйте return_exceptions=True.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def failing_task():
    await asyncio.sleep(0.5)
    raise ValueError("Что-то пошло не так")
 
async def success_task():
    await asyncio.sleep(0.3)
    return "успех"
 
async def main():
    results = await asyncio.gather(
        failing_task(),
        success_task(),
        return_exceptions=True
    )
    
    for result in results:
        if isinstance(result, Exception):
            print(f"Ошибка: {result}")
        else:
            print(f"Результат: {result}")
 
asyncio.run(main())
Так вы не потеряете успешные результаты из-за одной упавшей корутины.

Event loop можно получить изнутри корутины через asyncio.get_running_loop(). Это нужно для низкоуровневых операций вроде запуска callback'ов или работы с Future напрямую. В обычном коде это редкость, но знать полезно.

Корутины нельзя вызвать из синхронного кода без запуска event loop. asyncio.run() создает новый loop, выполняет корутину и закрывает loop. Если loop уже работает (например, внутри другой корутины), asyncio.run() вызовет ошибку. Тут нужен просто await.

Контроль над event loop дает гибкость. Можно использовать asyncio.create_task() для фоновых задач, asyncio.shield() для защиты корутины от отмены, asyncio.wait() для сложной логики ожидания с условиями. Но для 90% задач хватает asyncio.run(), await и gather().

Анатомия async/await



Под капотом async и await - это синтаксический сахар над генераторами и протоколом итераторов. Когда Python видит async def, он создает специальный тип функции, которая возвращает объект корутины. Этот объект реализует методы __await__(), send(), throw() и close() - тот же интерфейс, что у генераторов.

Python
1
2
3
4
5
6
async def simple_coro():
    return "результат"
 
coro = simple_coro()
print(type(coro))  # <class 'coroutine'>
print(dir(coro))   # методы: send, throw, close, __await__
Когда вы используете await, интерпретатор вызывает метод __await__() у awaitable объекта. Этот метод возвращает итератор, который event loop опрашивает через send(None). Каждый вызов send() продвигает выполнение до следующего await или до завершения.
Простой пример того, как это работает изнутри:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def inner():
    print("Inner: начало")
    await asyncio.sleep(0)  # Точка переключения
    print("Inner: завершение")
    return 42
 
async def outer():
    print("Outer: до await")
    result = await inner()
    print(f"Outer: получили {result}")
    return result
 
# Под капотом await превращается примерно в это:
# iter_obj = inner().__await__()
# value = None
# while True:
#     try:
#         value = iter_obj.send(value)
#     except StopIteration as e:
#         result = e.value
#         break
Ключевое отличие корутин от генераторов - корутины не производят значения через yield, они приостанавливаются на await и возвращают управление планировщику. Генератор возвращает значение вызывающему коду, корутина возвращает управление event loop.

В CPython корутины компилируются в специальный байткод. Инструкция GET_AWAITABLE проверяет, что объект действительно awaitable, YIELD_FROM реализует механизм делегирования выполнения. Если запустить dis.dis() на корутине, увидите эти инструкции:

Python
1
2
3
4
5
6
7
import dis
 
async def example():
    await asyncio.sleep(1)
    
dis.dis(example)
# Вывод покажет байткод с GET_AWAITABLE и YIELD_FROM
Важный момент - await можно использовать только с awaitable объектами. Это корутины, объекты с методом __await__(), или объекты класса Future/Task из asyncio. Попытка сделать await чего-то другого вызовет TypeError.

Python
1
2
async def broken():
    await 42  # TypeError: object int can't be used in 'await' expression
Чтобы сделать свой класс awaitable, нужно реализовать __await__(), который возвращает итератор. Это редкая потребность, но знать полезно:

Python
1
2
3
4
5
6
7
8
9
10
11
class MyAwaitable:
    def __await__(self):
        # Возвращаем генератор, который один раз yield'ит и завершается
        yield  # Отдаем управление event loop
        return "кастомный результат"
 
async def use_custom():
    result = await MyAwaitable()
    print(result)  # "кастомный результат"
 
asyncio.run(use_custom())
Event loop при встрече с await делает следующее: вызывает __await__(), получает итератор, делает send(None) до получения StopIteration. Значение в StopIteration.value становится результатом await. Я столкнулся с интересным багом, когда пытался сделать await на объекте, у которого __await__() возвращал не генератор, а список. Python упал с загадочной ошибкой про итерацию. Оказалось, __await__() должен возвращать именно итератор с методом send(), а не просто итерируемый объект.

Множественные await в одной функции работают последовательно, если не используются конструкции для параллельного выполнения:

Python
1
2
3
4
5
6
7
8
9
10
11
12
async def sequential():
    result1 = await slow_operation()  # Ждем завершения
    result2 = await another_operation()  # Только потом запускаем это
    return result1 + result2
 
# Параллельная версия:
async def parallel():
    task1 = asyncio.create_task(slow_operation())
    task2 = asyncio.create_task(another_operation())
    result1 = await task1
    result2 = await task2
    return result1 + result2
В первом случае общее время - сумма времен операций. Во втором - максимум из двух.

Ограничения async/await: нельзя использовать await в list comprehension (нужен async for), нельзя в lambda-функции, нельзя в синхронных генераторах. Python строго разделяет синхронный и асинхронный контекст - это предотвращает случайное смешивание несовместимых операций.

Также async def создает нативную корутину, которая отличается от generator-based корутины (функция с @asyncio.coroutine и yield from). Старый стиль deprecated с Python 3.10, но его еще можно встретить в легаси коде. Нативные корутины быстрее и имеют лучшую поддержку в инструментах отладки.

Цикл событий под капотом



Event loop - это сердце всего asyncio. Понимание его устройства превращает магию асинхронности в предсказуемую механику. Давайте разберем, что происходит на каждом тике цикла и как это влияет на производительность.

В основе лежит бесконечный цикл с двумя ключевыми компонентами: очередь готовых к выполнению задач (ready queue) и селектор для мониторинга I/O событий. Селектор - это системная абстракция (epoll на Linux, kqueue на BSD, select на Windows), которая позволяет отслеживать сотни файловых дескрипторов одновременно без активного опроса каждого.

Одна итерация event loop выглядит примерно так. Сначала проверяется ready queue - если там есть задачи, берется первая и выполняется до ближайшего await. Когда корутина доходит до await, она регистрирует в селекторе событие, которого ждет (например, готовность сокета к чтению), и возвращает управление в loop. Loop помечает ее как ожидающую и переходит к следующей готовой задаче.

Если ready queue пуста, loop обращается к селектору с вопросом: какие I/O события произошли? Селектор блокируется до появления хотя бы одного события или истечения таймаута. Как только событие происходит - сокет готов к чтению, таймер сработал, данные пришли - селектор возвращает список готовых дескрипторов. Loop находит корутины, ожидавшие эти события, и перемещает их обратно в ready queue.

Важная деталь - приоритетность. В стандартном event loop нет приоритетов задач, все корутины равны. Если одна задача долго держит управление без await, остальные простаивают. Я наблюдал это в проекте с WebSocket сервером - одна корутина парсила JSON размером 5 МБ без передачи управления, и все активные соединения замирали на полсекунды.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Упрощенная иллюстрация того, как работает event loop
class SimpleEventLoop:
    def __init__(self):
        self.ready_queue = []
        self.waiting_tasks = {}  # task: (event_type, event_data)
        self.selector = selectors.DefaultSelector()
    
    def run_until_complete(self, coro):
        task = Task(coro)
        self.ready_queue.append(task)
        
        while self.ready_queue or self.waiting_tasks:
            # Выполняем готовые задачи
            while self.ready_queue:
                task = self.ready_queue.pop(0)
                try:
                    # Продвигаем корутину до следующего await
                    task.step()
                except StopIteration as e:
                    # Задача завершилась
                    return e.value
            
            # Ждем I/O событий
            if self.waiting_tasks:
                events = self.selector.select(timeout=0.1)
                for key, mask in events:
                    task = key.data
                    self.ready_queue.append(task)
                    del self.waiting_tasks[task]
Таймеры реализованы через heap - бинарную кучу, где минимальный элемент всегда наверху. При регистрации asyncio.sleep(5) в кучу добавляется запись с временем срабатывания. На каждой итерации loop проверяет, не истек ли таймаут у элемента на вершине кучи. Это операция O(1) для проверки и O(log n) для добавления нового таймера.

Интересный нюанс - политика переключения. Event loop не прерывает корутину принудительно. Если корутина выполняет долгую операцию без await, она владеет управлением до завершения. Это кооперативная многозадачность - задачи сами решают, когда передать управление. В отличие от потоков, где ОС может переключить контекст в любой момент.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time
 
async def greedy_task():
    # Плохой пример - блокирует event loop
    print("Начинаю долгие вычисления")
    start = time.time()
    result = sum(i[B]2 for i in range(10_000_000))
    print(f"Вычисления заняли {time.time() - start:.2f}с")
    return result
 
async def polite_task():
    # Хороший пример - периодически отдает управление
    print("Начинаю вежливые вычисления")
    start = time.time()
    result = 0
    for i in range(10_000_000):
        result += i[/B]2
        if i % 100_000 == 0:
            await asyncio.sleep(0)  # Передаем управление
    print(f"Вычисления заняли {time.time() - start:.2f}с")
    return result
Вторая версия будет работать медленнее из-за overhead на переключения, зато не заблокирует другие корутины. Компромисс между производительностью и отзывчивостью.

Под капотом каждая Task - это обертка вокруг корутины с дополнительной логикой. Task отслеживает состояние (pending, running, done, cancelled), хранит результат или исключение, поддерживает callbacks, которые вызываются при завершении. Когда создаете Task через create_task(), он немедленно добавляется в ready queue и начнет выполняться на следующей итерации loop. Event loop также управляет вложенными вызовами. Когда корутина A делает await корутины B, loop не создает новый контекст - B просто становится текущей активной корутиной. После завершения B управление возвращается A. Это делегирование напоминает вызов обычной функции, но с возможностью переключения на другие задачи в точках await.

Производительность event loop критически зависит от количества задач в ready queue. Если там тысячи корутин, каждая итерация займет заметное время на перебор. В реальности хорошо написанное asyncio приложение держит в ready queue десятки задач, остальные ждут I/O в селекторе - это эффективнее.

Я профилировал event loop в production системе с 5000 одновременных WebSocket соединений. В ready queue было в среднем 30-40 задач, остальные висели в селекторе. CPU использование - 12% на одном ядре. Добавил один долгий синхронный вызов в обработчик - CPU подскочил до 85%, latency вырос с 20 мс до 300 мс. Один блокирующий вызов парализовал всю систему.

Еще важный аспект - закрытие event loop. asyncio.run() после завершения корутины закрывает loop и отменяет все незавершенные задачи. Если у вас есть фоновые задачи, они оборвутся с CancelledError. Нужно либо явно дожидаться их завершения, либо использовать asyncio.gather() с return_exceptions=True для graceful shutdown.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def background_worker():
    try:
        while True:
            await asyncio.sleep(1)
            print("Работаю в фоне")
    except asyncio.CancelledError:
        print("Получен сигнал остановки")
        # Очистка ресурсов
        raise
 
async def main():
    worker = asyncio.create_task(background_worker())
    await asyncio.sleep(5)
    worker.cancel()  # Явно отменяем фоновую задачу
    try:
        await worker
    except asyncio.CancelledError:
        print("Фоновая задача остановлена")
Event loop - это не черная магия, а довольно прямолинейный механизм диспетчеризации. Понимание его работы помогает писать эффективный асинхронный код и диагностировать проблемы производительности. Главное правило - никогда не блокируйте loop долгими операциями без await.

Создание и управление задачами (Tasks vs Coroutines)



Новички часто путают корутины и задачи, используя термины взаимозаменяемо. Это разные сущности с принципиально разным поведением. Корутина - это объект, который может выполниться. Task - это корутина, которая уже запланирована к выполнению в event loop и работает независимо.

Когда вызываете async def функцию, получаете объект корутины. Он инертен - просто лежит в памяти, пока не передадите его в asyncio.run() или не сделаете await. В этом состоянии корутина ничего не делает, даже не начинает выполнение.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
 
async def fetch_data():
    print("Загрузка началась")
    await asyncio.sleep(1)
    return "данные"
 
# Корутина создана, но не запущена
coro = fetch_data()
print(f"Тип: {type(coro)}")  # <class 'coroutine'>
[H2]Ничего не выведется - код не выполняется[/H2]
 
# Запуск через await
asyncio.run(coro)  # Теперь выполнится
Task оборачивает корутину и немедленно планирует ее выполнение. Создание Task через asyncio.create_task() добавляет корутину в event loop, и она начинает работать на следующей итерации цикла - даже если вы еще не сделали await на самом Task.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def background_work():
    for i in range(3):
        print(f"Фоновая работа: шаг {i}")
        await asyncio.sleep(0.5)
    return "завершено"
 
async def main():
    # Создаем Task - корутина запускается сразу
    task = asyncio.create_task(background_work())
    print("Task создан, продолжаем работу")
    
    # Делаем что-то еще, пока task работает
    await asyncio.sleep(1)
    print("Прошла секунда")
    
    # Теперь дожидаемся результата
    result = await task
    print(f"Результат: {result}")
 
asyncio.run(main())
Вывод покажет, что фоновая работа началась сразу после создания Task, не дожидаясь await task в конце. Это критическое отличие - Task выполняется параллельно с основным кодом.

Я столкнулся с тонкостью, когда забыл await на Task. Корутина запустилась, отработала в фоне, но результат потерялся, а Python выдал предупреждение о незавершенной задаче. Теперь всегда оборачиваю задачи в try/finally или использую asyncio.gather() для гарантированного ожидания.

Task хранит состояние - pending (выполняется), done (завершена), cancelled (отменена). Проверяется через методы .done(), .cancelled(). Результат извлекается через .result() - но только после завершения, иначе получите исключение.

Python
1
2
3
4
5
6
7
8
9
10
11
async def demo_task_state():
    task = asyncio.create_task(asyncio.sleep(2))
    
    print(f"Сразу после создания: done={task.done()}")
    await asyncio.sleep(1)
    print(f"Через секунду: done={task.done()}")
    
    await task  # Дожидаемся завершения
    print(f"После завершения: done={task.done()}")
 
asyncio.run(demo_task_state())
Отмена задачи делается через .cancel(). Task получит CancelledError в точке следующего await. Можно обработать это исключение для корректной очистки ресурсов.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def cancellable_task():
    try:
        for i in range(10):
            print(f"Итерация {i}")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Задача отменена, очищаю ресурсы")
        raise  # Важно пробросить исключение дальше
 
async def main():
    task = asyncio.create_task(cancellable_task())
    await asyncio.sleep(2.5)  # Даем поработать
    task.cancel()  # Отменяем
    
    try:
        await task
    except asyncio.CancelledError:
        print("Отмена подтверждена")
 
asyncio.run(main())
Task поддерживает callback'и через .add_done_callback(). Функция вызовется, когда задача завершится - успешно, с ошибкой или отменой. Полезно для логирования или уведомлений.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def task_finished(task):
    if task.cancelled():
        print("Задача была отменена")
    elif task.exception():
        print(f"Задача упала с ошибкой: {task.exception()}")
    else:
        print(f"Задача вернула: {task.result()}")
 
async def main():
    task = asyncio.create_task(asyncio.sleep(1))
    task.add_done_callback(task_finished)
    await task
 
asyncio.run(main())
Множественные задачи управляются через asyncio.gather() или asyncio.wait(). gather() проще - передаете корутины или Task'и, получаете список результатов в том же порядке. wait() гибче - позволяет указать условия завершения (первая выполненная, все выполнены, первая ошибка).

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def fetch(n):
    await asyncio.sleep(n * 0.1)
    return f"результат {n}"
 
async def main():
    # gather - простой способ
    results = await asyncio.gather(
        fetch(3), fetch(1), fetch(2)
    )
    print(results)  # ['результат 3', 'результат 1', 'результат 2']
    
    # wait - с контролем
    tasks = [asyncio.create_task(fetch(i)) for i in range(5)]
    done, pending = await asyncio.wait(
        tasks, 
        return_when=asyncio.FIRST_COMPLETED
    )
    print(f"Завершено: {len(done)}, в процессе: {len(pending)}")
    
    # Отменяем незавершенные
    for task in pending:
        task.cancel()
 
asyncio.run(main())
Важная ловушка - если Task упадет с исключением и вы не сделаете await на нем, исключение потеряется. Event loop выдаст предупреждение, но программа продолжит работу. Это источник трудноуловимых багов.

Python
1
2
3
4
5
6
7
8
9
10
async def failing_task():
    await asyncio.sleep(0.1)
    raise ValueError("упс")
 
async def main():
    task = asyncio.create_task(failing_task())
    await asyncio.sleep(1)  # Task упал, но мы не заметили
    # Предупреждение появится только при завершении event loop
 
asyncio.run(main())
Решение - всегда дожидайтесь Task'ов или используйте gather() с return_exceptions=True. Это гарантирует, что исключения не потеряются.

Еще момент - время жизни Task. Если ссылка на Task потерялась, но корутина еще работает, Task продолжит выполнение до завершения main(). Python не останавливает Task при потере ссылки - это не как с обычными объектами. Но при завершении event loop все активные Task'и получат CancelledError.

Разница между корутиной и Task фундаментальна для понимания асинхронного кода. Корутина - это описание работы, Task - запущенная работа. Смешивание этих концепций приводит к странным багам и непредсказуемому поведению.

Event Loop в деталях: как планировщик распределяет процессорное время между корутинами



Нажмите на изображение для увеличения
Название: asyncio и асинхронное программирование в Python 2.jpg
Просмотров: 75
Размер:	47.4 Кб
ID:	11267

Планировщик event loop использует простую, но эффективную стратегию - Round Robin с приоритетом готовности к выполнению. Нет сложных эвристик или приоритетных очередей, только базовая логика FIFO (First In, First Out) для задач в состоянии ready.

Каждая корутина существует в одном из четырех состояний. Ready - добавлена в очередь, ждет своей очереди на выполнение. Running - прямо сейчас выполняется, владеет управлением. Waiting - приостановлена на await, ожидает I/O события или завершения другой корутины. Done - завершилась, результат или исключение сохранены. Внутри event loop есть структура данных collections.deque - двусторонняя очередь для ready корутин. Deque выбран неслучайно - добавление и извлечение элементов с обоих концов работает за O(1). На каждой итерации loop берет корутину из начала deque, выполняет ее до следующего await, затем решает что с ней делать дальше.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Концептуальная схема работы планировщика
class Scheduler:
    def __init__(self):
        self.ready = deque()  # Готовые к выполнению
        self.sleeping = []    # Ожидают таймера
        self.waiting = {}     # Ожидают I/O
        
    def schedule(self, coro):
        """Добавить корутину в ready queue"""
        self.ready.append(coro)
    
    def run(self):
        while self.ready or self.sleeping or self.waiting:
            # Проверяем истекшие таймеры
            now = time.time()
            for timer in self.sleeping[:]:
                if timer.deadline <= now:
                    self.ready.append(timer.coro)
                    self.sleeping.remove(timer)
            
            # Проверяем готовые I/O события
            ready_io = check_io_events(timeout=0)
            for fd in ready_io:
                coro = self.waiting.pop(fd)
                self.ready.append(coro)
            
            # Выполняем одну готовую корутину
            if self.ready:
                coro = self.ready.popleft()
                result = coro.send(None)
                
                if isinstance(result, SleepRequest):
                    self.sleeping.append(Timer(result.duration, coro))
                elif isinstance(result, IORequest):
                    self.waiting[result.fd] = coro
Критический момент - корутина выполняется атомарно до await. Планировщик не может прервать ее в середине. Если корутина делает тяжелые вычисления без await, она монополизирует процессор. Я видел код, где корутина выполняла регулярное выражение на строке в 2 МБ - event loop застывал на 400 миллисекунд.

Время, выделяемое каждой корутине, называется квантом или time slice, но в asyncio этот термин условен. Корутина выполняется ровно до await - это может быть 10 микросекунд или 10 секунд. Никакого принудительного переключения нет, в отличие от потоков в ОС.

Селектор I/O - ключевой компонент планировщика. На Linux используется epoll, который может отслеживать тысячи файловых дескрипторов с минимальными накладными расходами. Когда все корутины в waiting состоянии, event loop вызывает selector.select(timeout). Это блокирующий вызов, но блокирует не программу, а только сам loop до появления готового I/O события.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Реальный код из asyncio (упрощенно)
def _run_once(self):
    # Вычисляем timeout для select
    timeout = None
    if self._ready:
        timeout = 0  # Есть готовые задачи, не ждем
    elif self._scheduled:
        # Ждем до ближайшего таймера
        timeout = self._scheduled[0]._when - time.monotonic()
    
    # Ждем I/O события
    event_list = self._selector.select(timeout)
    
    # Обрабатываем готовые события
    for key, mask in event_list:
        callback, data = key.data
        callback(data)
    
    # Обрабатываем истекшие таймеры
    end_time = time.monotonic()
    while self._scheduled:
        handle = self._scheduled[0]
        if handle._when > end_time:
            break
        heapq.heappop(self._scheduled)
        handle._run()
    
    # Выполняем готовые callback'и
    ntodo = len(self._ready)
    for i in range(ntodo):
        handle = self._ready.popleft()
        handle._run()
Таймеры хранятся в min-heap - бинарной куче, где наименьший элемент всегда на вершине. Проверка ближайшего таймера - O(1), добавление нового - O(log n). На каждой итерации loop проверяет, не истек ли срок у корутин в куче, и перемещает их в ready queue.

Интересная оптимизация - ready queue обрабатывается порциями. Loop запоминает, сколько задач было в начале итерации, и выполняет ровно столько. Задачи, добавленные в процессе выполнения текущей порции, попадут в следующую итерацию. Это предотвращает бесконечный цикл, если корутины постоянно планируют новые корутины. Справедливость распределения в asyncio относительна. Все корутины в ready queue равны, но та, что выполняется дольше без await, получает больше процессорного времени. Если одна корутина делает await каждые 100 микросекунд, а другая - каждую миллисекунду, вторая займет в 10 раз больше процессора. Я профилировал приложение, где одна корутина парсила JSON, а 50 других обрабатывали WebSocket сообщения. Парсинг занимал 80% процессорного времени, остальные корутины голодали. Решение - разбил парсинг на чанки с await между ними. CPU распределился равномернее, latency WebSocket соединений упал с 200 мс до 15 мс.

Планировщик не умеет в приоритеты из коробки. Нет способа сказать "эта корутина важнее". Все в одной очереди, все по порядку. Для критичных задач можно написать кастомный планировщик, но это редкость - обычно достаточно правильно расставить await.

Еще нюанс - вложенные await. Когда корутина A делает await корутины B, планировщик не создает новый контекст. Просто B становится текущей выполняемой корутиной. После ее завершения управление возвращается A, и она продолжает с места остановки. Никакого дополнительного overhead.

Стоимость переключения между корутинами микроскопическая - десятки наносекунд. Это просто смена указателя на текущий stack frame и вызов метода send(). Никаких системных вызовов, никакого взаимодействия с планировщиком ОС. Для сравнения - переключение контекста между потоками стоит 1-10 микросекунд, в 100-1000 раз дороже.

Эта архитектура объясняет, почему asyncio эффективен для I/O-bound задач с высокой конкурентностью. Тысячи корутин в waiting состоянии почти не потребляют ресурсов - просто записи в словаре. Активных корутин в ready queue обычно десятки, и они быстро переключаются между собой. Процессор не простаивает, память не раздувается, все работает в одном потоке без гонок за данными.

Типичные ошибки начинающих



Забыть await перед вызовом корутины - классика жанра. Код выглядит рабочим, тесты проходят, но корутина не выполняется. Python создает объект корутины и... на этом все. Никакого выполнения, никакой ошибки во время компиляции. Только runtime warning в консоли, который легко пропустить.

Python
1
2
3
4
5
6
7
8
9
10
11
async def fetch_data():
    await asyncio.sleep(1)
    return "данные"
 
async def wrong_way():
    result = fetch_data()  # Забыли await
    print(result)  # <coroutine object fetch_data at 0x...>
    
async def right_way():
    result = await fetch_data()  # Правильно
    print(result)  # "данные"
Я потратил три часа на отладку системы кеширования, где половина запросов проходила мимо кеша. Оказалось, функция сохранения в кеш была корутиной, но вызывалась без await. Данные не сохранялись, но код продолжал работать как ни в чем не бывало.

Блокирующие операции в корутинах - вторая по популярности ошибка. Используете обычный requests вместо aiohttp, читаете файл через стандартный open(), делаете time.sleep() вместо asyncio.sleep(). Event loop замирает, все остальные корутины ждут. Асинхронность превращается в иллюзию.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
import requests  # Синхронная библиотека
import asyncio
 
async def bad_fetch(url):
    # Блокирует весь event loop на время запроса
    response = requests.get(url)
    return response.text
 
async def good_fetch(url):
    # Асинхронная версия
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()
Коллега однажды написал парсер на asyncio, но использовал синхронную библиотеку beautifulsoup для обработки HTML. Парсинг одной страницы занимал 300 миллисекунд. Event loop тормозил на каждой странице - вместо 2 секунд на 100 страниц получалось 30 секунд. Переход на асинхронный парсер решил проблему.

Создание нового event loop в каждой функции - грубая ошибка, но встречается регулярно. asyncio.run() создает loop, выполняет корутину, закрывает loop. Вызов asyncio.run() внутри уже работающего loop вызовет RuntimeError.

Python
1
2
3
4
5
6
7
8
9
10
async def nested_call():
    await asyncio.sleep(1)
    
async def broken():
    # Ошибка: loop уже работает
    asyncio.run(nested_call())
    
async def correct():
    # Правильно: просто await
    await nested_call()
Игнорирование исключений в фоновых задачах - источник молчаливых отказов. Task упал с ошибкой, но если не сделали await на нем, исключение потеряется. Программа продолжит работу, а критичная фоновая задача мертва.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def background_task():
    await asyncio.sleep(1)
    raise ValueError("что-то сломалось")
 
async def silent_failure():
    task = asyncio.create_task(background_task())
    await asyncio.sleep(2)
    # task упал, но мы не узнали
 
async def handled_failure():
    task = asyncio.create_task(background_task())
    try:
        await task
    except ValueError as e:
        print(f"Поймали ошибку: {e}")
Незавершенные задачи при выходе из main() отменяются с CancelledError. Если не обработали это исключение и не очистили ресурсы, получите утечки соединений, незакрытые файлы, зависшие транзакции в базе.

Python
1
2
3
4
5
6
7
8
9
async def cleanup_needed():
    connection = await db.connect()
    try:
        while True:
            await asyncio.sleep(1)
            # Работаем с соединением
    except asyncio.CancelledError:
        await connection.close()  # Обязательно закрываем
        raise
Смешивание синхронного и асинхронного кода без понимания контекста. Пытаетесь вызвать await в обычной функции или запускаете синхронную блокирующую операцию в корутине. Python ругается на первое, второе молча убивает производительность.

Неправильное управление сессиями HTTP-клиента - создаете новую ClientSession на каждый запрос. Теряете connection pooling, каждый раз пересоздаете SSL контекст. Производительность падает, память растет.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
# Плохо
async def multiple_requests():
    for url in urls:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                data = await response.text()
 
# Хорошо
async def efficient_requests():
    async with aiohttp.ClientSession() as session:
        for url in urls:
            async with session.get(url) as response:
                data = await response.text()
Забыть про таймауты - корутина может зависнуть навечно при проблемах с сетью или внешним сервисом. Без asyncio.wait_for() или timeout в ClientSession получите мертвые корутины, пожирающие память.

Эти ошибки объединяет одно - недопонимание природы асинхронности. Asyncio требует переосмысления привычных паттернов. Блокирующие операции, которые в синхронном коде незаметны, в асинхронном становятся критичными. Каждый await - это точка переключения, каждая корутина без await - потерянная работа.

Трансформация синхронного кода в асинхронный: пошаговая методика



Переписать рабочий синхронный код на asyncio - не просто добавить async и await в случайных местах. Нужна система, последовательность действий. Иначе получится каша из блокирующих вызовов, которая работает медленнее оригинала.

Первый шаг - аудит зависимостей. Просканируйте код на библиотеки, которые выполняют I/O операции. requests, urllib, psycopg2, pymongo, redis, sqlite3 - все это синхронные библиотеки. Для каждой найдите асинхронный аналог: aiohttp вместо requests, asyncpg вместо psycopg2, motor вместо pymongo, aioredis для Redis.

Если асинхронного аналога нет - это стоп-фактор. Либо ищите альтернативную библиотеку, либо оборачивайте блокирующие вызовы в run_in_executor(). Второй вариант работает, но убивает часть преимуществ asyncio.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Синхронный код - идентифицируем I/O операции
import requests
import psycopg2
 
def fetch_user_data(user_id):
    # I/O операция 1: HTTP запрос
    response = requests.get(f"https://api.example.com/users/{user_id}")
    user_data = response.json()
    
    # I/O операция 2: запрос к базе
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM orders WHERE user_id = %s", (user_id,))
    orders = cursor.fetchall()
    
    return {"user": user_data, "orders": orders}
Второй шаг - замена блокирующих библиотек. Переписываете импорты, меняете синтаксис вызовов. requests.get() становится асинхронным контекстным менеджером в aiohttp. Подключение к базе тоже оборачивается в async with.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Асинхронная версия - заменили библиотеки
import aiohttp
import asyncpg
 
async def fetch_user_data(user_id):
    # Асинхронный HTTP запрос
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/users/{user_id}") as response:
            user_data = await response.json()
    
    # Асинхронный запрос к базе
    conn = await asyncpg.connect("postgresql://localhost/test")
    orders = await conn.fetch("SELECT * FROM orders WHERE user_id = $1", user_id)
    await conn.close()
    
    return {"user": user_data, "orders": orders}
Третий шаг - добавление async/`await`. Функция с I/O операциями становится корутиной через async def. Каждый вызов асинхронной операции получает await. Это не формальность - без await корутина не выполнится.

Четвертый шаг - параллелизация независимых операций. В синхронном коде запрос к API и базе выполняются последовательно. В асинхронном можно запустить их одновременно через asyncio.gather() или create_task().

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def fetch_user_data_parallel(user_id):
    # Запускаем операции параллельно
    async def get_api_data():
        async with aiohttp.ClientSession() as session:
            async with session.get(f"https://api.example.com/users/{user_id}") as response:
                return await response.json()
    
    async def get_db_data():
        conn = await asyncpg.connect("postgresql://localhost/test")
        orders = await conn.fetch("SELECT * FROM orders WHERE user_id = $1", user_id)
        await conn.close()
        return orders
    
    # Выполняем одновременно
    user_data, orders = await asyncio.gather(
        get_api_data(),
        get_db_data()
    )
    
    return {"user": user_data, "orders": orders}
Пятый шаг - рефакторинг циклов. Цикл с последовательными I/O операциями превращается в конкурентный. for остается, но внутри собираете корутины и запускаете через gather().

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Синхронный цикл
def fetch_multiple_users(user_ids):
    results = []
    for user_id in user_ids:
        data = fetch_user_data(user_id)  # Блокирует на каждой итерации
        results.append(data)
    return results
 
# Асинхронный вариант
async def fetch_multiple_users(user_ids):
    # Создаем список корутин
    tasks = [fetch_user_data(user_id) for user_id in user_ids]
    # Выполняем все параллельно
    results = await asyncio.gather(*tasks)
    return results
Шестой шаг - управление ресурсами. Соединения с базой, HTTP-сессии, файлы - все требует явного закрытия. Используйте async with для автоматической очистки или try/finally блоки.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def safe_database_operation():
    conn = await asyncpg.connect("postgresql://localhost/test")
    try:
        result = await conn.fetch("SELECT * FROM data")
        return result
    finally:
        await conn.close()  # Гарантированно закрываем
 
# Или с контекстным менеджером
async def safer_operation():
    async with asyncpg.create_pool("postgresql://localhost/test") as pool:
        async with pool.acquire() as conn:
            result = await conn.fetch("SELECT * FROM data")
            return result
Седьмой шаг - обработка ошибок. В асинхронном коде исключения могут теряться, если не обработаны правильно. gather() с return_exceptions=True собирает все результаты, включая ошибки.

Python
1
2
3
4
5
6
7
8
9
async def robust_fetch(user_ids):
    tasks = [fetch_user_data(uid) for uid in user_ids]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Разделяем успешные и неудачные результаты
    successes = [r for r in results if not isinstance(r, Exception)]
    failures = [r for r in results if isinstance(r, Exception)]
    
    return successes, failures
Восьмой шаг - добавление таймаутов. Асинхронные операции могут висеть вечно при проблемах с сетью. asyncio.wait_for() ограничивает время выполнения.

Python
1
2
3
4
5
6
7
8
9
async def fetch_with_timeout(user_id):
    try:
        result = await asyncio.wait_for(
            fetch_user_data(user_id),
            timeout=5.0  # Максимум 5 секунд
        )
        return result
    except asyncio.TimeoutError:
        return None  # Или обработать по-другому
Девятый шаг - адаптация точки входа. Синхронная main() становится корутиной и запускается через asyncio.run().

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Синхронная точка входа
def main():
    user_ids = [1, 2, 3, 4, 5]
    results = fetch_multiple_users(user_ids)
    process_results(results)
 
if __name__ == "__main__":
    main()
 
# Асинхронная точка входа
async def main():
    user_ids = [1, 2, 3, 4, 5]
    results = await fetch_multiple_users(user_ids)
    await process_results(results)  # Если process_results тоже асинхронная
 
if __name__ == "__main__":
    asyncio.run(main())
Десятый шаг - тестирование и профилирование. Убедитесь, что асинхронный код действительно быстрее. Используйте pytest-asyncio для тестов, измеряйте время выполнения.

Я переписывал API для сбора данных с 20 внешних сервисов. Синхронная версия работала 35 секунд. После миграции на asyncio - 3.2 секунды. Но первая попытка дала только 18 секунд, потому что забыл распараллелить запросы внутри одной функции. Профилирование показало узкие места.

Частая ошибка при миграции - неполная замена библиотек. Оставите один requests.get() в асинхронном коде - весь event loop будет блокироваться на этом вызове. Проверяйте каждую I/O операцию. Еще подвох - CPU-bound операции внутри корутин. Если после загрузки данных идет тяжелая обработка, вынесите ее в ProcessPoolExecutor. Иначе одна корутина заблокирует все остальные.

Python
1
2
3
4
5
6
7
8
9
10
11
12
import concurrent.futures
 
async def process_heavy_data(data):
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        # Вычисления идут в отдельном процессе
        result = await loop.run_in_executor(
            pool, 
            cpu_intensive_function, 
            data
        )
    return result
Миграция на asyncio - не волшебная пилюля. Если код и так быстрый или I/O операций мало, asyncio добавит только сложности. Но для конкурентных I/O-bound задач выигрыш может достигать 10-50х в производительности.

Python async await falcon
Всем привет, нигде не могу найти пример использования async await и фрэймворка falcon. Может у...

Async await в цикле for
Всем привет. Кусок кода (парсер+бот tg): if messages: for status_message...

Асинхронное программирование в Python
Доброго времени суток. Столкнулся с проблемой что в коде вызывается функция которая ждёт условные...

Python asyncio / aiohttp API 429 response error
Пытаюсь написать асинхронный API-запрос, нашел подходящий пример, но в ответ получаю ошибку: &quot;429...

python. flet. asyncio.Queue
Всем привет, есть проблема с понимаем как работают очереди и/или проблема с тем, что уже...

aiogram, RuntimeError: Event loop is closed
Добрый вечер. Я недавно начал программировать телеграмм бота но столкнулся с одной проблемой что...

Telethon ошибка RuntimeError: There is no current event loop in thread 'Thread-2 (process_request_thread)'
Всем привет! Есть файл с классом парсера, который использует телетон. При вызове сталкиваюсь с...

Event loop в matplotlib
Здравствуйте. Помогите, пожалуйста с решением проблемы: QCoreApplication::exec: The event loop is...

В чем отличие между использованием event loop при создании задач и без него?
Может кто-нибудь, пожалуйста, объяснить отличие вот этого кода с loop, где создаются таски (строки...

Не работает event.ref и event.ref_source
Все версии самые последние Код(вставил только то что имеет отношение к проблеме т.к. файл на 2000...

Потоки, процессы и асинхронное программирование
Добрый вечер. Почитал документацию и по моему я только еще больше запутался. Есть модуль...

Асинхронное программирование
Не всегда включаюсь в тему асинхронного программирования, рассказали бы в краткой форме,...

Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Асинхронный приём данных из COM-порта
Argus19 01.05.2026
Асинхронный приём данных из COM-порта Купил на aliexpress термопринтер QR701. Он оказался странным. Поключил к Arduino Nano. Был очень удивлён. Наотрез отказывается печатать русские буквы. Чтобы. . .
попытка написать игровой сервер на C++
pyirrlicht 29.04.2026
попытка написать игровой сервер на плюсах с открытым бесконечным миром. возможно получится прикрутить интерпретатор питон для кастомизации игровой логики. что есть на текущий момент:. . .
Контроль уникальности выбранного документа-основания при изменении реквизита
Maks 28.04.2026
Алгоритм из решения ниже разработан на примере нетипового документа "ЗаявкаНаРемонтСпецтехники", разработанного в КА2. Задача: уведомлять пользователя, если указанная заявка (документ-основание). . .
Благородство как наказание
Maks 24.04.2026
У хорошего человека отношения с женщинами всегда складываются трудно. А я человек хороший. Заявляю без тени смущения, потому что гордиться тут нечем. От хорошего человека ждут соответствующего. . .
Валидация и контроль данных табличной части документа перед записью
Maks 22.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа, разработанного в КА2. Задача: контроль и валидация данных табличной части документа перед записью с учетом регламента компании. . .
Отчёт о затраченных материалах за определенный период с макетом печатной формы
Maks 21.04.2026
Отчёт из решения ниже размещён в конфигурации КА2. Задача: разработка отчёта по затраченным материалам за определённый период, с возможностью вывода печатной формы отчёта с шапкой и подвалом. В. . .
Отчёт о спецтехнике находящейся в ремонте
Maks 20.04.2026
Отчёт из решения ниже размещен в конфигурации КА2. Задача: отобразить спецтехнику, которая на данный момент находится в ремонте. Есть нетиповой документ "Заявка на ремонт спецтехники" который. . .
Памятка для бота и "визитка" для читателей "Semantic Universe Layer (Слой семантической вселенной)"
Hrethgir 19.04.2026
Сгенерировано для краткого описания по случаю сборки и компиляции скелета серверного приложения. И пусть после этого скажут, что статьи сгенерированные AI - туфта и не интересно. И это не реклама -. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru