Below is an example PySpark script to load data from a DB2 table into an Azure Data Lake table. The script is optimized for handling high-volume data efficiently by leveraging Spark’s distributed computing capabilities.
Prerequisites:
Spark Configuration: Ensure Spark is configured with the necessary dependencies:
spark-sql-connector for Azure Data Lake Gen2.
db2jcc driver for connecting to DB2.
Azure Authentication: Use either a Service Principal or Account Key for Azure Data Lake authentication.
Script:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark Session
spark = SparkSession.builder \
.appName(“DB2 to Azure Data Lake”) \
.config(“spark.sql.warehouse.dir”, “/user/hive/warehouse”) \
.config(“fs.azure.account.key.<storage_account>.dfs.core.windows.net”, “<account_key>”) \
.getOrCreate()
# DB2 Connection Details
db2_url = “jdbc:db2://<db2_host>:<port>/<database>”
db2_properties = {
“user”: “<db2_user>”,
“password”: “<db2_password>”,
“driver”: “com.ibm.db2.jcc.DB2Driver”
}
db2_table = “<db2_schema>.<db2_table>”
# Read Data from DB2 Table
db2_df = spark.read \
.format(“jdbc”) \
.option(“url”, db2_url) \
.option(“dbtable”, db2_table) \
.option(“user”, db2_properties[“user”]) \
.option(“password”, db2_properties[“password”]) \
.option(“driver”, db2_properties[“driver”]) \
.load()
# Optimize Partitioning for High Volume Data
partition_column = “<partition_column>” # Replace with a numeric or timestamp column
num_partitions = 100 # Adjust based on your cluster’s capacity
min_partition = db2_df.selectExpr(f”min({partition_column})”).collect()[0][0]
max_partition = db2_df.selectExpr(f”max({partition_column})”).collect()[0][0]
db2_df = spark.read \
.format(“jdbc”) \
.option(“url”, db2_url) \
.option(“dbtable”, db2_table) \
.option(“user”, db2_properties[“user”]) \
.option(“password”, db2_properties[“password”]) \
.option(“driver”, db2_properties[“driver”]) \
.option(“partitionColumn”, partition_column) \
.option(“lowerBound”, min_partition) \
.option(“upperBound”, max_partition) \
.option(“numPartitions”, num_partitions) \
.load()
# Azure Data Lake Path
adls_path = “abfss://<container>@<storage_account>.dfs.core.windows.net/<folder>”
# Write Data to Azure Data Lake in Parquet Format
db2_df.write \
.mode(“overwrite”) \
.format(“parquet”) \
.option(“path”, adls_path) \
.save()
print(“Data successfully loaded from DB2 to Azure Data Lake.”)
# Stop Spark Session
spark.stop()
Key Optimizations:
Partitioning:
Use partitionColumn, lowerBound, and upperBound to split the data into chunks for parallel processing.
Choose a partition column that evenly distributes data.
Efficient File Format:
Use Parquet or ORC as the output format for better compression and query performance.
Cluster Configuration:
Use appropriate executor and driver memory settings for handling high data volume.
Scalability:
Adjust the numPartitions parameter based on the size of the cluster and data volume.
Error Handling:
Consider adding logging and exception handling for production readiness.
Testing:
Validate the script with a small dataset first before scaling to full data loads.
Pyspark code to handle large volume in micro batches:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder \
.appName(“DB2 to Azure Data Lake Micro-Batch Transfer”) \
.config(“spark.jars”, “/path/to/db2jcc4.jar”) \ # Path to DB2 JDBC driver
.config(“spark.executor.memory”, “4g”) \
.config(“spark.executor.cores”, “4”) \
.getOrCreate()
# DB2 connection details
db2_url = “jdbc:db2://<DB2_HOST>:<PORT>/<DATABASE>”
db2_properties = {
“user”: “<DB2_USERNAME>”,
“password”: “<DB2_PASSWORD>”,
“driver”: “com.ibm.db2.jcc.DB2Driver”,
“fetchsize”: “10000” # Fetch size for efficient data retrieval
}
# Table and partition details
db2_table = “<SCHEMA.TABLE_NAME>”
partition_column = “<PARTITION_COLUMN>” # Column to partition micro-batches
batch_size = 100000 # Number of records per batch
lower_bound = 0 # Minimum value of the partition column
upper_bound = 1000000 # Maximum value of the partition column
num_batches = (upper_bound – lower_bound) // batch_size # Number of batches
# Azure Data Lake details
adls_path = “abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<TARGET_PATH>”
# Loop through micro-batches
for batch_start in range(lower_bound, upper_bound, batch_size):
batch_end = batch_start + batch_size
print(f”Processing batch: {batch_start} to {batch_end}…”)
# Step 1: Read data for the current batch
batch_df = spark.read \
.format(“jdbc”) \
.option(“url”, db2_url) \
.option(“dbtable”, f”(SELECT * FROM {db2_table} WHERE {partition_column} >= {batch_start} AND {partition_column} < {batch_end}) AS temp”) \
.option(“user”, db2_properties[“user”]) \
.option(“password”, db2_properties[“password”]) \
.option(“driver”, db2_properties[“driver”]) \
.option(“fetchsize”, db2_properties[“fetchsize”]) \
.load()
# Step 2: Write the batch to Azure Data Lake
batch_output_path = f”{adls_path}/batch_{batch_start}_{batch_end}”
print(f”Writing batch to: {batch_output_path}”)
batch_df.write \
.format(“parquet”) \
.mode(“overwrite”) \
.save(batch_output_path)
print(“All batches processed successfully!”)
# Stop the Spark session
spark.stop()
Leave a comment