30 PySpark Scenario-Based Interview Questions for Experienced

PySpark is a powerful framework for distributed data processing and analysis. If you’re an experienced PySpark developer preparing for a job interview, it’s essential to be ready for scenario-based questions that test your practical knowledge. In this article, we present 30 scenario-based interview questions along with their solutions to help you confidently tackle your next PySpark interview.

1. Question: Working with CSV Files

Scenario: You have a CSV file named “data.csv” with the following columns: “id”, “name”, “age”, and “salary”. Load the data into a PySpark DataFrame and display the first 5 rows.

from pyspark.sql import SparkSession

# Initialize SparkSession

spark = SparkSession.builder.appName(“InterviewQuestions”).getOrCreate()

# Load CSV data into DataFrame

df = spark.read.csv(“data.csv”, header=True, inferSchema=True)

# Display the first 5 rows

df.show(5)

2. Question: Filtering and Aggregating Data

Scenario: You have a DataFrame “df” with columns “product_name” and “price”. Filter the DataFrame to show only the products with a price greater than 1000 and calculate the average price for each product category.

from pyspark.sql import functions as F

# Filter products with price > 1000

filtered_df = df.filter(df[“price”] > 1000)

# Calculate average price for each product category

average_price_df = filtered_df.groupBy(“product_name”).agg(F.avg(“price”).alias(“avg_price”))

average_price_df.show()

3. Question: Handling Missing Data

Scenario: Your DataFrame “df” contains columns “product_name”, “quantity”, and “price”. However, some rows have missing values for the “quantity” column. Replace the missing values with 0 and calculate the total revenue for each product.

# Replace missing “quantity” values with 0

df = df.fillna(0, subset=[“quantity”])

# Calculate total revenue for each product

df = df.withColumn(“total_revenue”, df[“quantity”] * df[“price”])

df.show()

4. Question: Working with Dates

Scenario: Your DataFrame “df” contains a column “transaction_date” in string format (YYYY-MM-DD). Convert this column to a DateType and calculate the total revenue generated on each date.

from pyspark.sql.functions import to_date

# Convert “transaction_date” to DateType

df = df.withColumn(“transaction_date”, to_date(df[“transaction_date”], “yyyy-MM-dd”))

# Calculate total revenue for each date

daily_revenue = df.groupBy(“transaction_date”).agg(F.sum(“quantity” * “price”).alias(“total_revenue”))

daily_revenue.show()

5. Question: Window Functions and Ranking

Scenario: You have a DataFrame “df” with columns “product_name”, “quantity”, and “transaction_date”. Calculate the rank of each product based on the total quantity sold, and display the top 5 products with the highest quantity.

from pyspark.sql import Window

# Define the Window specification

window_spec = Window.partitionBy(“product_name”).orderBy(F.desc(“quantity”))

# Calculate rank of each product based on quantity

df = df.withColumn(“rank”, F.rank().over(window_spec))

# Display top 5 products with highest quantity

top_products = df.filter(F.col(“rank”) <= 5)

top_products.show()

6. Question: Joining DataFrames

Scenario: You have two DataFrames “df1” and “df2”, both containing columns “customer_id” and “customer_name”. Perform a full outer join between the DataFrames and display the rows where the customer names match.

# Perform full outer join between df1 and df2

joined_df = df1.join(df2, on=”customer_id”, how=”full_outer”)

# Filter rows where customer names match

matched_customers = joined_df.filter(df1[“customer_name”] == df2[“customer_name”])

matched_customers.show()

7. Question: Broadcast Join

Scenario: You have a large DataFrame “big_df” with columns “customer_id” and “order_total”, and a small DataFrame “small_df” with columns “customer_id” and “customer_name”. Use broadcast join to join the DataFrames and display the customer names along with their order totals.

# Broadcast join small_df with big_df

from pyspark.sql.functions import broadcast

joined_df = big_df.join(broadcast(small_df), on=”customer_id”, how=”inner”)

# Display customer names with order totals

joined_df.select(“customer_name”, “order_total”).show()

8. Question: Working with UDFs

Scenario: You have a DataFrame “df” with a column “text” containing sentences. Create a User-Defined Function (UDF) to calculate the average length of sentences, and apply the UDF to create a new column “avg_length”.

# User-Defined Function

from pyspark.sql.functions import udf

from pyspark.sql.types import IntegerType

def calculate_avg_length(text):

    sentences = text.split(“.”)

    total_length = sum(len(sentence) for sentence in sentences)

    avg_length = total_length / len(sentences)

    return int(avg_length)

# Register UDF

avg_length_udf = udf(calculate_avg_length, IntegerType())

# Apply UDF to create “avg_length” column

df = df.withColumn(“avg_length”, avg_length_udf(“text”))

df.show()

9. Question: Handling Large Data

Scenario: You have a very large DataFrame “big_df” with millions of rows. Implement a sampling technique to select a random sample of 1% of the data and display the sampled DataFrame.

