Implementing slowly changing dimension (SCD type2) in Pyspark earlier we saw in SQL
https://lnkd.in/dH6j3MWE
# Define the schema for the DataFrame
schema = StructType([
StructField(“id”, IntegerType(), True),
StructField(“name”, StringType(), True),
StructField(“salary”, IntegerType(), True),
StructField(“department”, StringType(), True),
StructField(“active”, BooleanType(), True),
StructField(“start”, StringType(), True),
StructField(“end”, StringType(), True)
])
Employee_data = [
(1,”John”, 100, “HR”,True,’2023-10-20′,None),
(2,”Alice”, 200, “Finance”,True,’2023-10-20′,None),
(3,”Bob”, 300, “Engineering”,True,’2023-10-20′,None),
(4,”Jane”, 150, “HR”,True,’2023-10-20′,None)
]
Employee_df = spark.createDataFrame(Employee_data, schema=schema)
Employee_df.createOrReplaceTempView(‘Employee_df’)
Employee_df.show()
schema2 = StructType([
StructField(“id”, IntegerType(), True),
StructField(“name”, StringType(), True),
StructField(“salary”, IntegerType(), True),
StructField(“department”, StringType(), True),
StructField(“updated”, StringType(), True)
])
Employee_incremental_data = [
(1,”John”, 200, “Finance”,’2023-10-20′),
(5,”Henry”, 200, “Engineering”,’2023-10-20′),
(6,”Jack”, 400, “Engineering”,’2023-10-20′),
]
Employee_incremental_df = spark.createDataFrame(Employee_incremental_data, schema=schema2)
Employee_incremental_df.createOrReplaceTempView(‘Employee_incremental_df’)
https://lnkd.in/dvsjzKZq()
create table employee as select * from employee_df;
create table employee_incremental as select * from Employee_incremental_df;
%sql
merge into employee
using
(
select Employee_incremental.id as merge_key,Employee_incremental.* from Employee_incremental
union all
select null as merge_key, Employee_incremental.*
from Employee_incremental
join
employee on employee.id=Employee_incremental.id
where employee.active = true and
(employee.salary <> Employee_incremental.salary or
employee.department <> Employee_incremental.department)
) staged_updates
on employee.id = staged_updates.merge_key
when matched and employee.active = true
and
(employee.salary <> staged_updates.salary
or
employee.department <> staged_updates.department)
then update set active = false,end = staged_updates.updated
when not matched then
insert (id,name,salary,department,active,start,end)
values
(
staged_updates.id,staged_updates.name,staged_updates.salary,
staged_updates.department,true,staged_updates.updated,Null)
Leave a comment