Форум программистов, компьютерный форум, киберфорум
py-thonny
Войти
Регистрация
Восстановить пароль

asyncio и асинхронное программирование в Python: синхронизация, продвинутые примеры, асинхронный парсер

Запись от py-thonny размещена 07.10.2025 в 21:20. Обновил(-а) py-thonny 07.10.2025 в 21:23
Показов 3167 Комментарии 0

Нажмите на изображение для увеличения
Название: asyncio и асинхронное программирование в Python синхронизация, асинхронный парсер.jpg
Просмотров: 208
Размер:	68.1 Кб
ID:	11270
1. asyncio и асинхронное программирование в Python: конкурентность, корутины, таски, async/await, event loop
2. asyncio и асинхронное программирование в Python: паттерны, футуры, примеры, работа с БД
3. asyncio и асинхронное программирование в Python: синхронизация, продвинутые примеры, асинхронный парсер

Продвинутые техники



Когда базовые паттерны освоены, asyncio открывает возможности для более изощренных решений. Эти техники не нужны в каждом проекте, но знание их превращает ограничения в преимущества.

Динамическое управление размером пула задач - вместо фиксированного семафора адаптируете количество одновременных операций под нагрузку. Если сервер начинает тормозить, сокращаете конкурентность. Быстрые ответы - увеличиваете.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class AdaptiveSemaphore:
    def __init__(self, initial=10, min_size=5, max_size=50):
        self.semaphore = asyncio.Semaphore(initial)
        self.current_size = initial
        self.min_size = min_size
        self.max_size = max_size
        self.response_times = []
    
    async def adjust_based_on_performance(self):
        if len(self.response_times) < 10:
            return
        
        avg_time = sum(self.response_times) / len(self.response_times)
        self.response_times.clear()
        
        if avg_time > 2.0 and self.current_size > self.min_size:
            # Медленно - уменьшаем нагрузку
            self.current_size -= 1
        elif avg_time < 0.5 and self.current_size < self.max_size:
            # Быстро - можем больше
            self.current_size += 1
Приоритетные очереди для задач - не все операции равны. Критичные запросы должны обрабатываться первыми. asyncio.PriorityQueue сортирует задачи по приоритету.

Python
1
2
3
4
5
6
7
8
9
10
11
12
async def priority_worker(queue: asyncio.PriorityQueue):
    while True:
        priority, task = await queue.get()
        try:
            await process_task(task)
        finally:
            queue.task_done()
 
# Добавление с приоритетом (меньше = важнее)
await queue.put((1, critical_task))   # Высокий приоритет
await queue.put((5, normal_task))     # Обычный
await queue.put((10, background_task)) # Низкий
Каскадная отмена связанных задач - когда одна задача отменяется, автоматически останавливаете зависимые. Создаете иерархию Task с отслеживанием родительских связей.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class TaskGroup:
    def __init__(self):
        self.tasks = []
    
    def create_task(self, coro):
        task = asyncio.create_task(coro)
        self.tasks.append(task)
        return task
    
    async def cancel_all(self):
        for task in self.tasks:
            task.cancel()
        
        await asyncio.gather(*self.tasks, return_exceptions=True)
Мемоизация асинхронных результатов - кешируете результаты дорогих операций. Повторные вызовы возвращают закешированное значение без реального выполнения.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from functools import wraps
 
def async_lru_cache(maxsize=128):
    cache = {}
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args):
            if args in cache:
                return cache[args]
            
            result = await func(*args)
            
            if len(cache) >= maxsize:
                cache.pop(next(iter(cache)))
            
            cache[args] = result
            return result
        
        return wrapper
    return decorator
 
@async_lru_cache(maxsize=100)
async def expensive_api_call(param):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/{param}") as resp:
            return await resp.json()
Эти техники решают нетривиальные проблемы производительности и надежности. Применяйте осознанно - сложность должна быть оправдана выигрышем.

asyncio.wait vs asyncio.gather
Всем привет! :victory: Вопросы внизу. Есть два похожих теста: #1 - asyncio.wait import...

Асинхронное получение данных посредством asyncio.gather
Доброго времени суток, вопрос такой, когда приходят ссылки на обработку, (все уходят), но...

asyncio Асинхронный Сервер
Доброго времени суток форумчане. Питон прекрасен и многогранен. Предельно прост и понятен :) ...

Asyncio парсер
Подскажите направление, словами, нужен подход к задаче. Используя asynсio надо рекурсивно...


Управление контекстом и таймаутами



Контекст в асинхронном коде - это не просто async with. Это управление жизненным циклом ресурсов, координация множества операций, ограничение времени выполнения. Правильное использование контекстных менеджеров и таймаутов превращает хрупкий код в надежную систему.

`asyncio.timeout()` появился в Python 3.11 и упростил работу с временными ограничениями. Раньше использовали asyncio.wait_for(), теперь есть более изящный синтаксис через контекстный менеджер.

Python
1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
 
async def fetch_with_new_timeout(url):
    """Современный подход к таймаутам"""
    try:
        async with asyncio.timeout(5.0):
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    return await response.text()
    except asyncio.TimeoutError:
        print(f"Запрос к {url} превысил 5 секунд")
        return None
Вложенные контексты управляют разными аспектами операции. Внешний timeout() ограничивает общее время, внутренние async with следят за ресурсами. Если время истекает, все вложенные контексты корректно закрываются, соединения освобождаются.

Старый способ через wait_for() работает, но менее читаем. Функция оборачивает awaitable и возвращает результат или бросает TimeoutError.

Python
1
2
3
4
5
6
7
8
9
10
11
12
async def fetch_with_old_timeout(url):
    """Старый стиль таймаутов"""
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                result = await asyncio.wait_for(
                    response.text(),
                    timeout=5.0
                )
                return result
    except asyncio.TimeoutError:
        return None
Разница тонкая, но важная. timeout() контролирует весь блок кода, включая создание сессии и установку соединения. wait_for() ограничивает только конкретный await. Для полного контроля timeout() предпочтительнее. Я разрабатывал микросервис, который агрегировал данные с пяти внешних API. Каждый мог тормозить. Без таймаутов один медленный сервис блокировал весь ответ. Обернул каждый вызов в timeout(2.0) - если сервис не отвечает за 2 секунды, возвращаем None и продолжаем с остальными. Latency упал с 8 секунд (в худшем случае) до стабильных 2.1 секунды.

Множественные таймауты работают каскадом. Внешний контекст задает общий лимит, внутренние - для отдельных операций. Срабатывает ближайший истекший таймаут.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def cascading_timeouts():
    """Каскадные таймауты для сложных операций"""
    try:
        async with asyncio.timeout(10.0):  # Общий лимит 10 секунд
            # Первая операция - максимум 3 секунды
            async with asyncio.timeout(3.0):
                data = await fetch_initial_data()
            
            # Вторая операция - максимум 5 секунд
            async with asyncio.timeout(5.0):
                processed = await process_data(data)
            
            return processed
    except asyncio.TimeoutError:
        # Сработал один из таймаутов
        return None
Кастомные асинхронные контекстные менеджеры инкапсулируют логику создания и очистки ресурсов. Класс реализует __aenter__() и __aexit__() - аналоги обычных __enter__() и __exit__(), но корутинные.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class DatabaseTransaction:
    """Контекстный менеджер для транзакций"""
    
    def __init__(self, pool):
        self.pool = pool
        self.conn = None
    
    async def __aenter__(self):
        self.conn = await self.pool.acquire()
        await self.conn.execute('BEGIN')
        return self.conn
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        try:
            if exc_type is None:
                await self.conn.execute('COMMIT')
            else:
                await self.conn.execute('ROLLBACK')
        finally:
            await self.pool.release(self.conn)
        
        return False  # Не подавляем исключения
 
