ChangeDataFlowSpec:
+ See https://www.databricks.com/blog/2021/06/09/how-to-simplify-cdc-with-delta-lakes-change-data-feed.html
A dataset that is CDC enabled
- should be created and populated
+ Given a table created with the SQL:
CREATE TABLE ChangeDataFlowSpec (
id int,
label String,
partitionKey long,
date Date,
timestamp Timestamp
) USING DELTA TBLPROPERTIES (delta.enableChangeDataFeed = true)
+ When we write 20 rows to ChangeDataFlowSpec
+ And again write another 20 rows to ChangeDataFlowSpec
+ Then the history table has 3 rows, 1 for creation and 2 for insertion
+ And the history of the source table looks like:
+-------+-----------------------+------+--------+------------+----------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp |userId|userName|operation |operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
+-------+-----------------------+------+--------+------------+----------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------+------------+-----------------------------------+
|2 |2023-12-11 13:43:48.923|NULL |NULL |WRITE |{mode -> Append, partitionBy -> []} |NULL|NULL |NULL |1 |Serializable |true |{numFiles -> 2, numOutputRows -> 20, numOutputBytes -> 3373}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|1 |2023-12-11 13:43:47.723|NULL |NULL |WRITE |{mode -> Append, partitionBy -> []} |NULL|NULL |NULL |0 |Serializable |true |{numFiles -> 2, numOutputRows -> 20, numOutputBytes -> 3373}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|0 |2023-12-11 13:43:46.439|NULL |NULL |CREATE TABLE|{isManaged -> true, description -> NULL, partitionBy -> [], properties -> {"delta.enableChangeDataFeed":"true"}}|NULL|NULL |NULL |NULL |Serializable |true |{} |NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
+-------+-----------------------+------+--------+------------+----------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------+------------+-----------------------------------+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
- should write its deltas to another table in a batch
+ Given a sink table created with SQL:
CREATE TABLE myDeltaTable (
id int,
label String,
partitionKey long,
date Date,
timestamp Timestamp
) USING DELTA
+ When we merge on the condition ChangeDataFlowSpec.id = myDeltaTable.id
+ Then the rows in the sink file are not unique, in fact there are 40 rows
+ And the sink table looks like this:
+---+-------+------------+----------+-----------------------+
|id |label |partitionKey|date |timestamp |
+---+-------+------------+----------+-----------------------+
|0 |label_0|0 |2023-12-11|2023-12-11 13:43:15.664|
|0 |label_0|0 |2023-12-11|2023-12-11 13:43:15.664|
|1 |label_1|1 |2023-12-10|2023-12-11 13:43:15.864|
|1 |label_1|1 |2023-12-10|2023-12-11 13:43:15.864|
|2 |label_2|2 |2023-12-09|2023-12-11 13:43:16.064|
|2 |label_2|2 |2023-12-09|2023-12-11 13:43:16.064|
|3 |label_3|3 |2023-12-08|2023-12-11 13:43:16.264|
|3 |label_3|3 |2023-12-08|2023-12-11 13:43:16.264|
|4 |label_4|4 |2023-12-07|2023-12-11 13:43:16.464|
|4 |label_4|4 |2023-12-07|2023-12-11 13:43:16.464|
|5 |label_5|0 |2023-12-06|2023-12-11 13:43:16.664|
|5 |label_5|0 |2023-12-06|2023-12-11 13:43:16.664|
|6 |label_6|1 |2023-12-05|2023-12-11 13:43:16.864|
|6 |label_6|1 |2023-12-05|2023-12-11 13:43:16.864|
|7 |label_7|2 |2023-12-04|2023-12-11 13:43:17.064|
|7 |label_7|2 |2023-12-04|2023-12-11 13:43:17.064|
|8 |label_8|3 |2023-12-03|2023-12-11 13:43:17.264|
|8 |label_8|3 |2023-12-03|2023-12-11 13:43:17.264|
|9 |label_9|4 |2023-12-02|2023-12-11 13:43:17.464|
|9 |label_9|4 |2023-12-02|2023-12-11 13:43:17.464|
+---+-------+------------+----------+-----------------------+
only showing top 20 rows
+ See https://stackoverflow.com/questions/69562007/databricks-delta-table-merge-is-inserting-records-despite-keys-are-matching-with
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +