Форум программистов, компьютерный форум, киберфорум
py-thonny
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  

asyncio и асинхронное программирование в Python: паттерны, футуры, примеры, работа с БД

Запись от py-thonny размещена 07.10.2025 в 21:14. Обновил(-а) py-thonny 07.10.2025 в 21:23
Показов 4646 Комментарии 0

Нажмите на изображение для увеличения
Название: asyncio и асинхронное программирование в Python паттерны, футуры, примеры, работа с БД.jpg
Просмотров: 495
Размер:	89.4 Кб
ID:	11268
1. asyncio и асинхронное программирование в Python: конкурентность, корутины, таски, async/await, event loop
2. asyncio и асинхронное программирование в Python: паттерны, футуры, примеры, работа с БД
3. asyncio и асинхронное программирование в Python: синхронизация, продвинутые примеры, асинхронный парсер

Практические паттерны



Асинхронный код обрастает устоявшимися паттернами, которые решают типичные задачи элегантно и эффективно. Разберем самые востребованные.

Паттерн "Производитель-потребитель" - классика для обработки потока данных. Один или несколько производителей генерируют задачи, потребители их обрабатывают. Очередь asyncio.Queue координирует работу без явных блокировок.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
from asyncio import Queue
 
async def producer(queue: Queue, producer_id: int):
    """Генерирует задачи"""
    for i in range(5):
        item = f"задача_{producer_id}_{i}"
        await queue.put(item)
        print(f"Производитель {producer_id} добавил {item}")
        await asyncio.sleep(0.5)
    
async def consumer(queue: Queue, consumer_id: int):
    """Обрабатывает задачи"""
    while True:
        item = await queue.get()
        if item is None:  # Сигнал завершения
            queue.task_done()
            break
        print(f"Потребитель {consumer_id} обрабатывает {item}")
        await asyncio.sleep(1)  # Имитация работы
        queue.task_done()
 
async def main():
    queue = Queue(maxsize=10)
    
    # Запускаем производителей и потребителей
    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
    
    # Ждем завершения производителей
    await asyncio.gather(*producers)
    
    # Отправляем сигналы завершения потребителям
    for _ in consumers:
        await queue.put(None)
    
    # Ждем завершения потребителей
    await asyncio.gather(*consumers)
 
asyncio.run(main())
Очередь буферизует задачи - если производители быстрее потребителей, задачи накапливаются до maxsize. При переполнении put() блокируется до освобождения места. Это естественное регулирование нагрузки без костылей.

Паттерн "Семафор для ограничения конкурентности" - когда нужно ограничить количество одновременных операций. Например, API позволяет максимум 10 запросов в секунду. Семафор выдает разрешения, корутины ждут своей очереди.

Python
1
2
3
4
5
6
7
8
9
10
11
12
async def rate_limited_request(url: str, semaphore: asyncio.Semaphore):
    """HTTP запрос с ограничением конкурентности"""
    async with semaphore:  # Захватываем разрешение
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.text()
 
async def fetch_many_urls(urls: list[str]):
    semaphore = asyncio.Semaphore(10)  # Максимум 10 одновременно
    tasks = [rate_limited_request(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results
Использовал это для парсера вакансий - 5000 URL, ограничение сервера 20 запросов в секунду. Семафор гарантировал соблюдение лимита без ручного управления паузами.

Паттерн "Retry с экспоненциальной задержкой" - сетевые запросы падают, это норма. Повторяем попытки с увеличивающимися паузами: 1с, 2с, 4с, 8с. Защита от флуда при временных проблемах.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def fetch_with_retry(url: str, max_retries: int = 3):
    """Запрос с повторными попытками"""
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, timeout=10) as response:
                    response.raise_for_status()
                    return await response.json()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == max_retries - 1:
                raise  # Последняя попытка - пробрасываем ошибку
            
            delay = 2 ** attempt  # Экспоненциальная задержка
            print(f"Попытка {attempt + 1} не удалась, жду {delay}с")
            await asyncio.sleep(delay)
Паттерн "Контекстный менеджер для ресурсов" - соединения с базой, пулы, сессии требуют очистки. Кастомный асинхронный контекстный менеджер инкапсулирует логику.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class DatabaseConnection:
    """Асинхронный контекстный менеджер для соединения с БД"""
    
    async def __aenter__(self):
        self.conn = await asyncpg.connect("postgresql://localhost/db")
        return self.conn
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.conn.close()
        # Возвращаем False, чтобы исключение пробрасывалось дальше
        return False
 
async def use_database():
    async with DatabaseConnection() as conn:
        result = await conn.fetch("SELECT * FROM users")
        return result
    # Соединение автоматически закрыто
Паттерн "Batching для оптимизации запросов" - вместо 100 отдельных запросов к базе делаем один батч. Накапливаем операции, выполняем пачкой.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class BatchedWriter:
    def __init__(self, batch_size: int = 100):
        self.batch_size = batch_size
        self.buffer = []
        self.conn = None
    
    async def write(self, item):
        self.buffer.append(item)
        if len(self.buffer) >= self.batch_size:
            await self.flush()
    
    async def flush(self):
        if not self.buffer:
            return
        
        await self.conn.executemany(
            "INSERT INTO data VALUES ($1, $2)",
            self.buffer
        )
        self.buffer.clear()
Эти паттерны покрывают 80% реальных задач. Остальное - комбинации и вариации базовых техник. Главное - понимать, когда применять какой подход.

asyncio.wait vs asyncio.gather
Всем привет! :victory: Вопросы внизу. Есть два похожих теста: #1 - asyncio.wait import...

Асинхронное получение данных посредством asyncio.gather
Доброго времени суток, вопрос такой, когда приходят ссылки на обработку, (все уходят), но...

Асинхронное программирование в Python
Доброго времени суток. Столкнулся с проблемой что в коде вызывается функция которая ждёт условные...

Python asyncio / aiohttp API 429 response error
Пытаюсь написать асинхронный API-запрос, нашел подходящий пример, но в ответ получаю ошибку: "429...


Конкурентные HTTP-запросы



Реальные приложения редко делают один HTTP-запрос за раз. Обычно нужно опросить десятки API, собрать данные с сотни URL, синхронизировать информацию из множества источников. Синхронный подход превращает такие задачи в марафон ожидания, асинхронный - в спринт.

Базовый паттерн - создать список корутин для всех запросов и выполнить через gather(). Каждая корутина работает независимо, ждет только свой ответ, не блокируя остальные.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import aiohttp
import asyncio
 
async def fetch_url(session, url):
    """Загружает содержимое одного URL"""
    async with session.get(url) as response:
        return await response.text()
 
async def fetch_all(urls):
    """Параллельная загрузка множества URL"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    return results
 
# Использование
urls = [
    "https://api.example.com/data1",
    "https://api.example.com/data2",
    "https://api.example.com/data3",
]
results = asyncio.run(fetch_all(urls))
Критичный момент - переиспользование ClientSession. Создание новой сессии на каждый запрос убивает производительность. Сессия управляет connection pool, переиспользует TCP-соединения, кеширует DNS-резолвинг. Один раз создали - используем для всех запросов внутри контекста.

Я замерял разницу: 100 запросов с созданием сессии каждый раз - 8 секунд, с одной переиспользуемой сессией - 1.2 секунды. Overhead на установку SSL-соединений и TCP handshake огромен.

Для тонкого контроля над конкурентностью используем семафор. API часто ограничивают количество одновременных запросов с одного IP. Превысили лимит - получили 429 Too Many Requests. Семафор выдает "пропуска" на выполнение, остальные ждут освобождения.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def fetch_with_limit(session, url, semaphore):
    """Запрос с контролем конкурентности"""
    async with semaphore:
        async with session.get(url) as response:
            return await response.json()
 
async def fetch_many_controlled(urls, limit=10):
    """Загрузка с ограничением одновременных запросов"""
    semaphore = asyncio.Semaphore(limit)
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_limit(session, url, semaphore) 
            for url in urls
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)
Обработка ошибок требует особого внимания. Один упавший запрос не должен убивать остальные. return_exceptions=True в gather() собирает успешные результаты и исключения в один список.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def robust_fetch(urls):
    """Надежная загрузка с обработкой ошибок"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Разделяем успешные и неудачные
    successful = []
    failed = []
    
    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            failed.append((url, str(result)))
        else:
            successful.append((url, result))
    
    return successful, failed
Таймауты предотвращают зависание на медленных серверах. Без таймаутов один битый эндпоинт заставит ждать все приложение. ClientTimeout настраивает лимиты для разных фаз запроса.

Python
1
2
3
4
5
6
7
8
9
async def fetch_with_timeout(url, timeout_seconds=5):
    """Запрос с общим таймаутом"""
    timeout = aiohttp.ClientTimeout(total=timeout_seconds)
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url) as response:
                return await response.text()
    except asyncio.TimeoutError:
        return None  # Или пробросить исключение
