|
| 1 | +# Использование механизма TRANSFER для реализации SCD1 на основе данных из CDC-источников в {{ ydb-full-name }} |
| 2 | + |
| 3 | +В этой статье описывается реализация паттерна Slowly Changing Dimensions Type 1 (SCD1) в {{ ydb-short-name }}. |
| 4 | + |
| 5 | +## Используемые инструменты |
| 6 | + |
| 7 | +Для поставки данных в SCD1-таблицу в данной статье будет использоваться следующая комбинация функциональных возможностей {{ ydb-short-name }}: |
| 8 | + |
| 9 | +1. Таблица-источник будет [строковой](../../../concepts/datamodel/table.md#row-oriented-table) для оперативных транзакционных изменений; |
| 10 | +2. Таблица-приёмник будет [колоночной](../../../concepts/datamodel/table.md#column-oriented-table) для эффективного выполнения аналитических запросов; |
| 11 | +3. Подписка на изменения в таблице-источнике будет осуществляться через механизм [Change Data Capture (CDC)](../../../concepts/cdc.md); |
| 12 | +4. За буферизацию изменений будет отвечать неявно создаваемый под CDC [топик](../../../concepts/datamodel/topic.md); |
| 13 | +5. За автоматическое перекладывание данных из CDC-топика в таблицу-приёмник будет отвечать [трансфер](../../../concepts/transfer.md). |
| 14 | + |
| 15 | +{% note warning %} |
| 16 | + |
| 17 | +Transfer не может удалять строки из таблиц, а может выполнять только [`UPSERT`](../../../yql/reference/syntax/upsert_into.md) в таблицу-приёмник. |
| 18 | +Поэтому с помощью Transfer можно реализовать только SCD1-soft: удаление помечается флагом и сохраняется момент удаления. Если нужна физическая очистка данных, её можно выполнять через явную команду [`DELETE`](#howtodelete). |
| 19 | + |
| 20 | +{% endnote %} |
| 21 | + |
| 22 | +## Создайте таблицу-источник данных, которая будет генерировать CDC-события |
| 23 | + |
| 24 | + ```sql |
| 25 | + CREATE TABLE source_customers ( |
| 26 | + id Utf8 NOT NULL, |
| 27 | + attribute1 Utf8, |
| 28 | + attribute2 Utf8, |
| 29 | + change_time Timestamp, |
| 30 | + PRIMARY KEY (id) |
| 31 | + ); |
| 32 | + |
| 33 | + ALTER TABLE `source_customers` ADD CHANGEFEED `updates` WITH ( |
| 34 | + FORMAT = 'DEBEZIUM_JSON', |
| 35 | + MODE = 'NEW_AND_OLD_IMAGES' |
| 36 | + ); |
| 37 | + ``` |
| 38 | + |
| 39 | + ## Создайте таблицу-приемник данных в формате SCD1 |
| 40 | + |
| 41 | + ```sql |
| 42 | + CREATE TABLE dimension_scd1 ( |
| 43 | + id Utf8 NOT NULL, |
| 44 | + attribute1 Utf8, |
| 45 | + attribute2 Utf8, |
| 46 | + is_deleted Uint8, |
| 47 | + last_update_timestamp Timestamp, |
| 48 | + PRIMARY KEY (id) |
| 49 | + ) |
| 50 | + PARTITION BY HASH(id) |
| 51 | + WITH ( |
| 52 | + STORE=COLUMN |
| 53 | + ) |
| 54 | + ``` |
| 55 | + |
| 56 | + ## Создайте трансфер и lambda-функцию для обработки CDC-данных |
| 57 | + |
| 58 | + Особенности обработки CDC данных в формате Debezium: |
| 59 | + |
| 60 | + * Формат сообщения содержит поля `payload.before` и `payload.after` с состоянием до и после изменения. |
| 61 | + * Поле `payload.op` указывает тип операции: "c" (create), "u" (update), "d" (delete). |
| 62 | + * Поле `payload.ts_ms` содержит временную метку события в миллисекундах. |
| 63 | + * При удалении (`op = "d"`) данные находятся в поле `before`, а при создании/обновлении - в поле `after`. Если выполняется удаление несуществующей строки, то будет сформировано сообщение с типом операции `op = "d"`, но с пустыми полями `before` и `after`. |
| 64 | + |
| 65 | + ```sql |
| 66 | + $transformation_lambda = ($msg) -> { |
| 67 | + $cdc_data = CAST($msg._data AS Json); |
| 68 | + |
| 69 | + -- Определяем тип операции |
| 70 | + $operation = Json::ConvertToString($cdc_data.payload.op); |
| 71 | + $is_deleted = $operation == "d"; |
| 72 | + |
| 73 | + -- Получаем данные в зависимости от типа операции |
| 74 | + $data = IF($is_deleted, $cdc_data.payload.before, $cdc_data.payload.after); |
| 75 | + |
| 76 | + -- Если данные не пришли (выполнена команда DELETE на несуществующий ключ, то проигнорируем запись) |
| 77 | + return IF($data IS NOT NULL, |
| 78 | + <| |
| 79 | + id: Unwrap(CAST(Json::ConvertToString($data.id) AS Utf8)), |
| 80 | + attribute1: CAST(Json::ConvertToString($data.attribute1) AS Utf8), |
| 81 | + attribute2: CAST(Json::ConvertToString($data.attribute2) AS Utf8), |
| 82 | + is_deleted: CAST($is_deleted AS Uint8), |
| 83 | + last_update_timestamp: DateTime::FromMilliseconds(Json::ConvertToUint64($cdc_data.payload.source.ts_ms)) |
| 84 | + |> |
| 85 | + , NULL); |
| 86 | + }; |
| 87 | + |
| 88 | + -- В данном примере настраивается высокая частота обновлений таблицы-приемника. |
| 89 | + -- Это делается исключительно для наглядности. Для production-сценариев стоит настраивать большие значения |
| 90 | + CREATE TRANSFER dimension_scd1_cdc_transfer |
| 91 | + FROM `source_customers/updates` TO dimension_scd1 USING $transformation_lambda |
| 92 | + WITH ( |
| 93 | + FLUSH_INTERVAL=Interval("PT1S") |
| 94 | + ); |
| 95 | + ``` |
| 96 | + |
| 97 | + {% note info %} |
| 98 | + |
| 99 | + Колонка `id` в принимающей таблице `dimension_scd1` объявлена как `Utf8 NOT NULL`, при этом в Json CDC могут быт переданы данные, которые невозможно привести к строке, то есть результатом конвертации данных из Json может быть значение `NULL`. Функция [`Unwrap`](../../../yql/reference/builtins/basic.md#unwrap) гарантирует, что после ее выполнения не может быть значения `NULL` или будет ошибка времени выполнения. Это позволяет гарантировать, что результатом выполнения lambda-функции или будет полностью корректная структура данных, или будет ошибка времени выполнения. |
| 100 | + |
| 101 | + {% endnote %} |
| 102 | + |
| 103 | + ## Демонстрация работы |
| 104 | + |
| 105 | + Для демонстрации работы с данными CDC запишем данные в таблицу-источник в {{ ydb-short-name }}, которая будет генерировать CDC-события: |
| 106 | + |
| 107 | + ### Вставка новой записи |
| 108 | + |
| 109 | + ```sql |
| 110 | + INSERT INTO source_customers (id, attribute1, attribute2) |
| 111 | + VALUES ('CUSTOMER_1001', 'John Doe', 'New York'); |
| 112 | + ``` |
| 113 | + |
| 114 | + Это действие создаст CDC-событие с типом операции `op = "c"` примерно следующего вида: |
| 115 | + |
| 116 | + ```json |
| 117 | + { |
| 118 | + "payload":{ |
| 119 | + "op":"c", |
| 120 | + "source":{ |
| 121 | + "txId":18446744073709551615, |
| 122 | + "connector":"ydb", |
| 123 | + "version":"1.0.0", |
| 124 | + "step":1755868513260, |
| 125 | + "ts_ms":1755868513219, |
| 126 | + "snapshot":false |
| 127 | + }, |
| 128 | + "after":{ |
| 129 | + "attribute1":"John Doe", |
| 130 | + "id":"CUSTOMER_1001", |
| 131 | + "attribute2":"New York" |
| 132 | + } |
| 133 | + } |
| 134 | + } |
| 135 | + ``` |
| 136 | + |
| 137 | + В результате исполнения команды выше в таблице `dimension_scd1` будет следующее содержимое: |
| 138 | + |
| 139 | + | id | attribute1 | attribute2 | is\_deleted | last_update_timestamp | |
| 140 | + | -------------- | ---------- | ------------- | ----------- | --------------------------- | |
| 141 | + | CUSTOMER\_1001 | John Doe | New York | 0 | 2025-08-22T13:15:13.219000Z | |
| 142 | + |
| 143 | + ### Обновление существующей записи |
| 144 | + |
| 145 | + ```sql |
| 146 | + UPSERT INTO source_customers (id, attribute1, attribute2) |
| 147 | + VALUES ('CUSTOMER_1001', 'John Doe', 'Los Angeles'); |
| 148 | + ``` |
| 149 | + |
| 150 | + Это действие создаст CDC-событие с типом операции `op = "u" примерно следующего вида: |
| 151 | + |
| 152 | + ```json |
| 153 | + { |
| 154 | + "payload":{ |
| 155 | + "op":"u", |
| 156 | + "source":{ |
| 157 | + "txId":18446744073709551615, |
| 158 | + "connector":"ydb", |
| 159 | + "version":"1.0.0", |
| 160 | + "step":1755868795050, |
| 161 | + "ts_ms":1755868795719, |
| 162 | + "snapshot":false |
| 163 | + }, |
| 164 | + "after":{ |
| 165 | + "attribute1":"John Doe", |
| 166 | + "id":"CUSTOMER_1001", |
| 167 | + "attribute2":"Los Angeles" |
| 168 | + }, |
| 169 | + "before":{ |
| 170 | + "attribute1":"John Doe", |
| 171 | + "id":"CUSTOMER_1001", |
| 172 | + "attribute2":"New York" |
| 173 | + } |
| 174 | + } |
| 175 | + } |
| 176 | + ``` |
| 177 | + |
| 178 | + В результате исполнения команды выше в таблице `dimension_scd1` будет следующее содержимое: |
| 179 | + |
| 180 | + | id | attribute1 | attribute2 | is\_deleted | last_update_timestamp | |
| 181 | + | -------------- | ---------- | ------------- | ----------- | --------------------------- | |
| 182 | + | CUSTOMER\_1001 | John Doe | Los Angeles | 0 | 2025-08-22T13:19:55.719000Z | |
| 183 | + |
| 184 | + |
| 185 | + ### Удаление записи |
| 186 | + |
| 187 | + ```sql |
| 188 | + DELETE FROM source_customers WHERE id = 'CUSTOMER_1001'; |
| 189 | + ``` |
| 190 | + |
| 191 | + Это действие создаст CDC-событие с типом операции `op = "d"` примерно следующего вида: |
| 192 | + |
| 193 | + ```json |
| 194 | + { |
| 195 | + "payload":{ |
| 196 | + "op":"d", |
| 197 | + "source":{ |
| 198 | + "txId":18446744073709551615, |
| 199 | + "connector":"ydb", |
| 200 | + "version":"1.0.0", |
| 201 | + "step":1755868931000, |
| 202 | + "ts_ms":1755868931109, |
| 203 | + "snapshot":false |
| 204 | + }, |
| 205 | + "before":{ |
| 206 | + "attribute1":"John Doe", |
| 207 | + "id":"CUSTOMER_1001", |
| 208 | + "attribute2":"Los Angeles" |
| 209 | + } |
| 210 | + } |
| 211 | + } |
| 212 | + ``` |
| 213 | + |
| 214 | + В результате исполнения команды выше в таблице `dimension_scd1` будет следующее содержимое: |
| 215 | + |
| 216 | + | id | attribute1 | attribute2 | is\_deleted | last_update_timestamp | |
| 217 | + | -------------- | ---------- | ------------- | ----------- | --------------------------- | |
| 218 | + | CUSTOMER\_1001 | John Doe | Los Angeles | 1 | 2025-08-22T13:22:11.109000Z | |
| 219 | + |
| 220 | + |
| 221 | +## Пример удаления данных из таблицы-приемника {#howtodelete} |
| 222 | + |
| 223 | +Для удаления данных из таблицы-приемника можно использовать явную команду `DELETE`: |
| 224 | + |
| 225 | +```sql |
| 226 | +DELETE from dimension_scd1 WHERE is_deleted != 0ut |
| 227 | +``` |
0 commit comments