Управління пайплайнами Airflow на великому масштабі за допомогою DAG-Factory, Celery Executor та інтеграції з S3.

pic

Моя подорож у Airflow і чому мені був потрібен масштабований підхід

Як інженер даних, моя подорож з Apache Airflow почалася з кількох простих ETL конвеєрів. Спочатку все здавалося керованим — визначення DAG в Python, планування задач та усунення несправностей у робочих процесах. Але з розширенням моїх проєктів я зіткнувся з кількома значними проблемами:

  • Обробка масштабу: Керування понад 100+ конвеєрів даних серед різних команд та проєктів стало важким. Конвеєри почали заважати один одному, що призводило до неефективності та вузьких місць.
  • Проблеми з розгортанням: Ручне розгортання DAG та їх залежностей було схильним до помилок і займало багато часу.
  • Вузькі місця виконання: З зростанням навантажень виконання задач на одному комп'ютері стало неможливим, що призводило до затримок і збоїв.

Я зрозумів, що мені потрібен більш модульний, масштабований та автоматизований підхід для ефективної організації моїх робочих процесів даних. Це привело мене до створення рішення за допомогою DAG-Factory, Celery Executor та інтеграції з S3, щоб подолати ці проблеми та масштабувати мою настройку Airflow.

Чому цей підхід?

  1. Ізоляція на рівні проєкту: Кожен конвеєр керується окремо, що спрощує розробку, налагодження та розгортання.
  2. Автоматизовані розгортання: CI/CD конвеєри усувають ручні помилки та забезпечують надійні оновлення.
  3. Масштабоване виконання: Celery Executor дозволяє виконувати задачі розподілено, забезпечуючи ефективне використання ресурсів.
  4. Інтеграція в хмарі: S3 виступає як централізований шар зберігання для проміжних та фінальних наборів даних, а Redshift займається обробкою даних для аналітики.

Ця стаття розповідає вам про точну архітектуру, яку я реалізував для оптимізації та масштабування моїх робочих процесів Airflow.

1. Організація конвеєрів із ізоляцією на рівні проєкту

Ізоляція на рівні проєкту гарантує, що кожен конвеєр є модульним і незалежно керованим. Використовуючи DAG-Factory, ми можемо застосувати підхід на основі конфігурації для визначення Airflow конвеєрів, що мінімізує хардкодування та спрощує додавання нових робочих процесів.

Крок 1.1: Використання DAG-Factory для визначення DAG на основі YAML

DAG-Factory дозволяє вам оголошувати робочі процеси декларативно в YAML файлах, зменшуючи дублювання коду та роблячи робочі процеси легшими для підтримки.

Встановлення DAGKit

Встановіть DAG-Factory в своєму середовищі Airflow:

pip install dag-factory

Крок 1.2: Структура каталогу проєкту

Кожен конвеєр ізольований у власній папці проєкту з чітким поділом конфігурацій, задач і залежностей.

Приклад структури проєкту:

marketing_pipeline/  
├── configs/  
│ └── dag_config.yaml # YAML конфігурація для DAG  
├── tasks/  
│ ├── extract.py # Задача для витягування даних  
│ ├── transform.py # Задача для трансформації даних  
│ └── load.py # Задача для завантаження даних  
├── requirements.txt # Залежності для цього проєкту  
├── README.md # Документація

Крок 1.3: Використання DAG-Factory для модульного керування конвеєрами

DAG-Factory дозволяє визначати конвеєри як YAML конфігурації. Ці конфігурації динамічно завантажуються в Airflow, що робить керування та зміну конвеєрів легшими.

Переваги використання DAG-Factory:

  • Усуває повторюваний Python код для визначення DAG.
  • Покращує підтримуваність завдяки декларативним YAML конфігураціям.
  • Динамічно завантажує конвеєри під час виконання, спрощуючи оновлення.

2. Реалізація CI/CD для автоматизованих розгортань

2.1 Чому використовувати CI/CD для Airflow?

Ручне розгортання DAG і конфігурацій до Airflow може бути схильним до помилок і забирати багато часу.
CI/CD автоматизує цей процес, забезпечуючи:

  • Узгоджені розгортання.
  • Правильну валідацію конфігурацій та залежностей.
  • Швидші оновлення конвеєрів.