# Sample 1% of the data

sampled_df = big_df.sample(withReplacement=False, fraction=0.01)

sampled_df.show()

10. Question: Data Validation

Scenario: You have a DataFrame “df” with columns “age” and “gender”. Validate the data to ensure that the “age” column does not contain negative values, and the “gender” column only contains ‘Male’ or ‘Female’. Display the rows that fail the validation.

# Data Validation

invalid_data = df.filter((df[“age”] < 0) | (~df[“gender”].isin([‘Male’, ‘Female’])))

invalid_data.show()

11. Question: Cross-Joining DataFrames

Scenario: You have two DataFrames “df1” and “df2”, both containing columns “product_name” and “quantity”. Perform a cross-join between the DataFrames to get all possible combinations of products and quantities.

# Perform cross-join between df1 and df2

cross_joined_df = df1.crossJoin(df2)

cross_joined_df.show()

12. Question: Handling Duplicate Data

Scenario: Your DataFrame “df” contains duplicate rows. Remove the duplicate rows based on all columns and display the DataFrame without duplicates.

# Remove duplicate rows based on all columns

deduplicated_df = df.dropDuplicates()

deduplicated_df.show()

13. Question: Exploratory Data Analysis

Scenario: You have a DataFrame “df” with columns “age” and “income”. Perform exploratory data analysis to understand the distribution of ages and income using summary statistics.

# Exploratory Data Analysis

summary_stats = df.select(F.mean(“age”), F.min(“age”), F.max(“age”), F.mean(“income”), F.min(“income”), F.max(“income”))

summary_stats.show()

14. Question: Handling Outliers

Scenario: Your DataFrame “df” contains a column “income”. Use the Interquartile Range (IQR) method to detect and handle outliers in the “income” column.

# Handling Outliers using IQR method

q1, q3 = df.approxQuantile(“income”, [0.25, 0.75], 0.01)

iqr = q3 – q1

lower_bound = q1 – 1.5 * iqr

upper_bound = q3 + 1.5 * iqr

df = df.filter((df[“income”] >= lower_bound) & (df[“income”] <= upper_bound))

df.show()

15. Question: Applying Machine Learning

Scenario: You have a DataFrame “df” with features “age”, “income”, and a label “target” indicating whether a customer made a purchase (1) or not (0). Split the data into training and testing sets, and train a logistic regression model to predict the “target” based on the features.

from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import VectorAssembler

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Prepare features and label columns

feature_cols = [“age”, “income”]

assembler = VectorAssembler(inputCols=feature_cols, outputCol=”features”)

df = assembler.transform(df)

# Split data into training and testing sets

train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Train a logistic regression model

lr = LogisticRegression(labelCol=”target”, featuresCol=”features”)

model = lr.fit(train_data)

# Make predictions on test data

predictions = model.transform(test_data)

# Evaluate model performance

evaluator = BinaryClassificationEvaluator(labelCol=”target”)

auc = evaluator.evaluate(predictions)

print(“AUC:”, auc)

16. Question: Working with Nested Data

Scenario: Your DataFrame “df” contains a column “address” that contains nested data in the form of a struct with fields “city” and “zipcode”. Extract the “city” and “zipcode” from the “address” column and create separate columns for them.

# Extract “city” and “zipcode” from “address” struct

df = df.withColumn(“city”, df[“address.city”])

df = df.withColumn(“zipcode”, df[“address.zipcode”])

df.show()

17. Question: Handling Imbalanced Data

Scenario: Your DataFrame “df” contains a column “target” with binary values (0 or 1), indicating whether a customer made a purchase (1) or not (0). The data is imbalanced, with a small number of positive samples. Implement the Synthetic Minority Over-sampling Technique (SMOTE) to balance the data and train a classification model.

from imblearn.over_sampling import SMOTE

from pyspark.ml.feature import VectorAssembler

# Prepare features and label columns

feature_cols = [“age”, “income”]

assembler = VectorAssembler(inputCols=feature_cols, outputCol=”features”)

df = assembler.transform(df)

# Implement SMOTE to balance the data

smote = SMOTE(sampling_strategy=’auto’, random_state=42)

features_resampled, target_resampled = smote.fit_resample(df.select(“features”), df.select(“target”))

# Create a DataFrame with resampled data

balanced_df = spark.createDataFrame(zip(features_resampled, target_resampled), [“features”, “target”])

# Split data into training and testing sets

train_data, test_data = balanced_df.randomSplit([0.8, 0.2], seed=42)

# Train a logistic regression model

lr = LogisticRegression(labelCol=”target”, featuresCol=”features”)

model = lr.fit(train_data)

# Make predictions on test data

predictions = model.transform(test_data)

# Evaluate model performance

evaluator = BinaryClassificationEvaluator(labelCol=”target”)

auc = evaluator.evaluate(predictions)

