Jenkins та Apache Airflow для створення автоматизованих каналів обробки даних

1. Що таке Data Pipelines?

Data pipeline (канал обробки даних) автоматизує потік даних від джерела до призначення через різні етапи обробки, такі як вилучення, перетворення та завантаження (ETL).

2. Ознайомлення з Jenkins для Data Pipelines

Jenkins зазвичай використовується в CI/CD pipeline (конвеєр інтеграції та доставки), але він також може ініціювати скрипти для обробки даних. Ось як можна використовувати Jenkins для каналу обробки даних:

  1. Встановлення Jenkins:
  • Завантажте та встановіть Jenkins на вашу систему (або використовуйте Docker).
  • Встановіть плагіни (наприклад, Pipeline, Build Tools тощо).

2. Створення простого Pipeline:

  • Визначте ваші кроки в Jenkinsfile (декларативний синтаксис).
  • Використовуйте синтаксис, схожий на cron, в Jenkins для планування.

3. Приклад Jenkins Pipeline для ETL

pipeline {  
 agent any  
 stages {  
 stage('Extract') {  
 steps {  
 sh 'python extract_data.py'  
 }  
 }  
 stage('Transform') {  
 steps {  
 sh 'python transform_data.py'  
 }  
 }  
 stage('Load') {  
 steps {  
 sh 'python load_data.py'  
 }  
 }  
 }  
}

3. Ознайомлення з Apache Airflow для Data Pipelines

Apache Airflow спеціально створений для керування складними робочими процесами за допомогою DAG (Directed Acyclic Graph - орієнтований ациклічний граф). DAG визначає завдання як вузли, а їхні залежності — як орієнтовані ребра.

Кроки для створення Data Pipeline в Airflow:

  1. Налаштування Airflow:
  • Встановіть за допомогою pip install apache-airflow.
  • Налаштуйте airflow.cfg та запустіть вебсервер і планувальник Airflow.

2. Визначте ваш DAG:

  • Створіть Python-скрипт для вашого DAG в директорії dags/.

3. Приклад Airflow DAG для ETL:

from airflow import DAG  
from airflow.operators.python_operator import PythonOperator  
from datetime import datetime  

def extract_data():  
 print("Extracting data...")  

def transform_data():  
 print("Transforming data...")  

def load_data():  
 print("Loading data...")  

default_args = {  
 'owner': 'airflow',  
 'depends_on_past': False,  
 'start_date': datetime(2023, 12, 1),  
 'retries': 1,  
}  

dag = DAG(  
 'etl_pipeline',  
 default_args=default_args,  
 description='A simple ETL pipeline',  
 schedule_interval='@daily',  
)  

extract_task = PythonOperator(  
 task_id='extract',  
 python_callable=extract_data,  
 dag=dag,  
)  

transform_task = PythonOperator(  
 task_id='transform',  
 python_callable=transform_data,  
 dag=dag,  
)  

load_task = PythonOperator(  
 task_id='load',  
 python_callable=load_data,  
 dag=dag,  
)  

extract_task >> transform_task >> load_task

Пояснення: Завдання визначені та з'єднані за допомогою оператора >>, утворюючи орієнтований ациклічний граф.

4. Ключові відмінності

pic

6. Пояснення DAG

Directed Acyclic Graph (DAG - орієнтований ациклічний граф) — це граф, який має:

  • Орієнтовані ребра: Вони показують потік від одного завдання до іншого.
  • Ациклічну властивість: В графі немає циклів, що гарантує відсутність повторення завдань.
  • Вузли: Представляють окремі завдання.
  • Ребра: Представляють залежності між завданнями.

Приклад:
У наведеному прикладі з Airflow завдання повинні виконуватись у такому порядку:

  1. Extract → 2. Transform → 3. Load.
    Якщо Extract task не вдалося, наступні завдання не будуть виконуватись.

Висновок

  • Використовуйте Jenkins для простих робочих процесів або якщо ви вже використовуєте його для CI/CD.
  • Використовуйте Airflow для складних робочих процесів з залежностями та розширеним плануванням.

Перекладено з: Jenkins and Apache Airflow for building automated data pipelines

Leave a Reply

Your email address will not be published. Required fields are marked *