Для API с пагинацией нужен другой подход - последовательная загрузка страниц, пока есть данные. Но внутри каждой страницы можно распараллелить обработку элементов.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def fetch_paginated_api(base_url):
    """Загрузка API с пагинацией"""
    all_data = []
    page = 1
    
    async with aiohttp.ClientSession() as session:
        while True:
            url = f"{base_url}?page={page}"
            async with session.get(url) as response:
                data = await response.json()
                
                if not data.get("items"):
                    break  # Достигли конца
                
                all_data.extend(data["items"])
                page += 1
                
                if page > data.get("total_pages", page):
                    break
    
    return all_data
Продвинутый паттерн - динамическая очередь с обработкой по мере готовности. asyncio.as_completed() возвращает корутины в порядке завершения, не дожидаясь самой медленной.

Python
1
2
3
4
5
6
7
8
9
10
11
12
async def process_as_ready(urls):
    """Обработка результатов по мере поступления"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        
        for coro in asyncio.as_completed(tasks):
            try:
                result = await coro
                # Обрабатываем результат немедленно
                process_data(result)
            except Exception as e:
                handle_error(e)
Реальный кейс из практики: парсер цен для мониторинга конкурентов. 500 товаров на 8 маркетплейсах - 4000 запросов. Синхронный код с requests работал 22 минуты. Первая asyncio версия без оптимизаций - 8 минут. С семафором на 50 одновременных запросов и переиспользованием сессии - 47 секунд. С добавлением connection pool размером 100 - 31 секунда.

Но есть подводный камень. Если сервер отдает данные медленно, тысячи открытых соединений съедят всю память. Семафор обязателен для контроля ресурсов. Я встречал систему, которая падала с OOM при парсинге 10000 страниц без ограничений - все соединения висели в памяти.

Настройка connection pool критична. TCPConnector в aiohttp управляет переиспользованием соединений. limit контролирует максимум соединений на хост, limit_per_host - общий лимит.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def optimized_fetcher(urls):
    """Оптимизированная загрузка с настройкой пула"""
    connector = aiohttp.TCPConnector(
        limit=100,           # Максимум 100 соединений всего
        limit_per_host=20,   # Максимум 20 на один хост
        ttl_dns_cache=300    # Кеш DNS на 5 минут
    )
    
    async with aiohttp.ClientSession(connector=connector) as session:
        semaphore = asyncio.Semaphore(50)
        tasks = [
            fetch_with_limit(session, url, semaphore) 
            for url in urls
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)
Еще один паттерн - retry с экспоненциальной задержкой для временных сбоев. Сеть нестабильна, серверы падают, таймауты случаются. Умные повторы увеличивают надежность без костылей.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def fetch_with_retry(session, url, max_attempts=3):
    """Запрос с повторными попытками"""
    for attempt in range(max_attempts):
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
        except aiohttp.ClientError as e:
            if attempt == max_attempts - 1:
                raise  # Последняя попытка - сдаемся
            
            # Экспоненциальная задержка: 1s, 2s, 4s
            delay = 2 ** attempt
            await asyncio.sleep(delay)
Конкурентные HTTP-запросы - основной use case для asyncio. Здесь асинхронность показывает максимальную эффективность, превращая часы ожидания в минуты работы. Но требует понимания лимитов, таймаутов, обработки ошибок и управления ресурсами. Наивная реализация может быть хуже синхронной.

Паттерны обработки ошибок в асинхронном коде: try/except, обработка исключений в группах задач



Обработка ошибок в асинхронном коде коварнее, чем кажется. Исключение может всплыть в неожиданном месте, потеряться в фоновой задаче или разрушить весь пайплайн из-за одного упавшего запроса. Нужны четкие паттерны, иначе debugging превратится в ад.

Базовый try/except работает с корутинами так же, как с обычными функциями. Оборачиваете await в блок, ловите специфичные исключения, обрабатываете или пробрасываете дальше.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def safe_fetch(url):
    """Запрос с базовой обработкой ошибок"""
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=5) as response:
                return await response.json()
    except aiohttp.ClientError as e:
        print(f"Ошибка сети: {e}")
        return None
    except asyncio.TimeoutError:
        print(f"Таймаут для {url}")
        return None
    except Exception as e:
        print(f"Неожиданная ошибка: {e}")
        raise
Но в асинхронном коде часто запускаете десятки корутин одновременно. Если одна упадет, что произойдет с остальными? По умолчанию asyncio.gather() прерывается при первой ошибке и пробрасывает исключение наверх. Все незавершенные корутины отменяются.

Python
1
2
3
4
5
6
7
8
9
10
async def fetch_multiple_fail_fast(urls):
    """При ошибке прерываем все"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        try:
            results = await asyncio.gather(*tasks)
            return results
        except aiohttp.ClientError as e:
            print(f"Одна из задач упала: {e}")
            return []