async def use_transaction(pool):
    async with DatabaseTransaction(pool) as conn:
        await conn.execute('INSERT INTO logs VALUES ($1)', 'entry')
        # При исключении автоматический rollback
        # При успехе - commit
Встречал проект, где транзакции открывались вручную с try/finally блоками на 30 строк. Логика commit/rollback дублировалась в десяти местах. Один контекстный менеджер заменил весь этот копипаст и устранил несколько багов с незакрытыми транзакциями.

asyncio.TaskGroup из Python 3.11 упрощает управление группой задач. Все созданные внутри контекста задачи автоматически дожидаются при выходе. Если одна падает - остальные отменяются.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
async def process_batch_with_taskgroup(items):
    """Обработка батча с автоматической очисткой"""
    results = []
    
    async with asyncio.TaskGroup() as group:
        tasks = [
            group.create_task(process_item(item))
            for item in items
        ]
    
    # Сюда попадаем только когда все задачи завершены
    # Или при первом исключении (остальные отменены)
    return [task.result() for task in tasks]
Без TaskGroup приходилось вручную отслеживать задачи, дожидаться их через gather(), обрабатывать отмену при ошибках. Теперь всё автоматически - структурированная конкурентность из коробки.

Динамические таймауты адаптируются под нагрузку. Замеряете среднее время ответа, корректируете лимит. Сервер быстрый - даете больше времени для получения полного ответа. Начинает тормозить - сокращаете, чтобы не висеть на медленных запросах.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class AdaptiveTimeout:
    """Динамически адаптирующийся таймаут"""
    
    def __init__(self, base_timeout=5.0):
        self.base_timeout = base_timeout
        self.response_times = []
        self.max_samples = 50
    
    def record_time(self, duration):
        self.response_times.append(duration)
        if len(self.response_times) > self.max_samples:
            self.response_times.pop(0)
    
    def get_timeout(self):
        if not self.response_times:
            return self.base_timeout
        
        avg = sum(self.response_times) / len(self.response_times)
        # Таймаут = среднее * 2 + базовое
        return min(avg * 2 + self.base_timeout, 30.0)
 
adaptive = AdaptiveTimeout()
 
async def smart_fetch(url):
    timeout_value = adaptive.get_timeout()
    start = time.time()
    
    try:
        async with asyncio.timeout(timeout_value):
            result = await fetch_url(url)
        
        adaptive.record_time(time.time() - start)
        return result
    except asyncio.TimeoutError:
        # Не записываем в статистику - операция не завершилась
        return None
Защита от зависания критична для production систем. Один медленный запрос не должен блокировать остальные. Таймауты - это не признак недоверия к коду, а защитная сетка для реального мира, где сети падают, серверы глючат, а ответы теряются. Правильное управление контекстом и временем превращает оптимистичный код в устойчивый к сбоям сервис.

Соединения с базами данных, HTTP-сессии, файловые дескрипторы - все это ограниченные ресурсы. Создание нового соединения требует времени и памяти: TCP handshake, SSL negotiation, аутентификация. Если создавать соединение на каждый запрос, приложение утонет в overhead. Пулы решают проблему, переиспользуя соединения между запросами.

Connection pool - это контейнер с предсозданными соединениями. При старте приложения создается N соединений и держится открытыми. Когда нужно выполнить запрос, берете соединение из пула, работаете с ним, возвращаете обратно. Следующий запрос переиспользует то же соединение. Никаких повторных подключений.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncpg
import asyncio
 
class ConnectionPool:
    """Базовая реализация пула соединений"""
    
    def __init__(self, dsn, min_size=5, max_size=20):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self.pool = None
    
    async def initialize(self):
        """Создание пула при старте приложения"""
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
            command_timeout=5.0
        )
    
    async def execute_query(self, query, *args):
        """Выполнение запроса через пул"""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)
    
    async def close(self):
        """Закрытие всех соединений"""
        if self.pool:
            await self.pool.close()
Размер пула определяется нагрузкой. Слишком маленький - запросы ждут свободного соединения. Слишком большой - съедаете ресурсы базы впустую. Я обычно начинаю с min=5, max=20, потом смотрю по метрикам. Если пул постоянно на максимуме и есть ожидание - увеличиваю. Если половина соединений простаивает - уменьшаю.

В проекте с аналитикой запросы шли пачками - 100 одновременных SELECT на отчет. Пул на 10 соединений создавал очередь, latency рос до 3 секунд. Поднял до 30 - очередь исчезла, время ответа стабилизировалось на 200 мс. База справлялась, но пул был узким горлышком.

HTTP-клиенты работают похоже. aiohttp.ClientSession внутри управляет пулом TCP-соединений. TCPConnector настраивает параметры пула - сколько соединений на хост, общий лимит, время жизни keep-alive соединения.

Python
1
2
3
4
5
6
7
8
9
10
11
async def configure_http_pool():
    """Настройка HTTP connection pool"""
    connector = aiohttp.TCPConnector(
        limit=100,              # Максимум соединений всего
        limit_per_host=20,      # На один хост
        ttl_dns_cache=300,      # DNS кеш на 5 минут
        keepalive_timeout=30    # Keep-alive соединений
    )
    
    session = aiohttp.ClientSession(connector=connector)
    return session
Без переиспользования сессии каждый HTTP-запрос создает новое соединение. SSL handshake для HTTPS занимает 50-100 мс. 100 запросов = 5-10 секунд только на установку соединений. С пулом первый запрос платит эту цену, остальные 99 переиспользуют соединение - почти нулевой overhead.

Встречал код, где ClientSession создавалась внутри каждой корутины. 1000 одновременных корутин = 1000 сессий = десятки тысяч файловых дескрипторов. Система упиралась в ulimit, начинала падать с "Too many open files". Одна переиспользуемая сессия решила проблему - потребление дескрипторов упало с 50000 до 200.

Управление временем жизни ресурсов требует явной очистки. Оставите соединение открытым после использования - получите утечку. Не закроете сессию - дескрипторы останутся висеть до завершения процесса. Контекстные менеджеры автоматизируют cleanup.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class ManagedResource:
    """Ресурс с гарантированной очисткой"""
    
    async def __aenter__(self):
        self.pool = await asyncpg.create_pool(DSN)
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
        await self.pool.close()
        
        # Даем время на graceful shutdown
        await asyncio.sleep(0.250)
 
async def use_resources():
    """Использование с автоматической очисткой"""
    async with ManagedResource() as resources:
        # Работаем с пулом и сессией
        data = await resources.pool.fetch("SELECT * FROM users")
        response = await resources.session.get("https://api.example.com")
    
    # Здесь все уже закрыто
Мониторинг состояния пулов критичен. Метрики показывают, когда пул перегружен или недоиспользуется. Для asyncpg смотрим pool.get_size(), pool.get_idle_size(). Разница - количество активных соединений. Если постоянно равна максимуму - пул мал.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def monitor_pool_health(pool):
    """Периодический мониторинг пула"""
    while True:
        total = pool.get_size()
        idle = pool.get_idle_size()
        active = total - idle
        
        utilization = (active / total) * 100 if total > 0 else 0
        
        print(f"Pool: {active}/{total} active, {utilization:.1f}% utilization")
        
        if utilization > 90:
            print("WARNING: Pool nearly exhausted!")
        
        await asyncio.sleep(10)
