Використання інкрементальних предикатів для зменшення часу виконання dbt на 80%

Коротко

Використовуючи менш відому та дещо заплутану конфігурацію dbt — інкрементальні предикати, ми зменшили час виконання наших основних завдань dbt майже на 80%! Цей пост здебільшого є просто копією мого PR у блозі, з деякими уточненнями та поясненнями додаткових моментів, але читайте далі, щоб зрозуміти історію того, чому ми працювали над цією проблемою, і чи можливо застосування інкрементальних предикатів буде корисною стратегією для вашої команди, що використовує dbt.

pic

Скріншот графіка середнього часу виконання у нашому основному розгортанні dbt у Prefect Cloud

Історія

Команда з аналітичної інженерії компанії Lovevery оркеструє виконання своїх завдань dbt через оркестратор під назвою Prefect. Ми маємо те, що Prefect називає «розгортанням», під назвою dbt Core Models, яке виконує найбільш важливі моделі нашого проекту dbt, що мають конфігураційний тег core. Це розгортання налаштоване на виконання кожні 3 години з понеділка по п'ятницю та кожні 4 години на вихідних. Спочатку це розгортання було налаштоване на виконання кожні 2 години з понеділка по п'ятницю, і з часом ми помітили, що середній час виконання стає все довшим і довшим. Ми збільшили кількість потоків у розгортанні з 4 до 8, але суттєвого зменшення часу виконання від цього не сталося.

pic

Це здебільшого сталося через те, що повільність можна було приписати лише кільком дуже довготривалим моделям на етапах staging, intermediate та reporting. Ці моделі залежать від сутностей interactions (те, що ми вважаємо подіями) і web_session_details на шарі silver нашої медаліонної архітектури, і інтуїтивно, оскільки ці сутності є типами подій, це мало сенс, оскільки ці таблиці часто мають найбільшу кількість рядків.

Мені було доручено покращити ефективність часу виконання завдань, оскільки іноді на їх виконання витрачалося більше трьох годин. Я визначив кілька способів, як ми можемо потенційно зменшити час виконання dbt Core Models:

  • Ми могли б змінити incremental_strategy наших інкрементальних моделей (щоб побачити, які інкрементальні моделі є у вашому проекті, ви можете виконати команду dbt ls -m config.materialized:incremental).
  • Наразі ми використовуємо стандартну інкрементальну стратегію для наших інкрементальних моделей в адаптері dbt dbt-databricks, стратегію merge. У merge (також відому як upsert) за наявності унікального ключа записи або вставляються (якщо унікальний ключ є в таблиці джерела, але немає в таблиці призначення/цільовій таблиці), або оновлюються, якщо унікальний ключ є в обох таблицях (джерела і призначення/цільовій), або видаляються, якщо унікальний ключ є в таблиці призначення/цільовій, але немає в таблиці джерела. Використовуючи інкрементальну логіку в dbt, ми виконуємо SQL-операцію для отримання підмножини даних з таблиці джерела, зазвичай з фільтром за датою з вказаним вікном пошуку. Ця підмножина є вихідними даними для операції merge. На жаль, ця стратегія стає обчислювально дорогою, коли наша таблиця призначення/цільова (та, яка вже існує) стає дуже великою (що, ймовірно, і є причиною використання інкрементального підходу!). Уся таблиця призначення/цільова повинна бути відсканована для кожної операції merge, щоб знайти унікальний ключ, який необхідно оновити або видалити і вставити, якщо він не знайдений.
    ## Наприклад, ось як виглядає SQL-операція merge, яку dbt виконує при оновленні інкрементальної моделі stgwebsession_details:
/* {"app": "dbt", "dbt_version": "1.6.14", "dbt_databricks_version": "1.6.6", "databricks_sql_connector_version": "2.9.6", "profile_name": "default", "target_name": "sandbox", "node_id": "model.transformations.stg_web_session_details"} */-- підтримка зворотної сумісності для старої назви параметра  
merge into `dbt_erin_staging`.`stg_web_session_details` as DBT_INTERNAL_DEST using `stg_web_session_details__dbt_tmp` as DBT_INTERNAL_SOURCE on DBT_INTERNAL_SOURCE.web_session_detail_id = DBT_INTERNAL_DEST.web_session_detail_id  
when matched then  
update  
set  
 *  
 when not matched then  
insert  
 *

З профілю запиту одного з цих запитів merge, видно, що найбільше часу в усьому процесі займає сканування таблиці.

pic

pic

Однак dbt має інші, швидші інкрементальні стратегії, доступні для нас, і наша конкретна версія адаптера (dbt-databricks) має стратегії append, merge та insert_overwrite.

