Perfect ETL Pipeline on Azure Cloud

ETL Pipeline Implementation on Azure

This document outlines the creation of an end-to-end ETL pipeline on Microsoft Azure, utilizing Azure Data Factory for orchestration, Azure Databricks for transformation, Azure Data Lake Storage Gen2 for storage, Azure Synapse Analytics for data warehousing, and Power BI for visualization. The pipeline is designed to be scalable, secure, and efficient, following industry best practices.

Architecture Overview

The ETL pipeline follows a medallion architecture (Bronze, Silver, Gold layers) to process raw data into curated, analytics-ready datasets:





Extract: Data is ingested from various sources (e.g., on-premises SQL Server, SaaS applications like Salesforce, and streaming data via Azure Event Hubs).



Transform: Data is cleaned, enriched, and transformed using Azure Databricks with Apache Spark.



Load: Transformed data is loaded into Azure Synapse Analytics for querying and reporting, with visualizations in Power BI.



Storage: Azure Data Lake Storage Gen2 stores raw (Bronze), processed (Silver), and curated (Gold) data.



Orchestration: Azure Data Factory manages the pipeline, scheduling, and monitoring.

Prerequisites





Azure subscription with access to:





Azure Data Factory



Azure Databricks



Azure Data Lake Storage Gen2



Azure Synapse Analytics



Power BI



Permissions: Owner role on the Azure subscription or resource group.



Sample data sources (e.g., SQL Server, CSV files, or SaaS APIs).



Azure Active Directory (AAD) for authentication and Key Vault for secrets management.

Step-by-Step Implementation

1. Set Up Azure Resources

a. Create Azure Data Lake Storage Gen2





Navigate to the Azure Portal and create a storage account with the “Enable hierarchical namespace” option to enable Data Lake Storage Gen2.



Create containers for:





bronze: Raw data



silver: Processed data



gold: Curated, analytics-ready data



Assign appropriate access permissions using AAD and role-based access control (RBAC).

b. Create Azure Data Factory





In the Azure Portal, create a Data Factory instance.



Enable Git integration for version control (e.g., Azure Repos or GitHub).



Configure a managed virtual network for secure data movement.

c. Set Up Azure Databricks





Create a Databricks workspace in the Azure Portal.



Configure a cluster with autoscaling enabled (e.g., Standard_DS3_v2 nodes).



Mount the Data Lake Storage Gen2 containers to Databricks using AAD credentials stored in Azure Key Vault.

d. Create Azure Synapse Analytics





Provision a Synapse workspace with a dedicated SQL pool for data warehousing.



Configure external tables to query data stored in the Gold layer of the Data Lake.

e. Configure Azure Key Vault





Create a Key Vault to store secrets (e.g., storage account keys, SQL credentials).



Grant access to ADF and Databricks using managed identities.

2. Extract: Ingest Data





Sources: Configure linked services in ADF for various data sources:





On-premises SQL Server: Use a self-hosted integration runtime.



SaaS Applications (e.g., Salesforce): Use ADF’s built-in connectors.



Streaming Data: Use Azure Event Hubs for real-time ingestion.



Flat Files (e.g., CSV): Store in the Bronze container of the Data Lake.



Copy Activity: Create ADF pipelines to copy data from sources to the Bronze layer.





Use incremental loading with watermark columns or change tracking to process only new/updated data.



Example: For SQL Server, use a query like SELECT * FROM table WHERE last_modified > @pipeline().parameters.last_run.



Auto Loader: For streaming or file-based ingestion, use Databricks Auto Loader to incrementally ingest data into the Bronze layer.





Example configuration in a Databricks notebook:

from pyspark.sql.functions import col, current_timestamp

file_path = “abfss://bronze@<storage_account>.dfs.core.windows.net/raw/”
checkpoint_path = “abfss://bronze@<storage_account>.dfs.core.windows.net/_checkpoint/”