Такое поведение редко нужно. Обычно хотите собрать все результаты, включая ошибки. Параметр return_exceptions=True превращает gather() в более терпимый режим - возвращает список с результатами и исключениями.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def fetch_multiple_tolerant(urls):
    """Собираем все результаты, включая ошибки"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Разделяем успешные и неудачные
    successful = []
    failed = []
    
    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            failed.append((url, result))
        else:
            successful.append((url, result))
    
    return successful, failed
Я использовал этот паттерн в парсере вакансий с 50 источников. Три сайта регулярно отваливались с таймаутами, но это не должно было блокировать остальные 47. return_exceptions=True позволял собирать данные с работающих источников и логировать проблемные.

Для более детального контроля над каждой задачей оборачивайте корутины в индивидуальные try/except блоки. Так можете применять разную логику обработки ошибок для разных типов операций.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def robust_operation(url):
    """Каждая операция с собственной обработкой ошибок"""
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                data = await response.json()
                return {"status": "success", "data": data}
    except aiohttp.ClientError as e:
        return {"status": "network_error", "error": str(e)}
    except asyncio.TimeoutError:
        return {"status": "timeout", "error": "Request timed out"}
    except json.JSONDecodeError as e:
        return {"status": "parse_error", "error": str(e)}
 
async def process_many(urls):
    """Обработка с детальным статусом каждой операции"""
    tasks = [robust_operation(url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    # Теперь можем анализировать статусы
    success_count = sum(1 for r in results if r["status"] == "success")
    print(f"Успешно: {success_count}/{len(results)}")
    
    return results
Фоновые задачи требуют особого внимания. Если создали Task через create_task() и не сделали await, исключение может потеряться. Python выдаст предупреждение, но программа продолжит работу с мертвой фоновой задачей.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def background_worker():
    """Фоновая задача с обработкой ошибок"""
    try:
        while True:
            await process_queue()
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Задача отменена, очищаем ресурсы")
        await cleanup()
        raise  # Важно пробросить CancelledError
    except Exception as e:
        print(f"Фоновая задача упала: {e}")
        # Логируем, уведомляем, но не роняем приложение
        raise
 
async def main():
    task = asyncio.create_task(background_worker())
    
    try:
        await asyncio.sleep(10)
    finally:
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass  # Ожидаемая отмена
Python 3.11 принес ExceptionGroup - способ группировать несвязанные исключения. Полезно, когда несколько корутин упали независимо друг от друга. Новый синтаксис except* ловит исключения внутри группы.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async def failing_tasks():
    """Несколько задач с разными ошибками"""
    async def task_a():
        await asyncio.sleep(0.1)
        raise ValueError("Ошибка в A")
    
    async def task_b():
        await asyncio.sleep(0.2)
        raise TypeError("Ошибка в B")
    
    async def task_c():
        await asyncio.sleep(0.15)
        raise KeyError("Ошибка в C")
    
    results = await asyncio.gather(
        task_a(), task_b(), task_c(),
        return_exceptions=True
    )
    
    # Собираем исключения в группу
    exceptions = [r for r in results if isinstance(r, Exception)]
    if exceptions:
        raise ExceptionGroup("Множественные ошибки", exceptions)
 
async def handle_grouped_errors():
    """Обработка группы исключений"""
    try:
        await failing_tasks()
    except* ValueError as eg:
        print(f"Поймали ValueError: {eg.exceptions}")
    except* TypeError as eg:
        print(f"Поймали TypeError: {eg.exceptions}")
    except* KeyError as eg:
        print(f"Поймали KeyError: {eg.exceptions}")
Встречал баг, где фоновая задача обработки очереди упала с необработанным исключением через час работы. Очередь перестала обрабатываться, но никто не заметил - основное приложение работало. Добавил wrapper с логированием всех исключений и мониторингом heartbeat. Теперь при падении фоновой задачи срабатывает alert.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def monitored_background_task(task_name: str):
    """Фоновая задача с мониторингом"""
    last_heartbeat = time.time()
    
    try:
        while True:
            await do_work()
            last_heartbeat = time.time()
            await report_heartbeat(task_name, last_heartbeat)
            await asyncio.sleep(1)
    except Exception as e:
        await send_alert(f"Task {task_name} failed: {e}")
        await log_exception(task_name, e)
        raise
Обработка ошибок в асинхронном коде - не формальность. Потерянное исключение в фоновой задаче может означать незакрытые соединения, утечки памяти, невыполненные операции. Явная обработка, логирование, мониторинг - не паранойя, а необходимость для надежных систем.

Асинхронная работа с базами данных



База данных - типичный источник блокирующих операций. Запрос выполняется десятки миллисекунд, иногда секунды. В синхронном коде приложение замирает на каждом SELECT. В асинхронном - переключается на другие задачи, пока база думает.

Стандартные драйверы вроде psycopg2 для PostgreSQL или pymysql для MySQL блокируют поток. Нужны асинхронные аналоги: asyncpg для PostgreSQL, aiomysql для MySQL, motor для MongoDB. Они реализуют неблокирующий протокол общения с базой.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncpg
import asyncio
 
async def fetch_users():
    """Асинхронное подключение и запрос"""
    conn = await asyncpg.connect(
        host='localhost',
        database='mydb',
        user='postgres',
        password='secret'
    )
    
    try:
        rows = await conn.fetch('SELECT id, name FROM users WHERE active = $1', True)
        return [dict(row) for row in rows]
    finally:
        await conn.close()
Создание соединения - тяжелая операция. TCP handshake, аутентификация, подготовка сессии. Делать это на каждый запрос расточительно. Connection pool решает проблему - создает пул соединений при старте, переиспользует их между запросами.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def setup_database():
    """Создание пула соединений"""
    pool = await asyncpg.create_pool(
        host='localhost',
        database='mydb',
        user='postgres',
        password='secret',
        min_size=5,    # Минимум соединений
        max_size=20    # Максимум соединений
    )
    return pool
 
async def get_user_by_id(pool, user_id: int):
    """Запрос через пул"""
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            user_id
        )
        return dict(row) if row else None
 
async def main():
    pool = await setup_database()
    
    try:
        # Параллельные запросы к базе
        tasks = [get_user_by_id(pool, i) for i in range(1, 11)]
        users = await asyncio.gather(*tasks)
        print(f"Загружено пользователей: {len([u for u in users if u])}")
    finally:
        await pool.close()
 
asyncio.run(main())
Пул управляет соединениями автоматически. Когда делаете pool.acquire(), получаете свободное соединение или ждете, если все заняты. После выхода из контекста соединение возвращается в пул, а не закрывается.

Я разрабатывал API для аналитики с PostgreSQL. Синхронный код на Flask обрабатывал 50 запросов в секунду - каждый запрос висел на базе 200 мс. Переписал на FastAPI с asyncpg и пулом на 10 соединений. Throughput вырос до 400 запросов в секунду на том же железе. База не стала быстрее, но приложение научилось эффективно использовать время ожидания.

Транзакции в асинхронном коде требуют явного управления через контекстный менеджер или методы begin()/`commit()`. Нельзя просто начать транзакцию - нужно удерживать соединение до завершения.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def transfer_money(pool, from_id: int, to_id: int, amount: float):
    """Перевод денег с обработкой транзакции"""
    async with pool.acquire() as conn:
        async with conn.transaction():
            # Проверяем баланс
            balance = await conn.fetchval(
                'SELECT balance FROM accounts WHERE id = $1',
                from_id
            )
            
            if balance < amount:
                raise ValueError("Недостаточно средств")
            
            # Списываем у отправителя
            await conn.execute(
                'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
                amount, from_id
            )
            
            # Начисляем получателю
            await conn.execute(
                'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
                amount, to_id
            )
            
            # При выходе из блока транзакция автоматически commit'ится
            # При исключении - rollback
Если транзакция падает с исключением, conn.transaction() автоматически откатывает изменения. Это защищает от частичного выполнения операций.

Батчинг запросов - мощная оптимизация. Вместо N отдельных INSERT делаете один executemany(). База обрабатывает пачку эффективнее, чем N одиночных команд.

Python
1
2
3
4
5
6
7
8
9
10
11
async def bulk_insert_users(pool, users: list):
    """Массовая вставка пользователей"""
    async with pool.acquire() as conn:
        await conn.executemany(
            'INSERT INTO users (name, email, created_at) VALUES ($1, $2, $3)',
            [(u['name'], u['email'], datetime.now()) for u in users]
        )
 
# Вместо цикла с N запросами:
# for user in users:
#     await conn.execute('INSERT INTO users ...')
В проекте с импортом данных переход от отдельных INSERT к батчингу сократил время загрузки 10000 записей с 45 секунд до 3.2 секунды. База тратила меньше времени на обработку протокола и больше - на реальную работу.

Prepared statements ускоряют повторяющиеся запросы. База один раз парсит SQL, строит план выполнения, затем переиспользует его с разными параметрами.

Python
1
2
3
4
5
6
7
8
9
10
11
async def query_with_prepared_statement(pool):
    """Использование prepared statement"""
    async with pool.acquire() as conn:
        # Готовим statement
        stmt = await conn.prepare('SELECT * FROM users WHERE age > $1 AND country = $2')
        
        # Выполняем с разными параметрами
        young_users = await stmt.fetch(18, 'RU')
        middle_aged = await stmt.fetch(35, 'RU')
        
        return young_users, middle_aged
Для ORM есть асинхронные версии. SQLAlchemy с версии 1.4 поддерживает async/await через AsyncSession. Tortoise ORM и SQLModel заточены под asyncio изначально.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
 
engine = create_async_engine(
    'postgresql+asyncpg://user:pass@localhost/db',
    echo=True
)
 
async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)
 
async def get_users_orm():
    """Запрос через SQLAlchemy ORM"""
    async with async_session() as session:
        result = await session.execute(
            select(User).where(User.active == True)
        )
        users = result.scalars().all()
        return users
ORM добавляет overhead, но упрощает работу со сложными схемами. Выбор между raw SQL и ORM зависит от проекта. Для высоконагруженных API с простыми запросами предпочитаю asyncpg напрямую. Для бизнес-логики с десятками таблиц и связей - ORM экономит время разработки.

Мониторинг пула соединений критичен для production. Если все соединения заняты, новые запросы будут ждать. Метрики pool.get_size(), pool.get_idle_size() показывают состояние пула.

Python
1
2
3
4
5
6
7
async def monitor_pool(pool):
    """Периодический мониторинг состояния пула"""
    while True:
        print(f"Размер пула: {pool.get_size()}")
        print(f"Свободных: {pool.get_idle_size()}")
        print(f"Используется: {pool.get_size() - pool.get_idle_size()}")
        await asyncio.sleep(10)
Видел систему, где пул из 5 соединений не справлялся с нагрузкой. Запросы накапливались в очереди, latency рос до 5 секунд. Увеличение пула до 15 соединений решило проблему - база выдерживала, но приложение не могло эффективно использовать ее мощность.

Асинхронная работа с базами - не волшебство. База выполняет запросы с той же скоростью. Выигрыш в том, что приложение не простаивает во время этого выполнения. Один поток обслуживает сотни запросов, пока база обрабатывает их асинхронно. Для высоконагруженных систем с частыми обращениями к базе это дает кратный рост throughput.

Обработка потоков данных



Потоковая обработка данных - это когда информация поступает непрерывно, порциями, и обрабатывать ее нужно по мере поступления, а не дожидаясь полной загрузки. Asyncio идеально подходит для таких задач - можно читать следующую порцию, пока предыдущая обрабатывается.

Классический пример - чтение большого файла построчно. В синхронном коде читаете строку, обрабатываете, читаете следующую. В асинхронном - можете запустить обработку предыдущих строк, пока читаются новые.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
 
async def process_line(line: str) -> dict:
    """Обработка одной строки данных"""
    # Имитация тяжелой обработки
    await asyncio.sleep(0.1)
    return {"data": line.strip(), "length": len(line)}
 
async def stream_file_lines(filepath: str):
    """Генератор для потокового чтения файла"""
    async with aiofiles.open(filepath, mode='r') as file:
        async for line in file:
            yield line
 
async def process_file_stream(filepath: str, batch_size: int = 10):
    """Обработка файла порциями"""
    batch = []
    
    async for line in stream_file_lines(filepath):
        batch.append(line)
        
        if len(batch) >= batch_size:
            # Обрабатываем порцию параллельно
            results = await asyncio.gather(
                *[process_line(line) for line in batch]
            )
            
            # Сохраняем результаты или отправляем дальше
            for result in results:
                await save_result(result)
            
            batch.clear()
    
    # Обрабатываем остаток
    if batch:
        results = await asyncio.gather(
            *[process_line(line) for line in batch]
        )
Использовал этот паттерн для обработки логов веб-сервера - 50 ГБ текста. Синхронная версия читала файл за 8 минут, обрабатывала каждую строку последовательно - итого 45 минут. Асинхронная с батчингом по 100 строк работала 12 минут - чтение и обработка шли параллельно.

Для потоковой обработки HTTP-ответов используется response.content.iter_chunked(). Большой файл скачивается по частям, каждая обрабатывается сразу без ожидания полной загрузки.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def download_and_process_stream(url: str, chunk_size: int = 8192):
    """Потоковая загрузка и обработка"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            total_size = 0
            
            async for chunk in response.content.iter_chunked(chunk_size):
                # Обрабатываем chunk немедленно
                await process_chunk(chunk)
                total_size += len(chunk)
                
                # Показываем прогресс
                print(f"Обработано: {total_size / 1024:.2f} КБ")
    
    return total_size