pic

Використання стратегії append ризикує вставити дублікати записів, тому ця стратегія для наших цілей не підходить.

Коли інкрементальна модель виконується з стратегією insert_overwrite документація тут, лише зазначені частини таблиці призначення/цільової додаються/перезаписуються, а не вся таблиця. Однак ця стратегія не сумісна з SQL-складами (які ми наразі використовуємо на Databricks), тому ми не можемо використовувати цю стратегію (я пробував).

pic

Тому я зрозумів, що нам доведеться зберегти поточну інкрементальну стратегію merge. Однак я зауважив, що використання інкрементальних предикатів (що буде розглянуто нижче) може наблизити нас до відтворення стратегії insert_overwrite, все ще використовуючи стратегію merge, і зменшити час виконання. Це призвело мене до наступної ідеї:

  • Ми могли б зберегти поточну стратегію incremental_strategy, але додати те, що dbt Labs називає інкрементальними предикатами документація тут

Зазначивши incremental_predicate в блоці конфігурації наших інкрементальних моделей, ми можемо обмежити сканування наших попередніх таблиць під час виконання операції merge та суттєво зменшити час обчислень.

Додавши конфігурацію incremental_predicates до блоку конфігурацій для stg_web_session_details, при виконанні операції merge ми шукаємо лише унікальні ключі для співпадінь, які мають session_detail_event_at_utc, що належить до останніх семи днів.

{{  
config(  
schema = 'stage',  
tags = ['core'],  
materialized = 'incremental',  
unique_key = 'web_session_detail_id',  
partition_by = ['session_detail_event_date_utc'],  
incremental_strategy = 'merge',  
incremental_predicates = ["DBT_INTERNAL_DEST.session_detail_event_at_utc > dateadd(day, -7, current_date)"]  
)  
}}

Приклад

Давайте подивимося на це на прикладі, оскільки так мені найкраще зрозуміло! 😄 Припустимо, це поточний стан моєї спрощеної таблиці веб-сесій на етапі staging, stg_web_session_details -- це наша таблиця призначення/цільова.

pic

Припустимо, це поточний стан моєї спрощеної таблиці веб-сесій на шарі silver, us_silver.web_session_details.

Це наша таблиця джерела!

pic

Тепер, коли ми запускаємо наш dbt job і виконуємо код для таблиці stg_web_session_details, ми застосовуємо інкрементальну логіку, яка фільтрує дані, вибрані з нашої таблиці джерела на шарі silver, щоб отримати лише записи з us_silver.web_session_details, які на два дні або менше від найбільш актуальної дати session_detail_event_at_utc, що є в нашій таблиці призначення/цільовій таблиці, stg_web_session_details.

where session_detail_event_at > (select date_add(day, -2, max(session_detail_event_at_utc)) from {{ this }} where domain_region = 'us' )

