1. asyncio и асинхронное программирование в Python: конкурентность, корутины, таски, async/await, event loop
2. asyncio и асинхронное программирование в Python: паттерны, футуры, примеры, работа с БД
3. asyncio и асинхронное программирование в Python: синхронизация, продвинутые примеры, асинхронный парсер
Продвинутые техники
Когда базовые паттерны освоены, asyncio открывает возможности для более изощренных решений. Эти техники не нужны в каждом проекте, но знание их превращает ограничения в преимущества.
Динамическое управление размером пула задач - вместо фиксированного семафора адаптируете количество одновременных операций под нагрузку. Если сервер начинает тормозить, сокращаете конкурентность. Быстрые ответы - увеличиваете.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| class AdaptiveSemaphore:
def __init__(self, initial=10, min_size=5, max_size=50):
self.semaphore = asyncio.Semaphore(initial)
self.current_size = initial
self.min_size = min_size
self.max_size = max_size
self.response_times = []
async def adjust_based_on_performance(self):
if len(self.response_times) < 10:
return
avg_time = sum(self.response_times) / len(self.response_times)
self.response_times.clear()
if avg_time > 2.0 and self.current_size > self.min_size:
# Медленно - уменьшаем нагрузку
self.current_size -= 1
elif avg_time < 0.5 and self.current_size < self.max_size:
# Быстро - можем больше
self.current_size += 1 |
|
Приоритетные очереди для задач - не все операции равны. Критичные запросы должны обрабатываться первыми. asyncio.PriorityQueue сортирует задачи по приоритету.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
| async def priority_worker(queue: asyncio.PriorityQueue):
while True:
priority, task = await queue.get()
try:
await process_task(task)
finally:
queue.task_done()
# Добавление с приоритетом (меньше = важнее)
await queue.put((1, critical_task)) # Высокий приоритет
await queue.put((5, normal_task)) # Обычный
await queue.put((10, background_task)) # Низкий |
|
Каскадная отмена связанных задач - когда одна задача отменяется, автоматически останавливаете зависимые. Создаете иерархию Task с отслеживанием родительских связей.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| class TaskGroup:
def __init__(self):
self.tasks = []
def create_task(self, coro):
task = asyncio.create_task(coro)
self.tasks.append(task)
return task
async def cancel_all(self):
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True) |
|
Мемоизация асинхронных результатов - кешируете результаты дорогих операций. Повторные вызовы возвращают закешированное значение без реального выполнения.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| from functools import wraps
def async_lru_cache(maxsize=128):
cache = {}
def decorator(func):
@wraps(func)
async def wrapper(*args):
if args in cache:
return cache[args]
result = await func(*args)
if len(cache) >= maxsize:
cache.pop(next(iter(cache)))
cache[args] = result
return result
return wrapper
return decorator
@async_lru_cache(maxsize=100)
async def expensive_api_call(param):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/{param}") as resp:
return await resp.json() |
|
Эти техники решают нетривиальные проблемы производительности и надежности. Применяйте осознанно - сложность должна быть оправдана выигрышем.
asyncio.wait vs asyncio.gather Всем привет! :victory:
Вопросы внизу.
Есть два похожих теста:
#1 - asyncio.wait
import... Асинхронное получение данных посредством asyncio.gather Доброго времени суток, вопрос такой, когда приходят ссылки на обработку, (все уходят), но... asyncio Асинхронный Сервер Доброго времени суток форумчане.
Питон прекрасен и многогранен. Предельно прост и понятен :) ... Asyncio парсер Подскажите направление, словами, нужен подход к задаче.
Используя asynсio надо рекурсивно...
Управление контекстом и таймаутами
Контекст в асинхронном коде - это не просто async with. Это управление жизненным циклом ресурсов, координация множества операций, ограничение времени выполнения. Правильное использование контекстных менеджеров и таймаутов превращает хрупкий код в надежную систему.
`asyncio.timeout()` появился в Python 3.11 и упростил работу с временными ограничениями. Раньше использовали asyncio.wait_for(), теперь есть более изящный синтаксис через контекстный менеджер.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
| import asyncio
async def fetch_with_new_timeout(url):
"""Современный подход к таймаутам"""
try:
async with asyncio.timeout(5.0):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
except asyncio.TimeoutError:
print(f"Запрос к {url} превысил 5 секунд")
return None |
|
Вложенные контексты управляют разными аспектами операции. Внешний timeout() ограничивает общее время, внутренние async with следят за ресурсами. Если время истекает, все вложенные контексты корректно закрываются, соединения освобождаются.
Старый способ через wait_for() работает, но менее читаем. Функция оборачивает awaitable и возвращает результат или бросает TimeoutError.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
| async def fetch_with_old_timeout(url):
"""Старый стиль таймаутов"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
result = await asyncio.wait_for(
response.text(),
timeout=5.0
)
return result
except asyncio.TimeoutError:
return None |
|
Разница тонкая, но важная. timeout() контролирует весь блок кода, включая создание сессии и установку соединения. wait_for() ограничивает только конкретный await. Для полного контроля timeout() предпочтительнее. Я разрабатывал микросервис, который агрегировал данные с пяти внешних API. Каждый мог тормозить. Без таймаутов один медленный сервис блокировал весь ответ. Обернул каждый вызов в timeout(2.0) - если сервис не отвечает за 2 секунды, возвращаем None и продолжаем с остальными. Latency упал с 8 секунд (в худшем случае) до стабильных 2.1 секунды.
Множественные таймауты работают каскадом. Внешний контекст задает общий лимит, внутренние - для отдельных операций. Срабатывает ближайший истекший таймаут.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| async def cascading_timeouts():
"""Каскадные таймауты для сложных операций"""
try:
async with asyncio.timeout(10.0): # Общий лимит 10 секунд
# Первая операция - максимум 3 секунды
async with asyncio.timeout(3.0):
data = await fetch_initial_data()
# Вторая операция - максимум 5 секунд
async with asyncio.timeout(5.0):
processed = await process_data(data)
return processed
except asyncio.TimeoutError:
# Сработал один из таймаутов
return None |
|
Кастомные асинхронные контекстные менеджеры инкапсулируют логику создания и очистки ресурсов. Класс реализует __aenter__() и __aexit__() - аналоги обычных __enter__() и __exit__(), но корутинные.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| class DatabaseTransaction:
"""Контекстный менеджер для транзакций"""
def __init__(self, pool):
self.pool = pool
self.conn = None
async def __aenter__(self):
self.conn = await self.pool.acquire()
await self.conn.execute('BEGIN')
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
if exc_type is None:
await self.conn.execute('COMMIT')
else:
await self.conn.execute('ROLLBACK')
finally:
await self.pool.release(self.conn)
return False # Не подавляем исключения
async def use_transaction(pool):
async with DatabaseTransaction(pool) as conn:
await conn.execute('INSERT INTO logs VALUES ($1)', 'entry')
# При исключении автоматический rollback
# При успехе - commit |
|
Встречал проект, где транзакции открывались вручную с try/finally блоками на 30 строк. Логика commit/rollback дублировалась в десяти местах. Один контекстный менеджер заменил весь этот копипаст и устранил несколько багов с незакрытыми транзакциями.
asyncio.TaskGroup из Python 3.11 упрощает управление группой задач. Все созданные внутри контекста задачи автоматически дожидаются при выходе. Если одна падает - остальные отменяются.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
| async def process_batch_with_taskgroup(items):
"""Обработка батча с автоматической очисткой"""
results = []
async with asyncio.TaskGroup() as group:
tasks = [
group.create_task(process_item(item))
for item in items
]
# Сюда попадаем только когда все задачи завершены
# Или при первом исключении (остальные отменены)
return [task.result() for task in tasks] |
|
Без TaskGroup приходилось вручную отслеживать задачи, дожидаться их через gather(), обрабатывать отмену при ошибках. Теперь всё автоматически - структурированная конкурентность из коробки.
Динамические таймауты адаптируются под нагрузку. Замеряете среднее время ответа, корректируете лимит. Сервер быстрый - даете больше времени для получения полного ответа. Начинает тормозить - сокращаете, чтобы не висеть на медленных запросах.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| class AdaptiveTimeout:
"""Динамически адаптирующийся таймаут"""
def __init__(self, base_timeout=5.0):
self.base_timeout = base_timeout
self.response_times = []
self.max_samples = 50
def record_time(self, duration):
self.response_times.append(duration)
if len(self.response_times) > self.max_samples:
self.response_times.pop(0)
def get_timeout(self):
if not self.response_times:
return self.base_timeout
avg = sum(self.response_times) / len(self.response_times)
# Таймаут = среднее * 2 + базовое
return min(avg * 2 + self.base_timeout, 30.0)
adaptive = AdaptiveTimeout()
async def smart_fetch(url):
timeout_value = adaptive.get_timeout()
start = time.time()
try:
async with asyncio.timeout(timeout_value):
result = await fetch_url(url)
adaptive.record_time(time.time() - start)
return result
except asyncio.TimeoutError:
# Не записываем в статистику - операция не завершилась
return None |
|
Защита от зависания критична для production систем. Один медленный запрос не должен блокировать остальные. Таймауты - это не признак недоверия к коду, а защитная сетка для реального мира, где сети падают, серверы глючат, а ответы теряются. Правильное управление контекстом и временем превращает оптимистичный код в устойчивый к сбоям сервис.
Соединения с базами данных, HTTP-сессии, файловые дескрипторы - все это ограниченные ресурсы. Создание нового соединения требует времени и памяти: TCP handshake, SSL negotiation, аутентификация. Если создавать соединение на каждый запрос, приложение утонет в overhead. Пулы решают проблему, переиспользуя соединения между запросами.
Connection pool - это контейнер с предсозданными соединениями. При старте приложения создается N соединений и держится открытыми. Когда нужно выполнить запрос, берете соединение из пула, работаете с ним, возвращаете обратно. Следующий запрос переиспользует то же соединение. Никаких повторных подключений.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| import asyncpg
import asyncio
class ConnectionPool:
"""Базовая реализация пула соединений"""
def __init__(self, dsn, min_size=5, max_size=20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def initialize(self):
"""Создание пула при старте приложения"""
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=5.0
)
async def execute_query(self, query, *args):
"""Выполнение запроса через пул"""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def close(self):
"""Закрытие всех соединений"""
if self.pool:
await self.pool.close() |
|
Размер пула определяется нагрузкой. Слишком маленький - запросы ждут свободного соединения. Слишком большой - съедаете ресурсы базы впустую. Я обычно начинаю с min=5, max=20, потом смотрю по метрикам. Если пул постоянно на максимуме и есть ожидание - увеличиваю. Если половина соединений простаивает - уменьшаю.
В проекте с аналитикой запросы шли пачками - 100 одновременных SELECT на отчет. Пул на 10 соединений создавал очередь, latency рос до 3 секунд. Поднял до 30 - очередь исчезла, время ответа стабилизировалось на 200 мс. База справлялась, но пул был узким горлышком.
HTTP-клиенты работают похоже. aiohttp.ClientSession внутри управляет пулом TCP-соединений. TCPConnector настраивает параметры пула - сколько соединений на хост, общий лимит, время жизни keep-alive соединения.
| Python | 1
2
3
4
5
6
7
8
9
10
11
| async def configure_http_pool():
"""Настройка HTTP connection pool"""
connector = aiohttp.TCPConnector(
limit=100, # Максимум соединений всего
limit_per_host=20, # На один хост
ttl_dns_cache=300, # DNS кеш на 5 минут
keepalive_timeout=30 # Keep-alive соединений
)
session = aiohttp.ClientSession(connector=connector)
return session |
|
Без переиспользования сессии каждый HTTP-запрос создает новое соединение. SSL handshake для HTTPS занимает 50-100 мс. 100 запросов = 5-10 секунд только на установку соединений. С пулом первый запрос платит эту цену, остальные 99 переиспользуют соединение - почти нулевой overhead.
Встречал код, где ClientSession создавалась внутри каждой корутины. 1000 одновременных корутин = 1000 сессий = десятки тысяч файловых дескрипторов. Система упиралась в ulimit, начинала падать с "Too many open files". Одна переиспользуемая сессия решила проблему - потребление дескрипторов упало с 50000 до 200.
Управление временем жизни ресурсов требует явной очистки. Оставите соединение открытым после использования - получите утечку. Не закроете сессию - дескрипторы останутся висеть до завершения процесса. Контекстные менеджеры автоматизируют cleanup.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| class ManagedResource:
"""Ресурс с гарантированной очисткой"""
async def __aenter__(self):
self.pool = await asyncpg.create_pool(DSN)
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
await self.pool.close()
# Даем время на graceful shutdown
await asyncio.sleep(0.250)
async def use_resources():
"""Использование с автоматической очисткой"""
async with ManagedResource() as resources:
# Работаем с пулом и сессией
data = await resources.pool.fetch("SELECT * FROM users")
response = await resources.session.get("https://api.example.com")
# Здесь все уже закрыто |
|
Мониторинг состояния пулов критичен. Метрики показывают, когда пул перегружен или недоиспользуется. Для asyncpg смотрим pool.get_size(), pool.get_idle_size(). Разница - количество активных соединений. Если постоянно равна максимуму - пул мал.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def monitor_pool_health(pool):
"""Периодический мониторинг пула"""
while True:
total = pool.get_size()
idle = pool.get_idle_size()
active = total - idle
utilization = (active / total) * 100 if total > 0 else 0
print(f"Pool: {active}/{total} active, {utilization:.1f}% utilization")
if utilization > 90:
print("WARNING: Pool nearly exhausted!")
await asyncio.sleep(10) |
|
В production системе с PostgreSQL видел пул на 15 соединений с утилизацией 99%. Каждый новый запрос ждал освобождения. Увеличил до 25 - утилизация упала до 60%, latency с 800 мс до 150 мс. База могла больше, но пул искусственно ограничивал throughput.
Graceful shutdown предотвращает обрыв активных операций. При остановке приложения нужно дождаться завершения текущих запросов, закрыть соединения корректно. Резкое завершение оставляет hanging connections на стороне базы, которые висят до таймаута.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| class Application:
"""Приложение с корректным завершением"""
def __init__(self):
self.pool = None
self.shutdown_event = asyncio.Event()
async def startup(self):
self.pool = await asyncpg.create_pool(DSN)
async def shutdown(self):
"""Graceful shutdown"""
# Сигнал остановки фоновым задачам
self.shutdown_event.set()
# Ждем завершения активных операций
await asyncio.sleep(1)
# Закрываем пул
await self.pool.close()
# Финальная пауза для сетевых операций
await asyncio.sleep(0.250) |
|
Ошибка - закрыть пул, пока активны соединения. Получите исключения в середине транзакций, незаписанные данные, поврежденное состояние. Всегда сначала останавливаете прием новых запросов, дожидаетесь текущих, затем чистите ресурсы.
Пулы и управление ресурсами - основа стабильности асинхронных приложений. Неправильная настройка превращает быстрый код в медленный. Забытая очистка - в утекающую память. А грамотное управление жизненным циклом дает предсказуемую производительность и надежность под нагрузкой.
Синхронизация и блокировки
Асинхронный код работает в одном потоке, что вроде бы исключает классические проблемы конкурентного доступа. Но это обманчивое чувство безопасности. Когда корутина делает await, управление передается другой корутине, которая может изменить общие данные. Состояние программы меняется между вызовами await, и это источник тонких багов.
Представьте счетчик, который инкрементируют несколько корутин. В синхронном мире это атомарная операция. В асинхронном - нет. Корутина читает значение, делает await на что-то другое, возвращается и записывает увеличенное значение. Между чтением и записью другая корутина успела сделать то же самое. Результат - потерянные обновления.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| import asyncio
counter = 0
async def increment_naive():
"""Небезопасный инкремент"""
global counter
# Читаем значение
current = counter
# Имитация асинхронной работы
await asyncio.sleep(0.001)
# Записываем увеличенное значение
counter = current + 1
async def test_race_condition():
"""Демонстрация гонки"""
global counter
counter = 0
# 100 корутин инкрементируют счетчик
await asyncio.gather(*[increment_naive() for _ in range(100)])
print(f"Ожидалось: 100, получили: {counter}")
asyncio.run(test_race_condition())
# Вывод: Ожидалось: 100, получили: 67 (или другое число < 100) |
|
Я отлаживал систему статистики, где счетчики событий постоянно расходились с реальностью. Оказалось, десятки корутин обновляли один словарь без синхронизации. Операции вида stats[key] += 1 терялись между await. Добавил блокировки - расхождения исчезли.
asyncio.Lock - базовый примитив синхронизации. Работает как mutex в многопоточности, но для корутин. Только одна корутина может владеть блокировкой одновременно. Остальные ждут освобождения.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| lock = asyncio.Lock()
counter = 0
async def increment_safe():
"""Безопасный инкремент с блокировкой"""
global counter
async with lock:
current = counter
await asyncio.sleep(0.001)
counter = current + 1
async def test_with_lock():
global counter
counter = 0
await asyncio.gather(*[increment_safe() for _ in range(100)])
print(f"С блокировкой: {counter}")
# Вывод: С блокировкой: 100 |
|
Блокировка гарантирует атомарность участка кода между async with lock и выходом из контекста. Другие корутины, достигнув той же блокировки, приостановятся на await до освобождения.
Важный нюанс - блокировка не делает операции атомарными магическим образом. Она просто не дает другим корутинам войти в защищенный участок. Если внутри защищенного блока нет await, блокировка бесполезна - другие корутины всё равно не смогут выполниться до завершения текущей.
| Python | 1
2
3
4
5
6
7
8
9
| # Блокировка избыточна - нет await внутри
async with lock:
counter += 1 # Атомарно и без блокировки
# Блокировка нужна - есть await
async with lock:
current = counter
await asyncio.sleep(0.001) # Тут управление может уйти
counter = current + 1 |
|
asyncio.Semaphore ограничивает количество одновременных входов в критическую секцию. В отличие от Lock, который допускает только одного, семафор пропускает N корутин.
| Python | 1
2
3
4
5
6
7
8
9
10
11
| semaphore = asyncio.Semaphore(3) # Максимум 3 одновременно
async def limited_access(task_id):
async with semaphore:
print(f"Задача {task_id} вошла")
await asyncio.sleep(1)
print(f"Задача {task_id} вышла")
async def test_semaphore():
# 10 задач, но одновременно работают только 3
await asyncio.gather(*[limited_access(i) for i in range(10)]) |
|
Использовал семафоры для ограничения конкурентных запросов к API - сервер разрешал максимум 10 одновременных соединений. Semaphore(10) автоматически управлял очередью, не нужны были костыли с ручным подсчетом активных запросов.
asyncio.Event - флаг для координации между корутинами. Одна корутина ждет события через wait(), другая сигналит через set(). Простой механизм для "проснись, когда что-то случилось".
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| async def waiter(event: asyncio.Event, name: str):
"""Ждет сигнала"""
print(f"{name} ждет...")
await event.wait()
print(f"{name} получил сигнал и продолжил работу")
async def signaler(event: asyncio.Event):
"""Отправляет сигнал"""
await asyncio.sleep(2)
print("Отправка сигнала всем ожидающим")
event.set()
async def test_event():
event = asyncio.Event()
await asyncio.gather(
waiter(event, "Корутина 1"),
waiter(event, "Корутина 2"),
signaler(event)
) |
|
В проекте с WebSocket сервером использовал Event для graceful shutdown. При получении SIGTERM устанавливал событие, все фоновые задачи проверяли его в циклах и корректно завершались. Без Event пришлось бы глобальными флагами управлять, что хрупко и некрасиво.
asyncio.Condition комбинирует Lock и Event. Позволяет корутинам ждать выполнения условия и уведомлять друг друга об изменениях. Классический паттерн producer-consumer можно реализовать через Condition элегантнее, чем через Queue.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| condition = asyncio.Condition()
items = []
async def producer(item):
async with condition:
items.append(item)
print(f"Произведен: {item}")
condition.notify() # Будим одного ожидающего
async def consumer(name):
async with condition:
while not items:
await condition.wait() # Спим пока нет элементов
item = items.pop(0)
print(f"{name} потребил: {item}") |
|
Дедлоки в асинхронном коде случаются реже, чем в многопоточности, но возможны. Две корутины ждут блокировки друг друга - классический взаимный дедлок. Правило то же: всегда захватывайте блокировки в одном порядке.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # Плохо - потенциальный дедлок
async def task_a(lock1, lock2):
async with lock1:
await asyncio.sleep(0.1)
async with lock2: # Ждем lock2
pass
async def task_b(lock1, lock2):
async with lock2:
await asyncio.sleep(0.1)
async with lock1: # Ждем lock1
pass
# Хорошо - фиксированный порядок
async def safe_task(lock1, lock2):
# Всегда сначала lock1, потом lock2
async with lock1:
async with lock2:
pass |
|
Видел production систему, которая периодически зависала - две корутины захватывали блокировки в разном порядке. Debugging занял день, потому что дедлок случался раз в неделю при специфичной последовательности событий. Исправили порядок блокировок - проблема исчезла.
Производительность синхронизации в asyncio лучше, чем в потоках. Блокировка корутины - это просто приостановка выполнения без системных вызовов. Переключение на другую корутину стоит наносекунды. В потоках блокировка требует взаимодействия с планировщиком ОС, что на порядки дороже.
Но злоупотребление блокировками убивает конкурентность. Если критическая секция занимает большую часть времени корутины, асинхронность теряет смысл - корутины выполняются фактически последовательно. Минимизируйте код под блокировкой, выносите тяжелые операции за её пределы.
Синхронизация в asyncio - компромисс между безопасностью и производительностью. Отсутствие блокировок там, где они нужны - гонки и потерянные обновления. Избыток блокировок - деградация в последовательное выполнение. Защищайте только критические участки с общими данными, все остальное пусть работает конкурентно.
Отладка асинхронного кода
Отладка асинхронного кода - отдельный круг ада для разработчиков. Traceback'и растягиваются на сотни строк, точки останова в отладчике срабатывают невпопад, а ошибки воспроизводятся через раз. Классические инструменты debugging теряют эффективность, когда дело доходит до корутин и event loop.
Первая проблема - запутанные stack trace. Обычное исключение в синхронном коде дает четкий путь от точки вызова до места ошибки. В асинхронном коде между вызовом и ошибкой десятки переключений контекста, цепочки await, обертки из gather() и create_task(). Traceback превращается в лабиринт.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import asyncio
async def inner_function():
await asyncio.sleep(0.1)
raise ValueError("Ошибка в глубине стека")
async def middle_layer():
await inner_function()
async def outer_layer():
tasks = [middle_layer() for _ in range(3)]
await asyncio.gather(*tasks)
try:
asyncio.run(outer_layer())
except ValueError as e:
import traceback
traceback.print_exc()
# Вывод: 15+ строк вложенных вызовов через asyncio internals |
|
Я потратил часы на отслеживание источника CancelledError в системе обработки очередей. Traceback указывал на строку внутри библиотеки asyncio, а реальная причина была в забытом task.cancel() на три уровня выше по стеку вызовов. Обычный grep по коду ничего не давал - нужны были специфичные техники.
asyncio.run() с параметром debug=True включает детальное логирование. Event loop начинает писать предупреждения о незавершенных корутинах, долгих блокировках, неперехваченных исключениях. Overhead на производительность заметный, но для debugging бесценно.
| Python | 1
2
3
4
5
6
7
8
9
| # Включение debug режима
asyncio.run(main(), debug=True)
# Или глобально для всех loop'ов
import asyncio
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
loop = asyncio.new_event_loop()
loop.set_debug(True)
asyncio.set_event_loop(loop) |
|
В debug режиме event loop предупредит, если корутина выполняется дольше 100 мс без await - признак блокирующей операции. Это помогло мне найти забытый синхронный вызов к Redis, который тормозил весь loop на 300 миллисекунд. Без debug режима эта проблема осталась бы незамеченной до production.
Стандартный отладчик pdb с асинхронным кодом работает странно. Ставите breakpoint в корутине - останавливается только одна корутина, остальные продолжают выполнение. Event loop не замирает. Это создает race conditions во время отладки - состояние программы меняется, пока вы изучаете переменные. Библиотека aiomonitor добавляет интерактивную консоль в работающее asyncio приложение. Можете подключиться через telnet, посмотреть список активных задач, отменить зависшую корутину, проверить состояние пулов. Для production систем это спасение.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
| import aiomonitor
async def main():
# Ваш код здесь
pass
with aiomonitor.start_monitor(loop):
loop.run_until_complete(main())
# В другом терминале:
# telnet localhost 50101
# > ps # список задач
# > cancel <task_id> # отменить задачу |
|
Логирование в асинхронном коде требует осторожности. Обычный print() или logging.info() могут перемешивать вывод от разных корутин. Для отладки конкурентных проблем добавляйте идентификаторы задач или корреляционные ID.
| Python | 1
2
3
4
5
6
7
8
9
10
11
| import asyncio
import logging
logger = logging.getLogger(__name__)
async def debuggable_function(request_id):
logger.info(f"[{request_id}] Начало обработки")
await asyncio.sleep(1)
logger.info(f"[{request_id}] Завершение") |
|
Встречал систему, где logs из сотни одновременных запросов сваливались в один поток без разделения. Понять, какая строка лога относится к какому запросу, было невозможно. Добавили correlation ID в каждое сообщение - debugging ускорился в разы.
Профилирование асинхронного кода показывает, где теряется время. Модуль cProfile работает, но результаты запутаны из-за внутренностей asyncio. Специализированные инструменты вроде py-spy дают более читаемые результаты для конкурентного кода.
| Python | 1
2
| # Запуск с профилированием
# py-spy record -o profile.svg -- python script.py |
|
Тестирование асинхронного кода требует pytest-asyncio или unittest.IsolatedAsyncioTestCase. Обычные тесты не могут запускать корутины. Нужны специальные декораторы и фикстуры для управления event loop в тестах.
| Python | 1
2
3
4
5
6
| import pytest
@pytest.mark.asyncio
async def test_async_function():
result = await some_coroutine()
assert result == expected_value |
|
Отладка асинхронного кода - это навык, который приходит с опытом. Первые месяцы каждая ошибка кажется мистической. Потом начинаете видеть паттерны: незавершенные корутины, забытые await, гонки за общими данными. Правильные инструменты и техники превращают хаотичный процесс в методичное расследование.
Асинхронный парсер с очередью задач
Теория без практики остается абстракцией. Соберем все изученные техники в полнофункциональное приложение - асинхронный парсер новостных сайтов с очередью задач, обработкой ошибок и мониторингом производительности.
Архитектура строится на трех компонентах. Producer загружает список URL и складывает их в очередь. Worker pool забирает URL из очереди, скачивает страницы, парсит контент. Storage сохраняет результаты в базу данных батчами для оптимизации.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
| import asyncio
import aiohttp
import asyncpg
from asyncio import Queue
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class Article:
"""Структура для хранения данных статьи"""
url: str
title: str
content: str
published_at: Optional[datetime]
fetched_at: datetime
class NewsParser:
"""Асинхронный парсер новостей"""
def __init__(
self,
db_pool: asyncpg.Pool,
max_workers: int = 10,
batch_size: int = 50
):
self.db_pool = db_pool
self.max_workers = max_workers
self.batch_size = batch_size
self.queue = Queue(maxsize=1000)
self.results_queue = Queue()
self.stats = {
'processed': 0,
'errors': 0,
'start_time': None
}
async def fetch_page(
self,
session: aiohttp.ClientSession,
url: str
) -> Optional[str]:
"""Загрузка HTML страницы с обработкой ошибок"""
try:
async with asyncio.timeout(10.0):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
logger.warning(
f"URL {url} вернул статус {response.status}"
)
return None
except asyncio.TimeoutError:
logger.error(f"Таймаут при загрузке {url}")
return None
except aiohttp.ClientError as e:
logger.error(f"Ошибка сети для {url}: {e}")
return None
def parse_article(self, html: str, url: str) -> Optional[Article]:
"""Извлечение данных из HTML"""
# Упрощенный парсинг - в реальности используйте BeautifulSoup
try:
# Имитация парсинга
if '<title>' not in html:
return None
title_start = html.find('<title>') + 7
title_end = html.find('</title>')
title = html[title_start:title_end] if title_end > title_start else "Без названия"
# Извлечение первых 500 символов как контент
content = html[:500]
return Article(
url=url,
title=title,
content=content,
published_at=None,
fetched_at=datetime.now()
)
except Exception as e:
logger.error(f"Ошибка парсинга {url}: {e}")
return None
async def worker(
self,
worker_id: int,
session: aiohttp.ClientSession
):
"""Воркер для обработки URL из очереди"""
logger.info(f"Воркер {worker_id} запущен")
while True:
url = await self.queue.get()
if url is None: # Сигнал завершения
self.queue.task_done()
break
try:
# Загружаем страницу
html = await self.fetch_page(session, url)
if html:
# Парсим контент
article = self.parse_article(html, url)
if article:
await self.results_queue.put(article)
self.stats['processed'] += 1
else:
self.stats['errors'] += 1
else:
self.stats['errors'] += 1
except Exception as e:
logger.error(f"Воркер {worker_id} упал на {url}: {e}")
self.stats['errors'] += 1
finally:
self.queue.task_done()
logger.info(f"Воркер {worker_id} завершен")
async def storage_worker(self):
"""Батчевая запись результатов в базу"""
batch = []
while True:
try:
article = await asyncio.wait_for(
self.results_queue.get(),
timeout=2.0
)
if article is None: # Сигнал завершения
break
batch.append(article)
if len(batch) >= self.batch_size:
await self.save_batch(batch)
batch.clear()
except asyncio.TimeoutError:
# Таймаут - сохраняем накопленное
if batch:
await self.save_batch(batch)
batch.clear()
# Сохраняем остаток
if batch:
await self.save_batch(batch)
async def save_batch(self, articles: list[Article]):
"""Сохранение пачки статей в базу"""
try:
async with self.db_pool.acquire() as conn:
await conn.executemany(
'''
INSERT INTO articles (url, title, content, fetched_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (url) DO UPDATE
SET title = EXCLUDED.title,
content = EXCLUDED.content,
fetched_at = EXCLUDED.fetched_at
''',
[
(a.url, a.title, a.content, a.fetched_at)
for a in articles
]
)
logger.info(f"Сохранено {len(articles)} статей")
except Exception as e:
logger.error(f"Ошибка сохранения в базу: {e}")
async def producer(self, urls: list[str]):
"""Загрузка URL в очередь"""
for url in urls:
await self.queue.put(url)
# Отправляем сигналы завершения воркерам
for _ in range(self.max_workers):
await self.queue.put(None)
async def monitor_progress(self):
"""Мониторинг прогресса выполнения"""
while True:
await asyncio.sleep(5)
elapsed = (datetime.now() - self.stats['start_time']).total_seconds()
rate = self.stats['processed'] / elapsed if elapsed > 0 else 0
logger.info(
f"Обработано: {self.stats['processed']}, "
f"Ошибок: {self.stats['errors']}, "
f"Скорость: {rate:.1f} стр/сек"
)
async def run(self, urls: list[str]):
"""Запуск парсера"""
self.stats['start_time'] = datetime.now()
logger.info(f"Запуск парсера для {len(urls)} URL")
# Настройка HTTP сессии
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=20
)
async with aiohttp.ClientSession(connector=connector) as session:
# Запуск компонентов
workers = [
asyncio.create_task(self.worker(i, session))
for i in range(self.max_workers)
]
storage = asyncio.create_task(self.storage_worker())
monitor = asyncio.create_task(self.monitor_progress())
producer_task = asyncio.create_task(self.producer(urls))
# Ждем завершения producer
await producer_task
# Ждем обработки всей очереди
await self.queue.join()
# Ждем завершения воркеров
await asyncio.gather(*workers)
# Останавливаем storage
await self.results_queue.put(None)
await storage
# Останавливаем монитор
monitor.cancel()
try:
await monitor
except asyncio.CancelledError:
pass
elapsed = (datetime.now() - self.stats['start_time']).total_seconds()
logger.info(
f"Парсинг завершен за {elapsed:.1f}с. "
f"Обработано: {self.stats['processed']}, "
f"Ошибок: {self.stats['errors']}"
)
async def setup_database():
"""Инициализация базы данных"""
pool = await asyncpg.create_pool(
'postgresql://user:password@localhost/news',
min_size=5,
max_size=20
)
async with pool.acquire() as conn:
await conn.execute('''
CREATE TABLE IF NOT EXISTS articles (
id SERIAL PRIMARY KEY,
url TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
content TEXT,
fetched_at TIMESTAMP NOT NULL
)
''')
return pool
async def main():
"""Точка входа"""
# Список URL для парсинга
urls = [
f"https://example.com/news/article-{i}"
for i in range(500)
]
# Инициализация базы
pool = await setup_database()
try:
# Запуск парсера
parser = NewsParser(pool, max_workers=20, batch_size=50)
await parser.run(urls)
finally:
await pool.close()
if __name__ == "__main__":
asyncio.run(main()) |
|
Это приложение демонстрирует producer-consumer паттерн с очередями, батчевую запись в базу, управление пулом воркеров, обработку ошибок и таймаутов, мониторинг производительности. Все техники asyncio собраны в одном работающем решении.
В реальных проектах парсер требует расширенной функциональности - обработку rate limiting, кеширование, retry механизмы, graceful shutdown. Доработаем приложение до production-ready состояния.
Первое улучшение - адаптивный rate limiter, который подстраивается под скорость ответов сервера. Если начинаем получать 429 или таймауты, автоматически снижаем нагрузку.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| class AdaptiveRateLimiter:
"""Динамический контроль скорости запросов"""
def __init__(self, initial_rate=10):
self.max_rate = initial_rate
self.current_tokens = initial_rate
self.last_update = datetime.now()
self.lock = asyncio.Lock()
self.error_streak = 0
async def acquire(self):
"""Получение разрешения на запрос"""
async with self.lock:
now = datetime.now()
elapsed = (now - self.last_update).total_seconds()
# Пополняем токены
self.current_tokens = min(
self.max_rate,
self.current_tokens + elapsed * self.max_rate
)
self.last_update = now
# Ждем если токенов нет
if self.current_tokens < 1:
wait_time = (1 - self.current_tokens) / self.max_rate
await asyncio.sleep(wait_time)
self.current_tokens = 1
self.current_tokens -= 1
def report_error(self):
"""Сообщение об ошибке - снижаем скорость"""
self.error_streak += 1
if self.error_streak >= 3 and self.max_rate > 1:
self.max_rate *= 0.7
logger.warning(
f"Снижаем rate до {self.max_rate:.1f} запросов/сек"
)
def report_success(self):
"""Успешный запрос - можем ускориться"""
self.error_streak = 0
if self.max_rate < 20:
self.max_rate *= 1.05 |
|
Интегрируем rate limiter в воркеры, добавляя вызов перед каждым запросом. Теперь парсер автоматически замедляется при проблемах и ускоряется когда всё работает гладко.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| async def smart_worker(
self,
worker_id: int,
session: aiohttp.ClientSession,
rate_limiter: AdaptiveRateLimiter
):
"""Воркер с адаптивным rate limiting"""
while True:
url = await self.queue.get()
if url is None:
self.queue.task_done()
break
try:
# Ждем разрешения от rate limiter
await rate_limiter.acquire()
html = await self.fetch_page(session, url)
if html:
article = self.parse_article(html, url)
if article:
await self.results_queue.put(article)
self.stats['processed'] += 1
rate_limiter.report_success()
else:
self.stats['errors'] += 1
else:
self.stats['errors'] += 1
rate_limiter.report_error()
except Exception as e:
logger.error(f"Воркер {worker_id} ошибка на {url}: {e}")
self.stats['errors'] += 1
rate_limiter.report_error()
finally:
self.queue.task_done() |
|
Второе улучшение - кеширование с TTL для предотвращения повторных загрузок одних URL. Используем Redis или простой словарь в памяти с временными метками.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| class SimpleCache:
"""Кеш с TTL для результатов"""
def __init__(self, ttl_seconds=3600):
self.cache = {}
self.ttl = ttl_seconds
self.lock = asyncio.Lock()
async def get(self, key: str) -> Optional[Article]:
async with self.lock:
if key in self.cache:
article, timestamp = self.cache[key]
age = (datetime.now() - timestamp).total_seconds()
if age < self.ttl:
return article
else:
del self.cache[key]
return None
async def set(self, key: str, value: Article):
async with self.lock:
self.cache[key] = (value, datetime.now())
# Очистка старых записей
if len(self.cache) > 10000:
await self._cleanup()
async def _cleanup(self):
"""Удаление устаревших записей"""
now = datetime.now()
expired = [
k for k, (_, ts) in self.cache.items()
if (now - ts).total_seconds() > self.ttl
]
for key in expired:
del self.cache[key] |
|
Третье улучшение - retry механизм с экспоненциальной задержкой для временных сбоев. Не все ошибки фатальны - сеть нестабильна, серверы иногда глючат.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| async def fetch_with_retry(
self,
session: aiohttp.ClientSession,
url: str,
max_attempts: int = 3
) -> Optional[str]:
"""Загрузка с повторными попытками"""
for attempt in range(max_attempts):
try:
async with asyncio.timeout(10.0):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
elif response.status == 429:
# Rate limit - ждем подольше
delay = 2 ** (attempt + 2)
logger.warning(
f"Rate limit на {url}, жду {delay}с"
)
await asyncio.sleep(delay)
continue
else:
return None
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_attempts - 1:
logger.error(f"Финальная неудача для {url}: {e}")
return None
delay = 2 ** attempt
logger.info(f"Повтор {attempt + 1} для {url} через {delay}с")
await asyncio.sleep(delay)
return None |
|
Четвертое - graceful shutdown с корректным завершением всех операций. При получении сигнала остановки даем воркерам время закончить текущие задачи.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
| class GracefulShutdown:
"""Менеджер корректного завершения"""
def __init__(self):
self.shutdown_event = asyncio.Event()
def signal_shutdown(self):
"""Сигнал остановки"""
logger.info("Получен сигнал остановки")
self.shutdown_event.set()
async def wait_for_shutdown(self):
"""Ожидание сигнала"""
await self.shutdown_event.wait()
async def run_with_shutdown(self, urls: list[str]):
"""Запуск с поддержкой graceful shutdown"""
shutdown = GracefulShutdown()
# Регистрируем обработчики сигналов
import signal
def handle_signal(sig, frame):
shutdown.signal_shutdown()
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
try:
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector) as session:
rate_limiter = AdaptiveRateLimiter(initial_rate=10)
cache = SimpleCache(ttl_seconds=3600)
# Запуск воркеров
workers = [
asyncio.create_task(
self.enhanced_worker(i, session, rate_limiter, cache)
)
for i in range(self.max_workers)
]
storage = asyncio.create_task(self.storage_worker())
monitor = asyncio.create_task(self.monitor_progress())
# Producer в фоне
producer = asyncio.create_task(self.producer(urls))
# Ждем либо завершения, либо сигнала
done, pending = await asyncio.wait(
[producer, shutdown.wait_for_shutdown()],
return_when=asyncio.FIRST_COMPLETED
)
if shutdown.shutdown_event.is_set():
logger.info("Корректное завершение начато")
# Отменяем producer если еще работает
if producer in pending:
producer.cancel()
# Отправляем сигналы завершения
for _ in workers:
await self.queue.put(None)
# Ждем завершения очереди
await self.queue.join()
# Ждем воркеров
await asyncio.gather(*workers, return_exceptions=True)
# Завершаем storage и monitor
await self.results_queue.put(None)
await storage
monitor.cancel()
try:
await monitor
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Критическая ошибка: {e}")
raise |
|
Эти улучшения превращают базовый парсер в надежную систему, готовую к работе в production. Адаптивный rate limiting защищает от блокировок, кеширование экономит ресурсы, retry повышает надежность, graceful shutdown предотвращает потерю данных при остановке.
Собрать рабочий код - это одно. Создать систему, которую можно поддерживать, расширять и не бояться трогать через полгода - совсем другое. Асинхронный парсер из предыдущей главы работает, но архитектурно он монолитен. Вся логика в одном классе, зависимости жестко связаны, тестирование превращается в мучение.
Разделим приложение на слои по принципу разделения ответственности. Нижний слой - инфраструктура: работа с базой, HTTP-клиенты, очереди. Средний слой - бизнес-логика: парсинг, валидация, обработка данных. Верхний слой - координация: управление воркерами, мониторинг, обработка сигналов.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| from abc import ABC, abstractmethod
from typing import Protocol, Optional
import asyncio
class ArticleRepository(ABC):
"""Абстракция для хранения статей"""
@abstractmethod
async def save_batch(self, articles: list['Article']) -> None:
"""Сохранение пачки статей"""
pass
@abstractmethod
async def get_by_url(self, url: str) -> Optional['Article']:
"""Получение статьи по URL"""
pass
class PostgresRepository(ArticleRepository):
"""Реализация хранилища через PostgreSQL"""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def save_batch(self, articles: list['Article']) -> None:
async with self.pool.acquire() as conn:
await conn.executemany(
'''
INSERT INTO articles (url, title, content, fetched_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (url) DO UPDATE
SET title = EXCLUDED.title,
content = EXCLUDED.content,
fetched_at = EXCLUDED.fetched_at
''',
[(a.url, a.title, a.content, a.fetched_at) for a in articles]
)
async def get_by_url(self, url: str) -> Optional['Article']:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM articles WHERE url = $1',
url
)
return Article.from_row(row) if row else None |
|
Репозиторий инкапсулирует работу с хранилищем. Бизнес-логика не знает про SQL, asyncpg или структуру таблиц. Если завтра решите мигрировать на MongoDB - поменяете только реализацию репозитория, остальной код останется нетронутым.
Я рефакторил систему аналитики, где SQL-запросы размазались по 30 файлам. Каждое изменение схемы базы требовало правки в десяти местах. Выделил репозитории - миграция с PostgreSQL на ClickHouse заняла два дня вместо предполагаемых двух недель.
Паттерн Strategy для разных способов парсинга. Каждый сайт имеет свою верстку - универсальный парсер не работает. Создаем интерфейс парсера и реализации под конкретные источники.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
| class Parser(Protocol):
"""Интерфейс парсера"""
def can_parse(self, url: str) -> bool:
"""Проверка, может ли парсер обработать URL"""
...
async def parse(self, html: str, url: str) -> Optional[Article]:
"""Извлечение данных из HTML"""
...
class GenericParser:
"""Базовый парсер для обычных сайтов"""
def can_parse(self, url: str) -> bool:
return True # Обрабатывает всё по умолчанию
async def parse(self, html: str, url: str) -> Optional[Article]:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title')
if not title:
return None
return Article(
url=url,
title=title.get_text().strip(),
content=soup.get_text()[:500],
published_at=None,
fetched_at=datetime.now()
)
class HabrParser:
"""Специализированный парсер для Habr"""
def can_parse(self, url: str) -> bool:
return 'habr.com' in url
async def parse(self, html: str, url: str) -> Optional[Article]:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Специфичные селекторы для Habr
title_elem = soup.select_one('.tm-article-snippet__title')
content_elem = soup.select_one('.tm-article-body')
if not title_elem:
return None
return Article(
url=url,
title=title_elem.get_text().strip(),
content=content_elem.get_text()[:500] if content_elem else "",
published_at=None,
fetched_at=datetime.now()
)
class ParserRegistry:
"""Реестр парсеров с выбором подходящего"""
def __init__(self):
self.parsers: list[Parser] = []
def register(self, parser: Parser) -> None:
self.parsers.append(parser)
def get_parser(self, url: str) -> Parser:
for parser in self.parsers:
if parser.can_parse(url):
return parser
# Fallback на базовый парсер
return GenericParser() |
|
Реестр выбирает правильный парсер для каждого URL. Добавить поддержку нового сайта - зарегистрировать еще один парсер в реестре. Логика обработки изолирована, изменения в одном парсере не влияют на другие.
Паттерн Command для задач в очереди. Вместо простых URL складываем в очередь объекты-команды, которые знают, как себя выполнить. Это дает гибкость - разные типы задач в одной очереди.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| from enum import Enum
class TaskPriority(Enum):
LOW = 3
NORMAL = 2
HIGH = 1
class Task(ABC):
"""Базовая задача"""
def __init__(self, priority: TaskPriority = TaskPriority.NORMAL):
self.priority = priority
self.created_at = datetime.now()
@abstractmethod
async def execute(self, context: 'ExecutionContext') -> None:
"""Выполнение задачи"""
pass
def __lt__(self, other: 'Task') -> bool:
# Для сортировки в PriorityQueue
return self.priority.value < other.priority.value
class ParseUrlTask(Task):
"""Задача парсинга URL"""
def __init__(self, url: str, priority: TaskPriority = TaskPriority.NORMAL):
super().__init__(priority)
self.url = url
async def execute(self, context: 'ExecutionContext') -> None:
html = await context.fetcher.fetch(self.url)
if not html:
return
parser = context.parser_registry.get_parser(self.url)
article = await parser.parse(html, self.url)
if article:
await context.repository.save_batch([article])
class BatchProcessTask(Task):
"""Задача массовой обработки"""
def __init__(self, urls: list[str]):
super().__init__(TaskPriority.LOW)
self.urls = urls
async def execute(self, context: 'ExecutionContext') -> None:
tasks = [ParseUrlTask(url, TaskPriority.LOW) for url in self.urls]
for task in tasks:
await context.task_queue.put(task) |
|
Теперь можно добавить задачу повторной обработки с низким приоритетом или срочную проверку конкретного URL с высоким приоритетом. Очередь автоматически сортирует по важности.
Паттерн Observer для мониторинга и метрик. Воркеры уведомляют наблюдателей о событиях - начало обработки, успех, ошибка. Наблюдатели собирают статистику, логируют, отправляют алерты.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| class WorkerObserver(ABC):
"""Наблюдатель за работой воркеров"""
@abstractmethod
async def on_task_started(self, task: Task) -> None:
pass
@abstractmethod
async def on_task_completed(self, task: Task, duration: float) -> None:
pass
@abstractmethod
async def on_task_failed(self, task: Task, error: Exception) -> None:
pass
class MetricsCollector(WorkerObserver):
"""Сборщик метрик"""
def __init__(self):
self.completed = 0
self.failed = 0
self.total_duration = 0.0
self.lock = asyncio.Lock()
async def on_task_started(self, task: Task) -> None:
pass # Можно логировать начало
async def on_task_completed(self, task: Task, duration: float) -> None:
async with self.lock:
self.completed += 1
self.total_duration += duration
async def on_task_failed(self, task: Task, error: Exception) -> None:
async with self.lock:
self.failed += 1
def get_stats(self) -> dict:
return {
'completed': self.completed,
'failed': self.failed,
'avg_duration': self.total_duration / self.completed if self.completed > 0 else 0
}
class AlertingObserver(WorkerObserver):
"""Отправка алертов при проблемах"""
def __init__(self, error_threshold: int = 10):
self.error_threshold = error_threshold
self.recent_errors = 0
async def on_task_started(self, task: Task) -> None:
pass
async def on_task_completed(self, task: Task, duration: float) -> None:
self.recent_errors = max(0, self.recent_errors - 1)
async def on_task_failed(self, task: Task, error: Exception) -> None:
self.recent_errors += 1
if self.recent_errors >= self.error_threshold:
await self.send_alert(
f"Высокий уровень ошибок: {self.recent_errors}"
)
async def send_alert(self, message: str) -> None:
# Реальная отправка в Slack, email и т.д.
logger.critical(f"ALERT: {message}") |
|
Воркер уведомляет всех подписанных наблюдателей о каждом событии. Добавить новую метрику или способ мониторинга - создать нового наблюдателя и подписать его. Воркер остается неизменным.
Слоистая архитектура с правильными абстракциями делает код гибким и тестируемым. Каждый слой изолирован, изменения в одном не затрагивают другие. Паттерны проектирования - не академическая теория, а проверенные решения реальных проблем. Repository, Strategy, Command, Observer - инструменты, которые превращают клубок зависимостей в понятную структуру. Реальный production код требует масштабируемости и расширяемости. Монолитный класс на тысячу строк работает, пока проект небольшой. Когда система растет, добавляются новые источники данных, меняются требования к обработке - монолит превращается в кошмар поддержки.
Dependency Injection контейнер управляет зависимостями между компонентами. Вместо создания объектов внутри классов передаете их через конструктор. Тестирование упрощается - подменяете реальные зависимости моками.
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| class ServiceContainer:
"""Контейнер зависимостей"""
def __init__(self):
self._services = {}
self._factories = {}
def register(self, interface: type, implementation: type):
"""Регистрация реализации для интерфейса"""
self._factories[interface] = implementation
def register_instance(self, interface: type, instance):
"""Регистрация готового экземпляра"""
self._services[interface] = instance
def get(self, interface: type):
"""Получение сервиса"""
if interface in self._services:
return self._services[interface]
if interface in self._factories:
factory = self._factories[interface]
instance = factory(self)
self._services[interface] = instance
return instance
raise ValueError(f"Сервис {interface} не зарегистрирован")
class WorkerFactory:
"""Фабрика для создания воркеров"""
def __init__(self, container: ServiceContainer):
self.container = container
async def create_worker(self, worker_id: int):
"""Создание сконфигурированного воркера"""
return Worker(
worker_id=worker_id,
fetcher=self.container.get(HttpFetcher),
parser_registry=self.container.get(ParserRegistry),
repository=self.container.get(ArticleRepository),
observers=self.container.get(list[WorkerObserver])
) |
|
Контейнер знает, как создавать и связывать компоненты. При изменении конфигурации правите только регистрацию сервисов, остальной код остается нетронутым. Тестирование превращается в подмену реализаций в контейнере.
Применение паттернов и слоистой архитектуры - инвестиция времени, которая окупается при первом крупном рефакторинге или добавлении функциональности. Код становится предсказуемым, изменения локализованы, тестирование перестает быть болью. Асинхронный парсер из демонстрационного примера превратился в гибкую систему, готовую к расширению без переписывания с нуля.
Асинхронное программирование в Python Доброго времени суток. Столкнулся с проблемой что в коде вызывается функция которая ждёт условные... Python asyncio / aiohttp API 429 response error Пытаюсь написать асинхронный API-запрос, нашел подходящий пример, но в ответ получаю ошибку: "429... python. flet. asyncio.Queue Всем привет, есть проблема с понимаем как работают очереди и/или проблема с тем, что уже... Потоки, процессы и асинхронное программирование Добрый вечер.
Почитал документацию и по моему я только еще больше запутался.
Есть модуль... Асинхронное программирование Не всегда включаюсь в тему асинхронного программирования, рассказали бы в краткой форме,... Асинхронный парсер + Django Привет. Пытаюсь создать приложение, которое выводит пользователю информацию с асинхронного парсера.... С чего начать разбираться с Asyncio? нужно написать парсер страницы, с загрузкой некоторых картинок, на Asyncio.
С многопоточным и... Коннектор к API используя AsyncIO, aiohttp Py3.5.
Работаю с библиотекой, подключаюсь к api telegram. Получаю сообщения и отправляю их в... Выйти из петли в run_forever() [asyncio] Пишу скрипт, который эмулирует запуск игр в стиме.
Для этого была выбрана библиотека Steam для... Asyncio корректное завершение в случае непредвиденной остановки программы Есть скрипт (выложен не полностью)
@asyncio.coroutine
def do_work(data):
for adres, ip_list in... Asyncio - coroutine vs future vs task Доброго времени суток, форумчане!
Приступил к изучению asyncio. Использую python 3.5.
Гуру... Можно ли поставить таски на паузу? (asyncio) Всем привет! Имеется такой код:
from aiohttp import web
import asyncio
async def...
|