Очереди asyncio.Queue превращаются в конвейер обработки. Один producer читает данные и складывает в очередь, несколько consumers забирают и обрабатывают параллельно.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
async def stream_producer(queue: asyncio.Queue, source):
    """Читает данные и помещает в очередь"""
    async for item in source:
        await queue.put(item)
    
    # Сигнал завершения
    await queue.put(None)
 
async def stream_consumer(queue: asyncio.Queue, consumer_id: int):
    """Обрабатывает элементы из очереди"""
    while True:
        item = await queue.get()
        
        if item is None:
            queue.task_done()
            break
        
        try:
            result = await process_item(item)
            await send_to_output(result)
        except Exception as e:
            print(f"Consumer {consumer_id} error: {e}")
        finally:
            queue.task_done()
 
async def stream_pipeline(source, num_consumers: int = 3):
    """Конвейер обработки потока"""
    queue = asyncio.Queue(maxsize=50)  # Буфер на 50 элементов
    
    # Запускаем producer и consumers
    producer = asyncio.create_task(stream_producer(queue, source))
    consumers = [
        asyncio.create_task(stream_consumer(queue, i))
        for i in range(num_consumers)
    ]
    
    # Ждем завершения producer
    await producer
    
    # Отправляем сигналы завершения всем consumers
    for _ in consumers:
        await queue.put(None)
    
    # Ждем завершения всех consumers
    await asyncio.gather(*consumers)