В production системе с PostgreSQL видел пул на 15 соединений с утилизацией 99%. Каждый новый запрос ждал освобождения. Увеличил до 25 - утилизация упала до 60%, latency с 800 мс до 150 мс. База могла больше, но пул искусственно ограничивал throughput.

Graceful shutdown предотвращает обрыв активных операций. При остановке приложения нужно дождаться завершения текущих запросов, закрыть соединения корректно. Резкое завершение оставляет hanging connections на стороне базы, которые висят до таймаута.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Application:
    """Приложение с корректным завершением"""
    
    def __init__(self):
        self.pool = None
        self.shutdown_event = asyncio.Event()
    
    async def startup(self):
        self.pool = await asyncpg.create_pool(DSN)
    
    async def shutdown(self):
        """Graceful shutdown"""
        # Сигнал остановки фоновым задачам
        self.shutdown_event.set()
        
        # Ждем завершения активных операций
        await asyncio.sleep(1)
        
        # Закрываем пул
        await self.pool.close()
        
        # Финальная пауза для сетевых операций
        await asyncio.sleep(0.250)
Ошибка - закрыть пул, пока активны соединения. Получите исключения в середине транзакций, незаписанные данные, поврежденное состояние. Всегда сначала останавливаете прием новых запросов, дожидаетесь текущих, затем чистите ресурсы.

Пулы и управление ресурсами - основа стабильности асинхронных приложений. Неправильная настройка превращает быстрый код в медленный. Забытая очистка - в утекающую память. А грамотное управление жизненным циклом дает предсказуемую производительность и надежность под нагрузкой.

Синхронизация и блокировки



Асинхронный код работает в одном потоке, что вроде бы исключает классические проблемы конкурентного доступа. Но это обманчивое чувство безопасности. Когда корутина делает await, управление передается другой корутине, которая может изменить общие данные. Состояние программы меняется между вызовами await, и это источник тонких багов.

Представьте счетчик, который инкрементируют несколько корутин. В синхронном мире это атомарная операция. В асинхронном - нет. Корутина читает значение, делает await на что-то другое, возвращается и записывает увеличенное значение. Между чтением и записью другая корутина успела сделать то же самое. Результат - потерянные обновления.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
 
counter = 0
 
async def increment_naive():
    """Небезопасный инкремент"""
    global counter
    
    # Читаем значение
    current = counter
    
    # Имитация асинхронной работы
    await asyncio.sleep(0.001)
    
    # Записываем увеличенное значение
    counter = current + 1
 
async def test_race_condition():
    """Демонстрация гонки"""
    global counter
    counter = 0
    
    # 100 корутин инкрементируют счетчик
    await asyncio.gather(*[increment_naive() for _ in range(100)])
    
    print(f"Ожидалось: 100, получили: {counter}")
 
asyncio.run(test_race_condition())
# Вывод: Ожидалось: 100, получили: 67 (или другое число < 100)
Я отлаживал систему статистики, где счетчики событий постоянно расходились с реальностью. Оказалось, десятки корутин обновляли один словарь без синхронизации. Операции вида stats[key] += 1 терялись между await. Добавил блокировки - расхождения исчезли.

asyncio.Lock - базовый примитив синхронизации. Работает как mutex в многопоточности, но для корутин. Только одна корутина может владеть блокировкой одновременно. Остальные ждут освобождения.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
lock = asyncio.Lock()
counter = 0
 
async def increment_safe():
    """Безопасный инкремент с блокировкой"""
    global counter
    
    async with lock:
        current = counter
        await asyncio.sleep(0.001)
        counter = current + 1
 
async def test_with_lock():
    global counter
    counter = 0
    
    await asyncio.gather(*[increment_safe() for _ in range(100)])
    
    print(f"С блокировкой: {counter}")
    # Вывод: С блокировкой: 100
Блокировка гарантирует атомарность участка кода между async with lock и выходом из контекста. Другие корутины, достигнув той же блокировки, приостановятся на await до освобождения.

Важный нюанс - блокировка не делает операции атомарными магическим образом. Она просто не дает другим корутинам войти в защищенный участок. Если внутри защищенного блока нет await, блокировка бесполезна - другие корутины всё равно не смогут выполниться до завершения текущей.

Python
1
2
3
4
5
6
7
8
9
# Блокировка избыточна - нет await внутри
async with lock:
    counter += 1  # Атомарно и без блокировки
 
# Блокировка нужна - есть await
async with lock:
    current = counter
    await asyncio.sleep(0.001)  # Тут управление может уйти
    counter = current + 1
asyncio.Semaphore ограничивает количество одновременных входов в критическую секцию. В отличие от Lock, который допускает только одного, семафор пропускает N корутин.

Python
1
2
3
4
5
6
7
8
9
10
11
semaphore = asyncio.Semaphore(3)  # Максимум 3 одновременно
 
async def limited_access(task_id):
    async with semaphore:
        print(f"Задача {task_id} вошла")
        await asyncio.sleep(1)
        print(f"Задача {task_id} вышла")
 
async def test_semaphore():
    # 10 задач, но одновременно работают только 3
    await asyncio.gather(*[limited_access(i) for i in range(10)])
Использовал семафоры для ограничения конкурентных запросов к API - сервер разрешал максимум 10 одновременных соединений. Semaphore(10) автоматически управлял очередью, не нужны были костыли с ручным подсчетом активных запросов.

asyncio.Event - флаг для координации между корутинами. Одна корутина ждет события через wait(), другая сигналит через set(). Простой механизм для "проснись, когда что-то случилось".

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def waiter(event: asyncio.Event, name: str):
    """Ждет сигнала"""
    print(f"{name} ждет...")
    await event.wait()
    print(f"{name} получил сигнал и продолжил работу")
 
async def signaler(event: asyncio.Event):
    """Отправляет сигнал"""
    await asyncio.sleep(2)
    print("Отправка сигнала всем ожидающим")
    event.set()
 
async def test_event():
    event = asyncio.Event()
    
    await asyncio.gather(
        waiter(event, "Корутина 1"),
        waiter(event, "Корутина 2"),
        signaler(event)
    )
В проекте с WebSocket сервером использовал Event для graceful shutdown. При получении SIGTERM устанавливал событие, все фоновые задачи проверяли его в циклах и корректно завершались. Без Event пришлось бы глобальными флагами управлять, что хрупко и некрасиво.

asyncio.Condition комбинирует Lock и Event. Позволяет корутинам ждать выполнения условия и уведомлять друг друга об изменениях. Классический паттерн producer-consumer можно реализовать через Condition элегантнее, чем через Queue.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
condition = asyncio.Condition()
items = []
 
async def producer(item):
    async with condition:
        items.append(item)
        print(f"Произведен: {item}")
        condition.notify()  # Будим одного ожидающего
 
async def consumer(name):
    async with condition:
        while not items:
            await condition.wait()  # Спим пока нет элементов
        
        item = items.pop(0)
        print(f"{name} потребил: {item}")
Дедлоки в асинхронном коде случаются реже, чем в многопоточности, но возможны. Две корутины ждут блокировки друг друга - классический взаимный дедлок. Правило то же: всегда захватывайте блокировки в одном порядке.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Плохо - потенциальный дедлок
async def task_a(lock1, lock2):
    async with lock1:
        await asyncio.sleep(0.1)
        async with lock2:  # Ждем lock2
            pass
 
async def task_b(lock1, lock2):
    async with lock2:
        await asyncio.sleep(0.1)
        async with lock1:  # Ждем lock1
            pass
 
