Write a solution in PySpark to find the average selling price for each product. average_price should be rounded to 2 decimal places.
Solution :
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, round
from pyspark.sql.types import StructType, StructField, IntegerType, DateType
# Initialize Spark session
spark = SparkSession.builder.appName(“average_selling_price”).getOrCreate()
# Data for Prices and Units Sold
prices_data = [(1, datetime.date(2019, 2, 17), datetime.date(2019, 2, 28), 5),
(1, datetime.date(2019, 3, 1), datetime.date(2019, 3, 22), 20),
(2, datetime.date(2019, 2, 1), datetime.date(2019, 2, 20), 15),
(2, datetime.date(2019, 2, 21), datetime.date(2019, 3, 31), 30)]
units_sold_data = [(1, datetime.date(2019, 2, 25), 100),
(1, datetime.date(2019, 3, 1), 15),
(2, datetime.date(2019, 2, 10), 200),
(2, datetime.date(2019, 3, 22), 15)]
# Schemas
prices_schema = StructType([
StructField(“product_id”, IntegerType(), True),
StructField(“start_date”, DateType(), True),
StructField(“end_date”, DateType(), True),
StructField(“price”, IntegerType(), True)
])
units_sold_schema = StructType([
StructField(“product_id”, IntegerType(), True),
StructField(“purchase_date”, DateType(), True),
StructField(“units”, IntegerType(), True)
])
df_prices = spark.createDataFrame(data=prices_data, schema=prices_schema)
df_units_sold = spark.createDataFrame(data=units_sold_data, schema=units_sold_schema)
df_joined = df_prices.join(
df_units_sold,
“product_id”,
“left”).where(col(“purchase_date”).between(col(“start_date”), col(“end_date”)))
df_average_price = df_joined.groupby(“product_id”).agg(
round(sum(col(“units”) * col(“price”)) / sum(“units”), 2).alias(“average_price”)
)
df_average_price.show()
The output is shown in the comments section. Any suggestions, please comment in the comments section.
Leave a comment