В проекте по обработке финансовых транзакций использовал трехступенчатый конвейер: парсинг сырых данных → валидация → запись в базу. Каждый этап - отдельная очередь с consumers. Throughput достиг 15000 транзакций в секунду на одном процессе.

Асинхронные генераторы упрощают создание потоковых источников. Функция с async def и yield создает генератор, который можно итерировать через async for.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def fetch_paginated_stream(api_url: str):
    """Потоковая загрузка данных с пагинацией"""
    page = 1
    
    async with aiohttp.ClientSession() as session:
        while True:
            url = f"{api_url}?page={page}"
            async with session.get(url) as response:
                data = await response.json()
                
                if not data['items']:
                    break
                
                # Отдаем элементы по одному
                for item in data['items']:
                    yield item
                
                page += 1
 
async def process_api_stream():
    """Обработка потока из API"""
    async for item in fetch_paginated_stream("https://api.example.com/data"):
        await process_item(item)
Backpressure - механизм контроля скорости потока. Если consumers не успевают обрабатывать, producer должен замедлиться. Queue с maxsize реализует это автоматически - put() блокируется при переполнении.

Python
1
2
3
4
5
6
7
8
9
async def controlled_stream_producer(queue: asyncio.Queue):
    """Producer с учетом backpressure"""
    for i in range(1000):
        item = await fetch_data(i)
        
        # Если очередь полна, put() будет ждать
        await queue.put(item)
        
        # Producer автоматически замедляется
Видел систему, где producer заливал данные быстрее, чем consumers успевали обрабатывать. Без ограничения очереди память росла до OOM. Добавление maxsize=100 стабилизировало потребление - producer ждал, когда освободится место.

Потоковая обработка особенно эффективна для ETL процессов - extract, transform, load. Данные текут через цепочку трансформаций без промежуточного сохранения полного датасета в памяти. Обрабатываете терабайты, используя мегабайты RAM.

Работа с Future и низкоуровневым API



Future - это обещание результата, который станет доступен в будущем. Низкоуровневый примитив, на котором построены корутины и Task. В обычном коде редко работаете с Future напрямую, но понимание механизма помогает диагностировать сложные проблемы и писать кастомные расширения для asyncio.

Future хранит три состояния: pending (ожидание), cancelled (отменен), done (завершен). В состоянии done содержит либо результат, либо исключение. Попытка получить результат из pending Future вызовет InvalidStateError.

Python
1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
 
# Создание Future вручную
loop = asyncio.get_event_loop()
future = loop.create_future()
 
print(f"Состояние: done={future.done()}, cancelled={future.cancelled()}")
 
# Установка результата
future.set_result(42)
print(f"После set_result: done={future.done()}")
print(f"Результат: {future.result()}")
В реальности Future создается внутри asyncio - когда запускаете Task или делаете низкоуровневый вызов. Но иногда нужно создать Future самостоятельно для интеграции синхронного и асинхронного кода.