# Хорошо - фиксированный порядок
async def safe_task(lock1, lock2):
    # Всегда сначала lock1, потом lock2
    async with lock1:
        async with lock2:
            pass
Видел production систему, которая периодически зависала - две корутины захватывали блокировки в разном порядке. Debugging занял день, потому что дедлок случался раз в неделю при специфичной последовательности событий. Исправили порядок блокировок - проблема исчезла.

Производительность синхронизации в asyncio лучше, чем в потоках. Блокировка корутины - это просто приостановка выполнения без системных вызовов. Переключение на другую корутину стоит наносекунды. В потоках блокировка требует взаимодействия с планировщиком ОС, что на порядки дороже.

Но злоупотребление блокировками убивает конкурентность. Если критическая секция занимает большую часть времени корутины, асинхронность теряет смысл - корутины выполняются фактически последовательно. Минимизируйте код под блокировкой, выносите тяжелые операции за её пределы.

Синхронизация в asyncio - компромисс между безопасностью и производительностью. Отсутствие блокировок там, где они нужны - гонки и потерянные обновления. Избыток блокировок - деградация в последовательное выполнение. Защищайте только критические участки с общими данными, все остальное пусть работает конкурентно.

Отладка асинхронного кода



Отладка асинхронного кода - отдельный круг ада для разработчиков. Traceback'и растягиваются на сотни строк, точки останова в отладчике срабатывают невпопад, а ошибки воспроизводятся через раз. Классические инструменты debugging теряют эффективность, когда дело доходит до корутин и event loop.

Первая проблема - запутанные stack trace. Обычное исключение в синхронном коде дает четкий путь от точки вызова до места ошибки. В асинхронном коде между вызовом и ошибкой десятки переключений контекста, цепочки await, обертки из gather() и create_task(). Traceback превращается в лабиринт.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
 
async def inner_function():
    await asyncio.sleep(0.1)
    raise ValueError("Ошибка в глубине стека")
 
async def middle_layer():
    await inner_function()
 
async def outer_layer():
    tasks = [middle_layer() for _ in range(3)]
    await asyncio.gather(*tasks)
 
try:
    asyncio.run(outer_layer())
except ValueError as e:
    import traceback
    traceback.print_exc()
    # Вывод: 15+ строк вложенных вызовов через asyncio internals
Я потратил часы на отслеживание источника CancelledError в системе обработки очередей. Traceback указывал на строку внутри библиотеки asyncio, а реальная причина была в забытом task.cancel() на три уровня выше по стеку вызовов. Обычный grep по коду ничего не давал - нужны были специфичные техники.

asyncio.run() с параметром debug=True включает детальное логирование. Event loop начинает писать предупреждения о незавершенных корутинах, долгих блокировках, неперехваченных исключениях. Overhead на производительность заметный, но для debugging бесценно.

Python
1
2
3
4
5
6
7
8
9
# Включение debug режима
asyncio.run(main(), debug=True)
 
# Или глобально для всех loop'ов
import asyncio
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
loop = asyncio.new_event_loop()
loop.set_debug(True)
asyncio.set_event_loop(loop)
В debug режиме event loop предупредит, если корутина выполняется дольше 100 мс без await - признак блокирующей операции. Это помогло мне найти забытый синхронный вызов к Redis, который тормозил весь loop на 300 миллисекунд. Без debug режима эта проблема осталась бы незамеченной до production.

Стандартный отладчик pdb с асинхронным кодом работает странно. Ставите breakpoint в корутине - останавливается только одна корутина, остальные продолжают выполнение. Event loop не замирает. Это создает race conditions во время отладки - состояние программы меняется, пока вы изучаете переменные. Библиотека aiomonitor добавляет интерактивную консоль в работающее asyncio приложение. Можете подключиться через telnet, посмотреть список активных задач, отменить зависшую корутину, проверить состояние пулов. Для production систем это спасение.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
import aiomonitor
 
async def main():
    # Ваш код здесь
    pass
 
with aiomonitor.start_monitor(loop):
    loop.run_until_complete(main())
 
# В другом терминале:
# telnet localhost 50101
# > ps  # список задач
# > cancel <task_id>  # отменить задачу
Логирование в асинхронном коде требует осторожности. Обычный print() или logging.info() могут перемешивать вывод от разных корутин. Для отладки конкурентных проблем добавляйте идентификаторы задач или корреляционные ID.

Python
1
2
3
4
5
6
7
8
9
10
11
import asyncio
import logging
 
logger = logging.getLogger(__name__)
 
async def debuggable_function(request_id):
    logger.info(f"[{request_id}] Начало обработки")
    
    await asyncio.sleep(1)
    
    logger.info(f"[{request_id}] Завершение")
Встречал систему, где logs из сотни одновременных запросов сваливались в один поток без разделения. Понять, какая строка лога относится к какому запросу, было невозможно. Добавили correlation ID в каждое сообщение - debugging ускорился в разы.

Профилирование асинхронного кода показывает, где теряется время. Модуль cProfile работает, но результаты запутаны из-за внутренностей asyncio. Специализированные инструменты вроде py-spy дают более читаемые результаты для конкурентного кода.

Python
1
2
# Запуск с профилированием
# py-spy record -o profile.svg -- python script.py
Тестирование асинхронного кода требует pytest-asyncio или unittest.IsolatedAsyncioTestCase. Обычные тесты не могут запускать корутины. Нужны специальные декораторы и фикстуры для управления event loop в тестах.

Python
1
2
3
4
5
6
import pytest
 
@pytest.mark.asyncio
async def test_async_function():
    result = await some_coroutine()
    assert result == expected_value
Отладка асинхронного кода - это навык, который приходит с опытом. Первые месяцы каждая ошибка кажется мистической. Потом начинаете видеть паттерны: незавершенные корутины, забытые await, гонки за общими данными. Правильные инструменты и техники превращают хаотичный процесс в методичное расследование.

Асинхронный парсер с очередью задач



Теория без практики остается абстракцией. Соберем все изученные техники в полнофункциональное приложение - асинхронный парсер новостных сайтов с очередью задач, обработкой ошибок и мониторингом производительности.

Архитектура строится на трех компонентах. Producer загружает список URL и складывает их в очередь. Worker pool забирает URL из очереди, скачивает страницы, парсит контент. Storage сохраняет результаты в базу данных батчами для оптимизации.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import asyncio
import aiohttp
import asyncpg
from asyncio import Queue
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import logging
 
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
 
@dataclass
class Article:
    """Структура для хранения данных статьи"""
    url: str
    title: str
    content: str
    published_at: Optional[datetime]
    fetched_at: datetime
    
