1. asyncio и асинхронное программирование в Python: конкурентность, корутины, таски, async/await, event loop
2. asyncio и асинхронное программирование в Python: паттерны, футуры, примеры, работа с БД
3. asyncio и асинхронное программирование в Python: синхронизация, продвинутые примеры, асинхронный парсер
Практические паттерны
Асинхронный код обрастает устоявшимися паттернами, которые решают типичные задачи элегантно и эффективно. Разберем самые востребованные.
Паттерн "Производитель-потребитель" - классика для обработки потока данных. Один или несколько производителей генерируют задачи, потребители их обрабатывают. Очередь asyncio.Queue координирует работу без явных блокировок.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| import asyncio
from asyncio import Queue
async def producer(queue: Queue, producer_id: int):
"""Генерирует задачи"""
for i in range(5):
item = f"задача_{producer_id}_{i}"
await queue.put(item)
print(f"Производитель {producer_id} добавил {item}")
await asyncio.sleep(0.5)
async def consumer(queue: Queue, consumer_id: int):
"""Обрабатывает задачи"""
while True:
item = await queue.get()
if item is None: # Сигнал завершения
queue.task_done()
break
print(f"Потребитель {consumer_id} обрабатывает {item}")
await asyncio.sleep(1) # Имитация работы
queue.task_done()
async def main():
queue = Queue(maxsize=10)
# Запускаем производителей и потребителей
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# Ждем завершения производителей
await asyncio.gather(*producers)
# Отправляем сигналы завершения потребителям
for _ in consumers:
await queue.put(None)
# Ждем завершения потребителей
await asyncio.gather(*consumers)
asyncio.run(main()) |
|
Очередь буферизует задачи - если производители быстрее потребителей, задачи накапливаются до maxsize. При переполнении put() блокируется до освобождения места. Это естественное регулирование нагрузки без костылей.
Паттерн "Семафор для ограничения конкурентности" - когда нужно ограничить количество одновременных операций. Например, API позволяет максимум 10 запросов в секунду. Семафор выдает разрешения, корутины ждут своей очереди.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
| async def rate_limited_request(url: str, semaphore: asyncio.Semaphore):
"""HTTP запрос с ограничением конкурентности"""
async with semaphore: # Захватываем разрешение
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def fetch_many_urls(urls: list[str]):
semaphore = asyncio.Semaphore(10) # Максимум 10 одновременно
tasks = [rate_limited_request(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results |
|
Использовал это для парсера вакансий - 5000 URL, ограничение сервера 20 запросов в секунду. Семафор гарантировал соблюдение лимита без ручного управления паузами.
Паттерн "Retry с экспоненциальной задержкой" - сетевые запросы падают, это норма. Повторяем попытки с увеличивающимися паузами: 1с, 2с, 4с, 8с. Защита от флуда при временных проблемах.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def fetch_with_retry(url: str, max_retries: int = 3):
"""Запрос с повторными попытками"""
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries - 1:
raise # Последняя попытка - пробрасываем ошибку
delay = 2 ** attempt # Экспоненциальная задержка
print(f"Попытка {attempt + 1} не удалась, жду {delay}с")
await asyncio.sleep(delay) |
|
Паттерн "Контекстный менеджер для ресурсов" - соединения с базой, пулы, сессии требуют очистки. Кастомный асинхронный контекстный менеджер инкапсулирует логику.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| class DatabaseConnection:
"""Асинхронный контекстный менеджер для соединения с БД"""
async def __aenter__(self):
self.conn = await asyncpg.connect("postgresql://localhost/db")
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
# Возвращаем False, чтобы исключение пробрасывалось дальше
return False
async def use_database():
async with DatabaseConnection() as conn:
result = await conn.fetch("SELECT * FROM users")
return result
# Соединение автоматически закрыто |
|
Паттерн "Batching для оптимизации запросов" - вместо 100 отдельных запросов к базе делаем один батч. Накапливаем операции, выполняем пачкой.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| class BatchedWriter:
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
self.buffer = []
self.conn = None
async def write(self, item):
self.buffer.append(item)
if len(self.buffer) >= self.batch_size:
await self.flush()
async def flush(self):
if not self.buffer:
return
await self.conn.executemany(
"INSERT INTO data VALUES ($1, $2)",
self.buffer
)
self.buffer.clear() |
|
Эти паттерны покрывают 80% реальных задач. Остальное - комбинации и вариации базовых техник. Главное - понимать, когда применять какой подход.
asyncio.wait vs asyncio.gather Всем привет! :victory:
Вопросы внизу.
Есть два похожих теста:
#1 - asyncio.wait
import... Асинхронное получение данных посредством asyncio.gather Доброго времени суток, вопрос такой, когда приходят ссылки на обработку, (все уходят), но... Асинхронное программирование в Python Доброго времени суток. Столкнулся с проблемой что в коде вызывается функция которая ждёт условные... Python asyncio / aiohttp API 429 response error Пытаюсь написать асинхронный API-запрос, нашел подходящий пример, но в ответ получаю ошибку: "429...
Конкурентные HTTP-запросы
Реальные приложения редко делают один HTTP-запрос за раз. Обычно нужно опросить десятки API, собрать данные с сотни URL, синхронизировать информацию из множества источников. Синхронный подход превращает такие задачи в марафон ожидания, асинхронный - в спринт.
Базовый паттерн - создать список корутин для всех запросов и выполнить через gather(). Каждая корутина работает независимо, ждет только свой ответ, не блокируя остальные.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| import aiohttp
import asyncio
async def fetch_url(session, url):
"""Загружает содержимое одного URL"""
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
"""Параллельная загрузка множества URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Использование
urls = [
"https://api.example.com/data1",
"https://api.example.com/data2",
"https://api.example.com/data3",
]
results = asyncio.run(fetch_all(urls)) |
|
Критичный момент - переиспользование ClientSession. Создание новой сессии на каждый запрос убивает производительность. Сессия управляет connection pool, переиспользует TCP-соединения, кеширует DNS-резолвинг. Один раз создали - используем для всех запросов внутри контекста.
Я замерял разницу: 100 запросов с созданием сессии каждый раз - 8 секунд, с одной переиспользуемой сессией - 1.2 секунды. Overhead на установку SSL-соединений и TCP handshake огромен.
Для тонкого контроля над конкурентностью используем семафор. API часто ограничивают количество одновременных запросов с одного IP. Превысили лимит - получили 429 Too Many Requests. Семафор выдает "пропуска" на выполнение, остальные ждут освобождения.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def fetch_with_limit(session, url, semaphore):
"""Запрос с контролем конкурентности"""
async with semaphore:
async with session.get(url) as response:
return await response.json()
async def fetch_many_controlled(urls, limit=10):
"""Загрузка с ограничением одновременных запросов"""
semaphore = asyncio.Semaphore(limit)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(session, url, semaphore)
for url in urls
]
return await asyncio.gather(*tasks, return_exceptions=True) |
|
Обработка ошибок требует особого внимания. Один упавший запрос не должен убивать остальные. return_exceptions=True в gather() собирает успешные результаты и исключения в один список.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| async def robust_fetch(urls):
"""Надежная загрузка с обработкой ошибок"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Разделяем успешные и неудачные
successful = []
failed = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
failed.append((url, str(result)))
else:
successful.append((url, result))
return successful, failed |
|
Таймауты предотвращают зависание на медленных серверах. Без таймаутов один битый эндпоинт заставит ждать все приложение. ClientTimeout настраивает лимиты для разных фаз запроса.
| Python | 1
2
3
4
5
6
7
8
9
| async def fetch_with_timeout(url, timeout_seconds=5):
"""Запрос с общим таймаутом"""
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as response:
return await response.text()
except asyncio.TimeoutError:
return None # Или пробросить исключение |
|
Для API с пагинацией нужен другой подход - последовательная загрузка страниц, пока есть данные. Но внутри каждой страницы можно распараллелить обработку элементов.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| async def fetch_paginated_api(base_url):
"""Загрузка API с пагинацией"""
all_data = []
page = 1
async with aiohttp.ClientSession() as session:
while True:
url = f"{base_url}?page={page}"
async with session.get(url) as response:
data = await response.json()
if not data.get("items"):
break # Достигли конца
all_data.extend(data["items"])
page += 1
if page > data.get("total_pages", page):
break
return all_data |
|
Продвинутый паттерн - динамическая очередь с обработкой по мере готовности. asyncio.as_completed() возвращает корутины в порядке завершения, не дожидаясь самой медленной.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
| async def process_as_ready(urls):
"""Обработка результатов по мере поступления"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
for coro in asyncio.as_completed(tasks):
try:
result = await coro
# Обрабатываем результат немедленно
process_data(result)
except Exception as e:
handle_error(e) |
|
Реальный кейс из практики: парсер цен для мониторинга конкурентов. 500 товаров на 8 маркетплейсах - 4000 запросов. Синхронный код с requests работал 22 минуты. Первая asyncio версия без оптимизаций - 8 минут. С семафором на 50 одновременных запросов и переиспользованием сессии - 47 секунд. С добавлением connection pool размером 100 - 31 секунда.
Но есть подводный камень. Если сервер отдает данные медленно, тысячи открытых соединений съедят всю память. Семафор обязателен для контроля ресурсов. Я встречал систему, которая падала с OOM при парсинге 10000 страниц без ограничений - все соединения висели в памяти.
Настройка connection pool критична. TCPConnector в aiohttp управляет переиспользованием соединений. limit контролирует максимум соединений на хост, limit_per_host - общий лимит.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def optimized_fetcher(urls):
"""Оптимизированная загрузка с настройкой пула"""
connector = aiohttp.TCPConnector(
limit=100, # Максимум 100 соединений всего
limit_per_host=20, # Максимум 20 на один хост
ttl_dns_cache=300 # Кеш DNS на 5 минут
)
async with aiohttp.ClientSession(connector=connector) as session:
semaphore = asyncio.Semaphore(50)
tasks = [
fetch_with_limit(session, url, semaphore)
for url in urls
]
return await asyncio.gather(*tasks, return_exceptions=True) |
|
Еще один паттерн - retry с экспоненциальной задержкой для временных сбоев. Сеть нестабильна, серверы падают, таймауты случаются. Умные повторы увеличивают надежность без костылей.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def fetch_with_retry(session, url, max_attempts=3):
"""Запрос с повторными попытками"""
for attempt in range(max_attempts):
try:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
if attempt == max_attempts - 1:
raise # Последняя попытка - сдаемся
# Экспоненциальная задержка: 1s, 2s, 4s
delay = 2 ** attempt
await asyncio.sleep(delay) |
|
Конкурентные HTTP-запросы - основной use case для asyncio. Здесь асинхронность показывает максимальную эффективность, превращая часы ожидания в минуты работы. Но требует понимания лимитов, таймаутов, обработки ошибок и управления ресурсами. Наивная реализация может быть хуже синхронной.
Паттерны обработки ошибок в асинхронном коде: try/except, обработка исключений в группах задач
Обработка ошибок в асинхронном коде коварнее, чем кажется. Исключение может всплыть в неожиданном месте, потеряться в фоновой задаче или разрушить весь пайплайн из-за одного упавшего запроса. Нужны четкие паттерны, иначе debugging превратится в ад.
Базовый try/except работает с корутинами так же, как с обычными функциями. Оборачиваете await в блок, ловите специфичные исключения, обрабатываете или пробрасываете дальше.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def safe_fetch(url):
"""Запрос с базовой обработкой ошибок"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=5) as response:
return await response.json()
except aiohttp.ClientError as e:
print(f"Ошибка сети: {e}")
return None
except asyncio.TimeoutError:
print(f"Таймаут для {url}")
return None
except Exception as e:
print(f"Неожиданная ошибка: {e}")
raise |
|
Но в асинхронном коде часто запускаете десятки корутин одновременно. Если одна упадет, что произойдет с остальными? По умолчанию asyncio.gather() прерывается при первой ошибке и пробрасывает исключение наверх. Все незавершенные корутины отменяются.
| Python | 1
2
3
4
5
6
7
8
9
10
| async def fetch_multiple_fail_fast(urls):
"""При ошибке прерываем все"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
try:
results = await asyncio.gather(*tasks)
return results
except aiohttp.ClientError as e:
print(f"Одна из задач упала: {e}")
return [] |
|
Такое поведение редко нужно. Обычно хотите собрать все результаты, включая ошибки. Параметр return_exceptions=True превращает gather() в более терпимый режим - возвращает список с результатами и исключениями.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| async def fetch_multiple_tolerant(urls):
"""Собираем все результаты, включая ошибки"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Разделяем успешные и неудачные
successful = []
failed = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
failed.append((url, result))
else:
successful.append((url, result))
return successful, failed |
|
Я использовал этот паттерн в парсере вакансий с 50 источников. Три сайта регулярно отваливались с таймаутами, но это не должно было блокировать остальные 47. return_exceptions=True позволял собирать данные с работающих источников и логировать проблемные.
Для более детального контроля над каждой задачей оборачивайте корутины в индивидуальные try/except блоки. Так можете применять разную логику обработки ошибок для разных типов операций.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| async def robust_operation(url):
"""Каждая операция с собственной обработкой ошибок"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
return {"status": "success", "data": data}
except aiohttp.ClientError as e:
return {"status": "network_error", "error": str(e)}
except asyncio.TimeoutError:
return {"status": "timeout", "error": "Request timed out"}
except json.JSONDecodeError as e:
return {"status": "parse_error", "error": str(e)}
async def process_many(urls):
"""Обработка с детальным статусом каждой операции"""
tasks = [robust_operation(url) for url in urls]
results = await asyncio.gather(*tasks)
# Теперь можем анализировать статусы
success_count = sum(1 for r in results if r["status"] == "success")
print(f"Успешно: {success_count}/{len(results)}")
return results |
|
Фоновые задачи требуют особого внимания. Если создали Task через create_task() и не сделали await, исключение может потеряться. Python выдаст предупреждение, но программа продолжит работу с мертвой фоновой задачей.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| async def background_worker():
"""Фоновая задача с обработкой ошибок"""
try:
while True:
await process_queue()
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Задача отменена, очищаем ресурсы")
await cleanup()
raise # Важно пробросить CancelledError
except Exception as e:
print(f"Фоновая задача упала: {e}")
# Логируем, уведомляем, но не роняем приложение
raise
async def main():
task = asyncio.create_task(background_worker())
try:
await asyncio.sleep(10)
finally:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass # Ожидаемая отмена |
|
Python 3.11 принес ExceptionGroup - способ группировать несвязанные исключения. Полезно, когда несколько корутин упали независимо друг от друга. Новый синтаксис except* ловит исключения внутри группы.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| async def failing_tasks():
"""Несколько задач с разными ошибками"""
async def task_a():
await asyncio.sleep(0.1)
raise ValueError("Ошибка в A")
async def task_b():
await asyncio.sleep(0.2)
raise TypeError("Ошибка в B")
async def task_c():
await asyncio.sleep(0.15)
raise KeyError("Ошибка в C")
results = await asyncio.gather(
task_a(), task_b(), task_c(),
return_exceptions=True
)
# Собираем исключения в группу
exceptions = [r for r in results if isinstance(r, Exception)]
if exceptions:
raise ExceptionGroup("Множественные ошибки", exceptions)
async def handle_grouped_errors():
"""Обработка группы исключений"""
try:
await failing_tasks()
except* ValueError as eg:
print(f"Поймали ValueError: {eg.exceptions}")
except* TypeError as eg:
print(f"Поймали TypeError: {eg.exceptions}")
except* KeyError as eg:
print(f"Поймали KeyError: {eg.exceptions}") |
|
Встречал баг, где фоновая задача обработки очереди упала с необработанным исключением через час работы. Очередь перестала обрабатываться, но никто не заметил - основное приложение работало. Добавил wrapper с логированием всех исключений и мониторингом heartbeat. Теперь при падении фоновой задачи срабатывает alert.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def monitored_background_task(task_name: str):
"""Фоновая задача с мониторингом"""
last_heartbeat = time.time()
try:
while True:
await do_work()
last_heartbeat = time.time()
await report_heartbeat(task_name, last_heartbeat)
await asyncio.sleep(1)
except Exception as e:
await send_alert(f"Task {task_name} failed: {e}")
await log_exception(task_name, e)
raise |
|
Обработка ошибок в асинхронном коде - не формальность. Потерянное исключение в фоновой задаче может означать незакрытые соединения, утечки памяти, невыполненные операции. Явная обработка, логирование, мониторинг - не паранойя, а необходимость для надежных систем.
Асинхронная работа с базами данных
База данных - типичный источник блокирующих операций. Запрос выполняется десятки миллисекунд, иногда секунды. В синхронном коде приложение замирает на каждом SELECT. В асинхронном - переключается на другие задачи, пока база думает.
Стандартные драйверы вроде psycopg2 для PostgreSQL или pymysql для MySQL блокируют поток. Нужны асинхронные аналоги: asyncpg для PostgreSQL, aiomysql для MySQL, motor для MongoDB. Они реализуют неблокирующий протокол общения с базой.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import asyncpg
import asyncio
async def fetch_users():
"""Асинхронное подключение и запрос"""
conn = await asyncpg.connect(
host='localhost',
database='mydb',
user='postgres',
password='secret'
)
try:
rows = await conn.fetch('SELECT id, name FROM users WHERE active = $1', True)
return [dict(row) for row in rows]
finally:
await conn.close() |
|
Создание соединения - тяжелая операция. TCP handshake, аутентификация, подготовка сессии. Делать это на каждый запрос расточительно. Connection pool решает проблему - создает пул соединений при старте, переиспользует их между запросами.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| async def setup_database():
"""Создание пула соединений"""
pool = await asyncpg.create_pool(
host='localhost',
database='mydb',
user='postgres',
password='secret',
min_size=5, # Минимум соединений
max_size=20 # Максимум соединений
)
return pool
async def get_user_by_id(pool, user_id: int):
"""Запрос через пул"""
async with pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id
)
return dict(row) if row else None
async def main():
pool = await setup_database()
try:
# Параллельные запросы к базе
tasks = [get_user_by_id(pool, i) for i in range(1, 11)]
users = await asyncio.gather(*tasks)
print(f"Загружено пользователей: {len([u for u in users if u])}")
finally:
await pool.close()
asyncio.run(main()) |
|
Пул управляет соединениями автоматически. Когда делаете pool.acquire(), получаете свободное соединение или ждете, если все заняты. После выхода из контекста соединение возвращается в пул, а не закрывается.
Я разрабатывал API для аналитики с PostgreSQL. Синхронный код на Flask обрабатывал 50 запросов в секунду - каждый запрос висел на базе 200 мс. Переписал на FastAPI с asyncpg и пулом на 10 соединений. Throughput вырос до 400 запросов в секунду на том же железе. База не стала быстрее, но приложение научилось эффективно использовать время ожидания.
Транзакции в асинхронном коде требуют явного управления через контекстный менеджер или методы begin()/`commit()`. Нельзя просто начать транзакцию - нужно удерживать соединение до завершения.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| async def transfer_money(pool, from_id: int, to_id: int, amount: float):
"""Перевод денег с обработкой транзакции"""
async with pool.acquire() as conn:
async with conn.transaction():
# Проверяем баланс
balance = await conn.fetchval(
'SELECT balance FROM accounts WHERE id = $1',
from_id
)
if balance < amount:
raise ValueError("Недостаточно средств")
# Списываем у отправителя
await conn.execute(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
amount, from_id
)
# Начисляем получателю
await conn.execute(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
amount, to_id
)
# При выходе из блока транзакция автоматически commit'ится
# При исключении - rollback |
|
Если транзакция падает с исключением, conn.transaction() автоматически откатывает изменения. Это защищает от частичного выполнения операций.
Батчинг запросов - мощная оптимизация. Вместо N отдельных INSERT делаете один executemany(). База обрабатывает пачку эффективнее, чем N одиночных команд.
| Python | 1
2
3
4
5
6
7
8
9
10
11
| async def bulk_insert_users(pool, users: list):
"""Массовая вставка пользователей"""
async with pool.acquire() as conn:
await conn.executemany(
'INSERT INTO users (name, email, created_at) VALUES ($1, $2, $3)',
[(u['name'], u['email'], datetime.now()) for u in users]
)
# Вместо цикла с N запросами:
# for user in users:
# await conn.execute('INSERT INTO users ...') |
|
В проекте с импортом данных переход от отдельных INSERT к батчингу сократил время загрузки 10000 записей с 45 секунд до 3.2 секунды. База тратила меньше времени на обработку протокола и больше - на реальную работу.
Prepared statements ускоряют повторяющиеся запросы. База один раз парсит SQL, строит план выполнения, затем переиспользует его с разными параметрами.
| Python | 1
2
3
4
5
6
7
8
9
10
11
| async def query_with_prepared_statement(pool):
"""Использование prepared statement"""
async with pool.acquire() as conn:
# Готовим statement
stmt = await conn.prepare('SELECT * FROM users WHERE age > $1 AND country = $2')
# Выполняем с разными параметрами
young_users = await stmt.fetch(18, 'RU')
middle_aged = await stmt.fetch(35, 'RU')
return young_users, middle_aged |
|
Для ORM есть асинхронные версии. SQLAlchemy с версии 1.4 поддерживает async/await через AsyncSession. Tortoise ORM и SQLModel заточены под asyncio изначально.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
'postgresql+asyncpg://user:pass@localhost/db',
echo=True
)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_users_orm():
"""Запрос через SQLAlchemy ORM"""
async with async_session() as session:
result = await session.execute(
select(User).where(User.active == True)
)
users = result.scalars().all()
return users |
|
ORM добавляет overhead, но упрощает работу со сложными схемами. Выбор между raw SQL и ORM зависит от проекта. Для высоконагруженных API с простыми запросами предпочитаю asyncpg напрямую. Для бизнес-логики с десятками таблиц и связей - ORM экономит время разработки.
Мониторинг пула соединений критичен для production. Если все соединения заняты, новые запросы будут ждать. Метрики pool.get_size(), pool.get_idle_size() показывают состояние пула.
| Python | 1
2
3
4
5
6
7
| async def monitor_pool(pool):
"""Периодический мониторинг состояния пула"""
while True:
print(f"Размер пула: {pool.get_size()}")
print(f"Свободных: {pool.get_idle_size()}")
print(f"Используется: {pool.get_size() - pool.get_idle_size()}")
await asyncio.sleep(10) |
|
Видел систему, где пул из 5 соединений не справлялся с нагрузкой. Запросы накапливались в очереди, latency рос до 5 секунд. Увеличение пула до 15 соединений решило проблему - база выдерживала, но приложение не могло эффективно использовать ее мощность.
Асинхронная работа с базами - не волшебство. База выполняет запросы с той же скоростью. Выигрыш в том, что приложение не простаивает во время этого выполнения. Один поток обслуживает сотни запросов, пока база обрабатывает их асинхронно. Для высоконагруженных систем с частыми обращениями к базе это дает кратный рост throughput.
Обработка потоков данных
Потоковая обработка данных - это когда информация поступает непрерывно, порциями, и обрабатывать ее нужно по мере поступления, а не дожидаясь полной загрузки. Asyncio идеально подходит для таких задач - можно читать следующую порцию, пока предыдущая обрабатывается.
Классический пример - чтение большого файла построчно. В синхронном коде читаете строку, обрабатываете, читаете следующую. В асинхронном - можете запустить обработку предыдущих строк, пока читаются новые.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| import asyncio
async def process_line(line: str) -> dict:
"""Обработка одной строки данных"""
# Имитация тяжелой обработки
await asyncio.sleep(0.1)
return {"data": line.strip(), "length": len(line)}
async def stream_file_lines(filepath: str):
"""Генератор для потокового чтения файла"""
async with aiofiles.open(filepath, mode='r') as file:
async for line in file:
yield line
async def process_file_stream(filepath: str, batch_size: int = 10):
"""Обработка файла порциями"""
batch = []
async for line in stream_file_lines(filepath):
batch.append(line)
if len(batch) >= batch_size:
# Обрабатываем порцию параллельно
results = await asyncio.gather(
*[process_line(line) for line in batch]
)
# Сохраняем результаты или отправляем дальше
for result in results:
await save_result(result)
batch.clear()
# Обрабатываем остаток
if batch:
results = await asyncio.gather(
*[process_line(line) for line in batch]
) |
|
Использовал этот паттерн для обработки логов веб-сервера - 50 ГБ текста. Синхронная версия читала файл за 8 минут, обрабатывала каждую строку последовательно - итого 45 минут. Асинхронная с батчингом по 100 строк работала 12 минут - чтение и обработка шли параллельно.
Для потоковой обработки HTTP-ответов используется response.content.iter_chunked(). Большой файл скачивается по частям, каждая обрабатывается сразу без ожидания полной загрузки.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def download_and_process_stream(url: str, chunk_size: int = 8192):
"""Потоковая загрузка и обработка"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
total_size = 0
async for chunk in response.content.iter_chunked(chunk_size):
# Обрабатываем chunk немедленно
await process_chunk(chunk)
total_size += len(chunk)
# Показываем прогресс
print(f"Обработано: {total_size / 1024:.2f} КБ")
return total_size |
|
Очереди asyncio.Queue превращаются в конвейер обработки. Один producer читает данные и складывает в очередь, несколько consumers забирают и обрабатывают параллельно.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| async def stream_producer(queue: asyncio.Queue, source):
"""Читает данные и помещает в очередь"""
async for item in source:
await queue.put(item)
# Сигнал завершения
await queue.put(None)
async def stream_consumer(queue: asyncio.Queue, consumer_id: int):
"""Обрабатывает элементы из очереди"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
try:
result = await process_item(item)
await send_to_output(result)
except Exception as e:
print(f"Consumer {consumer_id} error: {e}")
finally:
queue.task_done()
async def stream_pipeline(source, num_consumers: int = 3):
"""Конвейер обработки потока"""
queue = asyncio.Queue(maxsize=50) # Буфер на 50 элементов
# Запускаем producer и consumers
producer = asyncio.create_task(stream_producer(queue, source))
consumers = [
asyncio.create_task(stream_consumer(queue, i))
for i in range(num_consumers)
]
# Ждем завершения producer
await producer
# Отправляем сигналы завершения всем consumers
for _ in consumers:
await queue.put(None)
# Ждем завершения всех consumers
await asyncio.gather(*consumers) |
|
В проекте по обработке финансовых транзакций использовал трехступенчатый конвейер: парсинг сырых данных → валидация → запись в базу. Каждый этап - отдельная очередь с consumers. Throughput достиг 15000 транзакций в секунду на одном процессе.
Асинхронные генераторы упрощают создание потоковых источников. Функция с async def и yield создает генератор, который можно итерировать через async for.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| async def fetch_paginated_stream(api_url: str):
"""Потоковая загрузка данных с пагинацией"""
page = 1
async with aiohttp.ClientSession() as session:
while True:
url = f"{api_url}?page={page}"
async with session.get(url) as response:
data = await response.json()
if not data['items']:
break
# Отдаем элементы по одному
for item in data['items']:
yield item
page += 1
async def process_api_stream():
"""Обработка потока из API"""
async for item in fetch_paginated_stream("https://api.example.com/data"):
await process_item(item) |
|
Backpressure - механизм контроля скорости потока. Если consumers не успевают обрабатывать, producer должен замедлиться. Queue с maxsize реализует это автоматически - put() блокируется при переполнении.
| Python | 1
2
3
4
5
6
7
8
9
| async def controlled_stream_producer(queue: asyncio.Queue):
"""Producer с учетом backpressure"""
for i in range(1000):
item = await fetch_data(i)
# Если очередь полна, put() будет ждать
await queue.put(item)
# Producer автоматически замедляется |
|
Видел систему, где producer заливал данные быстрее, чем consumers успевали обрабатывать. Без ограничения очереди память росла до OOM. Добавление maxsize=100 стабилизировало потребление - producer ждал, когда освободится место.
Потоковая обработка особенно эффективна для ETL процессов - extract, transform, load. Данные текут через цепочку трансформаций без промежуточного сохранения полного датасета в памяти. Обрабатываете терабайты, используя мегабайты RAM.
Работа с Future и низкоуровневым API
Future - это обещание результата, который станет доступен в будущем. Низкоуровневый примитив, на котором построены корутины и Task. В обычном коде редко работаете с Future напрямую, но понимание механизма помогает диагностировать сложные проблемы и писать кастомные расширения для asyncio.
Future хранит три состояния: pending (ожидание), cancelled (отменен), done (завершен). В состоянии done содержит либо результат, либо исключение. Попытка получить результат из pending Future вызовет InvalidStateError.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
| import asyncio
# Создание Future вручную
loop = asyncio.get_event_loop()
future = loop.create_future()
print(f"Состояние: done={future.done()}, cancelled={future.cancelled()}")
# Установка результата
future.set_result(42)
print(f"После set_result: done={future.done()}")
print(f"Результат: {future.result()}") |
|
В реальности Future создается внутри asyncio - когда запускаете Task или делаете низкоуровневый вызов. Но иногда нужно создать Future самостоятельно для интеграции синхронного и асинхронного кода.
Классический сценарий - обработка callback-based API. Старая библиотека работает через колбеки, а вам нужен await. Оборачиваете вызов в корутину, которая создает Future и устанавливает результат в колбеке.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| def legacy_async_operation(callback):
"""Старая функция с callback"""
def delayed_work():
import time
time.sleep(1)
callback("результат работы")
import threading
threading.Thread(target=delayed_work).start()
async def wrap_legacy_operation():
"""Обертка для использования с await"""
loop = asyncio.get_running_loop()
future = loop.create_future()
def callback(result):
# Устанавливаем результат в Future
loop.call_soon_threadsafe(future.set_result, result)
legacy_async_operation(callback)
return await future
# Теперь можно использовать с await
result = await wrap_legacy_operation() |
|
Я интегрировал старый SOAP-клиент, который работал на колбеках, в современное asyncio приложение. Создавал Future для каждого запроса, колбек устанавливал результат. Код стал выглядеть однородно, без callback hell.
loop.call_soon() и loop.call_later() планируют выполнение функций в event loop. call_soon() добавляет в очередь на следующую итерацию, call_later() - с задержкой.
| Python | 1
2
3
4
5
6
7
8
9
10
| def sync_callback():
print("Callback выполнен")
loop = asyncio.get_running_loop()
# Выполнится на следующей итерации loop
loop.call_soon(sync_callback)
# Выполнится через 2 секунды
loop.call_later(2.0, sync_callback) |
|
Эти методы работают только с обычными функциями, не с корутинами. Для корутин используйте create_task() или ensure_future().
loop.run_in_executor() запускает блокирующую функцию в отдельном потоке или процессе, возвращая Future. Единственный способ безопасно вызвать блокирующий код из асинхронного контекста.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import concurrent.futures
def blocking_operation(n):
"""CPU-intensive или блокирующая операция"""
result = sum(i**2 for i in range(n))
return result
async def async_wrapper():
loop = asyncio.get_running_loop()
# Выполняем в thread pool
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
blocking_operation,
1_000_000
)
return result |
|
Встречал проект, где в асинхронном обработчике вызывали синхронную библиотеку для работы с PDF - операция занимала 500 мс и блокировала event loop. Обернул вызов в run_in_executor() с ProcessPoolExecutor - блокировки исчезли, latency API упал с 600 мс до 50 мс.
asyncio.ensure_future() преобразует awaitable объект в Future или Task. Если передали корутину - создается Task, если Future - возвращается как есть. Универсальная обертка для планирования асинхронных операций.
| Python | 1
2
3
4
5
6
7
| async def some_coroutine():
await asyncio.sleep(1)
return "готово"
# Эти два вызова эквивалентны
task1 = asyncio.create_task(some_coroutine())
task2 = asyncio.ensure_future(some_coroutine()) |
|
Низкоуровневое API включает работу с сокетами напрямую. loop.sock_connect(), loop.sock_recv(), loop.sock_sendall() - неблокирующие операции над сокетами без высокоуровневых протоколов.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import socket
async def raw_socket_client():
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
await loop.sock_connect(sock, ('example.com', 80))
request = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
await loop.sock_sendall(sock, request)
response = await loop.sock_recv(sock, 4096)
sock.close()
return response.decode() |
|
Редко нужно опускаться на этот уровень - aiohttp и другие библиотеки инкапсулируют низкоуровневые детали. Но для кастомных протоколов или интеграции с legacy системами знание полезно.
Future и низкоуровневое API asyncio - это фундамент, на котором построены корутины и Task. В повседневной работе используете высокоуровневые абстракции, но иногда приходится спускаться ниже - интегрировать callback-based код, работать с блокирующими операциями, реализовывать кастомные протоколы. Понимание внутреннего устройства превращает asyncio из черного ящика в предсказуемый и управляемый инструмент.
Асинхронные генераторы расширяют концепцию обычных генераторов в мир asyncio. Функция с async def и yield создает объект, который можно итерировать через async for. Каждое yield отдает значение и передает управление event loop, позволяя другим корутинам работать.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| import asyncio
async def async_range(start, stop):
"""Асинхронный аналог range()"""
current = start
while current < stop:
yield current
current += 1
await asyncio.sleep(0.1) # Имитация асинхронной работы
async def consume_async_generator():
"""Потребление значений из асинхронного генератора"""
async for value in async_range(0, 5):
print(f"Получено значение: {value}")
asyncio.run(consume_async_generator()) |
|
В отличие от обычного генератора, который блокирует выполнение на yield, асинхронный генератор позволяет event loop переключаться между задачами. Это критично для генераторов, которые выполняют I/O операции между yield.
Реальный пример - потоковое чтение базы данных. Вместо загрузки миллиона записей в память разом, отдаете их порциями через асинхронный генератор.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| async def stream_database_records(pool, batch_size=1000):
"""Потоковое чтение из базы"""
offset = 0
while True:
async with pool.acquire() as conn:
rows = await conn.fetch(
'SELECT * FROM large_table LIMIT $1 OFFSET $2',
batch_size, offset
)
if not rows:
break
for row in rows:
yield dict(row)
offset += batch_size
await asyncio.sleep(0) # Отдаем управление
async def process_large_dataset(pool):
"""Обработка большого датасета без загрузки в память"""
count = 0
async for record in stream_database_records(pool):
await process_record(record)
count += 1
if count % 10000 == 0:
print(f"Обработано записей: {count}") |
|
Я использовал этот подход для ETL процесса с 50 миллионами записей. Синхронная версия падала с OOM при попытке загрузить все в память. Асинхронный генератор держал в памяти только текущий батч - потребление RAM не превышало 200 МБ.
Асинхронные генераторы поддерживают asend() и athrow() для передачи значений и исключений внутрь генератора. Это продвинутая техника для двусторонней коммуникации.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| async def accumulator_generator():
"""Генератор-аккумулятор"""
total = 0
while True:
value = yield total
if value is None:
break
total += value
async def use_accumulator():
"""Использование генератора с отправкой значений"""
gen = accumulator_generator()
# Инициализация генератора
await gen.asend(None)
# Отправляем значения
result = await gen.asend(10) # total = 10
print(f"Текущая сумма: {result}")
result = await gen.asend(20) # total = 30
print(f"Текущая сумма: {result}")
await gen.aclose() # Закрываем генератор |
|
Асинхронные итераторы - это объекты с методами __aiter__() и __anext__(). Аналог обычных итераторов, но методы корутинные. Позволяют создавать кастомные итерируемые объекты с асинхронной логикой.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| class AsyncRangeIterator:
"""Кастомный асинхронный итератор"""
def __init__(self, start, stop, delay=0.1):
self.current = start
self.stop = stop
self.delay = delay
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(self.delay)
value = self.current
self.current += 1
return value
async def use_custom_iterator():
"""Использование кастомного итератора"""
async for value in AsyncRangeIterator(0, 5):
print(f"Значение: {value}") |
|
Встречал код, где нужно было последовательно опрашивать очередь сообщений. Реализовал через асинхронный итератор - метод __anext__() ждал новое сообщение, async for потреблял их естественным образом. Код получился чище, чем с явным while True.
Асинхронные comprehensions упрощают работу с асинхронными итераторами. Синтаксис похож на обычные list/dict comprehensions, но с async for.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| async def async_square_numbers():
"""Возведение в квадрат асинхронно"""
async def async_range(n):
for i in range(n):
await asyncio.sleep(0.1)
yield i
# Асинхронный list comprehension
squares = [x**2 async for x in async_range(10)]
return squares
async def filter_async_data():
"""Фильтрация с условием"""
async def fetch_numbers():
for i in range(20):
await asyncio.sleep(0.05)
yield i
# Comprehension с условием
evens = [x async for x in fetch_numbers() if x % 2 == 0]
return evens |
|
Производительность асинхронных генераторов зависит от частоты await внутри. Если генератор делает yield без await, он фактически синхронный - никакой конкурентности. Только явная передача управления через await позволяет event loop переключаться на другие задачи.
| Python | 1
2
3
4
5
6
7
8
9
10
| # Плохо - псевдоасинхронный генератор
async def fake_async_gen():
for i in range(100):
yield i # Нет await - блокирует loop
# Хорошо - настоящий асинхронный генератор
async def real_async_gen():
for i in range(100):
yield i
await asyncio.sleep(0) # Отдаем управление |
|
Закрытие асинхронных генераторов требует явного вызова aclose(). Без этого генератор останется висеть в памяти с незавершенными ресурсами. Контекстный менеджер или try/finally гарантируют очистку.
| Python | 1
2
3
4
5
6
7
8
9
10
11
| async def safe_generator_usage():
"""Безопасное использование генератора"""
gen = stream_database_records(pool)
try:
async for record in gen:
if should_stop(record):
break
await process_record(record)
finally:
await gen.aclose() # Гарантированное закрытие |
|
Асинхронные генераторы и итераторы - мощный инструмент для работы с потоками данных. Они естественно вписываются в асинхронную архитектуру, позволяя обрабатывать бесконечные или очень большие последовательности без загрузки в память. Правильное использование yield и await превращает последовательную обработку в конкурентную без изменения базовой логики.
Интеграция синхронного и асинхронного кода: run_in_executor и практические стратегии
Реальные проекты редко бывают чисто асинхронными. Всегда найдется библиотека без async версии, legacy код на синхронном стеке, или просто операция, для которой асинхронный аналог не существует. Смешивание синхронного и асинхронного кода требует понимания того, как они взаимодействуют.
Основная проблема - блокирующий вызов в корутине парализует весь event loop. Вызвали time.sleep(5) вместо asyncio.sleep(5) - и все остальные корутины встали на пять секунд. Один синхронный вызов базы через psycopg2 блокирует тысячи асинхронных соединений.
loop.run_in_executor() решает проблему, выполняя блокирующую функцию в отдельном потоке или процессе. Event loop получает Future, который завершится, когда функция отработает. Остальные корутины продолжают работу.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_operation(n):
"""Синхронная блокирующая функция"""
time.sleep(2) # Блокирует поток
return sum(i**2 for i in range(n))
async def async_wrapper(n):
"""Обертка для вызова из асинхронного кода"""
loop = asyncio.get_running_loop()
# Выполняем в thread pool
result = await loop.run_in_executor(None, blocking_operation, n)
return result
async def main():
# Запускаем несколько операций параллельно
tasks = [async_wrapper(1_000_000) for _ in range(3)]
results = await asyncio.gather(*tasks)
print(f"Результаты: {results}")
asyncio.run(main()) |
|
Первый аргумент run_in_executor() - executor. None означает использование дефолтного ThreadPoolExecutor с небольшим пулом потоков. Для CPU-bound задач передайте ProcessPoolExecutor.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| from concurrent.futures import ProcessPoolExecutor
async def cpu_intensive_task(data):
"""CPU-intensive вычисления"""
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
heavy_computation,
data
)
return result |
|
Я переписывал систему обработки изображений - resize, фильтры, watermarks. PIL полностью синхронный. Обернул каждую операцию в run_in_executor() с ProcessPoolExecutor на 4 процесса. Throughput вырос с 15 изображений в секунду до 80. Event loop не блокировался, CPU использовался на 100% всех ядер.
Передача аргументов в run_in_executor() требует внимания. Аргументы сериализуются через pickle для ProcessPoolExecutor. Сложные объекты, сокеты, открытые файлы не сериализуются. Передавайте только простые типы или пути к файлам.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # Плохо - объект сессии не сериализуется
async def broken_approach(session):
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
process_with_session,
session # Ошибка при сериализации
)
# Хорошо - передаем параметры для создания сессии
async def correct_approach(db_url):
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
process_with_url,
db_url # Строка сериализуется без проблем
) |
|
Для библиотек с callback-based API создайте обертку через Future. Callback устанавливает результат в Future, корутина ждет через await.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def wrap_callback_api(param):
"""Обертка для callback-based библиотеки"""
loop = asyncio.get_running_loop()
future = loop.create_future()
def callback(result, error=None):
if error:
loop.call_soon_threadsafe(future.set_exception, error)
else:
loop.call_soon_threadsafe(future.set_result, result)
# Запуск синхронной операции с колбеком
legacy_library.async_operation(param, callback)
return await future |
|
call_soon_threadsafe() критична - callback может вызваться из другого потока. Обычный call_soon() небезопасен при многопоточности.
Встречал код, где забыли threadsafe вариант. Callback из потока записывал в Future, event loop иногда падал с segfault. Добавление _threadsafe решило проблему - защита от race conditions на уровне event loop.
Для частых вызовов блокирующих операций создайте persistent executor. Создание нового executor'а на каждый вызов дорого - пул потоков инициализируется заново.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| class AsyncService:
"""Сервис с переиспользуемым executor'ом"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10)
async def blocking_call(self, data):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
sync_operation,
data
)
async def cleanup(self):
"""Обязательная очистка ресурсов"""
self.executor.shutdown(wait=True) |
|
Баланс между ThreadPoolExecutor и ProcessPoolExecutor зависит от задачи. I/O-bound блокирующие операции - threads. CPU-bound вычисления - processes. Потоки легче, процессы мощнее, но дороже. Я использовал thread pool для вызовов синхронного Redis-клиента - 50 запросов в секунду, каждый 10-20 мс. Overhead на создание процесса превысил бы время самой операции. Process pool применял для обработки видео - каждый файл 5-10 секунд CPU-intensive работы.
Синхронный код внутри корутины без executor'а - типичная ошибка. Код работает, но блокирует event loop. Профилирование показывает странные задержки, latency плавает. Всегда оборачивайте блокирующие операции.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| # Плохо - блокирует event loop
async def bad_approach():
data = requests.get("https://api.example.com") # Блокирует
return data.json()
# Хорошо - через executor
async def good_approach():
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None,
lambda: requests.get("https://api.example.com").json()
)
# Еще лучше - асинхронная библиотека
async def best_approach():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
return await response.json() |
|
Третий вариант всегда предпочтительнее. Executor - это костыль для случаев, когда асинхронного аналога нет. Где возможно - используйте нативные async библиотеки.
Интеграция синхронного и асинхронного кода - компромисс. Вы получаете возможность использовать legacy код, но теряете часть эффективности asyncio. ThreadPoolExecutor добавляет overhead на переключение контекста, ProcessPoolExecutor - на сериализацию данных. Минимизируйте количество таких вызовов, но не бойтесь использовать, когда это единственный разумный вариант.
python. flet. asyncio.Queue Всем привет, есть проблема с понимаем как работают очереди и/или проблема с тем, что уже... Потоки, процессы и асинхронное программирование Добрый вечер.
Почитал документацию и по моему я только еще больше запутался.
Есть модуль... Асинхронное программирование Не всегда включаюсь в тему асинхронного программирования, рассказали бы в краткой форме,... С чего начать разбираться с Asyncio? нужно написать парсер страницы, с загрузкой некоторых картинок, на Asyncio.
С многопоточным и... Коннектор к API используя AsyncIO, aiohttp Py3.5.
Работаю с библиотекой, подключаюсь к api telegram. Получаю сообщения и отправляю их в... Выйти из петли в run_forever() [asyncio] Пишу скрипт, который эмулирует запуск игр в стиме.
Для этого была выбрана библиотека Steam для... Asyncio корректное завершение в случае непредвиденной остановки программы Есть скрипт (выложен не полностью)
@asyncio.coroutine
def do_work(data):
for adres, ip_list in... Asyncio - coroutine vs future vs task Доброго времени суток, форумчане!
Приступил к изучению asyncio. Использую python 3.5.
Гуру... Можно ли поставить таски на паузу? (asyncio) Всем привет! Имеется такой код:
from aiohttp import web
import asyncio
async def... Asyncio ошибка работы парсера Здравствуйте, имеется парсер на Python в связке asyncio + aiohttp + threadPoolExecutor + lxml.... WxPython и asyncio Добрый день.
Подскажите можно ли в приложении wxPython использовать модуль asyncio?
Как я понимаю... Параллельный запуск 2 методов через asyncio Есть необходимость сделать код, продолжающий работу даже при поступлении запроса. Пересмотрела и...
|