Incremental Loading with CDC using Pyspark


⏫ Incremental Loading technique with Change Data Capture (CDC):

➡️ Incremental Load with Change Data Capture (CDC) is a strategy in data warehousing and ETL (Extract, Transform, Load) processes where only the changed or newly added data is loaded from source systems to the target system. CDC is particularly useful in scenarios where processing the entire dataset is impractical due to its size or where real-time or near-real-time updates are essential.

 Key Concepts:
Change Data Capture (CDC): CDC is the process of identifying and capturing changes made to source data since the last extraction. It helps in tracking inserts, updates, and deletes. Instead of reloading the entire dataset, only the changes (inserts, updates, deletes) are applied to the target system. This minimizes processing time and resources.

 For Example :-

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

1️⃣ Create a Spark session
spark = SparkSession.builder.appName(“IncrementalLoadCDCExample”).getOrCreate()

2️⃣ Sample data for the source table (sales)
source_data = [
  (1, ‘A’, 100, 500, ‘2023-01-01 08:00:00’),
  (2, ‘B’, 50, 300, ‘2023-01-02 10:30:00’),
  (3, ‘A’, 75, 400, ‘2023-01-02 15:45:00’),
  (4, ‘C’, 30, 200, ‘2023-01-03 09:15:00’)
]
source_columns = [‘order_id’, ‘product_id’, ‘quantity’, ‘amount’, ‘order_date’]
source_df = spark.createDataFrame(source_data, source_columns)

3️⃣ Sample data for the target table (dw_sales)
target_data = [
  (1, ‘A’, 100, 500, ‘2023-01-01 08:00:00’),
  (2, ‘B’, 50, 300, ‘2023-01-02 10:30:00’)
]
target_columns = [‘order_id’, ‘product_id’, ‘quantity’, ‘amount’, ‘order_date’]
target_df = spark.createDataFrame(target_data, target_columns)

4️⃣ Simulate changes in the source data
changes_data = [
  (2, ‘B’, 70, 400, ‘2023-01-02 10:30:00’), # Updated record
  (3, ‘A’, 75, 400, ‘2023-01-02 15:45:00’), # New record
  (4, ‘C’, 30, 200, ‘2023-01-03 09:15:00’)  # New record
]
changes_df = spark.createDataFrame(changes_data, source_columns)

5️⃣ Identify changes using CDC (assuming order_id is the primary key)
updated_records = changes_df.join(target_df, ‘order_id’, ‘inner’).filter(
  (changes_df[‘quantity’] != target_df[‘quantity’]) |
  (changes_df[‘amount’] != target_df[‘amount’]) |
  (changes_df[‘order_date’] != target_df[‘order_date’])
)

6️⃣ new_records = changes_df.join(target_df, ‘order_id’, ‘left_anti’)

7️⃣ Update existing records in the target table
target_df = target_df.join(updated_records, ‘order_id’, ‘left_anti’).union(updated_records)

8️⃣ Append new records to the target table
target_df = target_df.union(new_records)

# Show the updated target table
target_df.show()

# Stop the Spark session
spark.stop()

#dataengineer #bigdata #etl #spark #pyspark #databricks

Leave a comment

Create a website or blog at WordPress.com

Up ↑

Design a site like this with WordPress.com
Get started