class NewsParser:
    """Асинхронный парсер новостей"""
    
    def __init__(
        self, 
        db_pool: asyncpg.Pool,
        max_workers: int = 10,
        batch_size: int = 50
    ):
        self.db_pool = db_pool
        self.max_workers = max_workers
        self.batch_size = batch_size
        self.queue = Queue(maxsize=1000)
        self.results_queue = Queue()
        self.stats = {
            'processed': 0,
            'errors': 0,
            'start_time': None
        }
        
    async def fetch_page(
        self, 
        session: aiohttp.ClientSession, 
        url: str
    ) -> Optional[str]:
        """Загрузка HTML страницы с обработкой ошибок"""
        try:
            async with asyncio.timeout(10.0):
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.text()
                    else:
                        logger.warning(
                            f"URL {url} вернул статус {response.status}"
                        )
                        return None
        except asyncio.TimeoutError:
            logger.error(f"Таймаут при загрузке {url}")
            return None
        except aiohttp.ClientError as e:
            logger.error(f"Ошибка сети для {url}: {e}")
            return None
            
    def parse_article(self, html: str, url: str) -> Optional[Article]:
        """Извлечение данных из HTML"""
        # Упрощенный парсинг - в реальности используйте BeautifulSoup
        try:
            # Имитация парсинга
            if '<title>' not in html:
                return None
                
            title_start = html.find('<title>') + 7
            title_end = html.find('</title>')
            title = html[title_start:title_end] if title_end > title_start else "Без названия"
            
            # Извлечение первых 500 символов как контент
            content = html[:500]
            
            return Article(
                url=url,
                title=title,
                content=content,
                published_at=None,
                fetched_at=datetime.now()
            )
        except Exception as e:
            logger.error(f"Ошибка парсинга {url}: {e}")
            return None
            
    async def worker(
        self, 
        worker_id: int, 
        session: aiohttp.ClientSession
    ):
        """Воркер для обработки URL из очереди"""
        logger.info(f"Воркер {worker_id} запущен")
        
        while True:
            url = await self.queue.get()
            
            if url is None:  # Сигнал завершения
                self.queue.task_done()
                break
                
            try:
                # Загружаем страницу
                html = await self.fetch_page(session, url)
                
                if html:
                    # Парсим контент
                    article = self.parse_article(html, url)
                    
                    if article:
                        await self.results_queue.put(article)
                        self.stats['processed'] += 1
                    else:
                        self.stats['errors'] += 1
                else:
                    self.stats['errors'] += 1
                    
            except Exception as e:
                logger.error(f"Воркер {worker_id} упал на {url}: {e}")
                self.stats['errors'] += 1
            finally:
                self.queue.task_done()
                
        logger.info(f"Воркер {worker_id} завершен")
        
    async def storage_worker(self):
        """Батчевая запись результатов в базу"""
        batch = []
        
        while True:
            try:
                article = await asyncio.wait_for(
                    self.results_queue.get(),
                    timeout=2.0
                )
                
                if article is None:  # Сигнал завершения
                    break
                    
                batch.append(article)
                
                if len(batch) >= self.batch_size:
                    await self.save_batch(batch)
                    batch.clear()
                    
            except asyncio.TimeoutError:
                # Таймаут - сохраняем накопленное
                if batch:
                    await self.save_batch(batch)
                    batch.clear()
                    
        # Сохраняем остаток
        if batch:
            await self.save_batch(batch)
            
    async def save_batch(self, articles: list[Article]):
        """Сохранение пачки статей в базу"""
        try:
            async with self.db_pool.acquire() as conn:
                await conn.executemany(
                    '''
                    INSERT INTO articles (url, title, content, fetched_at)
                    VALUES ($1, $2, $3, $4)
                    ON CONFLICT (url) DO UPDATE 
                    SET title = EXCLUDED.title,
                        content = EXCLUDED.content,
                        fetched_at = EXCLUDED.fetched_at
                    ''',
                    [
                        (a.url, a.title, a.content, a.fetched_at)
                        for a in articles
                    ]
                )
            logger.info(f"Сохранено {len(articles)} статей")
        except Exception as e:
            logger.error(f"Ошибка сохранения в базу: {e}")
            
    async def producer(self, urls: list[str]):
        """Загрузка URL в очередь"""
        for url in urls:
            await self.queue.put(url)
            
        # Отправляем сигналы завершения воркерам
        for _ in range(self.max_workers):
            await self.queue.put(None)
            
    async def monitor_progress(self):
        """Мониторинг прогресса выполнения"""
        while True:
            await asyncio.sleep(5)
            
            elapsed = (datetime.now() - self.stats['start_time']).total_seconds()
            rate = self.stats['processed'] / elapsed if elapsed > 0 else 0
            
            logger.info(
                f"Обработано: {self.stats['processed']}, "
                f"Ошибок: {self.stats['errors']}, "
                f"Скорость: {rate:.1f} стр/сек"
            )
            
    async def run(self, urls: list[str]):
        """Запуск парсера"""
        self.stats['start_time'] = datetime.now()
        logger.info(f"Запуск парсера для {len(urls)} URL")
        
        # Настройка HTTP сессии
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=20
        )
        
        async with aiohttp.ClientSession(connector=connector) as session:
            # Запуск компонентов
            workers = [
                asyncio.create_task(self.worker(i, session))
                for i in range(self.max_workers)
            ]
            
            storage = asyncio.create_task(self.storage_worker())
            monitor = asyncio.create_task(self.monitor_progress())
            producer_task = asyncio.create_task(self.producer(urls))
            
            # Ждем завершения producer
            await producer_task
            
            # Ждем обработки всей очереди
            await self.queue.join()
            
            # Ждем завершения воркеров
            await asyncio.gather(*workers)
            
            # Останавливаем storage
            await self.results_queue.put(None)
            await storage
            
            # Останавливаем монитор
            monitor.cancel()
            try:
                await monitor
            except asyncio.CancelledError:
                pass
                
        elapsed = (datetime.now() - self.stats['start_time']).total_seconds()
        logger.info(
            f"Парсинг завершен за {elapsed:.1f}с. "
            f"Обработано: {self.stats['processed']}, "
            f"Ошибок: {self.stats['errors']}"
        )
 
async def setup_database():
    """Инициализация базы данных"""
    pool = await asyncpg.create_pool(
        'postgresql://user:password@localhost/news',
        min_size=5,
        max_size=20
    )
    
    async with pool.acquire() as conn:
        await conn.execute('''
            CREATE TABLE IF NOT EXISTS articles (
                id SERIAL PRIMARY KEY,
                url TEXT UNIQUE NOT NULL,
                title TEXT NOT NULL,
                content TEXT,
                fetched_at TIMESTAMP NOT NULL
            )
        ''')
        
    return pool
 
async def main():
    """Точка входа"""
    # Список URL для парсинга
    urls = [
        f"https://example.com/news/article-{i}"
        for i in range(500)
    ]
    
    # Инициализация базы
    pool = await setup_database()
    
    try:
        # Запуск парсера
        parser = NewsParser(pool, max_workers=20, batch_size=50)
        await parser.run(urls)
    finally:
        await pool.close()
 
if __name__ == "__main__":
    asyncio.run(main())
Это приложение демонстрирует producer-consumer паттерн с очередями, батчевую запись в базу, управление пулом воркеров, обработку ошибок и таймаутов, мониторинг производительности. Все техники asyncio собраны в одном работающем решении.
В реальных проектах парсер требует расширенной функциональности - обработку rate limiting, кеширование, retry механизмы, graceful shutdown. Доработаем приложение до production-ready состояния.

Первое улучшение - адаптивный rate limiter, который подстраивается под скорость ответов сервера. Если начинаем получать 429 или таймауты, автоматически снижаем нагрузку.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class AdaptiveRateLimiter:
    """Динамический контроль скорости запросов"""
    
    def __init__(self, initial_rate=10):
        self.max_rate = initial_rate
        self.current_tokens = initial_rate
        self.last_update = datetime.now()
        self.lock = asyncio.Lock()
        self.error_streak = 0
        
    async def acquire(self):
        """Получение разрешения на запрос"""
        async with self.lock:
            now = datetime.now()
            elapsed = (now - self.last_update).total_seconds()
            
            # Пополняем токены
            self.current_tokens = min(
                self.max_rate,
                self.current_tokens + elapsed * self.max_rate
            )
            self.last_update = now
            
            # Ждем если токенов нет
            if self.current_tokens < 1:
                wait_time = (1 - self.current_tokens) / self.max_rate
                await asyncio.sleep(wait_time)
                self.current_tokens = 1
                
            self.current_tokens -= 1
            
    def report_error(self):
        """Сообщение об ошибке - снижаем скорость"""
        self.error_streak += 1
        if self.error_streak >= 3 and self.max_rate > 1:
            self.max_rate *= 0.7
            logger.warning(
                f"Снижаем rate до {self.max_rate:.1f} запросов/сек"
            )
            
    def report_success(self):
        """Успешный запрос - можем ускориться"""
        self.error_streak = 0
        if self.max_rate < 20:
            self.max_rate *= 1.05