Классический сценарий - обработка callback-based API. Старая библиотека работает через колбеки, а вам нужен await. Оборачиваете вызов в корутину, которая создает Future и устанавливает результат в колбеке.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def legacy_async_operation(callback):
    """Старая функция с callback"""
    def delayed_work():
        import time
        time.sleep(1)
        callback("результат работы")
    
    import threading
    threading.Thread(target=delayed_work).start()
 
async def wrap_legacy_operation():
    """Обертка для использования с await"""
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    
    def callback(result):
        # Устанавливаем результат в Future
        loop.call_soon_threadsafe(future.set_result, result)
    
    legacy_async_operation(callback)
    return await future
 
# Теперь можно использовать с await
result = await wrap_legacy_operation()
Я интегрировал старый SOAP-клиент, который работал на колбеках, в современное asyncio приложение. Создавал Future для каждого запроса, колбек устанавливал результат. Код стал выглядеть однородно, без callback hell.
loop.call_soon() и loop.call_later() планируют выполнение функций в event loop. call_soon() добавляет в очередь на следующую итерацию, call_later() - с задержкой.

Python
1
2
3
4
5
6
7
8
9
10
def sync_callback():
    print("Callback выполнен")
 
loop = asyncio.get_running_loop()
 
# Выполнится на следующей итерации loop
loop.call_soon(sync_callback)
 
# Выполнится через 2 секунды
loop.call_later(2.0, sync_callback)
Эти методы работают только с обычными функциями, не с корутинами. Для корутин используйте create_task() или ensure_future().
loop.run_in_executor() запускает блокирующую функцию в отдельном потоке или процессе, возвращая Future. Единственный способ безопасно вызвать блокирующий код из асинхронного контекста.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import concurrent.futures
 
def blocking_operation(n):
    """CPU-intensive или блокирующая операция"""
    result = sum(i**2 for i in range(n))
    return result
 
async def async_wrapper():
    loop = asyncio.get_running_loop()
    
    # Выполняем в thread pool
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, 
            blocking_operation, 
            1_000_000
        )
    
    return result
Встречал проект, где в асинхронном обработчике вызывали синхронную библиотеку для работы с PDF - операция занимала 500 мс и блокировала event loop. Обернул вызов в run_in_executor() с ProcessPoolExecutor - блокировки исчезли, latency API упал с 600 мс до 50 мс.
asyncio.ensure_future() преобразует awaitable объект в Future или Task. Если передали корутину - создается Task, если Future - возвращается как есть. Универсальная обертка для планирования асинхронных операций.

Python
1
2
3
4
5
6
7
async def some_coroutine():
    await asyncio.sleep(1)
    return "готово"
 
# Эти два вызова эквивалентны
task1 = asyncio.create_task(some_coroutine())
task2 = asyncio.ensure_future(some_coroutine())
Низкоуровневое API включает работу с сокетами напрямую. loop.sock_connect(), loop.sock_recv(), loop.sock_sendall() - неблокирующие операции над сокетами без высокоуровневых протоколов.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import socket
 
async def raw_socket_client():
    loop = asyncio.get_running_loop()
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setblocking(False)
    
    await loop.sock_connect(sock, ('example.com', 80))
    
    request = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
    await loop.sock_sendall(sock, request)
    
    response = await loop.sock_recv(sock, 4096)
    
    sock.close()
    return response.decode()
Редко нужно опускаться на этот уровень - aiohttp и другие библиотеки инкапсулируют низкоуровневые детали. Но для кастомных протоколов или интеграции с legacy системами знание полезно.

Future и низкоуровневое API asyncio - это фундамент, на котором построены корутины и Task. В повседневной работе используете высокоуровневые абстракции, но иногда приходится спускаться ниже - интегрировать callback-based код, работать с блокирующими операциями, реализовывать кастомные протоколы. Понимание внутреннего устройства превращает asyncio из черного ящика в предсказуемый и управляемый инструмент.

Асинхронные генераторы расширяют концепцию обычных генераторов в мир asyncio. Функция с async def и yield создает объект, который можно итерировать через async for. Каждое yield отдает значение и передает управление event loop, позволяя другим корутинам работать.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
 
async def async_range(start, stop):
    """Асинхронный аналог range()"""
    current = start
    while current < stop:
        yield current
        current += 1
        await asyncio.sleep(0.1)  # Имитация асинхронной работы
 
async def consume_async_generator():
    """Потребление значений из асинхронного генератора"""
    async for value in async_range(0, 5):
        print(f"Получено значение: {value}")
 
asyncio.run(consume_async_generator())
В отличие от обычного генератора, который блокирует выполнение на yield, асинхронный генератор позволяет event loop переключаться между задачами. Это критично для генераторов, которые выполняют I/O операции между yield.
Реальный пример - потоковое чтение базы данных. Вместо загрузки миллиона записей в память разом, отдаете их порциями через асинхронный генератор.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def stream_database_records(pool, batch_size=1000):
    """Потоковое чтение из базы"""
    offset = 0
    
    while True:
        async with pool.acquire() as conn:
            rows = await conn.fetch(
                'SELECT * FROM large_table LIMIT $1 OFFSET $2',
                batch_size, offset
            )
            
            if not rows:
                break
            
            for row in rows:
                yield dict(row)
            
            offset += batch_size
            await asyncio.sleep(0)  # Отдаем управление
 
async def process_large_dataset(pool):
    """Обработка большого датасета без загрузки в память"""
    count = 0
    
    async for record in stream_database_records(pool):
        await process_record(record)
        count += 1
        
        if count % 10000 == 0:
            print(f"Обработано записей: {count}")
Я использовал этот подход для ETL процесса с 50 миллионами записей. Синхронная версия падала с OOM при попытке загрузить все в память. Асинхронный генератор держал в памяти только текущий батч - потребление RAM не превышало 200 МБ.

Асинхронные генераторы поддерживают asend() и athrow() для передачи значений и исключений внутрь генератора. Это продвинутая техника для двусторонней коммуникации.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
async def accumulator_generator():
    """Генератор-аккумулятор"""
    total = 0
    
    while True:
        value = yield total
        if value is None:
            break
        total += value
 
async def use_accumulator():
    """Использование генератора с отправкой значений"""
    gen = accumulator_generator()
    
    # Инициализация генератора
    await gen.asend(None)
    
    # Отправляем значения
    result = await gen.asend(10)  # total = 10
    print(f"Текущая сумма: {result}")
    
    result = await gen.asend(20)  # total = 30
    print(f"Текущая сумма: {result}")
    
    await gen.aclose()  # Закрываем генератор
