Впровадження повільно змінюваних вимірів (SCD) Тип 1 та Тип 2

У своїй попередній статті я розглядав, що таке SCD Тип-1 та SCD Тип-2, а також коли їх варто використовувати. У цій статті я розповім, як їх реалізувати. Існує кілька способів реалізації, але я розгляну два варіанти: за допомогою SQL і також за допомогою Pyspark. Почнемо з реалізації SCD Тип-1 за допомогою SQL клаузи MERGE.

Для SCD Тип-1

В SCD Тип-1 мета — це перезапис старих записів новими. Це використовується, коли історичні дані не мають значення і потрібно зберігати тільки найактуальнішу версію запису.

Створення таблиць джерела та призначення

Спочатку створимо таблиці джерела та призначення. Таблиця джерела — це місце, звідки надходять дані, а таблиця призначення — це місце, де будуть зберігатися остаточні дані.

--------------- TARGET TABLE ----------------------  
create table target_customer(  
 Customer_ID INT PRIMARY KEY NOT NULL,  
 Name VARCHAR(200),  
 Email VARCHAR(300),  
 Phone VARCHAR(50)  
);  

INSERT INTO target_customer  
VALUES(Customer_ID, Name, Email, Phone)  
(100, 'Kakashi Hatake', '[email protected]','111-111-1111'),  
(101, 'Naruto Uzumaki', '[email protected]','123-456-7890'),  
(102, 'Itachi Uchiha', '[email protected]','970-234-5670');  


----------------- SOURCE TABLE ------------------------  
create table source_customer(  
 Customer_ID INT PRIMARY KEY NOT NULL,  
 Name VARCHAR(200),  
 Email VARCHAR(300),  
 Phone VARCHAR(50)  
);  

INSERT INTO source_customer (Customer_ID, Name, Email, Phone)  
VALUES  
(101, 'Naruto Uzumaki', '[email protected]','123-456-7890'), -- НОВИЙ EMAIL  
(102, 'Itachi Uchiha', '[email protected]','970-234-5670'),  
(103, 'Madara Uchiha', '[email protected]','456-321-1232'); -- НОВИЙ КОРИСТУВАЧ

pic

таблиця призначення (присутня в сховищі даних)

pic

таблиця джерела (присутня в операційній базі даних)

Як ви бачите, у Наруті Узумакі тепер нова електронна адреса, а новий користувач Мадара Учіха був доданий. Тому ми повинні виконати операцію Upsert (Update та Insert) в нашій таблиці призначення.

Реалізація SCD Тип-1 за допомогою SQL

MERGE INTO target_customer as target  
USING source_customer as source  
ON target.Customer_ID = source.Customer_ID --1. Перевірка на наявність співпадаючих записів  
WHEN MATCHED AND( -- якщо співпадіння знайдено  
 target.Name <> source.Name OR -- перевірка на зміну імені  
 target.Email <> source.Email OR -- перевірка на зміну email  
 target.Phone <> source.Email -- перевірка на зміну телефону  
) THEN UPDATE -- якщо зміни є, оновлюємо значення  
 SET -- встановлюємо нові значення  
 target.Name = source.Name,   
 target.Email = source.Email,  
 target.Phone = source.Phone  
WHEN NOT MATCHED BY target THEN -- якщо співпадіння не знайдено, значить нове значення  
 INSERT(Customer_ID, Name, Email, Phone) -- вставляємо нове значення  
 VALUES(source.Customer_ID, source.Name, source.Email, source.Phone);

pic

таблиця призначення після виконання операції merge

Кроки для SCD Тип-1:

  1. Співвіднесіть записи в джерелі та призначенні за допомогою Customer_ID.
  2. Якщо співпадіння знайдено і дані відрізняються, оновіть запис в таблиці призначення.
  3. Якщо співпадіння не знайдено, вставте новий запис у таблицю призначення.

Еквівалентна реалізація в PySpark

# Примітка: merge працює лише з таблицями Delta  
from delta.tables import DeltaTable  

delta_target = DeltaTable.forPath(spark, "/path/to/target_table")  
source_customer = spark.read.format("delta").load("/path/to/source_table")  