2.2 Налаштування CI/CD робочого процесу

Приклад GitHub Actions Workflow (.github/workflows/deploy.yml):

name: Deploy Airflow DAGs  
on:  
 push:  
 branches:  
 - main  
jobs:  
 deploy:  
 runs-on: ubuntu-latest  
 steps:  
 - name: Checkout Code  
 uses: actions/checkout@v3  
 - name: Install Dependencies  
 run: pip install dag-factory -r requirements.txt  
 - name: Validate YAML Configurations  
 run: |  
 for file in configs/*.yaml; do  
 python -m dagfactory.validate_config "$file"  
 done  
 - name: Deploy DAG Configurations  
 env:  
 AIRFLOW_HOST: ${{ secrets.AIRFLOW_HOST }}  
 AIRFLOW_USERNAME: ${{ secrets.AIRFLOW_USERNAME }}  
 AIRFLOW_PASSWORD: ${{ secrets.AIRFLOW_PASSWORD }}  
 run: |  
 scp -r configs/*.yaml airflow@${AIRFLOW_HOST}:/opt/airflow/dags/configs/

Цей робочий процес:

  1. Перевіряє останній код.
  2. Валідовує конфігурації DAG, використовуючи DAG-Factory.
  3. Розгортає YAML конфігурації на інстанції Airflow.

2.3 Автоматизація керування залежностями

Кожен проєкт може визначати свої власні залежності в файлі requirements.txt. CI/CD конвеєр повинен встановити ці залежності в середовище Airflow.

Приклад requirements.txt:

pandas==1.5.0  
boto3==1.28.0  
psycopg2==2.9.3

3. Використання Celery Executor для розподіленого виконання задач

3.1 Чому використовувати Celery Executor?

Celery Executor ідеально підходить для масштабування виконання задач через кілька воркерів. Це дозволяє:

  • Розподілену обробку: Задачі розподіляються між кількома воркерами.
  • Ефективне використання ресурсів: Кількість воркерів можна масштабувати вгору або вниз залежно від попиту.
  • Толерантність до збоїв: Задачі можуть автоматично повторюватися у разі збоїв.

3.2 Налаштування Celery Executor на Kubernetes

Використання Celery Executor з Airflow на Kubernetes забезпечує розподілене виконання задач між воркерами, гарантуючи масштабованість та толерантність до збоїв. Ось як налаштувати Celery Executor в середовищі Kubernetes:

Крок 1: Передумови

  1. Kubernetes Cluster: Переконайтеся, що у вас є працюючий кластер Kubernetes з доступом до розгортання подів.
  2. Встановлення Airflow: Використовуйте офіційний Apache Airflow Helm Chart або кастомне розгортання.
  3. Message Broker: Необхідний брокер повідомлень для Celery.
    Загальні варіанти вибору:
  • Redis: Легкий і швидкий.
  • RabbitMQ: Брокер повідомлень рівня підприємства з розширеними функціями.

Крок 2: Налаштування Celery Executor в Airflow

Змініть налаштування в airflow.cfg, щоб використовувати Celery Executor:

Оновіть налаштування executor

Встановіть executor на CeleryExecutor:

[core]  
executor = CeleryExecutor

Налаштуйте Message Broker

Для Redis як брокера повідомлень:

[celery]  
broker_url = redis://:6379/0  
result_backend = db+postgresql://:@:5432/

Для RabbitMQ як брокера повідомлень:

[celery]  
broker_url = amqp://:@:5672/  
result_backend = db+postgresql://:@:5432/

Крок 3: Розгортання Message Broker

Варіант 1: Розгортання Redis

Використовуйте Helm для розгортання Redis у кластері Kubernetes:

helm repo add bitnami https://charts.bitnami.com/bitnami  
helm install redis bitnami/redis

Варіант 2: Розгортання RabbitMQ

Розгорніть RabbitMQ за допомогою Helm:

helm repo add bitnami https://charts.bitnami.com/bitnami  
helm install rabbitmq bitnami/rabbitmq

Крок 4: Оновлення Airflow Helm Chart

Якщо ви використовуєте Apache Airflow Helm Chart, оновіть файл values.yaml, щоб налаштувати Celery Executor.

Налаштуйте executor

executor: CeleryExecutor

Налаштуйте налаштування Celery

Додайте налаштування специфічні для Celery в розділі config:

config:  
 AIRFLOW__CELERY__BROKER_URL: redis://redis-master:6379/0  
 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgresql-airflow:5432/airflow

Налаштуйте Redis або RabbitMQ

Переконайтеся, що broker_url в конфігурації відповідає імені сервісу Redis або RabbitMQ у вашому кластері Kubernetes.

Крок 5: Масштабування Worker Pods

У середовищі Kubernetes воркери Celery працюють як поди. Ви можете масштабувати кількість воркерів залежно від навантаження.

Конфігурація Helm для Worker

У файлі values.yaml налаштуйте воркер-поди:

workers:  
 enabled: true  
 replicas: 3  
 resources:  
 requests:  
 memory: "512Mi"  
 cpu: "500m"  
 limits:  
 memory: "1Gi"  
 cpu: "1"

Розгорніть оновлений Airflow Helm chart:

helm upgrade airflow apache-airflow/airflow -f values.yaml

Ви можете динамічно масштабувати воркерів:

kubectl scale deployment airflow-worker --replicas=5

Крок 6: Моніторинг Celery Workers

  1. Перевірте Worker Pods: Переконайтеся, що воркери Celery працюють:
kubectl get pods -l component=worker

2. Перевірте логи: Моніторинг логів воркера Celery:

kubectl logs 

3. Airflow UI: В інтерфейсі Airflow, на вкладці DAGs, ви можете відстежувати статус задач, виконуваних воркерами.

Крок 7: Опційно — Автоматичне масштабування Celery Workers

Ви можете налаштувати Kubernetes Horizontal Pod Autoscaler (HPA), щоб масштабувати Celery worker поди на основі використання CPU або пам'яті.

Увімкнення HPA

Застосуйте наступну конфігурацію для увімкнення автоматичного масштабування:

apiVersion: autoscaling/v2beta2  
kind: HorizontalPodAutoscaler  
metadata:  
 name: airflow-worker  
 namespace: airflow  
spec:  
 scaleTargetRef:  
 apiVersion: apps/v1  
 kind: Deployment  
 name: airflow-worker  
 minReplicas: 2  
 maxReplicas: 10  
 metrics:  
 - type: Resource  
 resource:  
 name: cpu  
 target:  
 type: Utilization  
 averageUtilization: 70

Застосуйте конфігурацію HPA:

kubectl apply -f hpa.yaml

Приклад робочого процесу: інтеграція S3 та Redshift

## 4.1 Конфігурація DAG

Визначте конвеєр для завантаження файлів Parquet з S3 в Redshift у форматі YAML за допомогою DAGKit.

`configs/s3_to_redshift_pipeline.yaml`:

defaultargs:
owner: airflow
retries: 2
retry
delaysec: 300
dag:
dag
id: "s3toredshiftpipeline"
schedule
interval: "@daily"
startdate: "2023-01-01"
tasks:
check
s3files:
operator: PythonOperator
task
id: "checks3files"
pythoncallable: "tasks.s3operations.checks3files"
processparquet:
operator: PythonOperator
task
id: "processparquet"
python
callable: "tasks.dataprocessing.processparquet"
dependencies:
- checks3files
loadtoredshift:
operator: PythonOperator
taskid: "loadtoredshift"
python
callable: "tasks.redshiftoperations.loadtoredshift"
dependencies:
- process
parquet
```

4.2 Реалізація задач

Завдання 1: Перевірити наявність файлів Parquet в S3

tasks/s3_operations.py:

from airflow.providers.amazon.aws.hooks.s3 import S3Hook  
def check_s3_files(**kwargs):  
 bucket_name = "your-s3-bucket-name"  
 prefix = "your/s3/prefix/"  
 s3 = S3Hook(aws_conn_id="your_s3_connection")  
 files = s3.list_keys(bucket_name=bucket_name, prefix=prefix)  
 if not files:  
 raise ValueError(f"No files found in S3 bucket '{bucket_name}' with prefix '{prefix}'")  

 kwargs['ti'].xcom_push(key='s3_file', value=files[0])

Завдання 2: Обробити файл Parquet

tasks/data_processing.py:

import pandas as pd  
from io import BytesIO  
from airflow.providers.amazon.aws.hooks.s3 import S3Hook  
def process_parquet(**kwargs):  
 bucket_name = "your-s3-bucket-name"  
 s3_file = kwargs['ti'].xcom_pull(key='s3_file', task_ids='check_s3_files')  
 s3 = S3Hook(aws_conn_id="your_s3_connection")  
 file_obj = s3.get_key(s3_file, bucket_name).get()  
 parquet_data = pd.read_parquet(BytesIO(file_obj['Body'].read()))  
 parquet_data['processed_timestamp'] = pd.Timestamp.now()  
 local_path = "/tmp/processed_data.csv"  
 parquet_data.to_csv(local_path, index=False)  
 kwargs['ti'].xcom_push(key='local_file_path', value=local_path)

Завдання 3: Завантажити дані в Redshift

tasks/redshift_operations.py:

from airflow.providers.postgres.hooks.postgres import PostgresHook  
from airflow.providers.amazon.aws.hooks.s3 import S3Hook  
def load_to_redshift(**kwargs):  
 bucket_name = "your-s3-bucket-name"  
 s3_path = "your/s3/destination/processed_data.csv"  
 local_file_path = kwargs['ti'].xcom_pull(key='local_file_path', task_ids='process_parquet')  
 s3 = S3Hook(aws_conn_id="your_s3_connection")  
 s3.load_file(filename=local_file_path, key=s3_path, bucket_name=bucket_name, replace=True)  
 redshift_hook = PostgresHook(postgres_conn_id="your_redshift_connection")  
 copy_command = f"""  
 COPY your_table_name  
 FROM 's3://{bucket_name}/{s3_path}'  
 IAM_ROLE 'arn:aws:iam::your-account-id:role/your-redshift-role'  
 FORMAT AS CSV  
 DELIMITER ','  
 IGNOREHEADER 1;  
 """  
 redshift_hook.run(copy_command)

4.3 Динамічне завантаження DAG

Створіть скрипт завантажувача для динамічного завантаження YAML-конфігурацій.

dags/dag_loader.py:

import os  
import dagfactory  
CONFIG_DIR = "/opt/airflow/dags/configs/"  
for config_file in os.listdir(CONFIG_DIR):  
 if config_file.endswith(".yaml"):  
 dag_factory = dagfactory.DagFactory(os.path.join(CONFIG_DIR, config_file))  
 dag_factory.generate_dags(globals())

Потік виконання

  1. Перевірка файлів Parquet в S3:
  • Перераховує файли в S3 бакеті.
  • Публікує останній файл в XCom.

2. Обробка файлу Parquet:

  • Завантажує і обробляє файл за допомогою Pandas.
  • Зберігає оброблений файл локально.

3. Завантаження даних в Redshift:

  • Завантажує перетворений файл назад в S3.
  • Виконує команду COPY для завантаження даних в Redshift.

Переваги

  1. Ізоляція на рівні проектів:
  • Окремі репозиторії для кожного конвеєра забезпечують модульність та підтримуваність.
    Автоматизовані розгортання з CI/CD:

  • CI/CD забезпечує безпомилкові та послідовні розгортання.

3. Масштабоване виконання задач з Celery Executor:

  • Розподіляє задачі між працівниками для ефективної обробки.

4. Динамічне управління DAG:

  • Легко додавайте нові конвеєри, оновлюючи YAML-конфігурації.

Ця архітектура забезпечує масштабоване та ефективне рішення для управління складними робочими процесами на великому масштабі. Інтегруючи DAG-Factory, Celery Executor та S3, ви забезпечуєте підтримуваність і продуктивність у процесах обробки даних.

Перекладено з: Managing Airflow Pipelines at Scale Using DAG-Factory, Celery Executor, and S3 Integration

Leave a Reply

Your email address will not be published. Required fields are marked *