Интегрируем rate limiter в воркеры, добавляя вызов перед каждым запросом. Теперь парсер автоматически замедляется при проблемах и ускоряется когда всё работает гладко.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def smart_worker(
    self, 
    worker_id: int, 
    session: aiohttp.ClientSession,
    rate_limiter: AdaptiveRateLimiter
):
    """Воркер с адаптивным rate limiting"""
    
    while True:
        url = await self.queue.get()
        
        if url is None:
            self.queue.task_done()
            break
            
        try:
            # Ждем разрешения от rate limiter
            await rate_limiter.acquire()
            
            html = await self.fetch_page(session, url)
            
            if html:
                article = self.parse_article(html, url)
                if article:
                    await self.results_queue.put(article)
                    self.stats['processed'] += 1
                    rate_limiter.report_success()
                else:
                    self.stats['errors'] += 1
            else:
                self.stats['errors'] += 1
                rate_limiter.report_error()
                
        except Exception as e:
            logger.error(f"Воркер {worker_id} ошибка на {url}: {e}")
            self.stats['errors'] += 1
            rate_limiter.report_error()
        finally:
            self.queue.task_done()
Второе улучшение - кеширование с TTL для предотвращения повторных загрузок одних URL. Используем Redis или простой словарь в памяти с временными метками.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SimpleCache:
    """Кеш с TTL для результатов"""
    
    def __init__(self, ttl_seconds=3600):
        self.cache = {}
        self.ttl = ttl_seconds
        self.lock = asyncio.Lock()
        
    async def get(self, key: str) -> Optional[Article]:
        async with self.lock:
            if key in self.cache:
                article, timestamp = self.cache[key]
                age = (datetime.now() - timestamp).total_seconds()
                
                if age < self.ttl:
                    return article
                else:
                    del self.cache[key]
                    
        return None
        
    async def set(self, key: str, value: Article):
        async with self.lock:
            self.cache[key] = (value, datetime.now())
            
            # Очистка старых записей
            if len(self.cache) > 10000:
                await self._cleanup()
                
    async def _cleanup(self):
        """Удаление устаревших записей"""
        now = datetime.now()
        expired = [
            k for k, (_, ts) in self.cache.items()
            if (now - ts).total_seconds() > self.ttl
        ]
        
        for key in expired:
            del self.cache[key]
Третье улучшение - retry механизм с экспоненциальной задержкой для временных сбоев. Не все ошибки фатальны - сеть нестабильна, серверы иногда глючат.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
async def fetch_with_retry(
    self,
    session: aiohttp.ClientSession,
    url: str,
    max_attempts: int = 3
) -> Optional[str]:
    """Загрузка с повторными попытками"""
    
    for attempt in range(max_attempts):
        try:
            async with asyncio.timeout(10.0):
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.text()
                    elif response.status == 429:
                        # Rate limit - ждем подольше
                        delay = 2 ** (attempt + 2)
                        logger.warning(
                            f"Rate limit на {url}, жду {delay}с"
                        )
                        await asyncio.sleep(delay)
                        continue
                    else:
                        return None
                        
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == max_attempts - 1:
                logger.error(f"Финальная неудача для {url}: {e}")
                return None
                
            delay = 2 ** attempt
            logger.info(f"Повтор {attempt + 1} для {url} через {delay}с")
            await asyncio.sleep(delay)
            
    return None
Четвертое - graceful shutdown с корректным завершением всех операций. При получении сигнала остановки даем воркерам время закончить текущие задачи.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class GracefulShutdown:
    """Менеджер корректного завершения"""
    
    def __init__(self):
        self.shutdown_event = asyncio.Event()
        
    def signal_shutdown(self):
        """Сигнал остановки"""
        logger.info("Получен сигнал остановки")
        self.shutdown_event.set()
        
    async def wait_for_shutdown(self):
        """Ожидание сигнала"""
        await self.shutdown_event.wait()
        
async def run_with_shutdown(self, urls: list[str]):
    """Запуск с поддержкой graceful shutdown"""
    shutdown = GracefulShutdown()
    
    # Регистрируем обработчики сигналов
    import signal
    
    def handle_signal(sig, frame):
        shutdown.signal_shutdown()
        
    signal.signal(signal.SIGTERM, handle_signal)
    signal.signal(signal.SIGINT, handle_signal)
    
    try:
        connector = aiohttp.TCPConnector(limit=100)
        
        async with aiohttp.ClientSession(connector=connector) as session:
            rate_limiter = AdaptiveRateLimiter(initial_rate=10)
            cache = SimpleCache(ttl_seconds=3600)
            
            # Запуск воркеров
            workers = [
                asyncio.create_task(
                    self.enhanced_worker(i, session, rate_limiter, cache)
                )
                for i in range(self.max_workers)
            ]
            
            storage = asyncio.create_task(self.storage_worker())
            monitor = asyncio.create_task(self.monitor_progress())
            
            # Producer в фоне
            producer = asyncio.create_task(self.producer(urls))
            
            # Ждем либо завершения, либо сигнала
            done, pending = await asyncio.wait(
                [producer, shutdown.wait_for_shutdown()],
                return_when=asyncio.FIRST_COMPLETED
            )
            
            if shutdown.shutdown_event.is_set():
                logger.info("Корректное завершение начато")
                
                # Отменяем producer если еще работает
                if producer in pending:
                    producer.cancel()
                    
                # Отправляем сигналы завершения
                for _ in workers:
                    await self.queue.put(None)
                    
            # Ждем завершения очереди
            await self.queue.join()
            
            # Ждем воркеров
            await asyncio.gather(*workers, return_exceptions=True)
            
            # Завершаем storage и monitor
            await self.results_queue.put(None)
            await storage
            
            monitor.cancel()
            try:
                await monitor
            except asyncio.CancelledError:
                pass
                
    except Exception as e:
        logger.error(f"Критическая ошибка: {e}")
        raise
Эти улучшения превращают базовый парсер в надежную систему, готовую к работе в production. Адаптивный rate limiting защищает от блокировок, кеширование экономит ресурсы, retry повышает надежность, graceful shutdown предотвращает потерю данных при остановке.

Собрать рабочий код - это одно. Создать систему, которую можно поддерживать, расширять и не бояться трогать через полгода - совсем другое. Асинхронный парсер из предыдущей главы работает, но архитектурно он монолитен. Вся логика в одном классе, зависимости жестко связаны, тестирование превращается в мучение.

Разделим приложение на слои по принципу разделения ответственности. Нижний слой - инфраструктура: работа с базой, HTTP-клиенты, очереди. Средний слой - бизнес-логика: парсинг, валидация, обработка данных. Верхний слой - координация: управление воркерами, мониторинг, обработка сигналов.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from abc import ABC, abstractmethod
from typing import Protocol, Optional
import asyncio
 