spark.readStream \
    .format(“cloudFiles”) \
    .option(“cloudFiles.format”, “json”) \
    .option(“cloudFiles.schemaLocation”, checkpoint_path) \
    .load(file_path) \
    .select(“*”, col(“_metadata.file_path”).alias(“source_file”), current_timestamp().alias(“processing_time”)) \
    .writeStream \
    .option(“checkpointLocation”, checkpoint_path) \
    .toTable(“bronze_table”)

3. Transform: Process and Enrich Data





Databricks Notebooks: Create notebooks in Databricks to transform data from Bronze to Silver and Silver to Gold.





Bronze to Silver:





Clean data (remove duplicates, handle missing values).



Standardize formats (e.g., date formats, string cases).



Example transformation:

from pyspark.sql.functions import col, to_date, trim

silver_df = spark.read.table(“bronze_table”) \
    .dropDuplicates() \
    .withColumn(“transaction_date”, to_date(col(“transaction_date”), “yyyy-MM-dd”)) \
    .withColumn(“customer_name”, trim(col(“customer_name”))) \
    .filter(col(“amount”).isNotNull())

silver_df.write.mode(“overwrite”).saveAsTable(“silver_table”)





Silver to Gold:





Aggregate data (e.g., monthly sales by region).



Join with reference data (e.g., customer demographics).



Example aggregation:

from pyspark.sql.functions import sum, month, year

gold_df = spark.read.table(“silver_table”) \
    .groupBy(year(“transaction_date”).alias(“year”), month(“transaction_date”).alias(“month”), “region”) \
    .agg(sum(“amount”).alias(“total_sales”)) \
    .orderBy(“year”, “month”, “region”)

gold_df.write.mode(“overwrite”).saveAsTable(“gold_sales”)





Optimization:





Use Delta Lake for ACID transactions and performance optimizations (e.g., Z-ordering, partitioning).



Example: OPTIMIZE gold_sales ZORDER BY (region).



Enable autoscaling and caching for large datasets.

4. Load: Store and Query





Copy to Synapse: Use ADF’s Copy Data activity to move Gold data to Azure Synapse Analytics.





Configure a sink dataset for the Synapse dedicated SQL pool.



Use PolyBase for efficient bulk loading.



External Tables: Create external tables in Synapse to query Gold data directly from the Data Lake without copying.





Example SQL:

CREATE EXTERNAL TABLE gold_sales (
    year INT,
    month INT,
    region VARCHAR(50),
    total_sales DECIMAL(18,2)
)
WITH (
    LOCATION = ‘gold/sales/’,
    DATA_SOURCE = datalake_datasource,
    FILE_FORMAT = parquet_format
);





Indexing and Partitioning: Optimize Synapse tables with indexes and partitioning for query performance.

5. Orchestrate: Automate with ADF





Pipeline Creation:





Create an ADF pipeline with the following activities:





Copy Activity: Ingest data to Bronze.



Databricks Notebook Activity: Transform Bronze to Silver.



Databricks Notebook Activity: Transform Silver to Gold.



Copy Activity: Load Gold data to Synapse.



Define dependencies to ensure sequential execution.



Triggers:





Schedule the pipeline to run daily/weekly using a tumbling window trigger.



Example: Run at 2 AM UTC daily.



Parameters:





Use pipeline parameters for dynamic file paths or dates (e.g., last_run_date).



Monitoring:





Use ADF’s monitoring dashboard to track pipeline runs.



Integrate with Azure Monitor for alerts on failures.

6. Visualize: Power BI Integration





Connect to Synapse: Use Power BI Desktop to connect to the Synapse SQL pool.



DirectQuery Mode: For real-time insights, use DirectQuery to query Gold tables.



Reports and Dashboards:





Create visualizations (e.g., sales trends by region).



Publish to Power BI Service and share with stakeholders.



Data Refresh: Schedule automatic refreshes to keep dashboards up-to-date.

Best Practices





Scalability:





Use serverless compute in ADF and Databricks to scale dynamically.



Partition large datasets by date or key columns in the Data Lake.



Performance:





Enable parallel processing in ADF Copy activities.



Use incremental loading to reduce data transfer.



Optimize Spark jobs with caching and broadcast joins.



Security:





Use Azure Key Vault for secrets.



Implement AAD authentication and RBAC for access control.



Encrypt data at rest and in transit.



Monitoring and Logging:





Set up alerts for pipeline failures using Azure Monitor.



Log transformation errors in Databricks for troubleshooting.



Cost Optimization:





Use autoscaling clusters in Databricks.



Monitor usage with Azure Cost Management to avoid overspending.

Sample ADF Pipeline JSON

Below is a simplified JSON definition for the ADF pipeline orchestrating the ETL process:

{
    “name”: “ETL_Pipeline”,
    “properties”: {
        “activities”: [
            {
                “name”: “CopyToBronze”,
                “type”: “Copy”,
                “inputs”: [{ “referenceName”: “SourceDataset”, “type”: “DatasetReference” }],
                “outputs”: [{ “referenceName”: “BronzeDataset”, “type”: “DatasetReference” }],
                “typeProperties”: {
                    “source”: { “type”: “SqlServerSource”, “query”: “SELECT * FROM table WHERE last_modified > @pipeline().parameters.last_run” },
                    “sink”: { “type”: “ParquetSink”, “storeSettings”: { “type”: “AzureDataLakeStoreWriteSettings” } }
                }
            },
            {
                “name”: “TransformBronzeToSilver”,
                “type”: “DatabricksNotebook”,
                “dependsOn”: [{ “activity”: “CopyToBronze”, “dependencyConditions”: [“Succeeded”] }],
                “typeProperties”: {
                    “notebookPath”: “/Users/<user>/BronzeToSilver”,
                    “baseParameters”: { “date”: “@pipeline().parameters.last_run” }
                }
            },
            {
                “name”: “TransformSilverToGold”,
                “type”: “DatabricksNotebook”,
                “dependsOn”: [{ “activity”: “TransformBronzeToSilver”, “dependencyConditions”: [“Succeeded”] }],
                “typeProperties”: {
                    “notebookPath”: “/Users/<user>/SilverToGold”,
                    “baseParameters”: { “date”: “@pipeline().parameters.last_run” }
                }
            },
            {
                “name”: “CopyToSynapse”,
                “type”: “Copy”,
                “dependsOn”: [{ “activity”: “TransformSilverToGold”, “dependencyConditions”: [“Succeeded”] }],
                “inputs”: [{ “referenceName”: “GoldDataset”, “type”: “DatasetReference” }],
                “outputs”: [{ “referenceName”: “SynapseDataset”, “type”: “DatasetReference” }],
                “typeProperties”: {
                    “source”: { “type”: “ParquetSource” },
                    “sink”: { “type”: “SqlSink”, “writeBehavior”: “insert” }
                }
            }
        ],
        “parameters”: {
            “last_run”: { “type”: “string”, “defaultValue”: “2025-01-01” }
        }
    }
}

Testing and Validation





Unit Testing: Test Databricks notebooks individually with sample data.



Pipeline Testing: Run the ADF pipeline with a small dataset and validate outputs in each layer (Bronze, Silver, Gold).



Data Validation: Use Synapse SQL queries to verify data integrity (e.g., row counts, aggregations).



Monitoring: Check ADF logs and Databricks job logs for errors.

Deployment





CI/CD: Use Azure DevOps or GitHub Actions to deploy ADF pipelines and Databricks notebooks.



Environment Separation: Maintain separate resource groups for development, testing, and production.



Version Control: Store notebook and pipeline code in Git for traceability.

Conclusion

This ETL pipeline leverages Azure’s powerful services to create a scalable, secure, and efficient data processing workflow. By following the medallion architecture and best practices, organizations can ensure high performance and reliability for their data integration needs. For further customization, consider integrating additional Azure services like Azure Machine Learning for predictive analytics or Azure Stream Analytics for real-time processing.

Leave a comment

Create a website or blog at WordPress.com

Up ↑

Design a site like this with WordPress.com
Get started