У своїй попередній статті я розглядав, що таке SCD Тип-1 та SCD Тип-2, а також коли їх варто використовувати. У цій статті я розповім, як їх реалізувати. Існує кілька способів реалізації, але я розгляну два варіанти: за допомогою SQL
і також за допомогою Pyspark
. Почнемо з реалізації SCD Тип-1 за допомогою SQL клаузи MERGE
.
Для SCD Тип-1
В SCD Тип-1 мета — це перезапис старих записів новими. Це використовується, коли історичні дані не мають значення і потрібно зберігати тільки найактуальнішу версію запису.
Створення таблиць джерела та призначення
Спочатку створимо таблиці джерела та призначення. Таблиця джерела — це місце, звідки надходять дані, а таблиця призначення — це місце, де будуть зберігатися остаточні дані.
--------------- TARGET TABLE ----------------------
create table target_customer(
Customer_ID INT PRIMARY KEY NOT NULL,
Name VARCHAR(200),
Email VARCHAR(300),
Phone VARCHAR(50)
);
INSERT INTO target_customer
VALUES(Customer_ID, Name, Email, Phone)
(100, 'Kakashi Hatake', '[email protected]','111-111-1111'),
(101, 'Naruto Uzumaki', '[email protected]','123-456-7890'),
(102, 'Itachi Uchiha', '[email protected]','970-234-5670');
----------------- SOURCE TABLE ------------------------
create table source_customer(
Customer_ID INT PRIMARY KEY NOT NULL,
Name VARCHAR(200),
Email VARCHAR(300),
Phone VARCHAR(50)
);
INSERT INTO source_customer (Customer_ID, Name, Email, Phone)
VALUES
(101, 'Naruto Uzumaki', '[email protected]','123-456-7890'), -- НОВИЙ EMAIL
(102, 'Itachi Uchiha', '[email protected]','970-234-5670'),
(103, 'Madara Uchiha', '[email protected]','456-321-1232'); -- НОВИЙ КОРИСТУВАЧ
таблиця призначення (присутня в сховищі даних)
таблиця джерела (присутня в операційній базі даних)
Як ви бачите, у Наруті Узумакі тепер нова електронна адреса, а новий користувач Мадара Учіха був доданий. Тому ми повинні виконати операцію Upsert (Update та Insert) в нашій таблиці призначення.
Реалізація SCD Тип-1 за допомогою SQL
MERGE INTO target_customer as target
USING source_customer as source
ON target.Customer_ID = source.Customer_ID --1. Перевірка на наявність співпадаючих записів
WHEN MATCHED AND( -- якщо співпадіння знайдено
target.Name <> source.Name OR -- перевірка на зміну імені
target.Email <> source.Email OR -- перевірка на зміну email
target.Phone <> source.Email -- перевірка на зміну телефону
) THEN UPDATE -- якщо зміни є, оновлюємо значення
SET -- встановлюємо нові значення
target.Name = source.Name,
target.Email = source.Email,
target.Phone = source.Phone
WHEN NOT MATCHED BY target THEN -- якщо співпадіння не знайдено, значить нове значення
INSERT(Customer_ID, Name, Email, Phone) -- вставляємо нове значення
VALUES(source.Customer_ID, source.Name, source.Email, source.Phone);
таблиця призначення після виконання операції merge
Кроки для SCD Тип-1:
- Співвіднесіть записи в джерелі та призначенні за допомогою
Customer_ID
. - Якщо співпадіння знайдено і дані відрізняються, оновіть запис в таблиці призначення.
- Якщо співпадіння не знайдено, вставте новий запис у таблицю призначення.
Еквівалентна реалізація в PySpark
# Примітка: merge працює лише з таблицями Delta
from delta.tables import DeltaTable
delta_target = DeltaTable.forPath(spark, "/path/to/target_table")
source_customer = spark.read.format("delta").load("/path/to/source_table")
delta_target.alias('target').merge(
source_customer.alias('source'),
"target.Customer_ID = source.Customer_ID"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Для SCD Тип-2
В SCD Тип-2 мета полягає в збереженні історичних даних шляхом додавання нового рядка для кожної зміни в таблиці джерела при збереженні попередніх записів.
Це важливо, коли потрібно відслідковувати історію змін.
Модифікація таблиці призначення
Таблиця призначення потребує кількох додаткових колонок для відслідковування історичних змін. Ось для чого служить кожна з колонок:
- Start_Date: Вказує, коли запис став активним.
- End_Date: Показує, коли запис був замінений або став неактивним. Для останнього запису це значення залишатиметься порожнім (
NULL
). - Is_Current: Простий прапорець
Y
абоN
, що вказує, чи є це останньою версією запису (Y
), або старішою версією (N
).
Ці колонки дозволяють легко бачити, як дані змінювались з часом, і забезпечують те, що важливі історичні деталі не загубляться.
CREATE TABLE target_customer_scd2(
Customer_ID INT NOT NULL,
Name VARCHAR(200),
Email VARCHAR(300),
Phone VARCHAR(50),
Start_Date DATE NOT NULL,
End_Date DATE,
Is_Current CHAR(1) DEFAULT 'Y'
);
INSERT INTO target_customer_scd2
VALUES
(101, 'Naruto Uzumaki', '[email protected]', '777-777-7777', '2024-01-01', '2025-01-01', 'N'),
(101, 'Naruto Uzumaki', '[email protected]', '123-456-7890', '2025-01-01', NULL, 'Y'),
(102, 'Itachi Uchiha', '[email protected]', '970-234-5670', '2024-01-01', NULL, 'Y');
Що стосується таблиці джерела, то ми використаємо ту саму таблицю джерела, яку використовували для SCD Тип-1.
Реалізація SCD Тип-2 за допомогою SQL
-- 1. Спочатку оновлюємо записи
MERGE INTO target_customer_scd2 AS target
USING source_customer AS source
ON target.Customer_ID = source.Customer_ID AND target.Is_Current = 'Y'
-- Співвіднесення записів у джерелі та поточних (активних) записів у таблиці призначення
WHEN MATCHED AND (
target.Name <> source.Name OR
target.Email <> source.Email OR
target.Phone <> source.Phone -- якщо є співпадіння
) THEN
UPDATE
SET
target.End_Date = GETDATE(), -- позначаємо існуючий запис як історичний
target.Is_Current = 'N';
-- 2. Потім вставляємо оновлені записи
INSERT INTO target_customer_scd2 (Customer_ID, Name, Email, Phone, Start_Date, End_Date, Is_Current)
SELECT source.Customer_ID, source.Name, source.Email, source.Phone, GETDATE(), NULL, 'Y'
FROM source_customer AS source
WHERE NOT EXISTS (
SELECT 1
FROM target_customer_scd2 AS target
WHERE target.Customer_ID = source.Customer_ID AND target.Is_Current = 'Y'
-- Перевірка, чи запис уже існує і чи є він активним.
);
таблиця, створена після виконання SCD Тип-2
Кроки для SCD Тип-2:
- Співвіднесіть записи з таблиці
source_customer
з поточними активними записами в таблиціtarget_customer_scd2
, використовуючиCustomer_ID
таIs_Current = 'Y'
. - Якщо співпадіння знайдено і будь-які дані (
Name
,Email
, абоPhone
) відрізняються між джерелом і призначенням, позначте існуючий запис в таблиці призначення як історичний, встановивши:
—End_Date
на поточну дату (GETDATE()
).
—Is_Current = 'N'
. - Вставте новий запис для оновлених даних з джерела, встановивши:
—Start_Date
на поточну дату (GETDATE()
)
—End_Date
якNULL
(що означає, що це поточний активний запис).
—Is_Current = 'Y'
(що позначає новий активний запис). - Вставте нові записи з джерела безпосередньо в таблицю призначення, якщо для цього
Customer_ID
немає активного запису в таблиці призначення (NOT EXISTS
умова).
Таким чином, SCD Тип-2 зберігає історію змін, одночасно підтримуючи останню версію.
Існують й інші способи реалізації SCD Тип-2, наприклад, використовуючи клаузу OUTPUT. Ось тут є детальна стаття з цієї теми від іншого автора.
Але якщо вам цікаво, ось змінений код, заснований на моїх таблицях:
INSERT INTO target_customer_scd2 (Customer_ID, Name, Email, Phone, Start_Date, End_Date, Is_Current)
SELECT
id,
name,
email,
phone,
start_date,
end_date,
is_current
FROM (
MERGE INTO target_customer_scd2 AS target
USING source_customer AS source
ON target.Customer_ID = source.Customer_ID AND target.Is_Current = 'Y'
-- Крок 1: Оновлюємо поточний активний запис, позначаючи його як історичний
WHEN MATCHED AND (
target.Name <> source.Name OR
target.Email <> source.Email OR
target.Phone <> source.Phone
) THEN
UPDATE
SET
target.End_Date = GETDATE(),
target.Is_Current = 'N'
-- Крок 2: Вставляємо новий запис для оновлених або нових даних
WHEN NOT MATCHED BY TARGET
THEN
INSERT (Customer_ID, Name, Email, Phone, Start_Date, End_Date, Is_Current)
VALUES (source.Customer_ID, source.Name, source.Email, source.Phone, GETDATE(), NULL, 'Y')
-- Виводимо зміни
OUTPUT $action,
source.Customer_ID,
source.Name,
source.Email,
source.Phone,
GETDATE(),
NULL,
'Y'
) AS [changes] (action, id, name, email, phone, start_date, end_date, is_current)
-- Фільтруємо за дією, щоб вставити тільки оновлені записи
WHERE action = 'UPDATE';
Тепер реалізуємо це за допомогою Pyspark.
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit, col, xxhash64
source_df = spark.sql("SELECT * FROM source_customer")
target_delta = DeltaTable.forName(spark, "Your_Catalog_Name.Schema_Name.Table_Name")
joined_df = source_df.join(
target_delta.toDF(),
(source_df.Customer_ID == target_delta.toDF().Customer_ID) &
(target_delta.toDF().Is_Current == 'Y'),
'left'
).select(
source_df['*'],
target_delta.toDF().Customer_ID.alias('Target_Customer_ID'),
target_delta.toDF().Name.alias('Target_Name'),
target_delta.toDF().Email.alias('Target_Email'),
target_delta.toDF().Phone.alias('Target_Phone'),
target_delta.toDF().Start_Date.alias('Target_Start_Date'),
target_delta.toDF().End_Date.alias('Target_End_Date'),
target_delta.toDF().Is_Current.alias('Target_Is_Current')
)
filtered_df = joined_df.filter(
xxhash64(col('Name'), col('Email'), col('Phone')) != xxhash64(col('Target_Name'), col('Target_Email'), col('Target_Phone'))
).withColumn('Merge_Key', col('Customer_ID'))
target_delta.alias('t').merge(
filtered_df.alias('f'),
"f.Merge_Key == t.Customer_ID and t.Is_Current == 'Y'"
).whenMatchedUpdate(set={
"Is_Current": "'N'",
"End_Date": "current_date"
}).whenNotMatchedInsert(values={
"Customer_ID": "f.Customer_ID",
"Name": "f.Name",
"Email": "f.Email",
"Phone": "f.Phone",
"Start_Date": "current_date",
"End_Date": "null",
"Is_Current": "'Y'"
}).execute()
Ви також можете реалізувати SCD тип-2, використовуючи функцію Window
і row_number
.
Висновок
Ось і все! Якщо ви все ще хочете дізнатися, коли використовувати SCD Тип-1 чи Тип-2, або навіть що означає SCD, ознайомтесь з моєю статтею Натисніть тут
Перекладено з: Implementing Slowly Changing Dimension (SCD) Type 1 and Type 2