PySpark Cookbook

PySpark Cookbook

1. Introduction

1.1. To create an empty dataframe

import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = []
df = spark.createDataFrame(schema=schema, data=data)
df.show()
A B

1.2. To create a dataframe with columns key and value from a dictionary

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df.show()
key value
key1 value1
key2 value2

1.3. To duplicate a column

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df = df.withColumn("value_dup", F.col("value"))
df.show()
key value value_dup
key1 value1 value1
key2 value2 value2

1.4. To rename a column using .withColumnRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnRenamed("key", "new_key") \
        .withColumnRenamed("value","new_value")
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.5. To rename a column using .withColumnsRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnsRenamed({"key": "new_key", "value": "new_value"})
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.6. To rename a column using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select(F.col("key").alias("new_key"), F.col("value").alias("new_value"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.7. To rename columns by adding a prefix

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),]
df = spark.createDataFrame(schema=schema, data=data)
print("Original dataframe:")
df.show()
print("Dataframe with renamed columns:")
df = df.select(*[F.col(k).alias(f"prefix_{k}") for k in df.columns])
df.show()

Original dataframe:

index value
1 Home
2 School
3 Home

Dataframe with renamed columns:

prefix_index prefix_value
1 Home
2 School
3 Home

1.8. To drop columns from a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)

df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()

df = df.drop("value", "const")
print("Modified dataframe:")
df.show()

Original dataframe:

key value const
key1 value1 1
key2 value2 1

Modified dataframe:

key
key1
key2

1.9. To subset columns of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()
print("Subset 'key', 'value' columns:")
df["key", "value"].show()
print("Subset 'key', 'const' columns:")
df.select("key", "const").show()

Original dataframe:

key value const
key1 value1 1
key2 value2 1

Subset 'key', 'value' columns:

key value
key1 value1
key2 value2

Subset 'key', 'const' columns:

key const
key1 1
key2 1

1.10. To add a column with a constant value using F.lit()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.withColumn("const_integer", F.lit(1))
df = df.withColumn("const_string", F.lit("string"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

key value const_integer const_string
key1 value1 1 string
key2 value2 1 string

1.11. To add a column with a constant value using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select("key", "value", F.lit("const_str").alias("constant_value"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

key value constant_value
key1 value1 const_str
key2 value2 const_str

1.12. To create a dataframe from a list of tuples

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

1.13. To get the number of rows of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
num_rows = df.count()
print(f"df has {num_rows} rows")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

df has 3 rows

1.14. To select first N rows

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
print("These are first 2 rows:")
df.limit(2).show()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

These are first 2 rows:

integer characters
1 [A, B]
2 [C, D]

1.15. To deduplicate rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("key", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
        T.StructField("comment", T.StringType(), True),
    ]
)
data = [(1, "Home", "a house"),
        (1, "Home", "a house"),
        (2, "School", "a building"),
        (2, "School", "a house"),
        (3, "Home", "a house"),]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print("Dataframe with distinct rows:")
df.distinct().show()

print("Dataframe with dropped duplicate rows:")
df.dropDuplicates().show()

print("Dataframe with dropped duplicates in columns 'key' and 'value':")
df = df.dropDuplicates(subset=["key", "value"])
df.show()

Original dataframe:

key value comment
1 Home a house
1 Home a house
2 School a building
2 School a house
3 Home a house

Dataframe with distinct rows:

key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house

Dataframe with dropped duplicate rows:

key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house

Dataframe with dropped duplicates in columns 'key' and 'value':

key value comment
1 Home a house
2 School a building
3 Home a house

1.16. To convert a column to a list using a lambda function

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").rdd.map(lambda r: r[0]).collect()
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.17. To convert a dataframe to a list of dictionaries corresponding to every row

import pprint
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst_dict = df.rdd.map(lambda row: row.asDict()).collect()
print(f"Dataframe is represented as:\n")
txt = pprint.pformat(lst_dict)
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Dataframe is represented as:

[{'characters': ['A', 'B'], 'integer': 1},
 {'characters': ['C', 'D'], 'integer': 2},
 {'characters': ['E', 'F'], 'integer': 3}]

1.18. To convert a column to a list using list comprehension

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = [k["integer"] for k in df.select("integer").rdd.collect()]
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.19. To convert a column to a list using Pandas

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").toPandas()["integer"].tolist()
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.20. To display full width of a column (do not truncate)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("sentence", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["A", "very", "long", "sentence"],),
        (["with", "many", "words", "."],)]
df = spark.createDataFrame(schema=schema, data=data)

print("Truncated output (default behavior):")
df.show()

print("Truncated to 15 characters output:")
df.show(truncate=15)

print("Non-truncated output (show all):")
df.show(truncate=False)

Truncated output (default behavior):

sentence
[A, very, long, s…
[with, many, word…

Truncated to 15 characters output:

sentence
[A, very, lo…
[with, many,…

Non-truncated output (show all):

sentence
[A, very, long, sentence]
[with, many, words, .]

2. Filtering rows

2.1. To filter based on values of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", None),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", None),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print('Filter: F.col("Location" == "Home")')
dft = df.filter(F.col("Location") == "Home")
dft.show()

print('Filter: F.col("Quantity").isNull()')
dft = df.filter(F.col("Quantity").isNull())
dft.show()

print('Filter: F.col("Quantity").isNotNull()')
dft = df.filter(F.col("Quantity").isNotNull())
dft.show()

print('Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))')
dft = df.filter((F.col("Location") == "Home") & (F.col("Product") == "Laptop"))
dft.show()

print('Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))')
dft = df.filter((F.col("Location") == "Home") & ~(F.col("Product") == "Laptop"))
dft.show()

print('Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))')
dft = df.filter((F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))
dft.show()

print('Filter: F.col("Product").isin(["Laptop", "Mouse"])')
dft = df.filter(F.col("Product").isin(["Laptop", "Mouse"]))
dft.show()

Original dataframe:

Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9
Office Laptop null
Office Monitor 10
Office Mouse 9

Filter: F.col("Location" == "Home")

Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9

Filter: F.col("Quantity").isNull()

Location Product Quantity
Home Monitor null
Office Laptop null

Filter: F.col("Quantity").isNotNull()

Location Product Quantity
Home Laptop 12
Home Keyboard 9
Office Monitor 10
Office Mouse 9

Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))

Location Product Quantity
Home Laptop 12

Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))

Location Product Quantity
Home Monitor null
Home Keyboard 9

Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))

Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9

Filter: F.col("Product").isin(["Laptop", "Mouse"])

Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9

3. Array operations

3.1. To create arrays of different lengths

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B")
dft.show()
A B
[1, 2] [2, 3, 4, 5]
[4, 5, 6] [2, 3, 4, 5]

3.2. To calculate set difference

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)

dft = df.select("A", "B",
          F.array_except("A", "B").alias("A\B"),
          F.array_except("B", "A").alias("B\A"))
dft.show()
A B A\B B\A
[b, a, c] [c, d, a, f] [b] [d, f]

3.3. To calculate set union

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B",
          F.array_union("A", "B").alias("A U B"))
dft.show()
A B A U B
[b, a, c] [c, d, a, f] [b, a, c, d, f]

3.4. To calculate set intersection

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B", F.array_intersect("A", "B").alias("A \u2229 B"))
dft.show()
A B A ∩ B
[b, a, c] [c, d, a, f] [a, c]

3.5. To pad arrays with value

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
n = 4
fill_value = 0
df1 = df.withColumn("A_padding", F.expr(f"array_repeat({fill_value}, {n} - size(A))"))
df1 = df1.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df1.select("A", "A_padding", "A_padded")
dft.show()

df2 = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df2 = df2.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df2.select("A", "A_padding", "A_padded")
dft.show()
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]

3.6. To sum two arrays elementwise using F.element_at()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr('transform(A_padded, (element, index) -> element + element_at(B, index + 1))'))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]

3.7. To sum two arrays using F.arrays_zip()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr("transform(arrays_zip(A_padded, B), x -> x.A_padded + x.B)"))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]

3.8. To find mode of an array (most common element)

from collections import Counter
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 2, 4],),
        ([4, 5, 6, 7],),
        ([1, 1, 2, 2],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf
def udf_mode(x):
    return Counter(x).most_common(1)[0][0]

dft = df.withColumn("mode", udf_mode("A"))
dft.printSchema()
dft.show()

Schema of dft is:

root
 |-- A: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- mode: string (nullable = true)
A mode
[1, 2, 2, 4] 2
[4, 5, 6, 7] 4
[1, 1, 2, 2] 1

3.9. To calculate difference of two consecutive elements in an array

import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("id", T.StringType(), True),
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [("A", [4, 1, 0, 2]),
        ("B", [1, 0, 3, 1])]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff", diff_of_two_consecutive_elements(F.col("values")))
df.show()
df.printSchema()
id values diff
A [4, 1, 0, 2] [-3, -1, 2]
B [1, 0, 3, 1] [-1, 3, -2]

Schema of df is:

root
 |-- id: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- diff: array (nullable = true)
 |    |-- element: integer (containsNull = true)

3.10. To apply a function to every element of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words_with_suffixes", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen_10", "note_11", "bottle_12"],), (["apple_13", "orange_14", "lemon_15"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("words", F.transform("words_with_suffixes", lambda x: F.split(x, "_").getItem(0)))
df.show(truncate=False)
words_with_suffixes words
[pen_10, note_11, bottle_12] [pen, note, bottle]
[apple_13, orange_14, lemon_15] [apple, orange, lemon]

3.11. To deduplicate elements in an array (find unique/distinct elements)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen", "note", "pen"],), (["apple", "apple", "lemon"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("unique_words", F.array_distinct("words"))
df.show(truncate=False)
words unique_words
[pen, note, pen] [pen, note]
[apple, apple, lemon] [apple, lemon]

3.12. To create a map (dictionary) from two arrays (one with keys, one with values)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("keys", T.ArrayType(T.IntegerType()), True),
        T.StructField("values", T.ArrayType(T.StringType()), True),
    ]
)
data = [([1, 2, 3], ["A", "B", "C"])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("map_kv", F.map_from_arrays("keys", "values"))
df.show(truncate=False)
keys values map_kv
[1, 2, 3] [A, B, C] {1 -> A, 2 -> B, 3 -> C}

3.13. To calculate mean of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("mean", F.aggregate(
          "values",                                  # column
          F.lit(0),                                  # initialValue
          lambda acc, x: acc + x,                    # merge operation
          lambda acc: acc / F.size(F.col("values")), # finish
      ))
df.show()
values mean
[1, 2] 1.5
[4, 5, 6] 5.0

3.14. To remove NULL values from an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2, None],),
        ([4, 5, None, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("values_without_nulls", F.array_compact("values"))
df.show()
values values_without_nulls
[1, -2, NULL] [1, -2]
[4, 5, NULL, 6] [4, 5, 6]

3.15. To find out whether an array has any negative elements

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("any_negative", F.exists("values", lambda x: x < 0))
df.show()
values any_negative
[1, -2] true
[4, 5, 6] false

3.16. To convert elements of an array to columns

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 3, 4],),
        ([5, 6, 7],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("first", F.col("A").getItem(0))
dft = df.select("A", "first", *[F.col("A").getItem(k).alias(f"element_{k+1}") for k in range(1,4)])
dft.show()
A first element_2 element_3 element_4
[1, 2, 3, 4] 1 2 3 4
[5, 6, 7] 5 6 7 null

3.17. To find location of the first occurence of an element in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([7, 4, 7],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("position", F.array_position(F.col("values"), 7))
df.show()
values position
[1, 7, 5] 2
[7, 4, 7] 1

3.18. To calculate moving difference of two consecutive elements in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 5],),
        ([4, 4, 6],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.pandas_udf(T.ArrayType(T.IntegerType()))
def diff2e(x: pd.Series) -> pd.Series:
    return x.apply(lambda x: (x[1:] - x[:-1]))

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff2e", diff2e(F.col("values")))
df = df.withColumn("ediff1d", diff_of_two_consecutive_elements(F.col("values")))
df.show()
values diff2e ediff1d
[1, 2, 5] [1, 3] [1, 3]
[4, 4, 6] [0, 2] [0, 2]

3.19. To slice an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5, 2],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("values[1:3]", F.slice("values", start=2, length=2))
df.show()
values values[1:3]
[1, 7, 5, 2] [7, 5]
[6, 4, 7, 3] [4, 7]

3.20. To slice an array dynamically

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)
start_idx = 2
df = df.withColumn("values[1:]", F.slice("values", start=2, length=(F.size("values") - F.lit(start_idx - 1))))
df.show()
values values[1:]
[1, 7, 5] [7, 5]
[6, 4, 7, 3] [4, 7, 3]

4. Text processing

4.1. To remove prefix from a string using a UDF

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("text", T.StringType(), True),
    ]
)
data = [("id_orange",),
        ("apple",)]
df = spark.createDataFrame(schema=schema, data=data)
remove_prefix = F.udf(lambda x: x[3:] if x[:3] == "id_" else x, T.StringType())
df = df.withColumn("no_prefix", remove_prefix(F.col("text")))
df.show()
text no_prefix
id_orange orange
apple apple

4.2. To deduplicate identical consecutive characters using F.regexp_replace()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["aaaabbccc"], ["bbbccaaa"]]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn('Shortened', F.regexp_replace("String", "([ab])\\1+", "$1"))
df.show(truncate=False)
String Shortened
aaaabbccc abccc
bbbccaaa bcca

4.3. To deduplicate identical consecutive characters using a UDF

import re
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["aaaabbccc"], ["bbbccaaa"]]

@F.udf(returnType=T.StringType())
def remove_consecutive_duplicate_characters_ab(s):
    return re.sub(r'([ab])\1+', r'\1', s)

df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn('Shortened', remove_consecutive_duplicate_characters_ab("String"))
df.show(truncate=False)
String Shortened
aaaabbccc abccc
bbbccaaa bcca

4.4. To split a string into letters (characters) using regex

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select('String', F.split('String', '(?!$)').alias("Characters"))
dft.show(truncate=False)
String Characters
This is [T, h, i, s, , i, s]

4.5. To concatenate columns with strings using a separator

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str_Concat", F.concat_ws( "_", "Str1", "Str2"))
df.show()
Str1 Str2 Str_Concat
This is a string This is_a string
on a different row on a_different row

4.6. To split a string into letters (characters) using split function

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
fsplit = F.expr("split(String, '')")
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)
String Characters
This is [T, h, i, s, , i, s]

4.7. To split a string into letters (characters) and remove last character

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is_"]]
df = spark.createDataFrame(schema=schema, data=data)
print("Using split function and remove last character:")
fsplit = "split(String, '')"
fsplit = F.expr(f'slice({fsplit}, 1, size({fsplit}) - 1)')
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)

Using split function and remove last character:

String Characters
This is_ [T, h, i, s, , i, s]

4.8. To append a string to all values in a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str1_with_prefix", F.concat(F.lit("Prefix_"), "Str1"))
dft = df.select("Str1", "Str1_with_prefix")
dft.show()
Str1 Str1_with_prefix
This is Prefix_This is
on a Prefix_on a

5. Time operations

5.1. To convert Unix time stamp to human readable format

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("timestamp", T.LongType(), True),
    ]
)
data = [(1703224755,),
        (1703285602,)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("time_stamp_hrf", F.from_unixtime(F.col("timestamp")))
df.show()
timestamp time_stamp_hrf
1703224755 2023-12-22 06:59:15
1703285602 2023-12-22 23:53:22

6. Numerical operations

6.1. To find percentage of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Laptop", 12),
        ("Monitor", 7),
        ("Mouse", 8),
        ("Keyboard", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy())*100, 2))
dft = df.select("Product", "Quantity", "%").orderBy(F.desc("Quantity"))
dft.show()
Product Quantity %
Laptop 12 33.33
Keyboard 9 25.0
Mouse 8 22.22
Monitor 7 19.44

6.2. To find percentage of a column within a group using a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy("Location"))*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44

6.3. To find percentage of a column within a group using .groupBy() and a join

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df_sum = df.groupBy("Location").agg(F.sum("Quantity").alias("Total_Quantity"))
df = df.join(df_sum, on="Location").withColumn("%", F.round(F.col("Quantity")/F.col("Total_Quantity")*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44

6.4. To add a row with total count and percentage for a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Books read", T.IntegerType(), True),
    ]
)
data = [("Alice", 12),
        ("Bob", 7),
        ("Michael", 8),
        ("Kevin", 10)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("%", F.round( F.col("Books read")/F.sum("Books read").over(Window.partitionBy())*100, 6))
dft = df.select("Name", "Books read", "%").orderBy(F.desc("Books read"))
dft = dft.union(
               dft.groupBy().agg(F.lit("Total"),
                                     F.sum("Books read").alias("Books read"),
                                     F.sum("%").alias("%"))
               )
dft.show()
Name Books read %
Alice 12 32.432432
Kevin 10 27.027027
Michael 8 21.621622
Bob 7 18.918919
Total 37 100.0

6.5. To find maximum value of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
max_val = df.select("Quantity").rdd.max()[0]
print(f"Maximum value of Quantity: {max_val}")
Location Product Quantity
Home Laptop 12
Home Monitor 7
Home Mouse 8
Home Keyboard 9
Office Laptop 23
Office Monitor 10
Office Mouse 9

Maximum value of Quantity: 23

6.6. To add a column with count of elements per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("count_per_group", F.count(F.lit(1)).over(Window.partitionBy(F.col("Location"))))
df.show()
Location Product Quantity count_per_group
Home Laptop 12 4
Home Monitor 7 4
Home Mouse 8 4
Home Keyboard 9 4
Office Laptop 23 3
Office Monitor 10 3
Office Mouse 9 3

6.7. To add a column with count of elements whose quantity is larger than some number per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
dfs = df.groupBy("Location").agg(F.sum((F.col("Quantity") >= 10).cast("int")).alias("Count of products with quantity >= 10 per location"))
dfs.show()
Location Product Quantity
Home Laptop 12
Home Monitor 7
Home Mouse 8
Home Keyboard 9
Office Laptop 23
Office Monitor 10
Office Mouse 9
Location Count of products with quantity >= 10 per location
Home 1
Office 2

6.8. To calculate minimal value of two columns per row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Updated schema to include two separate score fields
schema = T.StructType([
    T.StructField('Student', T.StructType([
        T.StructField('First name', T.StringType(), True),
        T.StructField('Middle name', T.StringType(), True),
        T.StructField('Last name', T.StringType(), True)
    ])),
    T.StructField('ID', T.StringType(), True),
    T.StructField('Gender', T.StringType(), True),
    T.StructField('Score1', T.IntegerType(), True),
    T.StructField('Score2', T.IntegerType(), True)
])

# Sample data with two scores for each student
data = [
    (("John", "", "Doe"), "1007", "M", 75, 80),
    (("Adam", "Scott", "Smith"), "1008", "M", 55, 65),
    (("Marie", "", "Carpenter"), "1004", "F", 67, 70),
    (("Samantha", "Louise", "Herbert"), "1002", "F", 90, 85),
    (("Craig", "", "Brown"), "1011", "M", 88, 92)
]

df = spark.createDataFrame(data=data, schema=schema)
# Calculate the minimum score between Score1 and Score2 and store it in a new column 'MinScore'
df = df.withColumn("MinScore", F.least(F.col("Score1"), F.col("Score2")))
df.printSchema()
# Show the result
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Score1: integer (nullable = true)
 |-- Score2: integer (nullable = true)
 |-- MinScore: integer (nullable = true)
Student ID Gender Score1 Score2 MinScore
{John, , Doe} 1007 M 75 80 75
{Adam, Scott, Smith} 1008 M 55 65 55
{Marie, , Carpenter} 1004 F 67 70 67
{Samantha, Louise, Herbert} 1002 F 90 85 85
{Craig, , Brown} 1011 M 88 92 88

6.9. To calculate cumulative sum of a column

import pandas as pd
from pyspark.sql import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = pd.DataFrame({'time': [0, 1, 2, 3, 4, 5],
                   'value': [False, False, True, False, True, True]})

df = spark.createDataFrame(df)
df = df.withColumn("cml_n_true", F.sum((F.col("value") == True).cast("int")).over(Window.orderBy(F.col("time").asc())))
df = df.withColumn("cml_n_false", F.sum((F.col("value") == False).cast("int")).over(Window.orderBy(F.col("time").asc())))
df.show()
time value cml_n_true cml_n_false
0 false 0 1
1 false 0 2
2 true 1 2
3 false 1 3
4 true 2 3
5 true 3 3

6.10. To calculate difference of values of two consecutive rows for a certain column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Update schema to include a dataset column
schema = T.StructType([
    T.StructField("dataset", T.StringType(), True),
    T.StructField("group_data", T.StringType(), True),
    T.StructField("cnt", T.IntegerType(), True),
])

# Update data to include multiple datasets
data = [
    ("dataset1", "group_b", 7),
    ("dataset1", "group_c", 8),
    ("dataset1", "group_d", 9),
    ("dataset1", "group_e", 23),
    ("dataset1", "group_g", 9),
    ("dataset2", "group_a", 5),
    ("dataset2", "group_b", 15),
    ("dataset2", "group_d", 20),
    ("dataset2", "group_e", 8),
    ("dataset2", "group_g", 18),
]

# Create DataFrame
dfs = spark.createDataFrame(schema=schema, data=data)

# Define a window partitioned by dataset and ordered by cnt
window_by_dataset = Window.partitionBy("dataset").orderBy(F.asc("cnt")).rowsBetween(Window.unboundedPreceding, 0)

# Calculate cumulative sum of 'cnt' within each dataset
dfs = dfs.withColumn("cml_cnt", F.sum("cnt").over(window_by_dataset))

# Define another window for consecutive row difference, partitioned by dataset
w = Window.partitionBy("dataset").orderBy(F.col("cnt").desc(), F.col("group_data").desc())

# Define fill value for NULL and shift (row lag) value
fill_null_value = 0
shift = -1

# Calculate difference between two consecutive rows within each dataset
dfs = dfs.withColumn(
    "diff_of_two_consec_rows",
    F.col("cml_cnt") - F.lag("cml_cnt", shift, fill_null_value).over(w)
)

# Show the resulting DataFrame
dfs.show(truncate=False)
dataset group_data cnt cml_cnt diff_of_two_consec_rows
dataset1 group_e 23 56 23
dataset1 group_g 9 33 9
dataset1 group_d 9 24 9
dataset1 group_c 8 15 8
dataset1 group_b 7 7 7
dataset2 group_d 20 66 20
dataset2 group_g 18 46 18
dataset2 group_b 15 28 15
dataset2 group_e 8 13 8
dataset2 group_a 5 5 5

6.11. To calculate down-sampling ratios for over-represented groups of some data used in training of a machine learning algorithm

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("group_data", T.StringType(), True),
        T.StructField("cnt", T.IntegerType(), True),
    ]
)
data = [("group_a", 12),
        ("group_b", 7),
        ("group_c", 8),
        ("group_d", 9),
        ("group_e", 23),
        ("group_f", 10),
        ("group_g", 9)]
dfs = spark.createDataFrame(schema=schema, data=data)

dfs = dfs.withColumn(
        "cml_cnt",
        F.sum("cnt").over(Window.orderBy(F.col("cnt").asc(), F.col("group_data").desc()).rowsBetween(Window.unboundedPreceding, 0)),
    )
txt = "Initial data composition:"
print(txt)
# Total count of samples in the input dataset
dfs = dfs.withColumn("cnt_tot", F.sum("cnt").over(Window.partitionBy()))
dfs.show(truncate=False)
dfs = dfs.withColumn("rank", F.rank().over(Window.orderBy(F.desc("cnt"), F.asc("group_data"))))

cnt_total = dfs.select("cnt_tot").limit(1).collect()[0][0]
dfs = dfs.drop("cnt_total")
n_samples_limit = 65
txt = f"Desired number of samples as parameter: {n_samples_limit}\n"
w = Window.partitionBy().orderBy(F.asc("rank"))
dfs = dfs.withColumn("cml_cnt + (rank-1)*F.lag(cnt, 1)",
                      F.concat_ws("", F.format_string("%2d", F.col("cml_cnt")), F.lit(" + "),
                                      F.lit("("), F.col("rank").cast(T.StringType()), F.lit(" - "), F.lit(1).cast(T.StringType()),
                                      F.lit(")"), F.lit(" * "), (F.lag("cnt", 1, cnt_total).over(w)).cast(T.StringType())))
dfs = dfs.withColumn("cnt_tot_after_top_cut", F.col("cml_cnt") + (F.col("rank") - F.lit(1)) * F.lag("cnt", 1, cnt_total).over(w))
dfs = dfs.withColumn("loc_cutoff", (F.col("cnt_tot_after_top_cut") > F.lit(n_samples_limit)).cast("int"))
dfs = dfs.withColumn("loc_cutoff_last", F.last("rank").over(Window.partitionBy("loc_cutoff").orderBy("loc_cutoff")))
dfs = dfs.withColumn("loc_cutoff", F.when((F.col("loc_cutoff") == 1) & (F.col("rank") == F.col("loc_cutoff_last")), F.lit(-1)).otherwise(F.col("loc_cutoff")))
dfs = dfs.drop("loc_cutoff_last")

rank_cutoff = dfs.filter(F.col("loc_cutoff") == 1).orderBy(F.desc("rank")).limit(1).select("rank").collect()[0][0]
txt += f"Rank at cutoff: {rank_cutoff}\n"
sum_before_cutoff = dfs.filter(F.col("loc_cutoff") == -1).orderBy(F.asc("rank")).limit(1).select("cml_cnt").collect()[0][0]
cnt_new_flat = (n_samples_limit - sum_before_cutoff)/rank_cutoff
txt += f"New samples count for groups with rank 1 to {rank_cutoff} (inclusive): {cnt_new_flat}\n"
dfs = dfs.withColumn("cnt_new", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)).otherwise(F.col("cnt").cast("float")))
txt += f"Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are.\n"
dfs = dfs.withColumn(
        "cml_cnt_new",
        F.sum("cnt_new").over(Window.orderBy(F.desc("rank")).rowsBetween(Window.unboundedPreceding, 0)),
).orderBy(F.asc("rank"))

dfs = dfs.withColumn("sampling_ratio", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)/F.col("cnt")).otherwise(F.lit(1.0)))
txt += f"Data from groups with rank 1 to {rank_cutoff} (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:"
print(txt)
dfs.show(truncate=False)
Initial data composition:
group_data cnt cml_cnt cnt_tot
group_b 7 7 78
group_c 8 15 78
group_g 9 24 78
group_d 9 33 78
group_f 10 43 78
group_a 12 55 78
group_e 23 78 78
Desired number of samples as parameter: 65
Rank at cutoff: 2
New samples count for groups with rank 1 to 2 (inclusive): 11.0
Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are.
Data from groups with rank 1 to 2 (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:
group_data cnt cml_cnt cnt_tot rank cml_cnt + (rank-1)*F.lag(cnt, 1) cnt_tot_after_top_cut loc_cutoff cnt_new cml_cnt_new sampling_ratio
group_e 23 78 78 1 78 + (1 - 1) * 78 78 1 11.0 65.0 0.4782608695652174
group_a 12 55 78 2 55 + (2 - 1) * 23 78 1 11.0 54.0 0.9166666666666666
group_f 10 43 78 3 43 + (3 - 1) * 12 67 -1 10.0 43.0 1.0
group_d 9 33 78 4 33 + (4 - 1) * 10 63 0 9.0 33.0 1.0
group_g 9 24 78 5 24 + (5 - 1) * 9 60 0 9.0 24.0 1.0
group_c 8 15 78 6 15 + (6 - 1) * 9 60 0 8.0 15.0 1.0
group_b 7 7 78 7 7 + (7 - 1) * 8 55 0 7.0 7.0 1.0

7. Structures

7.1. To convert a map to a struct

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.sql("SELECT map('John', 1, 'Michael', 2) as Students_map")
df = df.withColumn("Students_struct", F.map_entries("Students_map").alias("a", "b"))
df.printSchema()
df.show(truncate=False)

Schema of df is:

root
 |-- Students_map: map (nullable = false)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)
 |-- Students_struct: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- key: string (nullable = false)
 |    |    |-- value: integer (nullable = false)
Students_map Students_struct
{John -> 1, Michael -> 2} [{John, 1}, {Michael, 2}]

7.2. To extract a field from a struct as a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
        T.StructField('Student', T.StructType([
             T.StructField('First name', T.StringType(), True),
             T.StructField('Middle name', T.StringType(), True),
             T.StructField('Last name', T.StringType(), True)
             ])),
         T.StructField('ID', T.StringType(), True),
         T.StructField('Gender', T.StringType(), True),
         T.StructField('Score', T.IntegerType(), True)
         ])

data = [
    (("John", "", "Doe"),"1007", "M", 75),
    (("Adam", "Scott", "Smith"),"1008", "M", 55),
    (("Marie", "", "Carpenter"), "1004", "F", 67),
    (("Samantha", "Louise", "Herbert"),"1002", "F", 90),
    (("Craig", "", "Brown"), "1011", "M" , 88)
  ]
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df = df.withColumn("First name", F.col("Student.First Name"))
df = df.withColumn("Last name", F.upper(F.col("Student").getField("Last name")))
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Score: integer (nullable = true)
Student ID Gender Score First name Last name
{John, , Doe} 1007 M 75 John DOE
{Adam, Scott, Smith} 1008 M 55 Adam SMITH
{Marie, , Carpenter} 1004 F 67 Marie CARPENTER
{Samantha, Louise, Herbert} 1002 F 90 Samantha HERBERT
{Craig, , Brown} 1011 M 88 Craig BROWN

7.3. To process an an array of structs using F.transform() and .getField()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define schema with a struct that includes an array of structs for subjects
schema = T.StructType([
    T.StructField('Student', T.StructType([
        T.StructField('First name', T.StringType(), True),
        T.StructField('Middle name', T.StringType(), True),
        T.StructField('Last name', T.StringType(), True)
    ])),
    T.StructField('ID', T.StringType(), True),
    T.StructField('Gender', T.StringType(), True),
    T.StructField('Score', T.IntegerType(), True),
    T.StructField('Subjects and scores', T.ArrayType(
        T.StructType([
            T.StructField('Subject name', T.StringType(), True),
            T.StructField('Subject score', T.IntegerType(), True)
        ])
    ), True)
])

# Define data with an array of subjects for each student
data = [
    (("John", "", "Doe"), "1007", "M", 75, [("Math", 78), ("Science", 82)]),
    (("Adam", "Scott", "Smith"), "1008", "M", 55, [("Math", 55), ("English", 65)]),
    (("Marie", "", "Carpenter"), "1004", "F", 67, [("Math", 72), ("Science", 68), ("History", 75)]),
    (("Samantha", "Louise", "Herbert"), "1002", "F", 90, [("Math", 92), ("Science", 88), ("English", 91)]),
    (("Craig", "", "Brown"), "1011", "M", 88, [("Math", 85), ("Science", 90)])
]

df = spark.createDataFrame(data=data, schema=schema)

df.printSchema()

df = df.withColumn("Subjects taken", F.transform("Subjects and scores", lambda x: x.getField("Subject name")))
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Subjects and scores: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Subject name: string (nullable = true)
 |    |    |-- Subject score: integer (nullable = true)
Student ID Gender Score Subjects and scores Subjects taken
{John, , Doe} 1007 M 75 [{Math, 78}, {Science, 82}] [Math, Science]
{Adam, Scott, Smith} 1008 M 55 [{Math, 55}, {English, 65}] [Math, English]
{Marie, , Carpenter} 1004 F 67 [{Math, 72}, {Science, 68}, {History, 75}] [Math, Science, History]
{Samantha, Louise, Herbert} 1002 F 90 [{Math, 92}, {Science, 88}, {English, 91}] [Math, Science, English]
{Craig, , Brown} 1011 M 88 [{Math, 85}, {Science, 90}] [Math, Science]

7.4. To process an array of nested structs using F.transform() and .getField()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define the schema with nested structs
schema = T.StructType([
  T.StructField('Student', T.StructType([
      T.StructField('First name', T.StringType(), True),
      T.StructField('Middle name', T.StringType(), True),
      T.StructField('Last name', T.StringType(), True)
  ])),
  T.StructField('ID', T.StringType(), True),
  T.StructField('Gender', T.StringType(), True),
  T.StructField('Subjects', T.ArrayType(T.StructType([
      T.StructField('Subject Name', T.StringType(), True),
      T.StructField('Assessments', T.ArrayType(T.StructType([
          T.StructField('Assessment Name', T.StringType(), True),
          T.StructField('Score', T.IntegerType(), True)
      ])), True)
  ])), True)
])

# Sample data matching the new schema
data = [
  (("John", "", "Doe"), "1007", "M", [
      ("Math", [("Midterm", 70), ("Final", 80)]),
      ("Science", [("Midterm", 65), ("Final", 75)])
  ]),
  (("Adam", "Scott", "Smith"), "1008", "M", [
      ("Math", [("Midterm", 50), ("Final", 60)]),
      ("Science", [("Midterm", 55), ("Final", 65)])
  ]),
  (("Marie", "", "Carpenter"), "1004", "F", [
      ("Math", [("Midterm", 60), ("Final", 75)]),
      ("Science", [("Midterm", 68), ("Final", 70)])
  ]),
  (("Samantha", "Louise", "Herbert"), "1002", "F", [
      ("Math", [("Midterm", 88), ("Final", 92)]),
      ("Science", [("Midterm", 85), ("Final", 89)])
  ]),
  (("Craig", "", "Brown"), "1011", "M", [
      ("Math", [("Midterm", 78), ("Final", 85)]),
      ("Science", [("Midterm", 80), ("Final", 84)])
  ])
]

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

df = df.withColumn("Scores", F.transform("Subjects", lambda x: x["Assessments"].getField("Score")))
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Subjects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Subject Name: string (nullable = true)
 |    |    |-- Assessments: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Assessment Name: string (nullable = true)
 |    |    |    |    |-- Score: integer (nullable = true)
Student ID Gender Subjects Scores
{John, , Doe} 1007 M [{Math, [{Midterm, 70}, {Final, 80}]}, {Science, [{Midterm, 65}, {Final, 75}]}] [[70, 80], [65, 75]]
{Adam, Scott, Smith} 1008 M [{Math, [{Midterm, 50}, {Final, 60}]}, {Science, [{Midterm, 55}, {Final, 65}]}] [[50, 60], [55, 65]]
{Marie, , Carpenter} 1004 F [{Math, [{Midterm, 60}, {Final, 75}]}, {Science, [{Midterm, 68}, {Final, 70}]}] [[60, 75], [68, 70]]
{Samantha, Louise, Herbert} 1002 F [{Math, [{Midterm, 88}, {Final, 92}]}, {Science, [{Midterm, 85}, {Final, 89}]}] [[88, 92], [85, 89]]
{Craig, , Brown} 1011 M [{Math, [{Midterm, 78}, {Final, 85}]}, {Science, [{Midterm, 80}, {Final, 84}]}] [[78, 85], [80, 84]]

8. Dataframe join operations

8.1. To perform a full, outer, left, right join operations

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Score", T.IntegerType(), True),
    ]
)
data = [("Alice", 10),
        ("Bob", 11)
        ]
df_a = spark.createDataFrame(schema=schema, data=data)
print("Table A:")
df_a.show()

schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Surname", T.StringType(), True),
        T.StructField("Age", T.StringType(), True),
    ]
)
data = [("Alice", "Doe", 12),
        ("Alice", "Smith", 30),
        ("Jane", "Carter", 7),
]
df_b = spark.createDataFrame(schema=schema, data=data)
print("Table B:")
df_b.show()

df = df_a.join(df_b, on="Name", how="full")
print("Full join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="outer")
print("Outer join on 'Name':")
df.show()

df = df_a.join(df_b, df_a["Name"] == df_b["Name"])
print("Join on 'Name' on equal condition:")
df.show()

df = df_a.join(df_b, on="Name", how="inner")
print("Inner join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left")
print("Left join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_outer")
print("Left-outer join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_anti")
print("Left-anti join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_semi")
print("Left-semi join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right")
print("Right join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right_outer")
print("Right-outer join on 'Name':")
df.show()

Table A:

Name Score
Alice 10
Bob 11

Table B:

Name Surname Age
Alice Doe 12
Alice Smith 30
Jane Carter 7

Full join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7

Outer join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7

Join on 'Name' on equal condition:

Name Score Name Surname Age
Alice 10 Alice Doe 12
Alice 10 Alice Smith 30

Inner join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30

Left join on 'Name':

Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12

Left-outer join on 'Name':

Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12

Left-anti join on 'Name':

Name Score
Bob 11

Left-semi join on 'Name':

Name Score
Alice 10

Right join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7

Right-outer join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7

8.2. To drop one of the duplicate columns after join

from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df_a = spark.createDataFrame([
  Row(id=1, value="A1"),
  Row(id=1, value="B1"),
  Row(id=1, value="C1"),
  Row(id=2, value="A1"),
  Row(id=2, value="X1"),
  Row(id=2, value="Y1")]
)
print("Dataframe df_a:")
df_1.show()

df_b = spark.createDataFrame([
  Row(id=1, updated="A2"),
  Row(id=1, updated="B1"),
  Row(id=1, updated="C1"),
  Row(id=2, updated="A1"),
  Row(id=2, updated="X1"),
  Row(id=2, updated="Y1")]
)
print("Dataframe df_b:")
df_2.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full")
print("Full join on df_a['value'] == df_b['updated']:")
df.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full").drop(df_b["id"])
print("Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:")
df.show()

Dataframe df_a:

id value
1 A1
1 B1
1 C1
2 A1
2 X1
2 Y1

Dataframe df_b:

id updated
1 A2
1 B1
1 C1
2 A1
2 X1
2 Y1

Full join on df_a['value'] == df_b['updated']:

id value id updated
1.0 A1 null null
null null 1.0 A2
1.0 B1 1.0 B1
1.0 C1 1.0 C1
2.0 A1 2.0 A1
2.0 X1 2.0 X1
2.0 Y1 2.0 Y1

Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:

id value updated
1.0 A1 null
null null A2
1.0 B1 B1
1.0 C1 C1
2.0 A1 A1
2.0 X1 X1
2.0 Y1 Y1

9. Aggregation and maps

9.1. To group by and aggregate into a map using F.map_from_entries()

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show(truncate=False)
dft = df.groupBy("id").agg(F.map_from_entries(F.collect_list(
          F.struct("key", "value"))).alias("key_value")
)
print("Dataframe with key -> value mapping")
dft.show(truncate=False)
dft.printSchema()

Dataframe with keys and values:

id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1

Dataframe with key -> value mapping

id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {a -> A1, x -> X1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

9.2. To group by and aggregate into a map using UDF

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.types as T

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show()

@F.udf(returnType=T.MapType(T.StringType(), T.StringType()))
def map_array(column):
    return dict(column)

dft = (df.groupBy("id")
   .agg(F.collect_list(F.struct("key", "value")).alias("key_value"))
   .withColumn('key_value', map_array('key_value')))
print("Dataframe with keys and values:")
dft.show(truncate=False)
dft.printSchema()

Dataframe with keys and values:

id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1

Dataframe with keys and values:

id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {x -> X1, a -> A1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

9.3. To agregate over multiple columns and sum values of dictionaries

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

df_schema = T.StructType([T.StructField('clid', T.StringType(), True),
                        T.StructField('coef_1', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_2', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_3', T.MapType(T.StringType(), T.DoubleType(), True), False)])
df_data = [["X", {'B': 0.4, 'C': 0.4}, {'B': 0.33, 'C': 0.5}, {'A': 0.5, 'C': 0.33}],
           ["Y", {'B': 0.67, 'C': 0.33}, {'B': 0.85}, {'A': 0.4, 'C': 0.57}],
           ]
spark = SparkSession.builder\
        .appName("Parse DataFrame Schema")\
        .getOrCreate()
df = spark.createDataFrame(data=df_data, schema=df_schema)

df = df.withColumn("coef_total", F.col("coef_1"))
for i in range(2,4):
    df = df.withColumn("coef_total", F.map_zip_with("coef_total", f"coef_{i}",
                      lambda k, v1, v2: F.when(v1.isNull(), 0).otherwise(v1) + F.when(v2.isNull(), 0).otherwise(v2)))
df.show(truncate=False)
clid coef_1 coef_2 coef_3 coef_total
X {B -> 0.4, C -> 0.4} {B -> 0.33, C -> 0.5} {A -> 0.5, C -> 0.33} {B -> 0.73, C -> 1.23, A -> 0.5}
Y {B -> 0.67, C -> 0.33} {B -> 0.85} {A -> 0.4, C -> 0.57} {B -> 1.52, C -> 0.8999999999999999, A -> 0.4}

10. Groups, aggregations and window operations

10.1. To get 2nd smallest element in a group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
w = Window.partitionBy("Location").orderBy(F.asc("Quantity"))
df = df.withColumn("rank", F.rank().over(w))
df.show()
print("Products with the 2nd smallest quantity in a location:")
df = df.filter(F.col("rank") == 2)
df.show()
Location Product Quantity rank
Home Monitor 7 1
Home Mouse 8 2
Home Keyboard 9 3
Home Laptop 12 4
Office Mouse 9 1
Office Monitor 10 2
Office Laptop 23 3

Products with the 2nd smallest quantity in a location:

Location Product Quantity rank
Home Mouse 8 2
Office Monitor 10 2

11. Sampling rows

11.1. To sample rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),
        (4, "Home"),
        (5, "Office"),
        (6, "Office"),
        (7, "Office"),
        (8, "Mall"),
        (9, "Mall"),
        (10, "School")]
df = spark.createDataFrame(schema=schema, data=data).repartition(3)
df = df.withColumn("partition", F.spark_partition_id()).orderBy("index")
print("Original dataframe:")
df.show()

print("Sampled dataframe:")
dft = df.sample(fraction=0.5, seed=1).orderBy("index")
dft.show()

Original dataframe:

index value partition
1 Home 1
2 School 0
3 Home 0
4 Home 2
5 Office 2
6 Office 2
7 Office 1
8 Mall 0
9 Mall 1
10 School 0

Sampled dataframe:

index value partition
3 Home 0
4 Home 2
7 Office 1
8 Mall 0
9 Mall 1

12. UUID generation

12.1. To generate a UUID for every row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import random
import uuid

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
    ]
)
data = [["Alice"],
        ["Bon"],
        ["John"],
        ["Cecile"]
        ]
df = spark.createDataFrame(schema=schema, data=data).repartition(2)

def _generate_uuid(uuid_gen, v=10):
    def _replace_byte(value: int, byte: int):
        byte = byte & 0xF
        bit_shift = 76
        mask = ~(0xF << bit_shift)
        return value & mask | (byte << bit_shift)

    uuid_ = uuid_gen.generate()
    return uuid.UUID(int=(_replace_byte(uuid_.int, v)))

class RandomDistributedUUIDGenerator:
    def generate(self):
        return uuid.uuid4()

class SeedBasedUUIDGenerator:
    def __init__(self, seed):
        self.rnd = random.Random(seed)

    def generate(self):
        return uuid.UUID(int=self.rnd.getrandbits(128), version=4)

gen = RandomDistributedUUIDGenerator()
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_random_distributed", udf_generate_uuid())

seed_for_rng = 1
gen = SeedBasedUUIDGenerator(seed_for_rng)
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_seed_based", udf_generate_uuid())

print("The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.")
df.show(truncate=False)

The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.

Name UUID_random_distributed UUID_seed_based
John 4e9a3bb1-a189-a25e-8389-7f8382635b09 cd613e30-d8f1-aadf-91b7-584a2265b1f5
Bon 16cd1549-0c74-a483-9bbe-707e59e0796f 1e2feb89-414c-a43c-9027-c4d1c386bbc4
Cecile b8b05619-6004-aa75-b98b-7e1c83c9f301 cd613e30-d8f1-aadf-91b7-584a2265b1f5
Alice b1f1a9fb-feb9-a946-9171-3e7cb577fdaa 1e2feb89-414c-a43c-9027-c4d1c386bbc4

Author: Altynbek Isabekov

Created: 2024-11-25 Mon 08:49

Validate