class ArticleRepository(ABC):
    """Абстракция для хранения статей"""
    
    @abstractmethod
    async def save_batch(self, articles: list['Article']) -> None:
        """Сохранение пачки статей"""
        pass
    
    @abstractmethod
    async def get_by_url(self, url: str) -> Optional['Article']:
        """Получение статьи по URL"""
        pass
 
class PostgresRepository(ArticleRepository):
    """Реализация хранилища через PostgreSQL"""
    
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool
    
    async def save_batch(self, articles: list['Article']) -> None:
        async with self.pool.acquire() as conn:
            await conn.executemany(
                '''
                INSERT INTO articles (url, title, content, fetched_at)
                VALUES ($1, $2, $3, $4)
                ON CONFLICT (url) DO UPDATE 
                SET title = EXCLUDED.title,
                    content = EXCLUDED.content,
                    fetched_at = EXCLUDED.fetched_at
                ''',
                [(a.url, a.title, a.content, a.fetched_at) for a in articles]
            )
    
    async def get_by_url(self, url: str) -> Optional['Article']:
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                'SELECT * FROM articles WHERE url = $1', 
                url
            )
            return Article.from_row(row) if row else None
Репозиторий инкапсулирует работу с хранилищем. Бизнес-логика не знает про SQL, asyncpg или структуру таблиц. Если завтра решите мигрировать на MongoDB - поменяете только реализацию репозитория, остальной код останется нетронутым.

Я рефакторил систему аналитики, где SQL-запросы размазались по 30 файлам. Каждое изменение схемы базы требовало правки в десяти местах. Выделил репозитории - миграция с PostgreSQL на ClickHouse заняла два дня вместо предполагаемых двух недель.

Паттерн Strategy для разных способов парсинга. Каждый сайт имеет свою верстку - универсальный парсер не работает. Создаем интерфейс парсера и реализации под конкретные источники.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class Parser(Protocol):
    """Интерфейс парсера"""
    
    def can_parse(self, url: str) -> bool:
        """Проверка, может ли парсер обработать URL"""
        ...
    
    async def parse(self, html: str, url: str) -> Optional[Article]:
        """Извлечение данных из HTML"""
        ...
 
class GenericParser:
    """Базовый парсер для обычных сайтов"""
    
    def can_parse(self, url: str) -> bool:
        return True  # Обрабатывает всё по умолчанию
    
    async def parse(self, html: str, url: str) -> Optional[Article]:
        from bs4 import BeautifulSoup
        
        soup = BeautifulSoup(html, 'html.parser')
        title = soup.find('title')
        
        if not title:
            return None
        
        return Article(
            url=url,
            title=title.get_text().strip(),
            content=soup.get_text()[:500],
            published_at=None,
            fetched_at=datetime.now()
        )
 
class HabrParser:
    """Специализированный парсер для Habr"""
    
    def can_parse(self, url: str) -> bool:
        return 'habr.com' in url
    
    async def parse(self, html: str, url: str) -> Optional[Article]:
        from bs4 import BeautifulSoup
        
        soup = BeautifulSoup(html, 'html.parser')
        
        # Специфичные селекторы для Habr
        title_elem = soup.select_one('.tm-article-snippet__title')
        content_elem = soup.select_one('.tm-article-body')
        
        if not title_elem:
            return None
        
        return Article(
            url=url,
            title=title_elem.get_text().strip(),
            content=content_elem.get_text()[:500] if content_elem else "",
            published_at=None,
            fetched_at=datetime.now()
        )
 
class ParserRegistry:
    """Реестр парсеров с выбором подходящего"""
    
    def __init__(self):
        self.parsers: list[Parser] = []
    
    def register(self, parser: Parser) -> None:
        self.parsers.append(parser)
    
    def get_parser(self, url: str) -> Parser:
        for parser in self.parsers:
            if parser.can_parse(url):
                return parser
        
        # Fallback на базовый парсер
        return GenericParser()
Реестр выбирает правильный парсер для каждого URL. Добавить поддержку нового сайта - зарегистрировать еще один парсер в реестре. Логика обработки изолирована, изменения в одном парсере не влияют на другие.

Паттерн Command для задач в очереди. Вместо простых URL складываем в очередь объекты-команды, которые знают, как себя выполнить. Это дает гибкость - разные типы задач в одной очереди.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from enum import Enum
 
class TaskPriority(Enum):
    LOW = 3
    NORMAL = 2
    HIGH = 1
 
class Task(ABC):
    """Базовая задача"""
    
    def __init__(self, priority: TaskPriority = TaskPriority.NORMAL):
        self.priority = priority
        self.created_at = datetime.now()
    
    @abstractmethod
    async def execute(self, context: 'ExecutionContext') -> None:
        """Выполнение задачи"""
        pass
    
    def __lt__(self, other: 'Task') -> bool:
        # Для сортировки в PriorityQueue
        return self.priority.value < other.priority.value
 
class ParseUrlTask(Task):
    """Задача парсинга URL"""
    
    def __init__(self, url: str, priority: TaskPriority = TaskPriority.NORMAL):
        super().__init__(priority)
        self.url = url
    
    async def execute(self, context: 'ExecutionContext') -> None:
        html = await context.fetcher.fetch(self.url)
        
        if not html:
            return
        
        parser = context.parser_registry.get_parser(self.url)
        article = await parser.parse(html, self.url)
        
        if article:
            await context.repository.save_batch([article])
 
class BatchProcessTask(Task):
    """Задача массовой обработки"""
    
    def __init__(self, urls: list[str]):
        super().__init__(TaskPriority.LOW)
        self.urls = urls
    
    async def execute(self, context: 'ExecutionContext') -> None:
        tasks = [ParseUrlTask(url, TaskPriority.LOW) for url in self.urls]
        
        for task in tasks:
            await context.task_queue.put(task)
Теперь можно добавить задачу повторной обработки с низким приоритетом или срочную проверку конкретного URL с высоким приоритетом. Очередь автоматически сортирует по важности.

Паттерн Observer для мониторинга и метрик. Воркеры уведомляют наблюдателей о событиях - начало обработки, успех, ошибка. Наблюдатели собирают статистику, логируют, отправляют алерты.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class WorkerObserver(ABC):
    """Наблюдатель за работой воркеров"""
    
    @abstractmethod
    async def on_task_started(self, task: Task) -> None:
        pass
    
    @abstractmethod
    async def on_task_completed(self, task: Task, duration: float) -> None:
        pass
    
    @abstractmethod
    async def on_task_failed(self, task: Task, error: Exception) -> None:
        pass
 
class MetricsCollector(WorkerObserver):
    """Сборщик метрик"""
    
    def __init__(self):
        self.completed = 0
        self.failed = 0
        self.total_duration = 0.0
        self.lock = asyncio.Lock()
    
    async def on_task_started(self, task: Task) -> None:
        pass  # Можно логировать начало
    
    async def on_task_completed(self, task: Task, duration: float) -> None:
        async with self.lock:
            self.completed += 1
            self.total_duration += duration
    
    async def on_task_failed(self, task: Task, error: Exception) -> None:
        async with self.lock:
            self.failed += 1
    
    def get_stats(self) -> dict:
        return {
            'completed': self.completed,
            'failed': self.failed,
            'avg_duration': self.total_duration / self.completed if self.completed > 0 else 0
        }
 
