1. Що таке Data Pipelines?
Data pipeline (канал обробки даних) автоматизує потік даних від джерела до призначення через різні етапи обробки, такі як вилучення, перетворення та завантаження (ETL).
2. Ознайомлення з Jenkins для Data Pipelines
Jenkins зазвичай використовується в CI/CD pipeline (конвеєр інтеграції та доставки), але він також може ініціювати скрипти для обробки даних. Ось як можна використовувати Jenkins для каналу обробки даних:
- Встановлення Jenkins:
- Завантажте та встановіть Jenkins на вашу систему (або використовуйте Docker).
- Встановіть плагіни (наприклад, Pipeline, Build Tools тощо).
2. Створення простого Pipeline:
- Визначте ваші кроки в Jenkinsfile (декларативний синтаксис).
- Використовуйте синтаксис, схожий на cron, в Jenkins для планування.
3. Приклад Jenkins Pipeline для ETL
pipeline {
agent any
stages {
stage('Extract') {
steps {
sh 'python extract_data.py'
}
}
stage('Transform') {
steps {
sh 'python transform_data.py'
}
}
stage('Load') {
steps {
sh 'python load_data.py'
}
}
}
}
3. Ознайомлення з Apache Airflow для Data Pipelines
Apache Airflow спеціально створений для керування складними робочими процесами за допомогою DAG (Directed Acyclic Graph - орієнтований ациклічний граф). DAG визначає завдання як вузли, а їхні залежності — як орієнтовані ребра.
Кроки для створення Data Pipeline в Airflow:
- Налаштування Airflow:
- Встановіть за допомогою
pip install apache-airflow
. - Налаштуйте
airflow.cfg
та запустіть вебсервер і планувальник Airflow.
2. Визначте ваш DAG:
- Створіть Python-скрипт для вашого DAG в директорії
dags/
.
3. Приклад Airflow DAG для ETL:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extract_data():
print("Extracting data...")
def transform_data():
print("Transforming data...")
def load_data():
print("Loading data...")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 12, 1),
'retries': 1,
}
dag = DAG(
'etl_pipeline',
default_args=default_args,
description='A simple ETL pipeline',
schedule_interval='@daily',
)
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag,
)
load_task = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag,
)
extract_task >> transform_task >> load_task
Пояснення: Завдання визначені та з'єднані за допомогою оператора >>
, утворюючи орієнтований ациклічний граф.
4. Ключові відмінності
6. Пояснення DAG
Directed Acyclic Graph (DAG - орієнтований ациклічний граф) — це граф, який має:
- Орієнтовані ребра: Вони показують потік від одного завдання до іншого.
- Ациклічну властивість: В графі немає циклів, що гарантує відсутність повторення завдань.
- Вузли: Представляють окремі завдання.
- Ребра: Представляють залежності між завданнями.
Приклад:
У наведеному прикладі з Airflow завдання повинні виконуватись у такому порядку:
- Extract → 2. Transform → 3. Load.
Якщо Extract task не вдалося, наступні завдання не будуть виконуватись.
Висновок
- Використовуйте Jenkins для простих робочих процесів або якщо ви вже використовуєте його для CI/CD.
- Використовуйте Airflow для складних робочих процесів з залежностями та розширеним плануванням.
Перекладено з: Jenkins and Apache Airflow for building automated data pipelines