Моя подорож у Airflow і чому мені був потрібен масштабований підхід
Як інженер даних, моя подорож з Apache Airflow почалася з кількох простих ETL конвеєрів. Спочатку все здавалося керованим — визначення DAG в Python, планування задач та усунення несправностей у робочих процесах. Але з розширенням моїх проєктів я зіткнувся з кількома значними проблемами:
- Обробка масштабу: Керування понад 100+ конвеєрів даних серед різних команд та проєктів стало важким. Конвеєри почали заважати один одному, що призводило до неефективності та вузьких місць.
- Проблеми з розгортанням: Ручне розгортання DAG та їх залежностей було схильним до помилок і займало багато часу.
- Вузькі місця виконання: З зростанням навантажень виконання задач на одному комп'ютері стало неможливим, що призводило до затримок і збоїв.
Я зрозумів, що мені потрібен більш модульний, масштабований та автоматизований підхід для ефективної організації моїх робочих процесів даних. Це привело мене до створення рішення за допомогою DAG-Factory, Celery Executor та інтеграції з S3, щоб подолати ці проблеми та масштабувати мою настройку Airflow.
Чому цей підхід?
- Ізоляція на рівні проєкту: Кожен конвеєр керується окремо, що спрощує розробку, налагодження та розгортання.
- Автоматизовані розгортання: CI/CD конвеєри усувають ручні помилки та забезпечують надійні оновлення.
- Масштабоване виконання: Celery Executor дозволяє виконувати задачі розподілено, забезпечуючи ефективне використання ресурсів.
- Інтеграція в хмарі: S3 виступає як централізований шар зберігання для проміжних та фінальних наборів даних, а Redshift займається обробкою даних для аналітики.
Ця стаття розповідає вам про точну архітектуру, яку я реалізував для оптимізації та масштабування моїх робочих процесів Airflow.
1. Організація конвеєрів із ізоляцією на рівні проєкту
Ізоляція на рівні проєкту гарантує, що кожен конвеєр є модульним і незалежно керованим. Використовуючи DAG-Factory, ми можемо застосувати підхід на основі конфігурації для визначення Airflow конвеєрів, що мінімізує хардкодування та спрощує додавання нових робочих процесів.
Крок 1.1: Використання DAG-Factory для визначення DAG на основі YAML
DAG-Factory дозволяє вам оголошувати робочі процеси декларативно в YAML файлах, зменшуючи дублювання коду та роблячи робочі процеси легшими для підтримки.
Встановлення DAGKit
Встановіть DAG-Factory в своєму середовищі Airflow:
pip install dag-factory
Крок 1.2: Структура каталогу проєкту
Кожен конвеєр ізольований у власній папці проєкту з чітким поділом конфігурацій, задач і залежностей.
Приклад структури проєкту:
marketing_pipeline/
├── configs/
│ └── dag_config.yaml # YAML конфігурація для DAG
├── tasks/
│ ├── extract.py # Задача для витягування даних
│ ├── transform.py # Задача для трансформації даних
│ └── load.py # Задача для завантаження даних
├── requirements.txt # Залежності для цього проєкту
├── README.md # Документація
Крок 1.3: Використання DAG-Factory для модульного керування конвеєрами
DAG-Factory дозволяє визначати конвеєри як YAML конфігурації. Ці конфігурації динамічно завантажуються в Airflow, що робить керування та зміну конвеєрів легшими.
Переваги використання DAG-Factory:
- Усуває повторюваний Python код для визначення DAG.
- Покращує підтримуваність завдяки декларативним YAML конфігураціям.
- Динамічно завантажує конвеєри під час виконання, спрощуючи оновлення.
2. Реалізація CI/CD для автоматизованих розгортань
2.1 Чому використовувати CI/CD для Airflow?
Ручне розгортання DAG і конфігурацій до Airflow може бути схильним до помилок і забирати багато часу.
CI/CD автоматизує цей процес, забезпечуючи:
- Узгоджені розгортання.
- Правильну валідацію конфігурацій та залежностей.
- Швидші оновлення конвеєрів.
2.2 Налаштування CI/CD робочого процесу
Приклад GitHub Actions Workflow (.github/workflows/deploy.yml
):
name: Deploy Airflow DAGs
on:
push:
branches:
- main
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v3
- name: Install Dependencies
run: pip install dag-factory -r requirements.txt
- name: Validate YAML Configurations
run: |
for file in configs/*.yaml; do
python -m dagfactory.validate_config "$file"
done
- name: Deploy DAG Configurations
env:
AIRFLOW_HOST: ${{ secrets.AIRFLOW_HOST }}
AIRFLOW_USERNAME: ${{ secrets.AIRFLOW_USERNAME }}
AIRFLOW_PASSWORD: ${{ secrets.AIRFLOW_PASSWORD }}
run: |
scp -r configs/*.yaml airflow@${AIRFLOW_HOST}:/opt/airflow/dags/configs/
Цей робочий процес:
- Перевіряє останній код.
- Валідовує конфігурації DAG, використовуючи DAG-Factory.
- Розгортає YAML конфігурації на інстанції Airflow.
2.3 Автоматизація керування залежностями
Кожен проєкт може визначати свої власні залежності в файлі requirements.txt
. CI/CD конвеєр повинен встановити ці залежності в середовище Airflow.
Приклад requirements.txt
:
pandas==1.5.0
boto3==1.28.0
psycopg2==2.9.3
3. Використання Celery Executor для розподіленого виконання задач
3.1 Чому використовувати Celery Executor?
Celery Executor ідеально підходить для масштабування виконання задач через кілька воркерів. Це дозволяє:
- Розподілену обробку: Задачі розподіляються між кількома воркерами.
- Ефективне використання ресурсів: Кількість воркерів можна масштабувати вгору або вниз залежно від попиту.
- Толерантність до збоїв: Задачі можуть автоматично повторюватися у разі збоїв.
3.2 Налаштування Celery Executor на Kubernetes
Використання Celery Executor з Airflow на Kubernetes забезпечує розподілене виконання задач між воркерами, гарантуючи масштабованість та толерантність до збоїв. Ось як налаштувати Celery Executor в середовищі Kubernetes:
Крок 1: Передумови
- Kubernetes Cluster: Переконайтеся, що у вас є працюючий кластер Kubernetes з доступом до розгортання подів.
- Встановлення Airflow: Використовуйте офіційний Apache Airflow Helm Chart або кастомне розгортання.
- Message Broker: Необхідний брокер повідомлень для Celery.
Загальні варіанти вибору:
- Redis: Легкий і швидкий.
- RabbitMQ: Брокер повідомлень рівня підприємства з розширеними функціями.
Крок 2: Налаштування Celery Executor в Airflow
Змініть налаштування в airflow.cfg
, щоб використовувати Celery Executor:
Оновіть налаштування executor
Встановіть executor на CeleryExecutor
:
[core]
executor = CeleryExecutor
Налаштуйте Message Broker
Для Redis як брокера повідомлень:
[celery]
broker_url = redis://:6379/0
result_backend = db+postgresql://:@:5432/
Для RabbitMQ як брокера повідомлень:
[celery]
broker_url = amqp://:@:5672/
result_backend = db+postgresql://:@:5432/
Крок 3: Розгортання Message Broker
Варіант 1: Розгортання Redis
Використовуйте Helm для розгортання Redis у кластері Kubernetes:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install redis bitnami/redis
Варіант 2: Розгортання RabbitMQ
Розгорніть RabbitMQ за допомогою Helm:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install rabbitmq bitnami/rabbitmq
Крок 4: Оновлення Airflow Helm Chart
Якщо ви використовуєте Apache Airflow Helm Chart, оновіть файл values.yaml
, щоб налаштувати Celery Executor.
Налаштуйте executor
executor: CeleryExecutor
Налаштуйте налаштування Celery
Додайте налаштування специфічні для Celery в розділі config
:
config:
AIRFLOW__CELERY__BROKER_URL: redis://redis-master:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgresql-airflow:5432/airflow
Налаштуйте Redis або RabbitMQ
Переконайтеся, що broker_url
в конфігурації відповідає імені сервісу Redis або RabbitMQ у вашому кластері Kubernetes.
Крок 5: Масштабування Worker Pods
У середовищі Kubernetes воркери Celery працюють як поди. Ви можете масштабувати кількість воркерів залежно від навантаження.
Конфігурація Helm для Worker
У файлі values.yaml
налаштуйте воркер-поди:
workers:
enabled: true
replicas: 3
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1"
Розгорніть оновлений Airflow Helm chart:
helm upgrade airflow apache-airflow/airflow -f values.yaml
Ви можете динамічно масштабувати воркерів:
kubectl scale deployment airflow-worker --replicas=5
Крок 6: Моніторинг Celery Workers
- Перевірте Worker Pods: Переконайтеся, що воркери Celery працюють:
kubectl get pods -l component=worker
2. Перевірте логи: Моніторинг логів воркера Celery:
kubectl logs
3. Airflow UI: В інтерфейсі Airflow, на вкладці DAGs, ви можете відстежувати статус задач, виконуваних воркерами.
Крок 7: Опційно — Автоматичне масштабування Celery Workers
Ви можете налаштувати Kubernetes Horizontal Pod Autoscaler (HPA), щоб масштабувати Celery worker поди на основі використання CPU або пам'яті.
Увімкнення HPA
Застосуйте наступну конфігурацію для увімкнення автоматичного масштабування:
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: airflow-worker
namespace: airflow
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: airflow-worker
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Застосуйте конфігурацію HPA:
kubectl apply -f hpa.yaml
Приклад робочого процесу: інтеграція S3 та Redshift
## 4.1 Конфігурація DAG
Визначте конвеєр для завантаження файлів Parquet з S3 в Redshift у форматі YAML за допомогою DAGKit.
`configs/s3_to_redshift_pipeline.yaml`:
defaultargs:
owner: airflow
retries: 2
retrydelaysec: 300
dag:
dagid: "s3toredshiftpipeline"
scheduleinterval: "@daily"
startdate: "2023-01-01"
tasks:
checks3files:
operator: PythonOperator
taskid: "checks3files"
pythoncallable: "tasks.s3operations.checks3files"
processparquet:
operator: PythonOperator
taskid: "processparquet"
pythoncallable: "tasks.dataprocessing.processparquet"
dependencies:
- checks3files
loadtoredshift:
operator: PythonOperator
taskid: "loadtoredshift"
pythoncallable: "tasks.redshiftoperations.loadtoredshift"
dependencies:
- processparquet
```
4.2 Реалізація задач
Завдання 1: Перевірити наявність файлів Parquet в S3
tasks/s3_operations.py
:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def check_s3_files(**kwargs):
bucket_name = "your-s3-bucket-name"
prefix = "your/s3/prefix/"
s3 = S3Hook(aws_conn_id="your_s3_connection")
files = s3.list_keys(bucket_name=bucket_name, prefix=prefix)
if not files:
raise ValueError(f"No files found in S3 bucket '{bucket_name}' with prefix '{prefix}'")
kwargs['ti'].xcom_push(key='s3_file', value=files[0])
Завдання 2: Обробити файл Parquet
tasks/data_processing.py
:
import pandas as pd
from io import BytesIO
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def process_parquet(**kwargs):
bucket_name = "your-s3-bucket-name"
s3_file = kwargs['ti'].xcom_pull(key='s3_file', task_ids='check_s3_files')
s3 = S3Hook(aws_conn_id="your_s3_connection")
file_obj = s3.get_key(s3_file, bucket_name).get()
parquet_data = pd.read_parquet(BytesIO(file_obj['Body'].read()))
parquet_data['processed_timestamp'] = pd.Timestamp.now()
local_path = "/tmp/processed_data.csv"
parquet_data.to_csv(local_path, index=False)
kwargs['ti'].xcom_push(key='local_file_path', value=local_path)
Завдання 3: Завантажити дані в Redshift
tasks/redshift_operations.py
:
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def load_to_redshift(**kwargs):
bucket_name = "your-s3-bucket-name"
s3_path = "your/s3/destination/processed_data.csv"
local_file_path = kwargs['ti'].xcom_pull(key='local_file_path', task_ids='process_parquet')
s3 = S3Hook(aws_conn_id="your_s3_connection")
s3.load_file(filename=local_file_path, key=s3_path, bucket_name=bucket_name, replace=True)
redshift_hook = PostgresHook(postgres_conn_id="your_redshift_connection")
copy_command = f"""
COPY your_table_name
FROM 's3://{bucket_name}/{s3_path}'
IAM_ROLE 'arn:aws:iam::your-account-id:role/your-redshift-role'
FORMAT AS CSV
DELIMITER ','
IGNOREHEADER 1;
"""
redshift_hook.run(copy_command)
4.3 Динамічне завантаження DAG
Створіть скрипт завантажувача для динамічного завантаження YAML-конфігурацій.
dags/dag_loader.py
:
import os
import dagfactory
CONFIG_DIR = "/opt/airflow/dags/configs/"
for config_file in os.listdir(CONFIG_DIR):
if config_file.endswith(".yaml"):
dag_factory = dagfactory.DagFactory(os.path.join(CONFIG_DIR, config_file))
dag_factory.generate_dags(globals())
Потік виконання
- Перевірка файлів Parquet в S3:
- Перераховує файли в S3 бакеті.
- Публікує останній файл в XCom.
2. Обробка файлу Parquet:
- Завантажує і обробляє файл за допомогою Pandas.
- Зберігає оброблений файл локально.
3. Завантаження даних в Redshift:
- Завантажує перетворений файл назад в S3.
- Виконує команду
COPY
для завантаження даних в Redshift.
Переваги
- Ізоляція на рівні проектів:
-
Окремі репозиторії для кожного конвеєра забезпечують модульність та підтримуваність.
Автоматизовані розгортання з CI/CD: -
CI/CD забезпечує безпомилкові та послідовні розгортання.
3. Масштабоване виконання задач з Celery Executor:
- Розподіляє задачі між працівниками для ефективної обробки.
4. Динамічне управління DAG:
- Легко додавайте нові конвеєри, оновлюючи YAML-конфігурації.
Ця архітектура забезпечує масштабоване та ефективне рішення для управління складними робочими процесами на великому масштабі. Інтегруючи DAG-Factory, Celery Executor та S3, ви забезпечуєте підтримуваність і продуктивність у процесах обробки даних.
Перекладено з: Managing Airflow Pipelines at Scale Using DAG-Factory, Celery Executor, and S3 Integration