print(“AUC:”, auc)

18. Question: Handling Large Text Data

Scenario: You have a DataFrame “df” with a column “text” containing large text data. Implement feature extraction techniques such as TF-IDF and train a text classification model.

from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF

from pyspark.ml.classification import NaiveBayes

from pyspark.ml import Pipeline

# Tokenize text data

tokenizer = Tokenizer(inputCol=”text”, outputCol=”words”)

df = tokenizer.transform(df)

# Remove stop words

remover = StopWordsRemover(inputCol=”words”, outputCol=”filtered_words”)

df = remover.transform(df)

# Calculate Term Frequency (TF)

cv = CountVectorizer(inputCol=”filtered_words”, outputCol=”raw_features”)

model = cv.fit(df)

df = model.transform(df)

# Calculate Inverse Document Frequency (IDF)

idf = IDF(inputCol=”raw_features”, outputCol=”features”)

idf_model = idf.fit(df)

df = idf_model.transform(df)

# Split data into training and testing sets

train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Train a Naive Bayes classification model

nb = NaiveBayes(labelCol=”target”, featuresCol=”features”)

model = nb.fit(train_data)

# Make predictions on test data

predictions = model.transform(test_data)

# Evaluate model performance

evaluator = BinaryClassificationEvaluator(labelCol=”target”)

auc = evaluator.evaluate(predictions)

print(“AUC:”, auc)

19. Question: Data Serialization

Scenario: You have a large DataFrame “df” that needs to be shared with other teams in a serialized format. Serialize the DataFrame to a Parquet file for efficient storage and distribution.

# Serialize DataFrame to Parquet file

df.write.parquet(“data.parquet”)

20. Question: Working with Avro Data

Scenario: You have an Avro file named “data.avro” containing customer data in binary format. Load the Avro data into a PySpark DataFrame for further processing and analysis.

# Load Avro data into DataFrame

df = spark.read.format(“avro”).load(“data.avro”)

df.show()

21. Question: Broadcast Variables

Scenario: You have a large DataFrame “df” and a small list of values that you want to use for filtering the DataFrame. Use broadcast variables to efficiently join the DataFrame with the list and display the results.

# Small list of values

filter_values = [10, 20, 30]

# Broadcast the small list

broadcast_values = spark.sparkContext.broadcast(filter_values)

# Filter the DataFrame using broadcast variable

filtered_df = df.filter(df[“column_name”].isin(broadcast_values.value))

filtered_df.show()

23. Question: Working with Nested JSON Data

Scenario: You have a JSON file named “data.json” containing nested data. Load the JSON data into a PySpark DataFrame and extract specific fields from the nested structure.

# Load JSON data into DataFrame

df = spark.read.json(“data.json”)

# Extract specific fields from nested structure

df = df.select(“outer_field.inner_field1”, “outer_field.inner_field2”)

df.show()

24. Question: Cross-Validation for Model Selection

Scenario: You have a DataFrame “df” with features and a binary label. Perform k-fold cross-validation to evaluate the performance of multiple classification models and select the best-performing one.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier

# Prepare features and label columns

assembler = VectorAssembler(inputCols=feature_cols, outputCol=”features”)

df = assembler.transform(df)

# Initialize classifiers

lr = LogisticRegression(labelCol=”label”, featuresCol=”features”)

rf = RandomForestClassifier(labelCol=”label”, featuresCol=”features”)

gbt = GBTClassifier(labelCol=”label”, featuresCol=”features”)

# Create a parameter grid for each classifier

param_grid_lr = ParamGridBuilder().build()

param_grid_rf = ParamGridBuilder().build()

param_grid_gbt = ParamGridBuilder().build()

# Initialize evaluator

evaluator = BinaryClassificationEvaluator()

# Perform k-fold cross-validation for each classifier

cv_lr = CrossValidator(estimator=lr, estimatorParamMaps=param_grid_lr, evaluator=evaluator, numFolds=5)

cv_rf = CrossValidator(estimator=rf, estimatorParamMaps=param_grid_rf, evaluator=evaluator, numFolds=5)

cv_gbt = CrossValidator(estimator=gbt, estimatorParamMaps=param_grid_gbt, evaluator=evaluator, numFolds=5)

# Fit the models

cv_lr_model = cv_lr.fit(df)

cv_rf_model = cv_rf.fit(df)

cv_gbt_model = cv_gbt.fit(df)

# Get the best model for each classifier

best_lr_model = cv_lr_model.bestModel

best_rf_model = cv_rf_model.bestModel

best_gbt_model = cv_gbt_model.bestModel

print(“Best Logistic Regression Model:”, best_lr_model)

print(“Best Random Forest Model:”, best_rf_model)

print(“Best Gradient Boosting Tree Model:”, best_gbt_model)

Leave a comment

Create a website or blog at WordPress.com

Up ↑

Design a site like this with WordPress.com
Get started