Асинхронные итераторы - это объекты с методами __aiter__() и __anext__(). Аналог обычных итераторов, но методы корутинные. Позволяют создавать кастомные итерируемые объекты с асинхронной логикой.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class AsyncRangeIterator:
    """Кастомный асинхронный итератор"""
    
    def __init__(self, start, stop, delay=0.1):
        self.current = start
        self.stop = stop
        self.delay = delay
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        
        await asyncio.sleep(self.delay)
        value = self.current
        self.current += 1
        return value
 
async def use_custom_iterator():
    """Использование кастомного итератора"""
    async for value in AsyncRangeIterator(0, 5):
        print(f"Значение: {value}")
Встречал код, где нужно было последовательно опрашивать очередь сообщений. Реализовал через асинхронный итератор - метод __anext__() ждал новое сообщение, async for потреблял их естественным образом. Код получился чище, чем с явным while True.

Асинхронные comprehensions упрощают работу с асинхронными итераторами. Синтаксис похож на обычные list/dict comprehensions, но с async for.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def async_square_numbers():
    """Возведение в квадрат асинхронно"""
    async def async_range(n):
        for i in range(n):
            await asyncio.sleep(0.1)
            yield i
    
    # Асинхронный list comprehension
    squares = [x**2 async for x in async_range(10)]
    return squares
 
async def filter_async_data():
    """Фильтрация с условием"""
    async def fetch_numbers():
        for i in range(20):
            await asyncio.sleep(0.05)
            yield i
    
    # Comprehension с условием
    evens = [x async for x in fetch_numbers() if x % 2 == 0]
    return evens
Производительность асинхронных генераторов зависит от частоты await внутри. Если генератор делает yield без await, он фактически синхронный - никакой конкурентности. Только явная передача управления через await позволяет event loop переключаться на другие задачи.

Python
1
2
3
4
5
6
7
8
9
10
# Плохо - псевдоасинхронный генератор
async def fake_async_gen():
    for i in range(100):
        yield i  # Нет await - блокирует loop
 
# Хорошо - настоящий асинхронный генератор
async def real_async_gen():
    for i in range(100):
        yield i
        await asyncio.sleep(0)  # Отдаем управление
Закрытие асинхронных генераторов требует явного вызова aclose(). Без этого генератор останется висеть в памяти с незавершенными ресурсами. Контекстный менеджер или try/finally гарантируют очистку.

Python
1
2
3
4
5
6
7
8
9
10
11
async def safe_generator_usage():
    """Безопасное использование генератора"""
    gen = stream_database_records(pool)
    
    try:
        async for record in gen:
            if should_stop(record):
                break
            await process_record(record)
    finally:
        await gen.aclose()  # Гарантированное закрытие
Асинхронные генераторы и итераторы - мощный инструмент для работы с потоками данных. Они естественно вписываются в асинхронную архитектуру, позволяя обрабатывать бесконечные или очень большие последовательности без загрузки в память. Правильное использование yield и await превращает последовательную обработку в конкурентную без изменения базовой логики.

Интеграция синхронного и асинхронного кода: run_in_executor и практические стратегии



Реальные проекты редко бывают чисто асинхронными. Всегда найдется библиотека без async версии, legacy код на синхронном стеке, или просто операция, для которой асинхронный аналог не существует. Смешивание синхронного и асинхронного кода требует понимания того, как они взаимодействуют.

Основная проблема - блокирующий вызов в корутине парализует весь event loop. Вызвали time.sleep(5) вместо asyncio.sleep(5) - и все остальные корутины встали на пять секунд. Один синхронный вызов базы через psycopg2 блокирует тысячи асинхронных соединений.

loop.run_in_executor() решает проблему, выполняя блокирующую функцию в отдельном потоке или процессе. Event loop получает Future, который завершится, когда функция отработает. Остальные корутины продолжают работу.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
 
def blocking_operation(n):
    """Синхронная блокирующая функция"""
    time.sleep(2)  # Блокирует поток
    return sum(i**2 for i in range(n))
 
async def async_wrapper(n):
    """Обертка для вызова из асинхронного кода"""
    loop = asyncio.get_running_loop()
    
    # Выполняем в thread pool
    result = await loop.run_in_executor(None, blocking_operation, n)
    return result
 
async def main():
    # Запускаем несколько операций параллельно
    tasks = [async_wrapper(1_000_000) for _ in range(3)]
    results = await asyncio.gather(*tasks)
    print(f"Результаты: {results}")
 
asyncio.run(main())
Первый аргумент run_in_executor() - executor. None означает использование дефолтного ThreadPoolExecutor с небольшим пулом потоков. Для CPU-bound задач передайте ProcessPoolExecutor.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from concurrent.futures import ProcessPoolExecutor
 
async def cpu_intensive_task(data):
    """CPU-intensive вычисления"""
    loop = asyncio.get_running_loop()
    
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool,
            heavy_computation,
            data
        )
    
    return result
Я переписывал систему обработки изображений - resize, фильтры, watermarks. PIL полностью синхронный. Обернул каждую операцию в run_in_executor() с ProcessPoolExecutor на 4 процесса. Throughput вырос с 15 изображений в секунду до 80. Event loop не блокировался, CPU использовался на 100% всех ядер.

Передача аргументов в run_in_executor() требует внимания. Аргументы сериализуются через pickle для ProcessPoolExecutor. Сложные объекты, сокеты, открытые файлы не сериализуются. Передавайте только простые типы или пути к файлам.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Плохо - объект сессии не сериализуется
async def broken_approach(session):
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool,
            process_with_session,
            session  # Ошибка при сериализации
        )
 
# Хорошо - передаем параметры для создания сессии
async def correct_approach(db_url):
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool,
            process_with_url,
            db_url  # Строка сериализуется без проблем
        )
Для библиотек с callback-based API создайте обертку через Future. Callback устанавливает результат в Future, корутина ждет через await.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def wrap_callback_api(param):
    """Обертка для callback-based библиотеки"""
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    
    def callback(result, error=None):
        if error:
            loop.call_soon_threadsafe(future.set_exception, error)
        else:
            loop.call_soon_threadsafe(future.set_result, result)
    
    # Запуск синхронной операции с колбеком
    legacy_library.async_operation(param, callback)
    
    return await future