Рядок select date_add(day, -2, max(session_detail_event_at_utc) from {{ this }} where domain_region = 'us' поверне 2024-08-19, що означає, що наш умова в інкрементальній логіці стає такою:

where session_detail_event_at > '2024-08-19'

Це означає, що підмножина даних, яку ми вибираємо для злиття з нашою таблицею призначення/цільовою таблицею з us_silver.web_session_details, буде такою:

pic

Гаразд! Тепер без використання інкрементальних предикатів наша операція merge для кожного з двох записів буде намагатись знайти унікальні ключі 345 та 456 в нашій таблиці призначення/цільовій таблиці з трьома записами, stg_web_session_details. Якщо вона не знайде запис, то вставить його. Якщо знайде, то оновить його. Наша таблиця призначення/цільова тепер виглядає як наша таблиця джерела:

pic

Це не проблема, оскільки наша таблиця дуже маленька. Але уявіть, що наша таблиця призначення/цільова тепер має 1 мільярд записів; пошук кожного унікального ключа в таблиці з 1 мільярдом записів стає дуже дорогим з обчислювальної точки зору. Однак, якщо ми використовуємо інкрементальний предикат, ми можемо обмежити обсяг даних, які ми шукаємо для злиття!

Використовуючи цей рядок, ми обмежуємо обсяг даних, які ми шукаємо для злиття, тільки до записів, що мають session_detail_event_at_utc, який знаходиться в останні сім днів до поточної дати.

"DBT_INTERNAL_DEST.session_detail_event_at_utc > dateadd(day, -7, current_date)"

/* {"app": "dbt", "dbt_version": "1.6.14", "dbt_databricks_version": "1.6.6", "databricks_sql_connector_version": "2.9.6", "profile_name": "default", "target_name": "sandbox", "node_id": "model.transformations.stg_web_session_details"} */-- підтримка зворотної сумісності для старої назви параметра  
merge into `dbt_erin_staging`.`stg_web_session_details` as DBT_INTERNAL_DEST using `stg_web_session_details__dbt_tmp` as DBT_INTERNAL_SOURCE on DBT_INTERNAL_DEST.session_detail_event_at_utc > dateadd(day, -7, current_date)  
and DBT_INTERNAL_SOURCE.web_session_detail_id = DBT_INTERNAL_DEST.web_session_detail_id  
when matched then  
update  
set  
 *  
 when not matched then  
insert  
 *

Без цього рядка ми б шукали по всій таблиці призначення/цільовій:

pic

Але тепер ми шукаємо тільки в цій підмножині записів у таблиці призначення/цільовій:

pic

Це може, на великому масштабі, суттєво зменшити час пошуку для кожної операції merge.

Такий тип оптимізації добре працює для подійно орієнтованих даних або даних, які оновлюються лише іноді (або зовсім не оновлюються) після їх створення.

Єдина причина для використання вікна для перегляду (lookback window) полягає в тому, щоб врахувати пізно надійшлі записи або записи, які були створені, наприклад, три дні тому, але були інгестовані лише сьогодні.

Наприклад, що якщо наші джерела і таблиці призначення/цільові виглядали б наступним чином:

Таблиця джерела:

pic

Таблиця призначення/цільова:

pic

Якби ми використовували наш інкрементальний предикат, ми б шукали тільки наступну підмножину записів з нашої таблиці призначення/цільової:

pic

І наша операція merge призвела б до такої таблиці призначення/цільової:

pic

Очевидно, що це не добре і, ймовірно, порушить будь-які перевірки унікальності на таблиці. Тому, такі таблиці з довгими періодами, коли зміни записів можуть відбуватися (наприклад, записи, пов'язані з підписками, можуть мати довгі проміжки між оновленнями), непридатні для такого типу оптимізації. Тому я додав цю конфігурацію лише до наших моделей подійно орієнтованих типів, які також є нашими найбільш довготривалими!

Наприклад, запуск однієї з раніше довготривалих моделей у моєму середовищі розробки тривав лише 1 хвилину і 31,78 секунди (на відміну від понад 30 хвилин раніше) з цією додатковою конфігурацією.

╭─erin.feaser@Erin-Feaser-MacBook ~/lovevery/datahub-dbt/src ‹feature/dim-5108_fix-long-run-times●› ‹datahub-dbt-AWrLjnjm›
╰─$ dbt run -s int_interactions 1   
15:59:11 Running with dbt=1.6.1415:59:12 Registered adapter: databricks=1.6.615:59:13 Found 414 models, 10 snapshots, 2031 tests, 12 seeds, 215 sources, 0 exposures, 0 metrics, 980 macros, 1 group, 0 semantic models15:59:1315:59:17 Concurrency: 64 threads (target='sandbox')15:59:1715:59:17 1 of 1 START sql incremental model dbt_erin_intermediate.int_interactions ...... [RUN]16:00:44 1 of 1 OK created sql incremental model dbt_erin_intermediate.int_interactions . [OK in 86.98s]16:00:4416:00:44 Finished running 1 incremental model in 0 hours 1 minutes and 31.78 seconds (91.78s).16:00:4416:00:44 Completed successfully16:00:4416:00:44 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

NOTE:

Теоретично, додавши макрос, ми могли б налаштувати вікно для перегляду (look-back window) для інкрементальних предикатів, щоб воно динамічно змінювалося в залежності від максимальної дати нових даних, які ми намагаємось вставити. Однак, якщо це дуже далеко в минулому, це не буде корисно, оскільки це означає, що ми все одно будемо сканувати більшу частину таблиці джерела/цільової таблиці.

  • Крім того, але не в рамках цього PR, ми можемо ще більше зменшити час виконання, змінивши всі моделі в стадійному шарі, щоб вони матеріалізувались як перегляди.
  • У своїй попередній посаді я матеріалізував усі моделі, що не належать до шару звітності, як перегляди, якщо це не були інкрементальні моделі. Я робив це, тому що (теоретично, а не завжди на практиці) лише моделі зі шару звітності повинні використовуватись для ад-хок запитів від зацікавлених сторін та інструментів BI. Таким чином, нам не потрібні переваги продуктивності від збереження таблиці в пам'яті.

Перекладено з: Using Incremental Predicates to Decrease dbt Run Times by 80%

Leave a Reply

Your email address will not be published. Required fields are marked *