delta_target.alias('target').merge(  
 source_customer.alias('source'),  
 "target.Customer_ID = source.Customer_ID"  
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Для SCD Тип-2

В SCD Тип-2 мета полягає в збереженні історичних даних шляхом додавання нового рядка для кожної зміни в таблиці джерела при збереженні попередніх записів.
Це важливо, коли потрібно відслідковувати історію змін.

Модифікація таблиці призначення

Таблиця призначення потребує кількох додаткових колонок для відслідковування історичних змін. Ось для чого служить кожна з колонок:

  • Start_Date: Вказує, коли запис став активним.
  • End_Date: Показує, коли запис був замінений або став неактивним. Для останнього запису це значення залишатиметься порожнім (NULL).
  • Is_Current: Простий прапорець Y або N, що вказує, чи є це останньою версією запису (Y), або старішою версією (N).

Ці колонки дозволяють легко бачити, як дані змінювались з часом, і забезпечують те, що важливі історичні деталі не загубляться.

CREATE TABLE target_customer_scd2(  
 Customer_ID INT NOT NULL,  
 Name VARCHAR(200),  
 Email VARCHAR(300),  
 Phone VARCHAR(50),  
 Start_Date DATE NOT NULL,  
 End_Date DATE,  
 Is_Current CHAR(1) DEFAULT 'Y'  
);  

INSERT INTO target_customer_scd2  
VALUES  
(101, 'Naruto Uzumaki', '[email protected]', '777-777-7777', '2024-01-01', '2025-01-01', 'N'),  
(101, 'Naruto Uzumaki', '[email protected]', '123-456-7890', '2025-01-01', NULL, 'Y'),  
(102, 'Itachi Uchiha', '[email protected]', '970-234-5670', '2024-01-01', NULL, 'Y');

pic

Що стосується таблиці джерела, то ми використаємо ту саму таблицю джерела, яку використовували для SCD Тип-1.

pic

Реалізація SCD Тип-2 за допомогою SQL

-- 1. Спочатку оновлюємо записи  
MERGE INTO target_customer_scd2 AS target  
USING source_customer AS source  
ON target.Customer_ID = source.Customer_ID AND target.Is_Current = 'Y'  
-- Співвіднесення записів у джерелі та поточних (активних) записів у таблиці призначення  
WHEN MATCHED AND (  
 target.Name <> source.Name OR  
 target.Email <> source.Email OR  
 target.Phone <> source.Phone -- якщо є співпадіння  
) THEN   
 UPDATE   
 SET   
 target.End_Date = GETDATE(), -- позначаємо існуючий запис як історичний  
 target.Is_Current = 'N';  

-- 2. Потім вставляємо оновлені записи  
INSERT INTO target_customer_scd2 (Customer_ID, Name, Email, Phone, Start_Date, End_Date, Is_Current)  
SELECT source.Customer_ID, source.Name, source.Email, source.Phone, GETDATE(), NULL, 'Y'  
FROM source_customer AS source  
WHERE NOT EXISTS (  
 SELECT 1  
 FROM target_customer_scd2 AS target  
 WHERE target.Customer_ID = source.Customer_ID AND target.Is_Current = 'Y'  
-- Перевірка, чи запис уже існує і чи є він активним.  
);

pic

таблиця, створена після виконання SCD Тип-2

Кроки для SCD Тип-2:

  1. Співвіднесіть записи з таблиці source_customer з поточними активними записами в таблиці target_customer_scd2, використовуючи Customer_ID та Is_Current = 'Y'.
  2. Якщо співпадіння знайдено і будь-які дані (Name, Email, або Phone) відрізняються між джерелом і призначенням, позначте існуючий запис в таблиці призначення як історичний, встановивши:
    End_Date на поточну дату (GETDATE()).
    Is_Current = 'N'.
  3. Вставте новий запис для оновлених даних з джерела, встановивши:
    Start_Date на поточну дату (GETDATE())
    End_Date як NULL (що означає, що це поточний активний запис).
    Is_Current = 'Y' (що позначає новий активний запис).
  4. Вставте нові записи з джерела безпосередньо в таблицю призначення, якщо для цього Customer_ID немає активного запису в таблиці призначення (NOT EXISTS умова).

Таким чином, SCD Тип-2 зберігає історію змін, одночасно підтримуючи останню версію.

Існують й інші способи реалізації SCD Тип-2, наприклад, використовуючи клаузу OUTPUT. Ось тут є детальна стаття з цієї теми від іншого автора.
Але якщо вам цікаво, ось змінений код, заснований на моїх таблицях:

INSERT INTO target_customer_scd2 (Customer_ID, Name, Email, Phone, Start_Date, End_Date, Is_Current)  
SELECT   
 id,   
 name,   
 email,   
 phone,   
 start_date,   
 end_date,   
 is_current  
FROM (  
 MERGE INTO target_customer_scd2 AS target  
 USING source_customer AS source  
 ON target.Customer_ID = source.Customer_ID AND target.Is_Current = 'Y'  

 -- Крок 1: Оновлюємо поточний активний запис, позначаючи його як історичний  
 WHEN MATCHED AND (  
 target.Name <> source.Name OR  
 target.Email <> source.Email OR  
 target.Phone <> source.Phone  
 ) THEN   
 UPDATE   
 SET   
 target.End_Date = GETDATE(),   
 target.Is_Current = 'N'  

 -- Крок 2: Вставляємо новий запис для оновлених або нових даних  
 WHEN NOT MATCHED BY TARGET   
 THEN   
 INSERT (Customer_ID, Name, Email, Phone, Start_Date, End_Date, Is_Current)  
 VALUES (source.Customer_ID, source.Name, source.Email, source.Phone, GETDATE(), NULL, 'Y')  

 -- Виводимо зміни  
 OUTPUT $action,  
 source.Customer_ID,  
 source.Name,  
 source.Email,  
 source.Phone,  
 GETDATE(),  
 NULL,  
 'Y'  
) AS [changes] (action, id, name, email, phone, start_date, end_date, is_current)  

-- Фільтруємо за дією, щоб вставити тільки оновлені записи  
WHERE action = 'UPDATE';

Тепер реалізуємо це за допомогою Pyspark.

from delta.tables import DeltaTable  
from pyspark.sql.functions import current_date, lit, col, xxhash64  

source_df = spark.sql("SELECT * FROM source_customer")  
target_delta = DeltaTable.forName(spark, "Your_Catalog_Name.Schema_Name.Table_Name")  

joined_df = source_df.join(  
 target_delta.toDF(),  
 (source_df.Customer_ID == target_delta.toDF().Customer_ID) &  
 (target_delta.toDF().Is_Current == 'Y'),  
 'left'  
).select(  
 source_df['*'],  
 target_delta.toDF().Customer_ID.alias('Target_Customer_ID'),  
 target_delta.toDF().Name.alias('Target_Name'),  
 target_delta.toDF().Email.alias('Target_Email'),  
 target_delta.toDF().Phone.alias('Target_Phone'),  
 target_delta.toDF().Start_Date.alias('Target_Start_Date'),  
 target_delta.toDF().End_Date.alias('Target_End_Date'),  
 target_delta.toDF().Is_Current.alias('Target_Is_Current')  
)  

filtered_df = joined_df.filter(  
 xxhash64(col('Name'), col('Email'), col('Phone')) != xxhash64(col('Target_Name'), col('Target_Email'), col('Target_Phone'))  
).withColumn('Merge_Key', col('Customer_ID'))  

target_delta.alias('t').merge(  
 filtered_df.alias('f'),  
 "f.Merge_Key == t.Customer_ID and t.Is_Current == 'Y'"  
).whenMatchedUpdate(set={  
 "Is_Current": "'N'",  
 "End_Date": "current_date"  
}).whenNotMatchedInsert(values={  
 "Customer_ID": "f.Customer_ID",  
 "Name": "f.Name",  
 "Email": "f.Email",  
 "Phone": "f.Phone",  
 "Start_Date": "current_date",  
 "End_Date": "null",  
 "Is_Current": "'Y'"  
}).execute()

Ви також можете реалізувати SCD тип-2, використовуючи функцію Window і row_number.

Висновок

Ось і все! Якщо ви все ще хочете дізнатися, коли використовувати SCD Тип-1 чи Тип-2, або навіть що означає SCD, ознайомтесь з моєю статтею Натисніть тут

Перекладено з: Implementing Slowly Changing Dimension (SCD) Type 1 and Type 2

Leave a Reply

Your email address will not be published. Required fields are marked *