class AlertingObserver(WorkerObserver):
    """Отправка алертов при проблемах"""
    
    def __init__(self, error_threshold: int = 10):
        self.error_threshold = error_threshold
        self.recent_errors = 0
    
    async def on_task_started(self, task: Task) -> None:
        pass
    
    async def on_task_completed(self, task: Task, duration: float) -> None:
        self.recent_errors = max(0, self.recent_errors - 1)
    
    async def on_task_failed(self, task: Task, error: Exception) -> None:
        self.recent_errors += 1
        
        if self.recent_errors >= self.error_threshold:
            await self.send_alert(
                f"Высокий уровень ошибок: {self.recent_errors}"
            )
    
    async def send_alert(self, message: str) -> None:
        # Реальная отправка в Slack, email и т.д.
        logger.critical(f"ALERT: {message}")
Воркер уведомляет всех подписанных наблюдателей о каждом событии. Добавить новую метрику или способ мониторинга - создать нового наблюдателя и подписать его. Воркер остается неизменным.

Слоистая архитектура с правильными абстракциями делает код гибким и тестируемым. Каждый слой изолирован, изменения в одном не затрагивают другие. Паттерны проектирования - не академическая теория, а проверенные решения реальных проблем. Repository, Strategy, Command, Observer - инструменты, которые превращают клубок зависимостей в понятную структуру. Реальный production код требует масштабируемости и расширяемости. Монолитный класс на тысячу строк работает, пока проект небольшой. Когда система растет, добавляются новые источники данных, меняются требования к обработке - монолит превращается в кошмар поддержки.

Dependency Injection контейнер управляет зависимостями между компонентами. Вместо создания объектов внутри классов передаете их через конструктор. Тестирование упрощается - подменяете реальные зависимости моками.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class ServiceContainer:
    """Контейнер зависимостей"""
    
    def __init__(self):
        self._services = {}
        self._factories = {}
    
    def register(self, interface: type, implementation: type):
        """Регистрация реализации для интерфейса"""
        self._factories[interface] = implementation
    
    def register_instance(self, interface: type, instance):
        """Регистрация готового экземпляра"""
        self._services[interface] = instance
    
    def get(self, interface: type):
        """Получение сервиса"""
        if interface in self._services:
            return self._services[interface]
        
        if interface in self._factories:
            factory = self._factories[interface]
            instance = factory(self)
            self._services[interface] = instance
            return instance
        
        raise ValueError(f"Сервис {interface} не зарегистрирован")
 
class WorkerFactory:
    """Фабрика для создания воркеров"""
    
    def __init__(self, container: ServiceContainer):
        self.container = container
    
    async def create_worker(self, worker_id: int):
        """Создание сконфигурированного воркера"""
        return Worker(
            worker_id=worker_id,
            fetcher=self.container.get(HttpFetcher),
            parser_registry=self.container.get(ParserRegistry),
            repository=self.container.get(ArticleRepository),
            observers=self.container.get(list[WorkerObserver])
        )
Контейнер знает, как создавать и связывать компоненты. При изменении конфигурации правите только регистрацию сервисов, остальной код остается нетронутым. Тестирование превращается в подмену реализаций в контейнере.

Применение паттернов и слоистой архитектуры - инвестиция времени, которая окупается при первом крупном рефакторинге или добавлении функциональности. Код становится предсказуемым, изменения локализованы, тестирование перестает быть болью. Асинхронный парсер из демонстрационного примера превратился в гибкую систему, готовую к расширению без переписывания с нуля.

Асинхронное программирование в Python
Доброго времени суток. Столкнулся с проблемой что в коде вызывается функция которая ждёт условные...

Python asyncio / aiohttp API 429 response error
Пытаюсь написать асинхронный API-запрос, нашел подходящий пример, но в ответ получаю ошибку: &quot;429...

python. flet. asyncio.Queue
Всем привет, есть проблема с понимаем как работают очереди и/или проблема с тем, что уже...

Потоки, процессы и асинхронное программирование
Добрый вечер. Почитал документацию и по моему я только еще больше запутался. Есть модуль...

Асинхронное программирование
Не всегда включаюсь в тему асинхронного программирования, рассказали бы в краткой форме,...

Асинхронный парсер + Django
Привет. Пытаюсь создать приложение, которое выводит пользователю информацию с асинхронного парсера....

С чего начать разбираться с Asyncio?
нужно написать парсер страницы, с загрузкой некоторых картинок, на Asyncio. С многопоточным и...

Коннектор к API используя AsyncIO, aiohttp
Py3.5. Работаю с библиотекой, подключаюсь к api telegram. Получаю сообщения и отправляю их в...

Выйти из петли в run_forever() [asyncio]
Пишу скрипт, который эмулирует запуск игр в стиме. Для этого была выбрана библиотека Steam для...

Asyncio корректное завершение в случае непредвиденной остановки программы
Есть скрипт (выложен не полностью) @asyncio.coroutine def do_work(data): for adres, ip_list in...

Asyncio - coroutine vs future vs task
Доброго времени суток, форумчане! Приступил к изучению asyncio. Использую python 3.5. Гуру...

Можно ли поставить таски на паузу? (asyncio)
Всем привет! Имеется такой код: from aiohttp import web import asyncio async def...

Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Фото: Daniel Greenwood
kumehtar 13.11.2025
Расскажи мне о Мире, бродяга
kumehtar 12.11.2025
— Расскажи мне о Мире, бродяга, Ты же видел моря и метели. Как сменялись короны и стяги, Как эпохи стрелою летели. - Этот мир — это крылья и горы, Снег и пламя, любовь и тревоги, И бескрайние. . .
PowerShell Snippets
iNNOKENTIY21 11.11.2025
Модуль PowerShell 5. 1+ : Snippets. psm1 У меня модуль расположен в пользовательской папке модулей, по умолчанию: \Documents\WindowsPowerShell\Modules\Snippets\ А в самом низу файла-профиля. . .
PowerShell и онлайн сервисы. Валюта (floatrates.com руб.)
iNNOKENTIY21 11.11.2025
PowerShell функция floatrates-rub Примеры вызова: # Указанная валюта 'EUR' floatrates-rub -Code 'EUR' # Список имеющихся кодов валют floatrates-rub -Available function floatrates-rub {
PowerShell и онлайн сервисы. Погода (RP5.ru)
iNNOKENTIY21 11.11.2025
PowerShell функция Get-WeatherRP5rss для получения погоды с сервиса RP5 Примеры вызова Get-WeatherRP5rss с указанием id 5484 — Москва (восток, Измайлово) и переносом строки:. . .
PowerShell и онлайн сервисы. Погода (wttr)
iNNOKENTIY21 11.11.2025
PowerShell Функция для получения погоды с сервиса wttr Примеры вызова: Погода в городе Омск с прогнозом на день, можно изменить прогноз на более дней, для этого надо поменять запрос:. . .
PowerShell и онлайн сервисы. Валюта (ЦБР)
iNNOKENTIY21 11.11.2025
# Получение курса валют function cbr (] $Valutes = @('USD', 'EUR', 'CNY')) { $url = 'https:/ / www. cbr-xml-daily. ru/ daily_json. js' $data = Invoke-RestMethod -Uri $url $esc = 27 . . .
И решил я переделать этот ноут в машину для распределенных вычислений
Programma_Boinc 09.11.2025
И решил я переделать этот ноут в машину для распределенных вычислений Всем привет. А вот мой компьютер, переделанный из ноутбука. Был у меня ноут асус 2011 года. Со временем корпус превратился. . .
Мысли в слух
kumehtar 07.11.2025
Заметил среди людей, что по-настоящему верная дружба бывает между теми, с кем нечего делить.
Новая зверюга
volvo 07.11.2025
Подарок на Хеллоуин, и теперь у нас кроме Tuxedo Cat есть еще и щенок далматинца: Хочу еще Симбу взять, очень нравится. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru