Here’s a PySpark SQL cheatsheet, covering common operations and concepts. This is designed to be a quick reference for those working with PySpark DataFrames and SQL-like operations.
PySpark SQL Cheatsheet
1. Initialization & Data Loading
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Initialize SparkSession
spark = SparkSession.builder \
.appName(“PySparkSQLCheatsheet”) \
.getOrCreate()
# Load Data (e.g., CSV, Parquet)
df_csv = spark.read.csv(“path/to/data.csv”, header=True, inferSchema=True)
df_parquet = spark.read.parquet(“path/to/data.parquet”)
# Create DataFrame from Python list (for demonstration)
data = [(“Alice”, 1, “NY”), (“Bob”, 2, “CA”), (“Charlie”, 3, “NY”)]
schema = StructType([
StructField(“name”, StringType(), True),
StructField(“id”, IntegerType(), True),
StructField(“state”, StringType(), True)
])
df = spark.createDataFrame(data, schema)
2. Basic DataFrame Operations
# Show schema
df.printSchema()
# Show data (default 20 rows)
df.show()
df.show(5, truncate=False) # Show 5 rows, no truncation
# Select columns
df.select(“name”, “state”).show()
df.select(col(“name”), col(“id”).alias(“employee_id”)).show() # Rename column
# Filter rows (WHERE clause)
df.filter(df.id > 1).show()
df.where(“state = ‘NY'”).show() # SQL-like string condition
df.filter((df.id > 1) & (df.state == ‘NY’)).show()
# Add new column (WITH COLUMN)
df.withColumn(“age_plus_10”, df.id + 10).show()
df.withColumn(“greeting”, lit(“Hello”)).show() # Add a literal column
# Drop column
df.drop(“id”).show()
# Rename column
df.withColumnRenamed(“name”, “full_name”).show()
# Drop duplicates
df.dropDuplicates([“state”]).show() # Drop based on ‘state’
df.dropDuplicates().show() # Drop based on all columns
# Sort data (ORDER BY)
df.sort(“id”, ascending=False).show()
df.orderBy(col(“name”).desc(), col(“id”).asc()).show()
# Group by and aggregate (GROUP BY)
df.groupBy(“state”).count().show()
df.groupBy(“state”).agg(
avg(“id”).alias(“avg_id”),
max(“id”).alias(“max_id”)
).show()
3. SQL Functions (pyspark.sql.functions)
These are used with df.select(), df.withColumn(), df.filter(), etc.
# Aggregate Functions
df.agg(
count(“*”).alias(“total_rows”),
sum(“id”).alias(“total_id”),
min(“id”).alias(“min_id”),
max(“id”).alias(“max_id”),
avg(“id”).alias(“average_id”)
).show()
# String Functions
df.withColumn(“upper_name”, upper(col(“name”))).show()
df.withColumn(“lower_name”, lower(col(“name”))).show()
df.withColumn(“name_length”, length(col(“name”))).show()
df.withColumn(“concat_col”, concat_ws(“-“, col(“name”), col(“state”))).show()
df.withColumn(“substring_name”, substring(col(“name”), 1, 3)).show()
# Date & Time Functions (requires datetime columns)
# df.withColumn(“current_date”, current_date()).show()
# df.withColumn(“current_timestamp”, current_timestamp()).show()
# df.withColumn(“date_format”, date_format(col(“date_col”), “yyyy-MM-dd”)).show()
# df.withColumn(“datediff”, datediff(col(“end_date”), col(“start_date”))).show()
# Conditional Functions
df.withColumn(“status”, when(col(“id”) > 1, “Adult”).otherwise(“Child”)).show()
df.withColumn(“category”,
when(col(“state”) == “NY”, “East”)
.when(col(“state”) == “CA”, “West”)
.otherwise(“Other”)
).show()
# Mathematical Functions
df.withColumn(“sqrt_id”, sqrt(col(“id”))).show()
df.withColumn(“round_id”, round(col(“id”) / 2, 0)).show()
# Array Functions (if you have array columns)
# df.withColumn(“array_length”, size(col(“array_col”))).show()
# df.withColumn(“exploded_array”, explode(col(“array_col”))).show()
4. Working with SQL Queries
You can register a DataFrame as a temporary view and then query it using Spark SQL.
# Register DataFrame as a temporary view
df.createOrReplaceTempView(“my_table”)
# Execute SQL query
spark.sql(“SELECT * FROM my_table WHERE id > 1”).show()
spark.sql(“SELECT state, COUNT(*) as count FROM my_table GROUP BY state”).show()
# Dropping a temporary view
spark.catalog.dropTempView(“my_table”)
5. Joins
# Sample DataFrames for joins
df1 = spark.createDataFrame([(“A”, 1), (“B”, 2), (“C”, 3)], [“key”, “value1”])
df2 = spark.createDataFrame([(“B”, 10), (“C”, 20), (“D”, 30)], [“key”, “value2”])
# Inner Join (default)
df1.join(df2, on=”key”).show()
# Left Outer Join
df1.join(df2, on=”key”, how=”left”).show()
# Right Outer Join
df1.join(df2, on=”key”, how=”right”).show()
# Full Outer Join
df1.join(df2, on=”key”, how=”full”).show()
# Left Anti Join (rows in left that have no match in right)
df1.join(df2, on=”key”, how=”left_anti”).show()
# Left Semi Join (rows in left that have a match in right)
df1.join(df2, on=”key”, how=”left_semi”).show()
# Join with multiple columns
# df1.join(df2, on=[“col1”, “col2″], how=”inner”).show()
# Join with complex conditions
# df1.join(df2, df1.key == df2.key, “inner”).show()
6. Window Functions
Window functions perform calculations across a set of table rows that are related to the current row.
from pyspark.sql.window import Window
# Sample data for window functions
data_window = [(“A”, “dept1”, 100), (“B”, “dept1”, 150),
(“C”, “dept2”, 200), (“D”, “dept2”, 120),
(“E”, “dept1”, 110)]
schema_window = [“name”, “department”, “salary”]
df_window = spark.createDataFrame(data_window, schema_window)
# Define a window specification (partition by department, order by salary)
window_spec = Window.partitionBy(“department”).orderBy(“salary”)
# Rank within each department
df_window.withColumn(“rank”, rank().over(window_spec)).show()
df_window.withColumn(“dense_rank”, dense_rank().over(window_spec)).show()
df_window.withColumn(“row_number”, row_number().over(window_spec)).show()
# Lag/Lead
df_window.withColumn(“prev_salary”, lag(“salary”, 1).over(window_spec)).show()
df_window.withColumn(“next_salary”, lead(“salary”, 1).over(window_spec)).show()
# Cumulative sum within each department
df_window.withColumn(“cumulative_salary”, sum(“salary”).over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
# Ntile
df_window.withColumn(“ntile”, ntile(2).over(window_spec)).show() # Divide into 2 groups
7. Working with Null Values
# Fill nulls
df_with_nulls = spark.createDataFrame([(1, None), (2, “B”), (3, None)], [“id”, “value”])
df_with_nulls.na.fill(“N/A”, subset=[“value”]).show()
df_with_nulls.na.fill(0).show() # Fills all numeric nulls with 0
# Drop rows with nulls
df_with_nulls.na.drop().show() # Drops rows with any null
df_with_nulls.na.drop(subset=[“value”]).show() # Drops rows where ‘value’ is null
# Replace values
df.replace(“NY”, “New York”, subset=[“state”]).show()
8. UDFs (User Defined Functions)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define a Python function
def greet(name):
return f”Hello, {name}!”
# Register it as a UDF
greet_udf = udf(greet, StringType())
# Apply the UDF
df.withColumn(“greeting”, greet_udf(col(“name”))).show()
# SQL UDF (registering a UDF for direct use in spark.sql)
spark.udf.register(“my_greet_func”, greet, StringType())
df.createOrReplaceTempView(“my_table_udf”)
spark.sql(“SELECT name, my_greet_func(name) as greeted_name FROM my_table_udf”).show()
9. Saving Data
# Save as Parquet
df.write.parquet(“path/to/output.parquet”, mode=”overwrite”)
# Save as CSV
df.write.csv(“path/to/output.csv”, header=True, mode=”overwrite”)
# Save as JSON
df.write.json(“path/to/output.json”, mode=”overwrite”)
# Save as ORC
df.write.orc(“path/to/output.orc”, mode=”overwrite”)
# Save to a database (requires appropriate JDBC driver)
# df.write.format(“jdbc”) \
# .option(“url”, “jdbc:postgresql://localhost:5432/mydb”) \
# .option(“dbtable”, “my_table”) \
# .option(“user”, “user”) \
# .option(“password”, “password”) \
# .mode(“overwrite”) \
# .save()
10. SparkSession Management
# Stop SparkSession (important for releasing resources)
spark.stop()
Leave a comment