ETL-конвейеры – это набор процессов, отвечающих за извлечение данных из различных источников (Extract), их преобразование в нужный формат (Transform) и загрузку в целевое хранилище (Load). Современные компании оперируют сотнями терабайт информации, которые поступают из десятков разнородных систем: от древних корпоративных монстров на COBOL до навороченных REST API с непредсказуемым поведением. Создание надёжных потоков данных напоминает игру в инженерный тетрис, где нужно учесть массу факторов: гарантировать целостность и непротиворечивость данных, обеспечить масштабируемость при росте объемов, отлавливать и корректно обрабатывать ошыбки, выдерживать временные SLA и оптимально использовать вычислительные ресурсы. Особенно сложно обеспечить идемпотентность – свойство операции давать одинаковый результат при повторном выполнении, критичное для надёжных конвейеров.
В архитектурном плане ETL-конвейеры прошли эволюцию от монолитных пакетных ночных задач до микросервисных потоков, работающих в режиме реального времени. На практике можно выделить несколько паттернов:
1. Пакетный ETL – классический подход с обработкой больших массивов данных по расписанию. Простой, надёжный, но не всегда отвечающий современным требованиям к скорости получения информации.
2. Инкрементальный ETL – обработка только новых или измененных данных. Сложнее в реализации, но значительно экономит ресурсы.
3. Потоковый ETL – обработка данных по мере их поступления, практически в реальном времени. Требует особой архитектуры и инструментов.
4. Гибридный ETL – комбинация подходов, когда часть данных обрабатывается в потоке, а часть – пакетно.
За свою карьеру мне доводилось видеть ETL-конвейеры в самых разных обличьях – от скриптов на bash, запускаемых по cron, до высоконагруженных распределенных систем на Apache Airflow, Spark и Kafka. И вот что могу сказать: идеальный инструмент для работы с данными – тот, который решает вашу конкретную задачу с минимальными накладными расходами.
А теперь давайте взглянем на Apache Airflow – инструмент, который произвел настоящую революцию в разработке и оркестрации ETL-конвейеров, позволив разработчикам описывать сложнейшие рабочие процессы на чистом Python.
Apache Airflow: оркестратор рабочих процессов
Представьте, что вам поручили ежедневно собирать данные из десяти различных источников, трансформировать их по сложным бизнес-правилам и загружать в хранилище данных. А теперь добавьте сюда обработку ошибок, повторные попытки, уведомления и зависимости между задачами. Ещё пару лет назад это заставило бы вас писать сложную систему управления скриптами, крон-задачами и командами запуска. Сегодня у нас есть Apache Airflow – оркестратор рабочих процессов, который решает именно эту головную боль.
Apache Airflow родился в недрах Airbnb и быстро стал проектом Apache Foundation. Его основная идея гениальна в своей простоте: описывать рабочие процессы в виде направленных ациклических графов (DAG) с помощью чистого Python-кода. Каждый DAG – это набор задач и их зависимостей, а Python-код позволяет добавить любую логику, от ветвления до динамического создания задач.
Архитектурно Airflow состоит из нескольких ключевых компонентов, как в хорошо отлаженном оркестре:
1. Scheduler – дирижёр всего оркестра. Отвечает за планирование и запуск задач в нужное время. Он постоянно сканирует директорию с DAG-файлами, парсит их и отправляет задачи на выполнение в соответствии с заданным расписанием. Внутри планировщик использует сложную логику для определения, какие задачи готовы к выполнению, учитывая их зависимости и результаты предыдущих запусков.
2. Executor – исполнитель, который решает, как и где выполнять задачи. Airflow поддерживает множество типов исполнителей: от простого SequentialExecutor для отладки до CeleryExecutor и KubernetesExecutor для распределенных высоконагруженных систем. Выбор правильного исполнителя – ключевой фактор для производительности. В своей практике я всегда начинаю с LocalExecutor для простоты, но в продакшене почти всегда переключаюсь на Celery или Kubernetes.
3. Webserver – веб-интерфейс для мониторинга и управления. Это удобная консоль, где можно увидеть все DAGи, их статусы, логи выполнения и даже запустить задачи вручную. Кто бы мог подумать, что наблюдать за работающими процессами может быть так же увлекательно, как смотреть на горящий костер или текущую воду?
4. Workers – рабочие лошадки, которые выполняют конкретные задачи. В зависимости от выбранного исполнителя, рабочие процессы могут быть локальными или распределенными по разным серверам.
5. Metadata Database – сердце системы, хранящее состояние всех DAG-ов и задач. Обычно это PostgreSQL или MySQL, но можно использовать и SQLite для простых сценариев. База данных – критический компонент, потеря которого приведет к амнезии всей системы.
Webserver и Scheduler работают в тесном тандеме, обмениваясь информацией через метаданные в базе данных. Scheduler определяет, какие задачи должны быть выполнены, и отправляет их в очередь. Webserver отображает текущее состояние всей системы и позволяет взаимодействовать с ней. Эта архитектура обеспечивает удивительную гибкость: вы можете масштабировать каждый компонент независимо, в зависимости от ваших потребностей.
Управление состоянием – ещё одна сильная сторона Airflow. Каждое выполнение задачи сохраняет своё состояние в базе данных, что позволяет легко отслеживать историю, восстанавливаться после сбоев и анализировать производительность. Эта функциональность крайне важна для ETL-процессов, где надёжность и возможность аудита критичны. В сравнении с конкурентами вроде Luigi, Oozie или даже AWS Step Functions, Airflow выигрывает благодаря нескольким ключевым преимуществам:
Pythonic подход – вся конфигурация на чистом Python, что дает невероятную гибкость. Хотите добавить условную логику? Пожалуйста! Нужно динамически создавать задачи? Легко!
Богатая экосистема операторов – готовые компоненты для интеграции с практически любой системой: базами данных, облачными сервисами, системами обмена сообщениями и т.д.
Мощный UI – интуитивно понятный интерфейс с возможностью детального анализа выполнения задач.
Активное сообщество – тысячи разработчиков по всему миру постоянно улучшают Airflow и делятся опытом.
Один из самых недооцененных, но критичных аспектов Airflow – это его модель метаданных. База данных хранит всю информацию о DAG-ах, задачах, их зависимостях и результатах выполнения. Это дает возможность не только отслеживать текущий статус, но и анализировать исторические тренды производительности, что бесценно для оптимизации.
Airflow позволяет настроить аутентификацию через множество провайдеров: внутренняя база пользователей, LDAP, OAuth или даже кастомные решения. Я однажды интегрировал Airflow с корпоративной системой единого входа, и шок команды от того, что "эта штука реально работает с нашей древней инфраструктурой", был бесценен. Важный аспект безопасности – шифрование переменных и соединений. Airflow позволяет хранить чувствительные данные, такие как пароли и API-ключи, в зашифрованном виде, используя библиотеку Fernet. Этот механизм не идеален (ключ шифрования всё еще нужно хранить где-то безопасно), но значительно лучше, чем хранить учетные данные в открытом виде в коде.
Когда дело доходит до масштабирования, Airflow раскрывает всю свою мощь. От скромного локального развертывания на ноутбуке разработчика до распределенного кластера в облаке – путь может быть удивительно гладким. Секрет в модульной архитектуре и разнообразии исполнителей (executors).
Начинается всё обычно с LocalExecutor, который отлично подходит для разработки и небольших нагрузок. Он запускает задачи как отдельные процессы на том же сервере, где работает планировщик. Простота – его основное преимущество, но масштабируемость оставляет желать лучшего. Когда объем задач растёт, следующий шаг – CeleryExecutor. Он использует Celery (распределенную очередь задач) для координации работы между несколькими рабочими серверами. Этот подход позволяет горизонтально масштабировать систему, добавляя новые воркеры по мере необходимости. В одном из проектов мы столкнулись с взрывным ростом объема данных, и возможность быстро подключить дополнительные воркеры буквально спасла проект от провала. Для тех, кто уже погружен в мир контейнеров и Kubernetes, есть KubernetesExecutor. Он создает под для каждой задачи, обеспечивая полную изоляцию и эффективное использование ресурсов кластера. Этот подход особенно хорош для неоднородных нагрузок, когда одни задачи требуют много памяти, а другие – много CPU.
Особенно интересны гибридные подходы. Например, CeleryKubernetesExecutor позволяет комбинировать преимущества обоих мирав: стабильность Celery для стандартных задач и гибкость Kubernetes для ресурсоемких операций. Это как иметь в своем гараже и надежный семейный минивэн, и спортивный кабриолет для выходных – выбирайте подходящее транспортное средство для каждой поездки.
Масштабирование базы метаданных – отдельная история. При росте числа DAG-ов и задач SQLite быстро начинает задыхаться, поэтому в продакшен-окружениях рекомендуется использовать PostgreSQL или MySQL. Не забывайте о регулярной очистке историчесих данных – иначе ваша база может раздуться до неприличных размеров, что я однажды испытал на собственном опыте, когда обнаружил 50-гигабайтную базу метаданных, хранящую историю за два года.
Операторы – рабочие лошадки Airflow, выполняющие конкретные действия. Фреймворк поставляется с богатым набором готовых операторов: от банальных BashOperator и PythonOperator до специализированных для работы с S3, BigQuery, Spark и десятками других систем. Но иногда стандартных инструментов недостаточно, и приходится создавать кастомные решения. Разработка собственных операторов – удивительно просто. По сути, это обычные Python-классы, наследуемые от BaseOperator. Вам нужно реализовать метод execute(), который содержит логику работы оператора, и определить необходимые параметры. Вуаля – ваш новый оператор готов к использованию! В своей практике я неоднократно создавал кастомные операторы для интеграции с внутренними системами компаний. Например, оператор для работы с проприетарной системой управления данными, который выполнял сложную логику валидации и трансформации. Казалось бы, мелочь, но она сэкономила команде сотни часов работы, избавив от необходимости писать повторяющийся код.
Хуки (Hooks) – еще один мощный инструмент кастомизации. Они абстрагируют соединения с внешними системами, обеспечивая переиспользуемый интерфейс. Создав хук однажды, вы можете использовать его во многих операторах, что существенно упрощает поддержку.
Airflow. ETL из MS SQL в Postgres Hello world!
Для переливки данных из MS SQL таблицы в базу Postgres, в Airflow использует ODBC... Задача 'Петя и Airflow' Петя дали задачу создать DAG из N модулей кода в Airflow в нужной последовательности. Петя успешно... Apache и Apache Tomcat на одном компе Установил оба. По 127.0.0.1 все время захожу только в Apache, а как зайти в ROOT Tomcat'а через ip? Apache не запускается после того когда прикрутил php к apache Apache не запускается после того когда прикрутил php к apache
Я установил apache 2.2 , в папке...
Разработка ETL-конвейеров на Python
В отличие от многих других платформ управления рабочими процессами, Airflow позволяет описывать логику не через XML-конфиги или проприетарные языки, а на полноценном Python. Это открывает колоссальные возможности для гибкой разработки – от простых линейных последовательностей до сложных графов с условным выполнением и динамической генерацией.
В основе любого проекта Airflow лежат DAG-и – Directed Acyclic Graphs, или направленные ациклические графы. Это математическое понятие удивительно точно ложится на модель рабочих процессов: задачи (узлы графа) связаны направленными зависимостями (рёбрами), а отсутствие циклов гарантирует, что процесс когда-нибудь завершится, не попав в бесконечную петлю. DAG в Airflow – это обычный Python-скрипт, который определяет структуру рабочего процесса. Вот как выглядит простейший DAG:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'data_engineer',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'email': ['alerts@company.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
def extract_data():
# Логика извлечения данных
return {'data': [1, 2, 3, 4, 5]}
def transform_data(ti):
# Получаем данные из предыдущей задачи
data = ti.xcom_pull(task_ids='extract_task')
transformed_data = [x * 2 for x in data['data']]
return {'transformed_data': transformed_data}
def load_data(ti):
# Загружаем трансформированные данные
data = ti.xcom_pull(task_ids='transform_task')
print(f"Loading data: {data['transformed_data']}")
# Логика загрузки в целевую систему
with DAG('simple_etl_pipeline',
default_args=default_args,
schedule_interval='@daily') as dag:
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract_data,
)
transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform_data,
)
load_task = PythonOperator(
task_id='load_task',
python_callable=load_data,
)
# Определяем последовательность выполнения
extract_task >> transform_task >> load_task |
|
Уже этот простой пример демонстрирует несколько ключевых концепций. Во-первых, default_args – словарь с параметрами по умолчанию для всех задач в DAG. Это место, где мы определяем поведение при ошибках, стратегию повторных попыток и другие общие настройки.
Во-вторых, структура with DAG(...) as dag: – это контекстный менеджер, который создаёт объект DAG и позволяет определять задачи внутри него. Обратите внимание на параметр schedule_interval='@daily' – он указывает, что наш конвейер должен запускаться раз в день. Airflow поддерживает cron-подобный синтаксис или удобные макросы вроде @daily , @hourly и даже @once для задач, которые должны выполниться всего один раз.
В-третьих, оператор >> определяет зависимости между задачами. Это синтаксический сахар, который делает код более читаемым. Вместо extract_task >> transform_task >> load_task можно было бы написать:
Python | 1
2
| transform_task.set_upstream(extract_task)
load_task.set_upstream(transform_task) |
|
Но согласитесь, оператор >> гораздо нагляднее представляет поток данных! В более сложных сценариях можно создавать разветвленные потоки, например:
Python | 1
| extract_task >> [process_task_1, process_task_2] >> merge_task |
|
Здесь после extract_task параллельно выполняются две задачи обработки, а затем их результаты объединяются в merge_task .
Операторы – строительные блоки наших DAG-ов. Каждая задача в Airflow представлена оператором – классом Python, который инкапсулирует определенную функциональность. Стандартная библиотека Airflow включает множество готовых операторов для различных задач:
BashOperator – выполняет команды bash,
PythonOperator – запускает Python-функции,
EmailOperator – отправляет имейлы,
MySqlOperator , PostgresOperator – выполняет SQL-запросы,
S3FileTransformOperator – загружает и трансформирует файлы в/из Amazon S3,
HttpSensor – ожидает HTTP-эндпоинт.
И множество других.
В примере выше мы использовали PythonOperator – самый гибкий оператор, позволяющий выполнять произвольный Python-код. Обратите внимание на параметр python_callable – это ссылка на функцию, которую нужно выполнить. Такой подход позволяет инкапсулировать логику в отдельные функции, что делает код более чистым и тестируемым.
Передача данных между задачами реализуется через механизм XCom (Cross-Communication). Это простая система обмена сообщениями, которая позволяет отправлять небольшие объемы данных между задачами. В нашем примере мы использовали методы xcom_pull() для получения данных из предыдущей задачи. XCom – удобный механизм, но имеет ограничения: он хранит данные в базе метаданных Airflow, поэтому не подходит для передачи больших объемов информации. Для таких случаев лучше использовать промежуточное хранилище вроде S3 или HDFS.
Контекстные переменные – ещё один мощный инструмент в арсенале Airflow. Каждое выполнение задачи получает объект контекста, который содержит множество полезной информации:
ds – текущая дата в формате YYYY-MM-DD,
ts – таймстамп запуска DAG,
dag – объект текущего DAG,
task – объект текущей задачи,
ti – экземпляр текущей задачи (используется для доступа к XCom)
и много других
Эти переменные позволяют создавать динамические конвейеры, адаптирующиеся к текущей дате или другим параметрам выполнения. Например, мы можем использовать ds для генерации путей к файлам за конкретную дату:
Python | 1
2
3
| def process_daily_data(ds, **kwargs):
file_path = f"/data/{ds}/transactions.csv"
# Дальнейшая логика обработки файла |
|
Для интеграции с внешними источниками данных Airflow предлагает концепцию "хуков" (Hooks) – абстракций для подключения к различным сервисам и системам. Хуки инкапсулируют детали соединения и предоставляют удобный API для взаимодействия:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.S3_hook import S3Hook
def transfer_data():
# Получаем данные из PostgreSQL
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
data = pg_hook.get_records("SELECT * FROM users WHERE created_at = %s", parameters=[yesterday_ds])
# Загружаем в S3
s3_hook = S3Hook(aws_conn_id='aws_default')
s3_hook.load_string(
string_data=json.dumps(data),
key=f"user_data/{ds}.json",
bucket_name="my-data-lake"
) |
|
Соединения (Connections) в Airflow – это набор параметров для подключения к внешним системам. Они хранятся в базе метаданных и могут быть настроены через веб-интерфейс или програмно. Ключевое преимущество такого подхода – централизованное управление учетными данными и возможность шифрования чувствительной информации. На практике часто приходится интегрироваться с нестандартными системами или применять сложную логику обработки данных. В таких случаях подход Airflow "Конфигурация как код" раскрывается во всей красе – мы можем использовать всю мощь Python для моделирования наших процессов.
Особый класс операторов в Airflow – сенсоры (Sensors). В отличие от обычных операторов, которые выполняют действия и завершаются, сенсоры ждут наступления определённого события: появления файла, получения сообщения, наличия записи в базе данных. Это крайне полезно для интеграции разнородных систем с разным темпом работы.
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.sql import SqlSensor
# Ждём завершения другого DAG
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_data_processing',
external_dag_id='upstream_dag',
external_task_id='final_task',
timeout=3600,
mode='reschedule', # Важно для длительного ожидания
dag=dag,
)
# Проверяем появление файла
wait_for_file = FileSensor(
task_id='wait_for_report',
filepath='/path/to/expected/file.csv',
poke_interval=300, # Проверяем каждые 5 минут
dag=dag,
)
# Ждём появления данных в БД
wait_for_record = SqlSensor(
task_id='check_data_available',
conn_id='postgres_dwh',
sql="SELECT COUNT(*) FROM events WHERE date = '{{ ds }}' AND status = 'processed'",
success=lambda cnt: cnt[0][0] > 0,
dag=dag,
) |
|
Две ключевые настройки сенсоров – poke_interval (как часто проверять условие) и timeout (максимальное время ожидания). Кроме того, параметр mode определяет, как сенсор использует вычислительные ресурсы: в режиме poke сенсор блокирует слот исполнителя, а в режиме reschedule освобождает его между проверками. Для длителных ожиданий режим reschedule предпочтительнее – он экономит ресурсы, но при этом вносит задержку между проверками.
На одном проекте мы столкнулись с интересной проблемой: наши ETL-процесы простаивали, ожидая загрузки данных из внешней системы, которая обновлялась с непредсказуемой периодичностью. Мы решили использовать SqlSensor с динамическим poke_interval : сначала проверяли редко (раз в час), а когда система начинала получать данные, увеличивали частоту проверок до раза в минуту. Это позволило сбалансировать нагрузку и оперативность.
Теперь углубимся в механизм XCom – межзадачного обмена данными. Мы уже видели базовое использование xcom_pull и xcom_push , но есть несколько тонкостей, о которых стоит знать. Во-первых, PythonOperator автоматически сохраняет возвращаемое значение функции в XCom. Это удобно, но может привести к неожиданностям, если функция возвращает большой объем данных. Вот пример более контролируемого подхода:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| def process_with_explicit_xcom(ti=None, **context):
# Какая-то логика обработки
result = {"status": "success", "count": 42}
# Явно сохраняем только нужные данные
ti.xcom_push(key="processing_status", value=result["status"])
ti.xcom_push(key="item_count", value=result["count"])
# Возвращаемое значение тоже будет сохранено в XCom
# под ключем "return_value"
return "Completed"
process_task = PythonOperator(
task_id='process_data',
python_callable=process_with_explicit_xcom,
provide_context=True,
dag=dag,
) |
|
Во-вторых, XCom хранит данные в базе метаданных Airflow, поэтому имеет ограничение на размер – обычно это несколько килобайт (зависит от настроек базы данных). Для передачи больших объемов данных лучше использовать внешние системы хранения:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| def save_large_data(**context):
large_data = generate_massive_dataset()
s3_hook = S3Hook(aws_conn_id='aws_default')
key = f"temp/data_for_dag_run_{context['run_id']}.parquet"
s3_hook.load_string(
string_data=large_data.to_parquet(),
key=key,
bucket_name="my-data-bucket"
)
# Сохраняем только ссылку в XCom
return {"data_location": f"s3://my-data-bucket/{key}"}
def process_large_data(**context):
ti = context['ti']
data_info = ti.xcom_pull(task_ids='save_large_data')
s3_path = data_info["data_location"]
# Загружаем данные из S3
# ... |
|
Такой подход позволяет передавать даже гигабайты данных между задачами без нагрузки на базу метаданных Airflow.
В-третьих, XCom сохраняет данные в контексте конкретного запуска DAG. Если вам нужен долгосрочный обмен информацией между разными запусками, лучше использовать переменные Airflow (Variables) или внешнее хранилище.
Тестирование ETL-конвейеров – особая дисциплина, требующая тщательного подхода. В отличие от обычных приложений, здесь мы имеем дело с данными, которые могут быть непредсказуемыми как по формату, так и по содержанию. Первый уровень защиты – юнит-тесты для функций трансформации. Вот пример простого теста с использованием pytest:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| # Наша функция трансформации
def normalize_customer_data(row):
return {
'customer_id': str(row.get('id', '')).strip(),
'full_name': f"{row.get('first_name', '')} {row.get('last_name', '')}".strip(),
'email': row.get('email', '').lower(),
'active': bool(row.get('status') == 'active')
}
# Тест для функции трансформации
def test_normalize_customer_data():
input_data = {
'id': 123,
'first_name': 'John ',
'last_name': 'Doe',
'email': 'JOHN.DOE@example.com',
'status': 'active'
}
expected = {
'customer_id': '123',
'full_name': 'John Doe',
'email': 'john.doe@example.com',
'active': True
}
assert normalize_customer_data(input_data) == expected |
|
Для тестирования целых DAG'ов Airflow предоставляет специальный класс BashOperator , который позволяет запускать DAG в изолированном окружении с фиксироваными датами. Это позволяет воспроизводить поведение DAG и проверять правильность выполнения задач:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| from airflow.models import DagBag
def test_dag_loads():
"""Проверяем, что DAG загружается без ошыбок."""
dagbag = DagBag()
dag = dagbag.get_dag('my_etl_dag')
assert dagbag.import_errors == {}
assert dag is not None
def test_dag_structure():
"""Проверяем структуру DAG (задачи и их зависимости)."""
dagbag = DagBag()
dag = dagbag.get_dag('my_etl_dag')
# Проверяем наличие ожидаемых задач
task_ids = [t.task_id for t in dag.tasks]
assert 'extract_data' in task_ids
assert 'transform_data' in task_ids
assert 'load_data' in task_ids
# Проверяем зависимости
extract_task = dag.get_task('extract_data')
transform_task = dag.get_task('transform_data')
assert transform_task.upstream_task_ids == {'extract_data'} |
|
Автоматизация проверки качества данных – ещё один критически важный аспект ETL. Библиотеки вроде Great Expectations или dbt (data build tool) позволяют определять ожидания к данным и проверять их соответствие этим ожиданиям. Эти проверки можно интегрировать в ваши DAG'и:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| def validate_data_quality(**context):
from great_expectations.data_context import DataContext
# Инициализируем контекст Great Expectations
data_context = DataContext('/path/to/great_expectations')
# Запускаем набор проверок качества
result = data_context.run_checkpoint(
checkpoint_name="my_checkpoint",
batch_kwargs={
"path": context['ti'].xcom_pull(task_ids='extract_transform_task'),
"datasource": "my_datasource"
}
)
if not result["success"]:
raise Exception("Проверка качества данных не пройдена!")
return result
validate_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data_quality,
provide_context=True,
dag=dag
) |
|
Реальные примеры и кейсы
Теория теорией, но самое интересное начинается, когда мы применяем наши знания в реальных условиях боевых задач. За годы работы я видел десятки ETL-конвейеров на Airflow – от элегантных минималистичных решений до монструозных конструкций с сотнями задач и зависимостей. Давайте посмотрим на несколько примеров, которые демонстрируют разные аспекты построения ETL с Airflow.
Начнём с классического примера – ежедневный конвейер агрегации данных о продажах из нескольких источников. Представьте, что у нас есть интернет-магазин, который собирает данные о продажах из разных каналов: веб-сайт, мобильное приложение и POS-терминалы в физических магазинах. Нам нужно собрать эти данные, привести к единому формату, обогатить информацией о клиентах и продуктах, вычислить агрегаты и загрузить в хранилище для аналитики.
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
| from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.http_hook import HttpHook
import pandas as pd
import json
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'email': ['data-alerts@example.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'sales_etl_pipeline',
default_args=default_args,
description='ETL для агрегации данных о продажах',
schedule_interval='0 3 * * *', # Каждый день в 3:00
catchup=False,
max_active_runs=1
)
# Функции для извлечения данных из разных источников
def extract_web_sales(**context):
http_hook = HttpHook(http_conn_id='web_api', method='GET')
response = http_hook.run(f"/api/sales/daily/{context['ds']}")
sales_data = json.loads(response.content)
# Сохраняем во временную таблицу
pg_hook = PostgresHook(postgres_conn_id='dwh_connection')
df = pd.DataFrame(sales_data)
df.to_sql('temp_web_sales', pg_hook.get_sqlalchemy_engine(), if_exists='replace', index=False)
return len(sales_data)
def extract_mobile_sales([B]context):
# Похожая логика для мобильных продаж
pass
def extract_pos_sales([/B]context):
# Логика для POS-терминалов
pass
def transform_sales_data(**context):
pg_hook = PostgresHook(postgres_conn_id='dwh_connection')
# Объединяем, чистим и трансформируем данные
transform_sql = """
CREATE TABLE IF NOT EXISTS temp_transformed_sales AS
SELECT
coalesce(w.transaction_id, m.transaction_id, p.transaction_id) as transaction_id,
coalesce(w.sale_date, m.sale_date, p.sale_date) as sale_date,
coalesce(w.product_id, m.product_id, p.product_id) as product_id,
coalesce(w.customer_id, m.customer_id, p.customer_id) as customer_id,
coalesce(w.quantity, m.quantity, p.quantity) as quantity,
coalesce(w.price, m.price, p.price) as price,
CASE
WHEN w.transaction_id IS NOT NULL THEN 'web'
WHEN m.transaction_id IS NOT NULL THEN 'mobile'
ELSE 'pos'
END as channel
FROM temp_web_sales w
FULL OUTER JOIN temp_mobile_sales m ON w.transaction_id = m.transaction_id
FULL OUTER JOIN temp_pos_sales p ON w.transaction_id = p.transaction_id OR m.transaction_id = p.transaction_id;
"""
pg_hook.run(transform_sql)
return "Данные трансформированы"
def load_data_to_dwh(**context):
pg_hook = PostgresHook(postgres_conn_id='dwh_connection')
load_sql = """
INSERT INTO sales_fact (transaction_id, sale_date, product_id, customer_id, quantity, price, channel)
SELECT transaction_id, sale_date, product_id, customer_id, quantity, price, channel
FROM temp_transformed_sales
ON CONFLICT (transaction_id) DO UPDATE
SET quantity = EXCLUDED.quantity, price = EXCLUDED.price;
-- Обновляем агрегаты
INSERT INTO daily_sales_agg (sale_date, channel, total_sales, total_quantity)
SELECT sale_date, channel, SUM(price * quantity), SUM(quantity)
FROM temp_transformed_sales
GROUP BY sale_date, channel
ON CONFLICT (sale_date, channel) DO UPDATE
SET total_sales = EXCLUDED.total_sales, total_quantity = EXCLUDED.total_quantity;
"""
pg_hook.run(load_sql)
return "Данные загружены в хранилище"
# Определяем задачи
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
extract_web = PythonOperator(
task_id='extract_web_sales',
python_callable=extract_web_sales,
provide_context=True,
dag=dag
)
extract_mobile = PythonOperator(
task_id='extract_mobile_sales',
python_callable=extract_mobile_sales,
provide_context=True,
dag=dag
)
extract_pos = PythonOperator(
task_id='extract_pos_sales',
python_callable=extract_pos_sales,
provide_context=True,
dag=dag
)
transform = PythonOperator(
task_id='transform_sales_data',
python_callable=transform_sales_data,
provide_context=True,
dag=dag
)
load = PythonOperator(
task_id='load_data_to_dwh',
python_callable=load_data_to_dwh,
provide_context=True,
dag=dag
)
cleanup = PostgresOperator(
task_id='cleanup_temp_tables',
postgres_conn_id='dwh_connection',
sql="""
DROP TABLE IF EXISTS temp_web_sales;
DROP TABLE IF EXISTS temp_mobile_sales;
DROP TABLE IF EXISTS temp_pos_sales;
DROP TABLE IF EXISTS temp_transformed_sales;
""",
dag=dag
)
# Определяем последовательность
start >> [extract_web, extract_mobile, extract_pos] >> transform >> load >> cleanup >> end |
|
Этот DAG демонстрирует несколько важных приёмов построения ETL-конвейеров:
1. Параллельное извлечение данных из разных источников для экономии времени.
2. Использование временных таблиц в качестве промежуточного хранилища.
3. Трансформацию средствами SQL (часто более эффективно для табличных данных).
4. Идемпотентную загрузку с использованием ON CONFLICT .
5. Очистку временных артефактов после завершения.
На реальном проекте этот DAG развивался и усложнялся по мере роста бизнеса. Когда объем данных вырос с десятков тысяч до миллионов транзакций в день, мы столкнулись с серьезными проблемами производительности. Трансформация, которая раньше занимала минуты, растянулась до часов. Пришлось срочно оптимизировать.
Первым шагом было профилирование – мы добавили фиксацию времени выполнения каждой задачи:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
| def log_duration(func):
def wrapper(*args, **kwargs):
import time
start_time = time.time()
result = func(*args, [B]kwargs)
duration = time.time() - start_time
print(f"Задача {func.__name__} выполнялась {duration:.2f} секунд")
return result
return wrapper
@log_duration
def transform_sales_data([/B]context):
# Логика трансформации |
|
Анализ показал, что узким горлышком был JOIN трёх таблиц. Решение? Партиционирование данных и параллельная обработка:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| def transform_sales_data(**context):
pg_hook = PostgresHook(postgres_conn_id='dwh_connection')
# Разбиваем данные на партиции по первой цифре ID транзакции
for partition in range(10):
transform_sql = f"""
CREATE TABLE IF NOT EXISTS temp_transformed_sales_{partition} AS
SELECT
/* те же поля, что и раньше */
FROM temp_web_sales w
FULL OUTER JOIN temp_mobile_sales m
ON w.transaction_id = m.transaction_id
AND w.transaction_id % 10 = {partition}
FULL OUTER JOIN temp_pos_sales p
ON (w.transaction_id = p.transaction_id OR m.transaction_id = p.transaction_id)
AND p.transaction_id % 10 = {partition}
WHERE
COALESCE(w.transaction_id, m.transaction_id, p.transaction_id) % 10 = {partition}
"""
pg_hook.run(transform_sql)
# Объединяем партиции
pg_hook.run("""
CREATE TABLE temp_transformed_sales AS
SELECT * FROM temp_transformed_sales_0
UNION ALL SELECT * FROM temp_transformed_sales_1
/* и так далее для всех партиций */
""") |
|
Следующей проблемой стала полная перезагрузка всех данных каждый день. Решением стала инкрементальная загрузка – обработка только новых или изменившихся записей. Для этого потребовалось два ключевых изменения:
1. Добавление отслеживания изменений в источниках данных (через таймстампы или колонки состояния).
2. Изменение логики извлечения и загрузки для работы только с дельтой.
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| def extract_web_sales(**context):
# Получаем время последней успешной загрузки
last_etl_time = get_last_successful_etl_time('web_sales')
http_hook = HttpHook(http_conn_id='web_api', method='GET')
# Запрашиваем только новые или изменённые записи
response = http_hook.run(
f"/api/sales/changes?since={last_etl_time}&until={context['execution_date']}"
)
sales_data = json.loads(response.content)
if not sales_data:
return "Нет новых данных"
# Остальная логика загрузки
# ...
# Сохраняем время успешной загрузки
save_successful_etl_time('web_sales', context['execution_date'])
return f"Загружено {len(sales_data)} новых записей" |
|
Для отслеживания состояния ETL мы создали простую служебную таблицу:
SQL | 1
2
3
4
5
6
| CREATE TABLE etl_watermarks (
source_name VARCHAR(100) PRIMARY KEY,
last_successful_run TIMESTAMP,
records_processed INTEGER,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
); |
|
Функции для работы с этой таблицей:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| def get_last_successful_etl_time(source_name):
pg_hook = PostgresHook(postgres_conn_id='dwh_connection')
result = pg_hook.get_first(
"SELECT last_successful_run FROM etl_watermarks WHERE source_name = %s",
parameters=(source_name,)
)
return result[0] if result else datetime(2000, 1, 1).isoformat()
def save_successful_etl_time(source_name, execution_time):
pg_hook = PostgresHook(postgres_conn_id='dwh_connection')
pg_hook.run("""
INSERT INTO etl_watermarks (source_name, last_successful_run, records_processed)
VALUES (%s, %s, %s)
ON CONFLICT (source_name)
DO UPDATE SET
last_successful_run = EXCLUDED.last_successful_run,
records_processed = EXCLUDED.records_processed,
updated_at = CURRENT_TIMESTAMP
""", parameters=(source_name, execution_time, 0)) |
|
Эти оптимизации сократили время выполнения конвейера с часов до минут, а ресурсы базы данных значительно разгрузились. Плюс появилась возможность восстановления после сбоев без полной перезагрузки всех данных.
В другом кейсе нам пришлось разбить монолитный ETL-процесс на атомарные компоненты. Изначально у компании был один гигантский DAG с сотней задач, который выполнялся каждую ночь. Проблемы возникали регулярно: если хоть одна задача падала, приходилось перезапускать весь конвейер, что часто приводило к нарушению SLA и отсутствию данных для утренних отчётов. Решение? Разделить монстра на несколько независимых, но связанных DAG-ов:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| # dag_extract.py
extract_dag = DAG(
'extract_data_sources',
default_args=default_args,
schedule_interval='0 1 * * *', # Запуск в 1:00
catchup=False
)
# Задачи извлечения...
# dag_transform.py
transform_dag = DAG(
'transform_data',
default_args=default_args,
schedule_interval=None, # Триггерится извне
catchup=False
)
# Задачи трансформации...
# dag_load.py
load_dag = DAG(
'load_data_warehouse',
default_args=default_args,
schedule_interval=None, # Триггерится извне
catchup=False
)
# Задачи загрузки... |
|
Для связи между этими DAG-ами мы использовали TriggerDagRunOperator:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_transform = TriggerDagRunOperator(
task_id='trigger_transform',
trigger_dag_id='transform_data',
execution_date='{{ ds }}',
reset_dag_run=True,
wait_for_completion=True, # Ждём завершения следующего DAG
dag=extract_dag
)
trigger_load = TriggerDagRunOperator(
task_id='trigger_load',
trigger_dag_id='load_data_warehouse',
execution_date='{{ ds }}',
reset_dag_run=True,
wait_for_completion=True,
dag=transform_dag
) |
|
Такое разделение дало множество преимуществ:
1. Каждый DAG можно обновлять и тестировать отдельно.
2. При сбое достаточно перезапустить только один DAG.
3. Разные команды могут отвечать за разные части процесса.
4. Появилась возможность переиспользовать компоненты в других конвейерах.
А как быть с действительно большими объёмами данных? Когда объем превышает пару гигабайт, традиционные подходы начинают буксовать. На помощь приходит интеграция с Apache Spark.
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
process_large_dataset = SparkSubmitOperator(
task_id='process_large_dataset',
application='/path/to/spark_script.py',
conn_id='spark_default',
application_args=['{{ ds }}', 'other_param'],
conf={
'spark.driver.memory': '4g',
'spark.executor.memory': '2g',
'spark.executor.cores': '2',
'spark.dynamicAllocation.enabled': 'true',
'spark.dynamicAllocation.maxExecutors': '20'
},
dag=dag
) |
|
В Spark-скрипте делаем всю тяжелую работу:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| # spark_script.py
from pyspark.sql import SparkSession
import sys
# Получаем аргументы из Airflow
execution_date = sys.argv[1]
spark = SparkSession.builder \
.appName(f"LargeDataProcessing_{execution_date}") \
.getOrCreate()
# Читаем огромный набор данных
df = spark.read.parquet(f"s3://data-lake/raw/transactions/{execution_date}/")
# Выполняем сложные трансформации
result = df.groupBy("customer_id", "product_category") \
.agg({"amount": "sum", "transaction_id": "count"}) \
.withColumnRenamed("sum(amount)", "total_spent") \
.withColumnRenamed("count(transaction_id)", "transaction_count")
# Сохраняем результат
result.write.mode("overwrite").parquet(f"s3://data-lake/processed/customer_stats/{execution_date}/") |
|
Когда бизнес потребовал перейти от пакетной обработки к анализу данных в режиме, близком к реальному времени, пришлось мигрировать часть процессов на потоковую обработку. Используя Apache Airflow как оркестратор и Kafka как шину сообщений, мы создали гибридную архитектуру:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| # Задача для запуска Kafka-коннектора, который считывает CDC из базы
start_kafka_connector = HttpOperator(
task_id='start_kafka_connector',
http_conn_id='kafka_connect_api',
endpoint='/connectors',
method='POST',
data=json.dumps({
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-server",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "mysql-server-1",
"table.whitelist": "inventory.customers",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}),
headers={"Content-Type": "application/json"},
dag=dag
)
# Запускаем Spark Streaming джобу, которая обрабатывает поток данных
start_spark_streaming = SparkSubmitOperator(
task_id='start_spark_streaming',
application='/path/to/streaming_script.py',
conn_id='spark_default',
conf={'spark.jars.packages': 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0'},
dag=dag
) |
|
Spark Streaming скрипт:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| # streaming_script.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("KafkaStreamProcessing") \
.getOrCreate()
# Читаем поток из Kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "mysql-server-1.inventory.customers") \
.load()
# Парсим JSON из значения
schema = StructType([
StructField("id", LongType()),
StructField("first_name", StringType()),
StructField("last_name", StringType()),
StructField("email", StringType()),
# Другие поля...
])
parsed = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Применяем бизнес-логику
processed = parsed.withColumn(
"full_name",
concat(col("first_name"), lit(" "), col("last_name"))
)
# Записываем результаты в хранилище
query = processed \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/processed/customers") \
.option("checkpointLocation", "/tmp/checkpoints/customers") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination() |
|
Важно понимать, что переход к потоковой обработке – не всегда панацея. В нашем случае мы сохранили и пакетный процесс для ежедневной сверки и исправления возможных аномалий. Гибридный подход дал лучше всего.
Лучшие практики
После нескольких лет работы с Airflow у каждого дата-инженера формируется набор любимых трюков и приёмов, которые превращают рутинные конвейеры в элегантные произведения инженерного искусства. Поделюсь теми, что не раз спасали мою команду от авралов и бессонных ночей. Динамическое создание задач – настоящая суперсила Airflow, которую удивительно мало используют новички. Вместо того чтобы дублировать код для похожих процессов, можно генерировать задачи на лету:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| with DAG('dynamic_tasks_dag', default_args=default_args, schedule_interval='@daily') as dag:
# Список источников данных
data_sources = {
'users': {'table': 'users', 'primary_key': 'user_id'},
'orders': {'table': 'orders', 'primary_key': 'order_id'},
'products': {'table': 'products', 'primary_key': 'product_id'},
}
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
processing_tasks = {}
# Динамически создаём задачи для каждого источника
for source_name, config in data_sources.items():
extract_task = PythonOperator(
task_id=f'extract_{source_name}',
python_callable=extract_data,
op_kwargs={'table': config['table'], 'pk': config['primary_key']},
)
transform_task = PythonOperator(
task_id=f'transform_{source_name}',
python_callable=transform_data,
op_kwargs={'source': source_name},
)
load_task = PythonOperator(
task_id=f'load_{source_name}',
python_callable=load_data,
op_kwargs={'target_table': f'dwh_{source_name}'},
)
# Сохраняем задачи в словаре для последующего определения зависимостей
processing_tasks[source_name] = {
'extract': extract_task,
'transform': transform_task,
'load': load_task,
}
# Определяем последовательность для этого источника
extract_task >> transform_task >> load_task
# Связываем с общими начальной и конечной точками
for source in processing_tasks.values():
start >> source['extract']
source['load'] >> end |
|
Такой подход не только уменьшает дублирование кода, но и упрощает масштабирование: добавление нового источника данных требует всего лишь обновления словаря data_sources . Для ещё более продвинутого сценария можно даже загружать конфигурацию из базы данных или файла:
Python | 1
2
3
4
5
6
7
| def get_config_from_db():
hook = PostgresHook(postgres_conn_id='airflow_db')
query = "SELECT source_name, config FROM etl_sources WHERE active = true"
sources = hook.get_records(query)
return {source[0]: json.loads(source[1]) for source in sources}
data_sources = get_config_from_db() |
|
Обработка исключений – ещё одна критическая область, где дьявол в деталях. Простой оператор try/except не всегда спасает, особенно в распределённых системах:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| def process_with_robust_error_handling(**context):
try:
# Основная логика
result = complex_processing()
return result
except ConnectionError as e:
# Сетевые ошыбки часто временные, стоит повторить
context['ti'].xcom_push(key='retry_reason', value=str(e))
raise Exception(f"Сетевая ошыбка: {e}")
except ValidationError as e:
# Проблемы с данными требуют вмешательства человека
send_alert(f"Критическая ошибка в данных: {e}")
context['ti'].xcom_push(key='validation_errors', value=str(e))
raise AirflowSkipException("Задача пропущена из-за ошибок валидации")
except Exception as e:
# Неизвестные ошибки логируем детально
logging.error(f"Неожиданная ошибка: {e}")
logging.error(traceback.format_exc())
raise |
|
Заметьте использование AirflowSkipException – этот специальный тип исключения помечает задачу как пропущенную (а не упавшую), что может быть полезно для условной логики. Например, если данных за определённую дату просто нет, это не ошибка, а ожидаемая ситуация.
Для мониторинга Airflow предлагает несколько встроенных инструментов, но настоящие профи настраивают интеграцию со специализированными системами:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| def task_failure_slack_alert(context):
failed_alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id='slack_webhook',
webhook_token=slack_webhook_token,
message=f"""
:red_circle: Задача упала!
*Даг*: {context.get('task_instance').dag_id}
*Задача*: {context.get('task_instance').task_id}
*Дата выполнения*: {context.get('execution_date')}
*Ошибка*: {context.get('exception')}
""",
username='airflow-bot',
)
return failed_alert.execute(context=context)
default_args = {
# Другие аргументы...
'on_failure_callback': task_failure_slack_alert,
} |
|
Такой кастомный обработчик отправляет уведомление в Slack при любой ошибке. Для критически важных процессов стоит рассмотреть более продвинутый мониторинг – интеграцию с Prometheus + Grafana или специализированные сервисы вроде Datadog.
На одном из проектов мы создали собственную систему "проактивного мониторинга", которая предсказывала потенциальные проблемы на основе статистики выполнения DAG-ов:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| def predict_dag_issues():
hook = PostgresHook(postgres_conn_id='airflow_db')
# Анализируем статистику выполнения
query = """
SELECT
dag_id,
AVG(end_date - start_date) as avg_duration,
STDDEV(end_date - start_date) as stddev_duration,
COUNT(*) FILTER (WHERE state = 'failed') / COUNT(*) as failure_rate
FROM dag_run
WHERE start_date > NOW() - INTERVAL '7 days'
GROUP BY dag_id
"""
stats = hook.get_pandas_df(query)
# Выявляем аномалии
for index, row in stats.iterrows():
if row['failure_rate'] > 0.2: # Больше 20% падений
send_alert(f"Высокий процент сбоев у DAG {row['dag_id']}: {row['failure_rate']*100}%")
if row['stddev_duration'] > row['avg_duration'] * 0.5:
# Высокая вариативность времени выполнения - признак нестабильности
send_alert(f"Нестабильное время выполнения у DAG {row['dag_id']}") |
|
Эта функция запускалась отдельным DAG-ом и анализировала метрики других конвейеров, предупреждая о потенциальных проблемах до того, как они приведут к сбоям.
Параллельное выполнение задач в Airflow – ещё один аспект, который может как ускорить ваши конвейеры, так и стать источником неожиданных проблем. По умолчанию задачи в Airflow выполняются параллельно, если они не имеют зависимостей. Однако есть несколько подводных камней, о которых стоит знать. Главное ограничение – параметр parallelism в конфигурации Airflow и max_active_tasks_per_dag для конкретного DAG. Если ваш DAG содержит сотни параллельных задач, они не обязательно запустятся все одновременно:
Python | 1
2
3
4
5
6
7
| # Ограничение параллелизма для конкретного DAG
dag = DAG(
'parallel_processing',
default_args=default_args,
concurrency=20, # Не более 20 задач одновременно
max_active_runs=3 # Не более 3 запусков DAG параллельно
) |
|
Распространённая ошибка – делать слишком много мелких задач. Я видел DAG с 500+ операторами для обработки небольших кусочков данных, и это создавало огромные накладные расходы на планирование и мониторинг. Лучше использовать подход с динамическим распараллеливанием внутри одной задачи:
Python | 1
2
3
4
5
6
7
8
9
10
11
12
| def process_in_parallel(partition_keys):
from concurrent.futures import ThreadPoolExecutor
def process_partition(key):
# Обработка конкретной партиции
return f"Партиция {key} обработана"
# Распараллеливаем внутри одной задачи
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(process_partition, partition_keys))
return results |
|
Пулы в Airflow – ещё один инструмент для управления параллелизмом. Они позволяют ограничить количество задач определённого типа, например, запросов к конкретной базе данных:
Python | 1
2
3
4
5
6
| task = PythonOperator(
task_id='resource_intensive_task',
python_callable=process_function,
pool='database_pool', # Задачи из этого пула выполняются с ограниченным параллелизмом
dag=dag
) |
|
Такой подход защищает внешние системы от перегрузки – на одном проекте нам пришлось ввести пул для задач, работающих с API внешнего поставщика, после того как мы случайно превысили лимит запросов и получили бан на сутки. Весёлый день выдался, ничего не скажешь!
В заключение, вот краткий чеклист лучших практик, который я сформировал за годы работы с Airflow:
1. Идемпотентность – ваши DAG-и должны корректно работать при повторном запуске.
2. Атомарность – каждая задача должна делать одну конкретную вещь.
3. Изоляция – минимизируйте зависимости между DAG-ами.
4. Наблюдаемость – вкладывайтесь в логирование и мониторинг.
5. Документирование – используйте док-строки и описательные имена.
6. Версионирование – храните DAG-и в системе контроля версий.
7. Тестирование – пишите юнит-тесты для ключевой логики.
8. Инфраструктура как код – автоматизируйте развёртывание Airflow.
Следуя этим принципам, вы превратите свои ETL-конвейеры из хрупких скриптов в надёжные, масштабируемые системы, которые будут работать годами, даже когда вы уже давно переключитесь на новые проекты и технологии.
Apache 2.2 и Apache 2.4 не показывает картинки с папки adv Приветствую уважаемые форумчане.
У меня стоит Apache 2.2 и Apache 2.4
Столкнулся с такой... Apache, windows 7 и папка adv - Как Apache реагирует на папку adv Приветствую уважаемые форумчане.
У меня стоит Apache 2.2 и Apache 2.4
Столкнулся с такой... Насколько больше памяти жрет PHP в режиме работы модуля Apache по сравнению с apache + fastcgi? Вот есть два режима работы:
1. Apache + modphp
2. Apache + fastcgi
И тут несколько вопросов:... Python+Django+Apache+WSGI Привет, всем, новичок в питоне, помимо самого языка интересует создание веб приложений при помощи... Существует ли готовый набор python + apache + mysql для установки? Здравствуйте, у меня такой вопрос, есть ли на винду готовая связка python + apach + mysql (что... Как Python 3.3 к Apache 2.4.3 на windows 7? Здравствуйте
Apache я ставил по вот этой инструкции:
http://miloserdov.org/?article=43#3... Как подключить Python 3.3 к Apache 2.4.3 на windows 7 ? Здравствуйте
Apache я ставил по вот этой инструкции:
http://miloserdov.org/?article=43#3... Apache + mod_wsgi + Python 3 + Django 1.6 (CentOS 6) Хочу поднять сайт в такой связке, но как обычно возникли проблемы.
Как откомпилить mod_wsgi для... Apache + mod_wsgi + Python 3 + Django 1.6 (CentOS 6) Хочу поднять сайт в такой связке, но как обычно возникли проблемы.
Как откомпилить mod_wsgi для... Запуск скриптов Python 3 на сервере Apache 2 Имею:
python3 + Apache2 + Ubuntu
1) Создаю папку хоста и файл со скриптом:... Python + apache? Привет,
Если кто работал со связкой python + apache?
Стоит задача во всех приложениях апача в... Посоветуйте литературу, где описывалось бы взаимодействие Python и IIS/Apache Книг полно по Питону. Но сложно найти целостную информацию от А до Я. Интересует подробное описание...
|