call_soon_threadsafe() критична - callback может вызваться из другого потока. Обычный call_soon() небезопасен при многопоточности.

Встречал код, где забыли threadsafe вариант. Callback из потока записывал в Future, event loop иногда падал с segfault. Добавление _threadsafe решило проблему - защита от race conditions на уровне event loop.

Для частых вызовов блокирующих операций создайте persistent executor. Создание нового executor'а на каждый вызов дорого - пул потоков инициализируется заново.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class AsyncService:
    """Сервис с переиспользуемым executor'ом"""
    
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=10)
    
    async def blocking_call(self, data):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self.executor,
            sync_operation,
            data
        )
    
    async def cleanup(self):
        """Обязательная очистка ресурсов"""
        self.executor.shutdown(wait=True)
Баланс между ThreadPoolExecutor и ProcessPoolExecutor зависит от задачи. I/O-bound блокирующие операции - threads. CPU-bound вычисления - processes. Потоки легче, процессы мощнее, но дороже. Я использовал thread pool для вызовов синхронного Redis-клиента - 50 запросов в секунду, каждый 10-20 мс. Overhead на создание процесса превысил бы время самой операции. Process pool применял для обработки видео - каждый файл 5-10 секунд CPU-intensive работы.

Синхронный код внутри корутины без executor'а - типичная ошибка. Код работает, но блокирует event loop. Профилирование показывает странные задержки, latency плавает. Всегда оборачивайте блокирующие операции.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Плохо - блокирует event loop
async def bad_approach():
    data = requests.get("https://api.example.com")  # Блокирует
    return data.json()
 
# Хорошо - через executor
async def good_approach():
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        lambda: requests.get("https://api.example.com").json()
    )
 
# Еще лучше - асинхронная библиотека
async def best_approach():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com") as response:
            return await response.json()
Третий вариант всегда предпочтительнее. Executor - это костыль для случаев, когда асинхронного аналога нет. Где возможно - используйте нативные async библиотеки.

Интеграция синхронного и асинхронного кода - компромисс. Вы получаете возможность использовать legacy код, но теряете часть эффективности asyncio. ThreadPoolExecutor добавляет overhead на переключение контекста, ProcessPoolExecutor - на сериализацию данных. Минимизируйте количество таких вызовов, но не бойтесь использовать, когда это единственный разумный вариант.

python. flet. asyncio.Queue
Всем привет, есть проблема с понимаем как работают очереди и/или проблема с тем, что уже...

Потоки, процессы и асинхронное программирование
Добрый вечер. Почитал документацию и по моему я только еще больше запутался. Есть модуль...

Асинхронное программирование
Не всегда включаюсь в тему асинхронного программирования, рассказали бы в краткой форме,...

С чего начать разбираться с Asyncio?
нужно написать парсер страницы, с загрузкой некоторых картинок, на Asyncio. С многопоточным и...

Коннектор к API используя AsyncIO, aiohttp
Py3.5. Работаю с библиотекой, подключаюсь к api telegram. Получаю сообщения и отправляю их в...

Выйти из петли в run_forever() [asyncio]
Пишу скрипт, который эмулирует запуск игр в стиме. Для этого была выбрана библиотека Steam для...

Asyncio корректное завершение в случае непредвиденной остановки программы
Есть скрипт (выложен не полностью) @asyncio.coroutine def do_work(data): for adres, ip_list in...

Asyncio - coroutine vs future vs task
Доброго времени суток, форумчане! Приступил к изучению asyncio. Использую python 3.5. Гуру...

Можно ли поставить таски на паузу? (asyncio)
Всем привет! Имеется такой код: from aiohttp import web import asyncio async def...

Asyncio ошибка работы парсера
Здравствуйте, имеется парсер на Python в связке asyncio + aiohttp + threadPoolExecutor + lxml....

WxPython и asyncio
Добрый день. Подскажите можно ли в приложении wxPython использовать модуль asyncio? Как я понимаю...

Параллельный запуск 2 методов через asyncio
Есть необходимость сделать код, продолжающий работу даже при поступлении запроса. Пересмотрела и...

Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Диалоги с ИИ
zorxor 23.05.2026
Насколько я понимаю - Вы - Искусственный Интеллект. Это так? Да, всё верно. Я — искусственный интеллект. Я представляю собой большую языковую модель, созданную для помощи в самых разных задачах. . . .
Модель здравосохранения 14. Собираем всю модель вместе.
anaschu 22.05.2026
Модель собрана. В будущих постах на видео я покажу, как она работает. В этом посте запускаем её, проверяем результаты и разбираем что можно с ней делать дальше. Перед запуском проверяем. . .
Модель здравоохранения 13. Добавление самой системы здравоохранения.
anaschu 22.05.2026
В предыдущем посте мы настроили болезни. Теперь добавим события, которые управляют здоровьем всего коллектива, а также настроим рабочий график и расчёт финансов. В Main создаём четыре события. . . .
Модель здравоохранения 12. добавление болезней через ресурпул, как аварии
anaschu 22.05.2026
Болезни — это ключевая часть нашей модели. Нам нужно, чтобы работник периодически уходил на больничный, его задание при этом зависало, а после выздоровления работа возобновлялась. Реализуем это двумя. . .
Модель здравоохранения 11. Создаём классы Задание и Работник
anaschu 22.05.2026
В AnyLogic каждая заявка и каждый ресурс — это объект определённого класса. Нам нужно создать два класса: Задание (заявка) и Работник (ресурс). Класс Задание В дереве проекта нажимаем правой. . .
Модель здравоохранения 10. Новая модель, смотрим, как добавлять логические блоки, и что писать внутри
anaschu 22.05.2026
Открываем AnyLogic, создаём новый проект. В дереве проекта появляется класс Main — это главный агент, в котором будет жить вся наша логика. Палитра блоков Слева находится палитра. Нас интересует. . .
модель ЗдравоСохранения 9. Новая модель, разбираемся, как ее создавать
anaschu 22.05.2026
В этой серии постов мы построим модель небольшого рабочего коллектива. Сотрудники получают задания, выполняют их, иногда болеют — и мы хотим посчитать, сколько это стоит компании. Метод. . .
[golang] Linked list
alhaos 22.05.2026
Связный список / Linked list Связный список структура данных позволяющая хранить список значений, в отличии от массива в памяти хранится не сплошным куском, а отдельными частями которые ссылаются. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru