PySpark Cookbook

PySpark Cookbook

1. Introduction

1.1. To create an empty dataframe

import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = []
df = spark.createDataFrame(schema=schema, data=data)
df.show()
A B

1.2. To create a dataframe with columns key and value from a dictionary

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df.show()
key value
key1 value1
key2 value2

1.3. To duplicate a column

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df = df.withColumn("value_dup", F.col("value"))
df.show()
key value value_dup
key1 value1 value1
key2 value2 value2

1.4. To rename a column using .withColumnRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnRenamed("key", "new_key") \
        .withColumnRenamed("value","new_value")
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.5. To rename a column using .withColumnsRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnsRenamed({"key": "new_key", "value": "new_value"})
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.6. To rename a column using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select(F.col("key").alias("new_key"), F.col("value").alias("new_value"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.7. To rename columns by adding a prefix

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),]
df = spark.createDataFrame(schema=schema, data=data)
print("Original dataframe:")
df.show()
print("Dataframe with renamed columns:")
df = df.select(*[F.col(k).alias(f"prefix_{k}") for k in df.columns])
df.show()

Original dataframe:

index value
1 Home
2 School
3 Home

Dataframe with renamed columns:

prefix_index prefix_value
1 Home
2 School
3 Home

1.8. To drop columns from a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)

df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()

df = df.drop("value", "const")
print("Modified dataframe:")
df.show()

Original dataframe:

key value const
key1 value1 1
key2 value2 1

Modified dataframe:

key
key1
key2

1.9. To subset columns of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()
print("Subset 'key', 'value' columns:")
df["key", "value"].show()
print("Subset 'key', 'const' columns:")
df.select("key", "const").show()

Original dataframe:

key value const
key1 value1 1
key2 value2 1

Subset 'key', 'value' columns:

key value
key1 value1
key2 value2

Subset 'key', 'const' columns:

key const
key1 1
key2 1

1.10. To add a column with a constant value using F.lit()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.withColumn("const_integer", F.lit(1))
df = df.withColumn("const_string", F.lit("string"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

key value const_integer const_string
key1 value1 1 string
key2 value2 1 string

1.11. To add a column with a constant value using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select("key", "value", F.lit("const_str").alias("constant_value"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

key value constant_value
key1 value1 const_str
key2 value2 const_str

1.12. To create a dataframe from a list of tuples

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

1.13. To get the number of rows of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
num_rows = df.count()
print(f"df has {num_rows} rows")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

df has 3 rows

1.14. To select first N rows

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
print("These are first 2 rows:")
df.limit(2).show()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

These are first 2 rows:

integer characters
1 [A, B]
2 [C, D]

1.15. To deduplicate rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("key", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
        T.StructField("comment", T.StringType(), True),
    ]
)
data = [(1, "Home", "a house"),
        (1, "Home", "a house"),
        (2, "School", "a building"),
        (2, "School", "a house"),
        (3, "Home", "a house"),]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print("Dataframe with distinct rows:")
df.distinct().show()

print("Dataframe with dropped duplicate rows:")
df.dropDuplicates().show()

print("Dataframe with dropped duplicates in columns 'key' and 'value':")
df = df.dropDuplicates(subset=["key", "value"])
df.show()

Original dataframe:

key value comment
1 Home a house
1 Home a house
2 School a building
2 School a house
3 Home a house

Dataframe with distinct rows:

key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house

Dataframe with dropped duplicate rows:

key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house

Dataframe with dropped duplicates in columns 'key' and 'value':

key value comment
1 Home a house
2 School a building
3 Home a house

1.16. To convert a column to a list using a lambda function

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").rdd.map(lambda r: r[0]).collect()
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.17. To convert a dataframe to a list of dictionaries corresponding to every row

import pprint
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst_dict = df.rdd.map(lambda row: row.asDict()).collect()
print(f"Dataframe is represented as:\n")
txt = pprint.pformat(lst_dict)
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Dataframe is represented as:

[{'characters': ['A', 'B'], 'integer': 1},
 {'characters': ['C', 'D'], 'integer': 2},
 {'characters': ['E', 'F'], 'integer': 3}]

1.18. To convert a column to a list using list comprehension

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = [k["integer"] for k in df.select("integer").rdd.collect()]
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.19. To convert a column to a list using Pandas

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").toPandas()["integer"].tolist()
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.20. To display full width of a column (do not truncate)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("sentence", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["A", "very", "long", "sentence"],),
        (["with", "many", "words", "."],)]
df = spark.createDataFrame(schema=schema, data=data)

print("Truncated output (default behavior):")
df.show()

print("Truncated to 15 characters output:")
df.show(truncate=15)

print("Non-truncated output (show all):")
df.show(truncate=False)

Truncated output (default behavior):

sentence
[A, very, long, s…
[with, many, word…

Truncated to 15 characters output:

sentence
[A, very, lo…
[with, many,…

Non-truncated output (show all):

sentence
[A, very, long, sentence]
[with, many, words, .]

2. Filtering rows

2.1. To filter based on values of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", None),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", None),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print('Filter: F.col("Location" == "Home")')
dft = df.filter(F.col("Location") == "Home")
dft.show()

print('Filter: F.col("Quantity").isNull()')
dft = df.filter(F.col("Quantity").isNull())
dft.show()

print('Filter: F.col("Quantity").isNotNull()')
dft = df.filter(F.col("Quantity").isNotNull())
dft.show()

print('Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))')
dft = df.filter((F.col("Location") == "Home") & (F.col("Product") == "Laptop"))
dft.show()

print('Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))')
dft = df.filter((F.col("Location") == "Home") & ~(F.col("Product") == "Laptop"))
dft.show()

print('Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))')
dft = df.filter((F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))
dft.show()

print('Filter: F.col("Product").isin(["Laptop", "Mouse"])')
dft = df.filter(F.col("Product").isin(["Laptop", "Mouse"]))
dft.show()

Original dataframe:

Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9
Office Laptop null
Office Monitor 10
Office Mouse 9

Filter: F.col("Location" == "Home")

Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9

Filter: F.col("Quantity").isNull()

Location Product Quantity
Home Monitor null
Office Laptop null

Filter: F.col("Quantity").isNotNull()

Location Product Quantity
Home Laptop 12
Home Keyboard 9
Office Monitor 10
Office Mouse 9

Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))

Location Product Quantity
Home Laptop 12

Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))

Location Product Quantity
Home Monitor null
Home Keyboard 9

Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))

Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9

Filter: F.col("Product").isin(["Laptop", "Mouse"])

Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9

3. Array operations

3.1. To create arrays of different lengths

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B")
dft.show()
A B
[1, 2] [2, 3, 4, 5]
[4, 5, 6] [2, 3, 4, 5]

3.2. To calculate set difference

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)

dft = df.select("A", "B",
          F.array_except("A", "B").alias("A\B"),
          F.array_except("B", "A").alias("B\A"))
dft.show()
A B A\B B\A
[b, a, c] [c, d, a, f] [b] [d, f]

3.3. To calculate set union

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B",
          F.array_union("A", "B").alias("A U B"))
dft.show()
A B A U B
[b, a, c] [c, d, a, f] [b, a, c, d, f]

3.4. To calculate set intersection

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B", F.array_intersect("A", "B").alias("A \u2229 B"))
dft.show()
A B A ∩ B
[b, a, c] [c, d, a, f] [a, c]

3.5. To pad arrays with value

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
n = 4
fill_value = 0
df1 = df.withColumn("A_padding", F.expr(f"array_repeat({fill_value}, {n} - size(A))"))
df1 = df1.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df1.select("A", "A_padding", "A_padded")
dft.show()

df2 = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df2 = df2.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df2.select("A", "A_padding", "A_padded")
dft.show()
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]

3.6. To sum two arrays elementwise using F.element_at()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr('transform(A_padded, (element, index) -> element + element_at(B, index + 1))'))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]

3.7. To sum two arrays using F.arrays_zip()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr("transform(arrays_zip(A_padded, B), x -> x.A_padded + x.B)"))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]

3.8. To find mode of an array (most common element)

from collections import Counter
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 2, 4],),
        ([4, 5, 6, 7],),
        ([1, 1, 2, 2],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf
def udf_mode(x):
    return Counter(x).most_common(1)[0][0]

dft = df.withColumn("mode", udf_mode("A"))
dft.printSchema()
dft.show()

Schema of dft is:

root
 |-- A: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- mode: string (nullable = true)
A mode
[1, 2, 2, 4] 2
[4, 5, 6, 7] 4
[1, 1, 2, 2] 1

3.9. To apply a function to every element of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words_with_suffixes", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen_10", "note_11", "bottle_12"],), (["apple_13", "orange_14", "lemon_15"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("words", F.transform("words_with_suffixes", lambda x: F.split(x, "_").getItem(0)))
df.show(truncate=False)
words_with_suffixes words
[pen_10, note_11, bottle_12] [pen, note, bottle]
[apple_13, orange_14, lemon_15] [apple, orange, lemon]

3.10. To calculate cumulative sum of elements in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("arr", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 2, 4],),
        ([4, 5, 6, 7],),
        ([1, 1, 2, 2],)]
df = spark.createDataFrame(schema=schema, data=data)

empty_int_array = F.array().cast(T.ArrayType(T.IntegerType()))

df = df.withColumn("cum_sum",
           F.aggregate(
             F.col("arr"),
             empty_int_array,
             lambda acc, x: F.concat(
                 acc,
                 F.array(x + F.coalesce(F.element_at(acc, -1), F.lit(0).cast("int")))
              )
            )
         )

df.printSchema()
df.show()

Schema of df is:

root
 |-- arr: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- cum_sum: array (nullable = true)
 |    |-- element: integer (containsNull = true)
arr cum_sum
[1, 2, 2, 4] [1, 3, 5, 9]
[4, 5, 6, 7] [4, 9, 15, 22]
[1, 1, 2, 2] [1, 2, 4, 6]

3.11. To deduplicate elements in an array (find unique/distinct elements)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen", "note", "pen"],), (["apple", "apple", "lemon"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("unique_words", F.array_distinct("words"))
df.show(truncate=False)
words unique_words
[pen, note, pen] [pen, note]
[apple, apple, lemon] [apple, lemon]

3.12. To create a map (dictionary) from two arrays (one with keys, one with values)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("keys", T.ArrayType(T.IntegerType()), True),
        T.StructField("values", T.ArrayType(T.StringType()), True),
    ]
)
data = [([1, 2, 3], ["A", "B", "C"])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("map_kv", F.map_from_arrays("keys", "values"))
df.show(truncate=False)
keys values map_kv
[1, 2, 3] [A, B, C] {1 -> A, 2 -> B, 3 -> C}

3.13. To calculate mean of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("mean", F.aggregate(
          "values",                                  # column
          F.lit(0),                                  # initialValue
          lambda acc, x: acc + x,                    # merge operation
          lambda acc: acc / F.size(F.col("values")), # finish
      ))
df.show()
values mean
[1, 2] 1.5
[4, 5, 6] 5.0

3.14. To remove NULL values from an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2, None],),
        ([4, 5, None, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("values_without_nulls", F.array_compact("values"))
df.show()
values values_without_nulls
[1, -2, NULL] [1, -2]
[4, 5, NULL, 6] [4, 5, 6]

3.15. To find out whether an array has any negative elements

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("any_negative", F.exists("values", lambda x: x < 0))
df.show()
values any_negative
[1, -2] true
[4, 5, 6] false

3.16. To convert elements of an array to columns

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 3, 4],),
        ([5, 6, 7],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("first", F.col("A").getItem(0))
dft = df.select("A", "first", *[F.col("A").getItem(k).alias(f"element_{k+1}") for k in range(1,4)])
dft.show()
A first element_2 element_3 element_4
[1, 2, 3, 4] 1 2 3 4
[5, 6, 7] 5 6 7 null

3.17. To find location of the first occurence of an element in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([7, 4, 7],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("position", F.array_position(F.col("values"), 7))
df.show()
values position
[1, 7, 5] 2
[7, 4, 7] 1

3.18. To calculate moving difference of two consecutive elements in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 5],),
        ([4, 4, 6],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.pandas_udf(T.ArrayType(T.IntegerType()))
def diff2e(x: pd.Series) -> pd.Series:
    return x.apply(lambda x: (x[1:] - x[:-1]))

df = df.withColumn("diff2e", diff2e(F.col("values")))

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("ediff1d", diff_of_two_consecutive_elements(F.col("values")))
df = df.withColumn("diff_transform",
         F.transform(F.slice("values", start=2, length=(F.size("values") - F.lit(1))),
                     lambda x, i: x - F.col("values").getItem(i))
                   )

df = df.withColumn("diff_expr",
  F.expr("""
      transform(
          slice(values, 2, size(values) - 1),
          (x, i) -> x - values[i]
      )
  """)
)
df.show()
values diff2e ediff1d diff_transform diff_expr
[1, 2, 5] [1, 3] [1, 3] [1, 3] [1, 3]
[4, 4, 6] [0, 2] [0, 2] [0, 2] [0, 2]

3.19. To find a union of all unique elements of multiple arrays in a group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("group", T.StringType(), True),
    T.StructField("id", T.StringType(), True),
    T.StructField("values", T.ArrayType(T.IntegerType()), True),
])

data = [
    ("group_1", "A", [4, 1, 2]),
    ("group_1", "B", [1, 0, 3, 1]),
    ("group_2", "C", [5, 6, 7, 5]),
    ("group_2", "D", [7, 6, 9])
]

df = spark.createDataFrame(data=data, schema=schema)
df.show(truncate=False)

# Flatten arrays within groups and calculate unique elements
dfs = (
    df.groupBy("group")
    .agg(F.collect_list("values").alias("list_of_lists"))
    .withColumn("all_values", F.flatten("list_of_lists"))
    .withColumn("unique_values", F.array_distinct("all_values"))
    .select("group", "list_of_lists", "all_values", "unique_values")
)

dfs.show(truncate=False)
group id values
group_1 A [4, 1, 2]
group_1 B [1, 0, 3, 1]
group_2 C [5, 6, 7, 5]
group_2 D [7, 6, 9]
group list_of_lists all_values unique_values
group_1 [[4, 1, 2], [1, 0, 3, 1 ]] [4, 1, 2, 1, 0, 3, 1] [4, 1, 2, 0, 3]
group_2 [[5, 6, 7, 5], [7, 6, 9 ]] [5, 6, 7, 5, 7, 6, 9] [5, 6, 7, 9]

3.20. To slice an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5, 2],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("values[1:3]", F.slice("values", start=2, length=2))
df.show()
values values[1:3]
[1, 7, 5, 2] [7, 5]
[6, 4, 7, 3] [4, 7]

3.21. To slice an array dynamically

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)
start_idx = 2
df = df.withColumn("values[1:]", F.slice("values", start=2, length=(F.size("values") - F.lit(start_idx - 1))))
df.show()
values values[1:]
[1, 7, 5] [7, 5]
[6, 4, 7, 3] [4, 7, 3]

4. Text processing

4.1. To remove prefix from a string

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("text", T.StringType(), True),
    ]
)
data = [("id_orange",),
        ("apple",)]
df = spark.createDataFrame(schema=schema, data=data)
remove_prefix = F.udf(lambda x: x[3:] if x[:3] == "id_" else x, T.StringType())
df = df.withColumn("no_prefix_udf", remove_prefix(F.col("text")))
df = df.withColumn("no_prefix_rgx", F.regexp_replace("text", "id_", ""))
df.show()
text no_prefix_udf no_prefix_rgx
id_orange orange orange
apple apple apple

4.2. To deduplicate identical consecutive characters using F.regexp_replace()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["aaaabbccc"], ["bbbccaaa"]]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn('Shortened', F.regexp_replace("String", "([ab])\\1+", "$1"))
df.show(truncate=False)
String Shortened
aaaabbccc abccc
bbbccaaa bcca

4.3. To deduplicate identical consecutive characters using a UDF

import re
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["aaaabbccc"], ["bbbccaaa"]]

@F.udf(returnType=T.StringType())
def remove_consecutive_duplicate_characters_ab(s):
    return re.sub(r'([ab])\1+', r'\1', s)

df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn('Shortened', remove_consecutive_duplicate_characters_ab("String"))
df.show(truncate=False)
String Shortened
aaaabbccc abccc
bbbccaaa bcca

4.4. To split a string into letters (characters) using regex

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select('String', F.split('String', '(?!$)').alias("Characters"))
dft.show(truncate=False)
String Characters
This is [T, h, i, s, , i, s]

4.5. To concatenate columns with strings using a separator

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str_Concat", F.concat_ws( "_", "Str1", "Str2"))
df.show()
Str1 Str2 Str_Concat
This is a string This is_a string
on a different row on a_different row

4.6. To split a string into letters (characters) using split function

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
fsplit = F.expr("split(String, '')")
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)
String Characters
This is [T, h, i, s, , i, s]

4.7. To split a string into letters (characters) and remove last character

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is_"]]
df = spark.createDataFrame(schema=schema, data=data)
print("Using split function and remove last character:")
fsplit = "split(String, '')"
fsplit = F.expr(f'slice({fsplit}, 1, size({fsplit}) - 1)')
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)

Using split function and remove last character:

String Characters
This is_ [T, h, i, s, , i, s]

4.8. To append a string to all values in a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str1_with_prefix", F.concat(F.lit("Prefix_"), "Str1"))
dft = df.select("Str1", "Str1_with_prefix")
dft.show()
Str1 Str1_with_prefix
This is Prefix_This is
on a Prefix_on a

5. Time operations

5.1. To convert Unix time stamp to human readable format

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("timestamp", T.LongType(), True),
    ]
)
data = [(1703224755,),
        (1703285602,)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("time_stamp_hrf", F.from_unixtime(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
df.show()
timestamp time_stamp_hrf
1703224755 2023-12-22 06:59:15
1703285602 2023-12-22 23:53:22

5.2. To detect log in and log out timestamps in a sequence of events

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("tstp_unix", T.LongType(), True),
    T.StructField("computer", T.StringType(), True),
    T.StructField("user", T.StringType(), True),
])

data = [
    (1703224755, "computer_A", "user_1"),
    (1703224780, "computer_A", "user_1"),
    (1703224850, "computer_A", "user_1"),
    (1703224950, "computer_B", "user_1"),
    (1703225050, "computer_B", "user_1"),
    (1703225100, "computer_B", "user_1"),
    (1703225150, "computer_A", "user_1"),
    (1703225200, "computer_B", "user_1"),
    (1703225250, "computer_B", "user_1"),
    (1703285700, "computer_C", "user_2"),
    (1703285800, "computer_C", "user_2"),
    (1703285900, "computer_C", "user_2"),
    (1703286000, "computer_C", "user_2"),
    (1703286100, "computer_D", "user_2"),
    (1703286200, "computer_D", "user_2"),
    (1703286300, "computer_D", "user_2"),
    (1703286400, "computer_D", "user_2"),
]

df = spark.createDataFrame(data=data, schema=schema)

# Add human-readable timestamp
df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss"))


# Define a window to identify session changes
window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix"))


# Previous row
prev = F.lag("computer", 1).over(window_spec)
cmp_equ_prev = (F.col("computer") != F.when(prev.isNull(), "some_random_computer").otherwise(prev))
# Detect changes in computer usage: compare with the previous row in a window
df = df.withColumn("is_first", F.when(cmp_equ_prev, 1).otherwise(0))

next = F.lag("computer", -1).over(window_spec)
cmp_equ_next = (F.col("computer") != F.when(next.isNull(), "some_random_computer").otherwise(next))
# Detect changes in computer usage: compare with the next row in a window
df = df.withColumn("is_last", F.when(cmp_equ_next, 1).otherwise(0))

df = df.withColumn("unq_grp", F.sum("is_first").over(window_spec))

cols = ["tstp_unix", "tstp_hrf", "user", "computer", "is_first", "is_last", "unq_grp"]
df.select(cols).show()

cols = ["tstp_unix", "tstp_hrf", "user", "computer", "unq_grp",
        F.col("is_first").alias("login"), F.col("is_last").alias("logout")]
df.filter((F.col("is_first") == 1) | (F.col("is_last") == 1)).select(cols).show()
tstp_unix tstp_hrf user computer is_first is_last unq_grp
1703224755 2023-12-22 06:59:15 user_1 computer_A 1 0 1
1703224780 2023-12-22 06:59:40 user_1 computer_A 0 0 1
1703224850 2023-12-22 07:00:50 user_1 computer_A 0 1 1
1703224950 2023-12-22 07:02:30 user_1 computer_B 1 0 2
1703225050 2023-12-22 07:04:10 user_1 computer_B 0 0 2
1703225100 2023-12-22 07:05:00 user_1 computer_B 0 1 2
1703225150 2023-12-22 07:05:50 user_1 computer_A 1 1 3
1703225200 2023-12-22 07:06:40 user_1 computer_B 1 0 4
1703225250 2023-12-22 07:07:30 user_1 computer_B 0 1 4
1703285700 2023-12-22 23:55:00 user_2 computer_C 1 0 1
1703285800 2023-12-22 23:56:40 user_2 computer_C 0 0 1
1703285900 2023-12-22 23:58:20 user_2 computer_C 0 0 1
1703286000 2023-12-23 00:00:00 user_2 computer_C 0 1 1
1703286100 2023-12-23 00:01:40 user_2 computer_D 1 0 2
1703286200 2023-12-23 00:03:20 user_2 computer_D 0 0 2
1703286300 2023-12-23 00:05:00 user_2 computer_D 0 0 2
1703286400 2023-12-23 00:06:40 user_2 computer_D 0 1 2
tstp_unix tstp_hrf user computer unq_grp login logout
1703224755 2023-12-22 06:59:15 user_1 computer_A 1 1 0
1703224850 2023-12-22 07:00:50 user_1 computer_A 1 0 1
1703224950 2023-12-22 07:02:30 user_1 computer_B 2 1 0
1703225100 2023-12-22 07:05:00 user_1 computer_B 2 0 1
1703225150 2023-12-22 07:05:50 user_1 computer_A 3 1 1
1703225200 2023-12-22 07:06:40 user_1 computer_B 4 1 0
1703225250 2023-12-22 07:07:30 user_1 computer_B 4 0 1
1703285700 2023-12-22 23:55:00 user_2 computer_C 1 1 0
1703286000 2023-12-23 00:00:00 user_2 computer_C 1 0 1
1703286100 2023-12-23 00:01:40 user_2 computer_D 2 1 0
1703286400 2023-12-23 00:06:40 user_2 computer_D 2 0 1

5.3. To estimate host change time based on log in and log out information in a sequence of events

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("tstp_unix", T.LongType(), True),
    T.StructField("host", T.StringType(), True),
    T.StructField("user", T.StringType(), True),
])

data = [
    (1703224755, "host_A", "user_1"),
    (1703224852, "host_A", "user_1"),
    (1703224950, "host_B", "user_1"),
    (1703225104, "host_B", "user_1"),
    (1703225150, "host_A", "user_1"),
    (1703225200, "host_B", "user_1"),
    (1703225250, "host_B", "user_1"),
    (1703285700, "host_C", "user_2"),
    (1703286000, "host_C", "user_2"),
    (1703286100, "host_D", "user_2"),
    (1703286400, "host_D", "user_2"),
]

df = spark.createDataFrame(data=data, schema=schema)

# Add human-readable timestamp
df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss"))


# Define a window to identify session changes
window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix"))

prev_tstp = F.lag("tstp_unix", 1).over(window_spec)
next_tstp = F.lag("tstp_unix", -1).over(window_spec)
prev_host = F.lag("host", 1).over(window_spec)
next_host = F.lag("host", -1).over(window_spec)

t_delta_prev = (F.col("tstp_unix") - prev_tstp)
df = df.withColumn("t_diff_prv",
                   F.when(prev_host.isNull(), 0.0).otherwise(
                         F.when(F.col("host") != prev_host, t_delta_prev).otherwise(None))
                   )

t_delta_next = (next_tstp - F.col("tstp_unix"))
df = df.withColumn("t_diff_nxt",
                   F.when(next_host.isNull(), 0.0).otherwise(
                         F.when(F.col("host") != next_host, t_delta_next).otherwise(None))
                   )

df = df.withColumn("new_tstp_prv", F.when(F.col("t_diff_prv").isNotNull(), F.col("tstp_unix") - F.col("t_diff_prv")/2 + 1))
df = df.withColumn("new_tstp_nxt", F.when(F.col("t_diff_nxt").isNotNull(), F.col("tstp_unix") + F.col("t_diff_nxt")/2))
df = df.withColumn("new_tstp_prv", F.from_unixtime("new_tstp_prv", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("new_tstp_nxt", F.from_unixtime("new_tstp_nxt", "yyyy-MM-dd HH:mm:ss"))

txt = "Calculate time delta to previous and next timestamps:"
print(txt)
cols = ["tstp_unix", "tstp_hrf", "user", "host", "t_diff_prv", "t_diff_nxt", "new_tstp_prv", "new_tstp_nxt"]
df.select(cols).show()

txt = "Combine new timestamps calculated from adding time deltas into an array:"
print(txt)
df = df.withColumn("new_tstps", F.array_compact(F.array("new_tstp_prv", "new_tstp_nxt")))
cols = ["tstp_hrf", "user", "host", "new_tstp_prv", "new_tstp_nxt", "new_tstps"]
df.select(cols).show(truncate=False)

txt = "Explode the array with new timestamps:"
print(txt)
df = df.withColumn("new_tstp", F.explode("new_tstps"))
cols = ["user", "host", "tstp_hrf", "new_tstps", "new_tstp"]
df.select(cols).show(truncate=False)
Calculate time delta to previous and next timestamps:
tstp_unix tstp_hrf user host t_diff_prv t_diff_nxt new_tstp_prv new_tstp_nxt
1703224755 2023-12-22 06:59:15 user_1 host_A 0.0 NULL 2023-12-22 06:59:16 NULL
1703224852 2023-12-22 07:00:52 user_1 host_A NULL 98.0 NULL 2023-12-22 07:01:41
1703224950 2023-12-22 07:02:30 user_1 host_B 98.0 NULL 2023-12-22 07:01:42 NULL
1703225104 2023-12-22 07:05:04 user_1 host_B NULL 46.0 NULL 2023-12-22 07:05:27
1703225150 2023-12-22 07:05:50 user_1 host_A 46.0 50.0 2023-12-22 07:05:28 2023-12-22 07:06:15
1703225200 2023-12-22 07:06:40 user_1 host_B 50.0 NULL 2023-12-22 07:06:16 NULL
1703225250 2023-12-22 07:07:30 user_1 host_B NULL 0.0 NULL 2023-12-22 07:07:30
1703285700 2023-12-22 23:55:00 user_2 host_C 0.0 NULL 2023-12-22 23:55:01 NULL
1703286000 2023-12-23 00:00:00 user_2 host_C NULL 100.0 NULL 2023-12-23 00:00:50
1703286100 2023-12-23 00:01:40 user_2 host_D 100.0 NULL 2023-12-23 00:00:51 NULL
1703286400 2023-12-23 00:06:40 user_2 host_D NULL 0.0 NULL 2023-12-23 00:06:40
Combine new timestamps calculated from adding time deltas into an array:
tstp_hrf user host new_tstp_prv new_tstp_nxt new_tstps
2023-12-22 06:59:15 user_1 host_A 2023-12-22 06:59:16 NULL [ 2023-12-22 06:59:16 ]
2023-12-22 07:00:52 user_1 host_A NULL 2023-12-22 07:01:41 [ 2023-12-22 07:01:41 ]
2023-12-22 07:02:30 user_1 host_B 2023-12-22 07:01:42 NULL [ 2023-12-22 07:01:42 ]
2023-12-22 07:05:04 user_1 host_B NULL 2023-12-22 07:05:27 [ 2023-12-22 07:05:27 ]
2023-12-22 07:05:50 user_1 host_A 2023-12-22 07:05:28 2023-12-22 07:06:15 [ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ]
2023-12-22 07:06:40 user_1 host_B 2023-12-22 07:06:16 NULL [ 2023-12-22 07:06:16 ]
2023-12-22 07:07:30 user_1 host_B NULL 2023-12-22 07:07:30 [ 2023-12-22 07:07:30 ]
2023-12-22 23:55:00 user_2 host_C 2023-12-22 23:55:01 NULL [ 2023-12-22 23:55:01 ]
2023-12-23 00:00:00 user_2 host_C NULL 2023-12-23 00:00:50 [ 2023-12-23 00:00:50 ]
2023-12-23 00:01:40 user_2 host_D 2023-12-23 00:00:51 NULL [ 2023-12-23 00:00:51 ]
2023-12-23 00:06:40 user_2 host_D NULL 2023-12-23 00:06:40 [ 2023-12-23 00:06:40 ]
Explode the array with new timestamps:
user host tstp_hrf new_tstps new_tstp
user_1 host_A 2023-12-22 06:59:15 [ 2023-12-22 06:59:16 ] 2023-12-22 06:59:16
user_1 host_A 2023-12-22 07:00:52 [ 2023-12-22 07:01:41 ] 2023-12-22 07:01:41
user_1 host_B 2023-12-22 07:02:30 [ 2023-12-22 07:01:42 ] 2023-12-22 07:01:42
user_1 host_B 2023-12-22 07:05:04 [ 2023-12-22 07:05:27 ] 2023-12-22 07:05:27
user_1 host_A 2023-12-22 07:05:50 [ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ] 2023-12-22 07:05:28
user_1 host_A 2023-12-22 07:05:50 [ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ] 2023-12-22 07:06:15
user_1 host_B 2023-12-22 07:06:40 [ 2023-12-22 07:06:16 ] 2023-12-22 07:06:16
user_1 host_B 2023-12-22 07:07:30 [ 2023-12-22 07:07:30 ] 2023-12-22 07:07:30
user_2 host_C 2023-12-22 23:55:00 [ 2023-12-22 23:55:01 ] 2023-12-22 23:55:01
user_2 host_C 2023-12-23 00:00:00 [ 2023-12-23 00:00:50 ] 2023-12-23 00:00:50
user_2 host_D 2023-12-23 00:01:40 [ 2023-12-23 00:00:51 ] 2023-12-23 00:00:51
user_2 host_D 2023-12-23 00:06:40 [ 2023-12-23 00:06:40 ] 2023-12-23 00:06:40

5.4. To round down log-in and round up log-out times of a user to midnight timestamp in a sequence of events

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("tstp_unix", T.LongType(), True),
    T.StructField("computer", T.StringType(), True),
    T.StructField("user", T.StringType(), True),
])

data = [
    (1703224755, "computer_A", "user_1"),
    (1703224780, "computer_A", "user_1"),
    (1703224850, "computer_A", "user_1"),
    (1703224950, "computer_B", "user_1"),
    (1703225050, "computer_B", "user_1"),
    (1703285700, "computer_C", "user_2"),
    (1703285800, "computer_C", "user_2"),
    (1703276000, "computer_C", "user_2"),
    (1703286300, "computer_D", "user_2"),
    (1703296400, "computer_D", "user_2"),
]

df = spark.createDataFrame(data=data, schema=schema)

# Add human-readable timestamp
df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss"))


# Define a window to identify session changes
window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix"))


# Previous row
prev = F.lag("computer", 1).over(window_spec)
df = df.withColumn("is_first", F.when(prev.isNull(), 1).otherwise(0))


rounded_down = F.from_unixtime("tstp_unix", "yyyy-MM-dd")
df = df.withColumn("rounded_tstp_first",
                   F.when(F.col("is_first") == 1, F.unix_timestamp(rounded_down, "yyyy-MM-dd")
                   ).otherwise(F.col("tstp_unix")))

# Next row
next = F.lag("computer", -1).over(window_spec)
df = df.withColumn("is_last", F.when(next.isNull(), 1).otherwise(0))

rounded_up = F.from_unixtime(F.col("tstp_unix") + 24*60*60, "yyyy-MM-dd")
df = df.withColumn("rounded_tstp_last",
                   F.when(F.col("is_last") == 1, F.unix_timestamp(rounded_up, "yyyy-MM-dd")
                   ).otherwise(F.col("tstp_unix")))

# Combine rounded values
df = df.withColumn("rounded_tstp",
                   F.when(next.isNull(), F.col("rounded_tstp_last")
                   ).otherwise(F.col("rounded_tstp_first")))
df = df.withColumn("rounded_tstp_hrf",
                   F.from_unixtime("rounded_tstp", "yyyy-MM-dd HH:mm:ss"))

cols = ["user", "computer", "tstp_unix", "tstp_hrf", "is_first", "is_last", "rounded_tstp_hrf"]
df.select(cols).show(truncate=False)
user computer tstp_unix tstp_hrf is_first is_last rounded_tstp_hrf
user_1 computer_A 1703224755 2023-12-22 06:59:15 1 0 2023-12-22 00:00:00
user_1 computer_A 1703224780 2023-12-22 06:59:40 0 0 2023-12-22 06:59:40
user_1 computer_A 1703224850 2023-12-22 07:00:50 0 0 2023-12-22 07:00:50
user_1 computer_B 1703224950 2023-12-22 07:02:30 0 0 2023-12-22 07:02:30
user_1 computer_B 1703225050 2023-12-22 07:04:10 0 1 2023-12-23 00:00:00
user_2 computer_C 1703276000 2023-12-22 21:13:20 1 0 2023-12-22 00:00:00
user_2 computer_C 1703285700 2023-12-22 23:55:00 0 0 2023-12-22 23:55:00
user_2 computer_C 1703285800 2023-12-22 23:56:40 0 0 2023-12-22 23:56:40
user_2 computer_D 1703286300 2023-12-23 00:05:00 0 0 2023-12-23 00:05:00
user_2 computer_D 1703296400 2023-12-23 02:53:20 0 1 2023-12-24 00:00:00

6. Numerical operations

6.1. To find percentage of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Laptop", 12),
        ("Monitor", 7),
        ("Mouse", 8),
        ("Keyboard", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy())*100, 2))
dft = df.select("Product", "Quantity", "%").orderBy(F.desc("Quantity"))
dft.show()
Product Quantity %
Laptop 12 33.33
Keyboard 9 25.0
Mouse 8 22.22
Monitor 7 19.44

6.2. To find percentage of a column within a group using a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy("Location"))*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44

6.3. To find percentage of a column within a group using .groupBy() and a join

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df_sum = df.groupBy("Location").agg(F.sum("Quantity").alias("Total_Quantity"))
df = df.join(df_sum, on="Location").withColumn("%", F.round(F.col("Quantity")/F.col("Total_Quantity")*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44

6.4. To add a row with total count and percentage for a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Books read", T.IntegerType(), True),
    ]
)
data = [("Alice", 12),
        ("Bob", 7),
        ("Michael", 8),
        ("Kevin", 10)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("%", F.round( F.col("Books read")/F.sum("Books read").over(Window.partitionBy())*100, 6))
dft = df.select("Name", "Books read", "%").orderBy(F.desc("Books read"))
dft = dft.union(
               dft.groupBy().agg(F.lit("Total"),
                                     F.sum("Books read").alias("Books read"),
                                     F.sum("%").alias("%"))
               )
dft.show()
Name Books read %
Alice 12 32.432432
Kevin 10 27.027027
Michael 8 21.621622
Bob 7 18.918919
Total 37 100.0

6.5. To find maximum value of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
max_val = df.select("Quantity").rdd.max()[0]
print(f"Maximum value of Quantity: {max_val}")
Location Product Quantity
Home Laptop 12
Home Monitor 7
Home Mouse 8
Home Keyboard 9
Office Laptop 23
Office Monitor 10
Office Mouse 9

Maximum value of Quantity: 23

6.6. To add a column with count of elements per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("count_per_group", F.count(F.lit(1)).over(Window.partitionBy(F.col("Location"))))
df.show()
Location Product Quantity count_per_group
Home Laptop 12 4
Home Monitor 7 4
Home Mouse 8 4
Home Keyboard 9 4
Office Laptop 23 3
Office Monitor 10 3
Office Mouse 9 3

6.7. To add a column with count of elements whose quantity is larger than some number per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
dfs = df.groupBy("Location").agg(F.sum((F.col("Quantity") >= 10).cast("int")).alias("Count of products with quantity >= 10 per location"))
dfs.show()
Location Product Quantity
Home Laptop 12
Home Monitor 7
Home Mouse 8
Home Keyboard 9
Office Laptop 23
Office Monitor 10
Office Mouse 9
Location Count of products with quantity >= 10 per location
Home 1
Office 2

6.8. To calculate minimal value of two columns per row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Updated schema to include two separate score fields
schema = T.StructType([
    T.StructField('Student', T.StructType([
        T.StructField('First name', T.StringType(), True),
        T.StructField('Middle name', T.StringType(), True),
        T.StructField('Last name', T.StringType(), True)
    ])),
    T.StructField('ID', T.StringType(), True),
    T.StructField('Gender', T.StringType(), True),
    T.StructField('Score1', T.IntegerType(), True),
    T.StructField('Score2', T.IntegerType(), True)
])

# Sample data with two scores for each student
data = [
    (("John", "", "Doe"), "1007", "M", 75, 80),
    (("Adam", "Scott", "Smith"), "1008", "M", 55, 65),
    (("Marie", "", "Carpenter"), "1004", "F", 67, 70),
    (("Samantha", "Louise", "Herbert"), "1002", "F", 90, 85),
    (("Craig", "", "Brown"), "1011", "M", 88, 92)
]

df = spark.createDataFrame(data=data, schema=schema)
# Calculate the minimum score between Score1 and Score2 and store it in a new column 'MinScore'
df = df.withColumn("MinScore", F.least(F.col("Score1"), F.col("Score2")))
df.printSchema()
# Show the result
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Score1: integer (nullable = true)
 |-- Score2: integer (nullable = true)
 |-- MinScore: integer (nullable = true)
Student ID Gender Score1 Score2 MinScore
{John, , Doe} 1007 M 75 80 75
{Adam, Scott, Smith} 1008 M 55 65 55
{Marie, , Carpenter} 1004 F 67 70 67
{Samantha, Louise, Herbert} 1002 F 90 85 85
{Craig, , Brown} 1011 M 88 92 88

6.9. To calculate cumulative sum of a column

import pandas as pd
from pyspark.sql import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = pd.DataFrame({'time': [0, 1, 2, 3, 4, 5],
                   'value': [False, False, True, False, True, True]})

df = spark.createDataFrame(df)
df = df.withColumn("cml_n_true", F.sum((F.col("value") == True).cast("int")).over(Window.orderBy(F.col("time").asc())))
df = df.withColumn("cml_n_false", F.sum((F.col("value") == False).cast("int")).over(Window.orderBy(F.col("time").asc())))
df.show()
time value cml_n_true cml_n_false
0 false 0 1
1 false 0 2
2 true 1 2
3 false 1 3
4 true 2 3
5 true 3 3

6.10. To calculate difference of values of two consecutive rows for a certain column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Update schema to include a dataset column
schema = T.StructType([
    T.StructField("dataset", T.StringType(), True),
    T.StructField("group_data", T.StringType(), True),
    T.StructField("cnt", T.IntegerType(), True),
])

# Update data to include multiple datasets
data = [
    ("dataset1", "group_b", 7),
    ("dataset1", "group_c", 8),
    ("dataset1", "group_d", 9),
    ("dataset1", "group_e", 23),
    ("dataset1", "group_g", 9),
    ("dataset2", "group_a", 5),
    ("dataset2", "group_b", 15),
    ("dataset2", "group_d", 20),
    ("dataset2", "group_e", 8),
    ("dataset2", "group_g", 18),
]

# Create DataFrame
dfs = spark.createDataFrame(schema=schema, data=data)

# Define a window partitioned by dataset and ordered by cnt
window_by_dataset = Window.partitionBy("dataset").orderBy(F.asc("cnt")).rowsBetween(Window.unboundedPreceding, 0)

# Calculate cumulative sum of 'cnt' within each dataset
dfs = dfs.withColumn("cml_cnt", F.sum("cnt").over(window_by_dataset))

# Define another window for consecutive row difference, partitioned by dataset
w = Window.partitionBy("dataset").orderBy(F.col("cnt").desc(), F.col("group_data").desc())

# Define fill value for NULL and shift (row lag) value
fill_null_value = 0
shift = -1

# Calculate difference between two consecutive rows within each dataset
dfs = dfs.withColumn(
    "diff_of_two_consec_rows",
    F.col("cml_cnt") - F.lag("cml_cnt", shift, fill_null_value).over(w)
)

# Show the resulting DataFrame
dfs.show(truncate=False)
dataset group_data cnt cml_cnt diff_of_two_consec_rows
dataset1 group_e 23 56 23
dataset1 group_g 9 33 9
dataset1 group_d 9 24 9
dataset1 group_c 8 15 8
dataset1 group_b 7 7 7
dataset2 group_d 20 66 20
dataset2 group_g 18 46 18
dataset2 group_b 15 28 15
dataset2 group_e 8 13 8
dataset2 group_a 5 5 5

6.11. To calculate difference of values of two consecutive rows for a certain column using .applyInPandas()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("group", T.StringType(), True),
    T.StructField("value", T.IntegerType(), True),
    T.StructField("timestamp", T.IntegerType(), True),
])

data = [
    ("A", 10, 1),
    ("A", 15, 2),
    ("A", 21, 3),
    ("B", 5, 1),
    ("B", 7, 2),
    ("B", 12, 3),
]

df = spark.createDataFrame(data=data, schema=schema)

# Define a Pandas UDF to calculate differences
def calculate_diff(pdf):
    # Ensure the data is sorted by timestamp within each group
    pdf = pdf.sort_values("timestamp")
    # Calculate the difference between consecutive rows
    pdf["difference"] = pdf["value"].diff()
    pdf.loc[0, "difference"] = pdf.loc[0, "value"]
    return pdf


# Apply the Pandas UDF, partitioning by the "group" column
df_res = df.groupBy("group").applyInPandas(calculate_diff, schema="group string, timestamp int, value int, difference int")

# Show the result
df_res.show(truncate=False)
group timestamp value difference
A 1 10 10
A 2 15 5
A 3 21 6
B 1 5 5
B 2 7 2
B 3 12 5

6.12. To calculate down-sampling ratios for over-represented groups of some data used in training of a machine learning algorithm

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("group_data", T.StringType(), True),
        T.StructField("cnt", T.IntegerType(), True),
    ]
)
data = [("group_a", 12),
        ("group_b", 7),
        ("group_c", 8),
        ("group_d", 9),
        ("group_e", 23),
        ("group_f", 10),
        ("group_g", 9)]
dfs = spark.createDataFrame(schema=schema, data=data)

dfs = dfs.withColumn(
        "cml_cnt",
        F.sum("cnt").over(Window.orderBy(F.col("cnt").asc(), F.col("group_data").desc()).rowsBetween(Window.unboundedPreceding, 0)),
    )
txt = "Initial data composition:"
print(txt)
# Total count of samples in the input dataset
dfs = dfs.withColumn("cnt_tot", F.sum("cnt").over(Window.partitionBy()))
dfs.show(truncate=False)
dfs = dfs.withColumn("rank", F.rank().over(Window.orderBy(F.desc("cnt"), F.asc("group_data"))))

cnt_total = dfs.select("cnt_tot").limit(1).collect()[0][0]
dfs = dfs.drop("cnt_total")
n_samples_limit = 65
txt = f"Desired number of samples as parameter: {n_samples_limit}\n"
w = Window.partitionBy().orderBy(F.asc("rank"))
dfs = dfs.withColumn("cml_cnt + (rank-1)*F.lag(cnt, 1)",
                      F.concat_ws("", F.format_string("%2d", F.col("cml_cnt")), F.lit(" + "),
                                      F.lit("("), F.col("rank").cast(T.StringType()), F.lit(" - "), F.lit(1).cast(T.StringType()),
                                      F.lit(")"), F.lit(" * "), (F.lag("cnt", 1, cnt_total).over(w)).cast(T.StringType())))
dfs = dfs.withColumn("cnt_tot_after_top_cut", F.col("cml_cnt") + (F.col("rank") - F.lit(1)) * F.lag("cnt", 1, cnt_total).over(w))
dfs = dfs.withColumn("loc_cutoff", (F.col("cnt_tot_after_top_cut") > F.lit(n_samples_limit)).cast("int"))
dfs = dfs.withColumn("loc_cutoff_last", F.last("rank").over(Window.partitionBy("loc_cutoff").orderBy("loc_cutoff")))
dfs = dfs.withColumn("loc_cutoff", F.when((F.col("loc_cutoff") == 1) & (F.col("rank") == F.col("loc_cutoff_last")), F.lit(-1)).otherwise(F.col("loc_cutoff")))
dfs = dfs.drop("loc_cutoff_last")

rank_cutoff = dfs.filter(F.col("loc_cutoff") == 1).orderBy(F.desc("rank")).limit(1).select("rank").collect()[0][0]
txt += f"Rank at cutoff: {rank_cutoff}\n"
sum_before_cutoff = dfs.filter(F.col("loc_cutoff") == -1).orderBy(F.asc("rank")).limit(1).select("cml_cnt").collect()[0][0]
cnt_new_flat = (n_samples_limit - sum_before_cutoff)/rank_cutoff
txt += f"New samples count for groups with rank 1 to {rank_cutoff} (inclusive): {cnt_new_flat}\n"
dfs = dfs.withColumn("cnt_new", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)).otherwise(F.col("cnt").cast("float")))
txt += f"Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are.\n"
dfs = dfs.withColumn(
        "cml_cnt_new",
        F.sum("cnt_new").over(Window.orderBy(F.desc("rank")).rowsBetween(Window.unboundedPreceding, 0)),
).orderBy(F.asc("rank"))

dfs = dfs.withColumn("sampling_ratio", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)/F.col("cnt")).otherwise(F.lit(1.0)))
txt += f"Data from groups with rank 1 to {rank_cutoff} (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:"
print(txt)
dfs.show(truncate=False)
Initial data composition:
group_data cnt cml_cnt cnt_tot
group_b 7 7 78
group_c 8 15 78
group_g 9 24 78
group_d 9 33 78
group_f 10 43 78
group_a 12 55 78
group_e 23 78 78
Desired number of samples as parameter: 65
Rank at cutoff: 2
New samples count for groups with rank 1 to 2 (inclusive): 11.0
Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are.
Data from groups with rank 1 to 2 (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:
group_data cnt cml_cnt cnt_tot rank cml_cnt + (rank-1)*F.lag(cnt, 1) cnt_tot_after_top_cut loc_cutoff cnt_new cml_cnt_new sampling_ratio
group_e 23 78 78 1 78 + (1 - 1) * 78 78 1 11.0 65.0 0.4782608695652174
group_a 12 55 78 2 55 + (2 - 1) * 23 78 1 11.0 54.0 0.9166666666666666
group_f 10 43 78 3 43 + (3 - 1) * 12 67 -1 10.0 43.0 1.0
group_d 9 33 78 4 33 + (4 - 1) * 10 63 0 9.0 33.0 1.0
group_g 9 24 78 5 24 + (5 - 1) * 9 60 0 9.0 24.0 1.0
group_c 8 15 78 6 15 + (6 - 1) * 9 60 0 8.0 15.0 1.0
group_b 7 7 78 7 7 + (7 - 1) * 8 55 0 7.0 7.0 1.0

7. Statistical operations

7.1. To calculate distribution of discrete items by some grouping within a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

def aggregate_distribution(df_stat, col, count_column, window=None, to_order_by_pct=True, n=None, operations=None, op_cols=None):
    """
    Count number of rows (referenced by column `count_column`) having the same value of field `col`.
    :param df_stat: pyspark.sql.DataFrame with `col` and `count_column` columns
    :param col: str, column name which is used in .groupBy(...) operation
    :param count_column: str, column name which represents a row and used in .agg(...)
    :param to_order_by_pct: bool, flag indicating whether to order the output dataframe by % column
    :param n: int, number of sample values of `count_column` to combine into an array
    :return: pyspark.sql.DataFrame with columns
        f"%_of_{count_column}s_having", f"number_of_{count_column}s_having", col, f"list_{count_column}s"
    """
    if type(col) == str:
        col = [col]

    if window is None:
        window = []

    if n is None:
        samples_col = []
        samples_op = []
    else:
        samples_col = [f"list_{count_column}s"]
        kset = F.collect_list(count_column)
        samples_op = [
            F.when(F.count(count_column) <= n, kset)
            .otherwise(F.concat(F.slice(kset, 1, n), F.array(F.lit("..."))))
            .alias(f"list_{count_column}s")
        ]

    if operations is not None:
        samples_op = samples_op + operations
        samples_col = samples_col + op_cols

    df_stat_agg = (
        df_stat.groupBy(col)
        .agg(F.count(count_column).alias(f"n_of_{count_column}s_having"), *samples_op)
        .select(f"n_of_{count_column}s_having", *col, *samples_col)
    )

    df_stat_agg = df_stat_agg.withColumn(
        f"n_of_{count_column}s_in_win",
        F.sum(f"n_of_{count_column}s_having").over(Window.partitionBy(window)),
        )
    df_stat_agg = df_stat_agg.withColumn(
        f"%_of_{count_column}s_having",
        F.round(
            100
            * F.col(f"n_of_{count_column}s_having")
            / F.col(f"n_of_{count_column}s_in_win"),
            3,
        ),
    )
    df_stat_agg = df_stat_agg.select(
        f"%_of_{count_column}s_having", f"n_of_{count_column}s_having", *col, *samples_col, f"n_of_{count_column}s_in_win"
    )

    if to_order_by_pct:
        # Number corresponds to %
        df_stat_agg = df_stat_agg.orderBy([f"n_of_{count_column}s_having"], ascending=False)
    else:
        df_stat_agg = df_stat_agg.orderBy(col, ascending=True)
    return df_stat_agg

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.range(100)

# Add "class" column randomly chosen between "A" and "B"
df = df.withColumn("class", F.when(F.rand() < 0.5, F.lit("A")).otherwise(F.lit("B")))

# Add exponentially decaying "group" column
# We'll map a uniform random number to group values with exponential probabilities

# Define group weights (manually decayed): e.g. P(1) > P(2) > P(3) > P(4)
group_values = [1, 2, 3, 4]
weights = [0.5, 0.25, 0.15, 0.10]  # must sum to 1

# Compute cumulative distribution for thresholding
cumulative = [sum(weights[:i+1]) for i in range(len(weights))]

# Define group assignment via conditional expressions
group_expr = F.when(F.rand() < cumulative[0], F.lit(1)) \
            .when(F.rand() < cumulative[1], F.lit(2)) \
            .when(F.rand() < cumulative[2], F.lit(3)) \
            .otherwise(F.lit(4))

df = df.withColumn("group", group_expr)

# Add random 5-letter alphanumeric "student_id" (using SHA hash trick)
df = df.withColumn(
     "student_id",
     F.sha2(F.concat_ws("", F.col("id").cast("string"), F.rand().cast("string")), 256).substr(1, 5)
     )

df = df.select("class", "group", "student_id")

txt = "Raw data with \"student_id\" column to be aggregated:"
print(txt)
df.show(7, truncate=False)

txt = f"Number of rows in raw data: {df.count()}"
print(txt)

txt = "Percentages should sum up to 100 within a \"window\":"
print(txt)
df_stat = aggregate_distribution(df, ["class", "group"], "student_id", window="class", to_order_by_pct=False)
df_stat.orderBy("class", "group").show(truncate=False)

txt = "Argument \"window\" can be set to None as well:"
print(txt)
df_stat = aggregate_distribution(df, ["class", "group"], "student_id", window=None, to_order_by_pct=False)
df_stat.orderBy("class", "group").show(truncate=False)
Raw data with "student_id" column to be aggregated:
class group student_id
A 2 55e43
B 1 9578a
B 1 94305
A 2 1a0c7
B 2 e44c2
B 1 6d3e7
A 3 5a1ce

only showing top 7 rows

Number of rows in raw data: 100
Percentages should sum up to 100 within a "window":
%_of_student_ids_having n_of_student_ids_having class group n_of_student_ids_in_win
48.889 22 A 1 45
37.778 17 A 2 45
11.111 5 A 3 45
2.222 1 A 4 45
56.364 31 B 1 55
36.364 20 B 2 55
5.455 3 B 3 55
1.818 1 B 4 55
Argument "window" can be set to None as well:
%_of_student_ids_having n_of_student_ids_having class group n_of_student_ids_in_win
22.0 22 A 1 100
17.0 17 A 2 100
5.0 5 A 3 100
1.0 1 A 4 100
31.0 31 B 1 100
20.0 20 B 2 100
3.0 3 B 3 100
1.0 1 B 4 100

7.2. To calculate one-dimensional histogram of some real-valued column

Install prerequisites:

$ sudo pip install sparkhistogram asciibars
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from sparkhistogram import computeHistogram
import asciibars

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.range(500)

std_dev = 0.2
mean_val = 0.5
df = df.withColumn("value", F.randn(seed=1)*F.lit(stddev) + F.lit(mean_val)).drop("id")
txt = "Raw data with real-valued numbers:"
print(txt)
df.show(5, truncate=False)

txt = f"Number of rows in raw data: {df.count()}"
print(txt)

txt = f"Computation of histogram:"
print(txt)
df_hist = computeHistogram(df, value_col="value", min_val=0, max_val=1, bins=20)
df_hist.show(5, truncate=False)
df_pd = df_hist.toPandas()

# Generate labels
df_pd["value"] = df_pd["value"].astype(str)
data = df_pd[["value", "count"]].apply(tuple, axis=1).tolist()

txt = "Plotting the histogram:"
print(txt)

asciibars.plot(data, unit='*')

# Alternatively, use matplotlib.pyplot:
#df_hist.toPandas().plot(x="value", y="count", kind="bar", rot=90)
Raw data with real-valued numbers:
value
0.8369122250888984
0.7455214018875292
0.6472126581378677
0.5901651497771481
0.4201448106630146

only showing top 5 rows

Number of rows in raw data: 500
Computation of histogram:
bucket value count
1 0.025 1
2 0.075 5
3 0.125 9
4 0.175 14
5 0.225 25

only showing top 5 rows

Plotting the histogram:
0.025 |  1
0.075 |  5 **
0.125 |  9 ***
0.175 | 14 ****
0.225 | 25 ********
0.275 | 23 *******
0.325 | 31 **********
0.375 | 41 *************
0.425 | 57 ******************
0.475 | 39 ************
0.525 | 63 ********************
0.575 | 37 ************
0.625 | 44 **************
0.675 | 30 **********
0.725 | 28 *********
0.775 | 21 *******
0.825 | 11 ***
0.875 | 12 ****
0.925 |  4 *
0.975 |  1

8. Structures

8.1. To convert a map to a struct

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.sql("SELECT map('John', 1, 'Michael', 2) as Students_map")
df = df.withColumn("Students_struct", F.map_entries("Students_map").alias("a", "b"))
df.printSchema()
df.show(truncate=False)

Schema of df is:

root
 |-- Students_map: map (nullable = false)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)
 |-- Students_struct: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- key: string (nullable = false)
 |    |    |-- value: integer (nullable = false)
Students_map Students_struct
{John -> 1, Michael -> 2} [{John, 1}, {Michael, 2}]

8.2. To extract a field from a struct as a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
        T.StructField('Student', T.StructType([
             T.StructField('First name', T.StringType(), True),
             T.StructField('Middle name', T.StringType(), True),
             T.StructField('Last name', T.StringType(), True)
             ])),
         T.StructField('ID', T.StringType(), True),
         T.StructField('Gender', T.StringType(), True),
         T.StructField('Score', T.IntegerType(), True)
         ])

data = [
    (("John", "", "Doe"),"1007", "M", 75),
    (("Adam", "Scott", "Smith"),"1008", "M", 55),
    (("Marie", "", "Carpenter"), "1004", "F", 67),
    (("Samantha", "Louise", "Herbert"),"1002", "F", 90),
    (("Craig", "", "Brown"), "1011", "M" , 88)
  ]
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df = df.withColumn("First name", F.col("Student.First Name"))
df = df.withColumn("Last name", F.upper(F.col("Student").getField("Last name")))
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Score: integer (nullable = true)
Student ID Gender Score First name Last name
{John, , Doe} 1007 M 75 John DOE
{Adam, Scott, Smith} 1008 M 55 Adam SMITH
{Marie, , Carpenter} 1004 F 67 Marie CARPENTER
{Samantha, Louise, Herbert} 1002 F 90 Samantha HERBERT
{Craig, , Brown} 1011 M 88 Craig BROWN

8.3. To process an array of structs using F.transform() and .getField()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define schema with a struct that includes an array of structs for subjects
schema = T.StructType([
    T.StructField('Student', T.StructType([
        T.StructField('First name', T.StringType(), True),
        T.StructField('Middle name', T.StringType(), True),
        T.StructField('Last name', T.StringType(), True)
    ])),
    T.StructField('ID', T.StringType(), True),
    T.StructField('Gender', T.StringType(), True),
    T.StructField('Score', T.IntegerType(), True),
    T.StructField('Subjects and scores', T.ArrayType(
        T.StructType([
            T.StructField('Subject name', T.StringType(), True),
            T.StructField('Subject score', T.IntegerType(), True)
        ])
    ), True)
])

# Define data with an array of subjects for each student
data = [
    (("John", "", "Doe"), "1007", "M", 75, [("Math", 78), ("Science", 82)]),
    (("Adam", "Scott", "Smith"), "1008", "M", 55, [("Math", 55), ("English", 65)]),
    (("Marie", "", "Carpenter"), "1004", "F", 67, [("Math", 72), ("Science", 68), ("History", 75)]),
    (("Samantha", "Louise", "Herbert"), "1002", "F", 90, [("Math", 92), ("Science", 88), ("English", 91)]),
    (("Craig", "", "Brown"), "1011", "M", 88, [("Math", 85), ("Science", 90)])
]

df = spark.createDataFrame(data=data, schema=schema)

df.printSchema()

df = df.withColumn("Subjects taken", F.transform("Subjects and scores", lambda x: x.getField("Subject name")))
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Subjects and scores: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Subject name: string (nullable = true)
 |    |    |-- Subject score: integer (nullable = true)
Student ID Gender Score Subjects and scores Subjects taken
{John, , Doe} 1007 M 75 [{Math, 78}, {Science, 82}] [Math, Science]
{Adam, Scott, Smith} 1008 M 55 [{Math, 55}, {English, 65}] [Math, English]
{Marie, , Carpenter} 1004 F 67 [{Math, 72}, {Science, 68}, {History, 75}] [Math, Science, History]
{Samantha, Louise, Herbert} 1002 F 90 [{Math, 92}, {Science, 88}, {English, 91}] [Math, Science, English]
{Craig, , Brown} 1011 M 88 [{Math, 85}, {Science, 90}] [Math, Science]

8.4. To process an array of nested structs using F.transform() and .getField()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define the schema with nested structs
schema = T.StructType([
  T.StructField('Student', T.StructType([
      T.StructField('First name', T.StringType(), True),
      T.StructField('Middle name', T.StringType(), True),
      T.StructField('Last name', T.StringType(), True)
  ])),
  T.StructField('ID', T.StringType(), True),
  T.StructField('Gender', T.StringType(), True),
  T.StructField('Subjects', T.ArrayType(T.StructType([
      T.StructField('Subject Name', T.StringType(), True),
      T.StructField('Assessments', T.ArrayType(T.StructType([
          T.StructField('Assessment Name', T.StringType(), True),
          T.StructField('Score', T.IntegerType(), True)
      ])), True)
  ])), True)
])

# Sample data matching the new schema
data = [
  (("John", "", "Doe"), "1007", "M", [
      ("Math", [("Midterm", 70), ("Final", 80)]),
      ("Science", [("Midterm", 65), ("Final", 75)])
  ]),
  (("Adam", "Scott", "Smith"), "1008", "M", [
      ("Math", [("Midterm", 50), ("Final", 60)]),
      ("Science", [("Midterm", 55), ("Final", 65)])
  ]),
  (("Marie", "", "Carpenter"), "1004", "F", [
      ("Math", [("Midterm", 60), ("Final", 75)]),
      ("Science", [("Midterm", 68), ("Final", 70)])
  ]),
  (("Samantha", "Louise", "Herbert"), "1002", "F", [
      ("Math", [("Midterm", 88), ("Final", 92)]),
      ("Science", [("Midterm", 85), ("Final", 89)])
  ]),
  (("Craig", "", "Brown"), "1011", "M", [
      ("Math", [("Midterm", 78), ("Final", 85)]),
      ("Science", [("Midterm", 80), ("Final", 84)])
  ])
]

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

df = df.withColumn("Scores", F.transform("Subjects", lambda x: x["Assessments"].getField("Score")))
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Subjects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Subject Name: string (nullable = true)
 |    |    |-- Assessments: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Assessment Name: string (nullable = true)
 |    |    |    |    |-- Score: integer (nullable = true)
Student ID Gender Subjects Scores
{John, , Doe} 1007 M [{Math, [{Midterm, 70}, {Final, 80}]}, {Science, [{Midterm, 65}, {Final, 75}]}] [[70, 80], [65, 75]]
{Adam, Scott, Smith} 1008 M [{Math, [{Midterm, 50}, {Final, 60}]}, {Science, [{Midterm, 55}, {Final, 65}]}] [[50, 60], [55, 65]]
{Marie, , Carpenter} 1004 F [{Math, [{Midterm, 60}, {Final, 75}]}, {Science, [{Midterm, 68}, {Final, 70}]}] [[60, 75], [68, 70]]
{Samantha, Louise, Herbert} 1002 F [{Math, [{Midterm, 88}, {Final, 92}]}, {Science, [{Midterm, 85}, {Final, 89}]}] [[88, 92], [85, 89]]
{Craig, , Brown} 1011 M [{Math, [{Midterm, 78}, {Final, 85}]}, {Science, [{Midterm, 80}, {Final, 84}]}] [[78, 85], [80, 84]]

9. Dataframe join operations

9.1. To perform a full, outer, left, right join operations

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Score", T.IntegerType(), True),
    ]
)
data = [("Alice", 10),
        ("Bob", 11)
        ]
df_a = spark.createDataFrame(schema=schema, data=data)
print("Table A:")
df_a.show()

schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Surname", T.StringType(), True),
        T.StructField("Age", T.StringType(), True),
    ]
)
data = [("Alice", "Doe", 12),
        ("Alice", "Smith", 30),
        ("Jane", "Carter", 7),
]
df_b = spark.createDataFrame(schema=schema, data=data)
print("Table B:")
df_b.show()

df = df_a.join(df_b, on="Name", how="full")
print("Full join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="outer")
print("Outer join on 'Name':")
df.show()

df = df_a.join(df_b, df_a["Name"] == df_b["Name"])
print("Join on 'Name' on equal condition:")
df.show()

df = df_a.join(df_b, on="Name", how="inner")
print("Inner join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left")
print("Left join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_outer")
print("Left-outer join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_anti")
print("Left-anti join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_semi")
print("Left-semi join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right")
print("Right join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right_outer")
print("Right-outer join on 'Name':")
df.show()

Table A:

Name Score
Alice 10
Bob 11

Table B:

Name Surname Age
Alice Doe 12
Alice Smith 30
Jane Carter 7

Full join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7

Outer join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7

Join on 'Name' on equal condition:

Name Score Name Surname Age
Alice 10 Alice Doe 12
Alice 10 Alice Smith 30

Inner join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30

Left join on 'Name':

Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12

Left-outer join on 'Name':

Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12

Left-anti join on 'Name':

Name Score
Bob 11

Left-semi join on 'Name':

Name Score
Alice 10

Right join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7

Right-outer join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7

9.2. To drop one of the duplicate columns after join

from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df_a = spark.createDataFrame([
  Row(id=1, value="A1"),
  Row(id=1, value="B1"),
  Row(id=1, value="C1"),
  Row(id=2, value="A1"),
  Row(id=2, value="X1"),
  Row(id=2, value="Y1")]
)
print("Dataframe df_a:")
df_1.show()

df_b = spark.createDataFrame([
  Row(id=1, updated="A2"),
  Row(id=1, updated="B1"),
  Row(id=1, updated="C1"),
  Row(id=2, updated="A1"),
  Row(id=2, updated="X1"),
  Row(id=2, updated="Y1")]
)
print("Dataframe df_b:")
df_2.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full")
print("Full join on df_a['value'] == df_b['updated']:")
df.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full").drop(df_b["id"])
print("Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:")
df.show()

Dataframe df_a:

id value
1 A1
1 B1
1 C1
2 A1
2 X1
2 Y1

Dataframe df_b:

id updated
1 A2
1 B1
1 C1
2 A1
2 X1
2 Y1

Full join on df_a['value'] == df_b['updated']:

id value id updated
1.0 A1 null null
null null 1.0 A2
1.0 B1 1.0 B1
1.0 C1 1.0 C1
2.0 A1 2.0 A1
2.0 X1 2.0 X1
2.0 Y1 2.0 Y1

Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:

id value updated
1.0 A1 null
null null A2
1.0 B1 B1
1.0 C1 C1
2.0 A1 A1
2.0 X1 X1
2.0 Y1 Y1

10. Aggregation and maps

10.1. To group by and aggregate into a map using F.map_from_entries()

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show(truncate=False)
dft = df.groupBy("id").agg(F.map_from_entries(F.collect_list(
          F.struct("key", "value"))).alias("key_value")
)
print("Dataframe with key -> value mapping")
dft.show(truncate=False)
dft.printSchema()

Dataframe with keys and values:

id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1

Dataframe with key -> value mapping

id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {a -> A1, x -> X1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

10.2. To group by and aggregate into a map using UDF

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.types as T

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show()

@F.udf(returnType=T.MapType(T.StringType(), T.StringType()))
def map_array(column):
    return dict(column)

dft = (df.groupBy("id")
   .agg(F.collect_list(F.struct("key", "value")).alias("key_value"))
   .withColumn('key_value', map_array('key_value')))
print("Dataframe with keys and values:")
dft.show(truncate=False)
dft.printSchema()

Dataframe with keys and values:

id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1

Dataframe with keys and values:

id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {x -> X1, a -> A1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

10.3. To agregate over multiple columns and sum values of dictionaries

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

df_schema = T.StructType([T.StructField('clid', T.StringType(), True),
                        T.StructField('coef_1', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_2', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_3', T.MapType(T.StringType(), T.DoubleType(), True), False)])
df_data = [["X", {'B': 0.4, 'C': 0.4}, {'B': 0.33, 'C': 0.5}, {'A': 0.5, 'C': 0.33}],
           ["Y", {'B': 0.67, 'C': 0.33}, {'B': 0.85}, {'A': 0.4, 'C': 0.57}],
           ]
spark = SparkSession.builder\
        .appName("Parse DataFrame Schema")\
        .getOrCreate()
df = spark.createDataFrame(data=df_data, schema=df_schema)

df = df.withColumn("coef_total", F.col("coef_1"))
for i in range(2,4):
    df = df.withColumn("coef_total", F.map_zip_with("coef_total", f"coef_{i}",
                      lambda k, v1, v2: F.when(v1.isNull(), 0).otherwise(v1) + F.when(v2.isNull(), 0).otherwise(v2)))
df.show(truncate=False)
clid coef_1 coef_2 coef_3 coef_total
X {B -> 0.4, C -> 0.4} {B -> 0.33, C -> 0.5} {A -> 0.5, C -> 0.33} {B -> 0.73, C -> 1.23, A -> 0.5}
Y {B -> 0.67, C -> 0.33} {B -> 0.85} {A -> 0.4, C -> 0.57} {B -> 1.52, C -> 0.8999999999999999, A -> 0.4}

10.4. To unpack specific key-value pairs of a map as columns

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from typing import List

df_schema = T.StructType([T.StructField('key_val', T.MapType(T.StringType(), T.IntegerType(), True), False)])
df_data = [[{'A': 0, 'B': 1, 'C': 2}],
           [{'B': 3, 'C': 4, 'D': 5}],
           ]
spark = SparkSession.builder\
        .appName("PySpark Cookbook")\
        .getOrCreate()
df = spark.createDataFrame(data=df_data, schema=df_schema)

def unpack_key_value_pairs(df, map_clm: str, keys: List[str]):
    df = df.withColumn("tmp", F.col(map_clm))
    for key in keys:
        df = df.withColumn(key, F.when(F.map_contains_key(map_clm, key), F.col(map_clm).getField(key)
                                       ).otherwise(F.lit(None)))
        df = df.withColumn("tmp", F.map_filter("tmp", lambda k, _: k != key))
    return df.drop(map_clm).withColumnRenamed("tmp", map_clm)

txt = "Original dataframe with a map:"
print(txt)
df.show(truncate=False)
df = unpack_key_value_pairs(df, "key_val", ["A", "B"])
txt = "Dataframe with the unpacked map:"
print(txt)
df.show(truncate=False)
Original dataframe with a map:
key_val
{A -> 0, B -> 1, C -> 2}
{B -> 3, C -> 4, D -> 5}
Dataframe with the unpacked map:
key_val A B
{C -> 2} 0 1
{C -> 4, D -> 5} NULL 3

11. Groups, aggregations, pivoting and window operations

11.1. To get 2nd smallest element in a group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
w = Window.partitionBy("Location").orderBy(F.asc("Quantity"))
df = df.withColumn("rank", F.rank().over(w))
df.show()
print("Products with the 2nd smallest quantity in a location:")
df = df.filter(F.col("rank") == 2)
df.show()
Location Product Quantity rank
Home Monitor 7 1
Home Mouse 8 2
Home Keyboard 9 3
Home Laptop 12 4
Office Mouse 9 1
Office Monitor 10 2
Office Laptop 23 3

Products with the 2nd smallest quantity in a location:

Location Product Quantity rank
Home Mouse 8 2
Office Monitor 10 2

11.2. To divide a group into sub-groups with number of rows not exceeding some value

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import string

def additive_decomposition(n, m):
    """
    Decompose an integer "n" into a set of possibly repeatable integers given
    a constraint for the maximal value of integers defined by "m".
    For example, additive_decomposition(10, 3) = [3, 3, 2, 2], such that
    the largest integer is 3 and the number of components is the minimum possible.
    """
    r = n % m
    t = int(n/m)
    if r == 0:
        return [m] * t
    else:
        f = (t - (m - r) + 1)
        g = (m - r)

        if f < 0:
            return additive_decomposition(n, m - 1)
        else:
            return [m] * f + [m-1] * g


@F.udf(returnType=T.ArrayType(T.IntegerType()))
def udf_additive_decomposition(n, k):
    return additive_decomposition(n, k)


spark = SparkSession.builder.appName("LabeledRandomIDs").getOrCreate()

# Character set for random ID generation
CHAR_SET = string.ascii_letters + string.digits
CHAR_LIST = list(CHAR_SET)
ID_LENGTH = 5

data = [("group_A",) for _ in range(8)] + [("group_B",) for _ in range(5)]
df = spark.createDataFrame(data, ["group"])

# Add random 5-character ID by selecting characters from CHAR_LIST
for i in range(ID_LENGTH):
    df = df.withColumn(
        f"char_{i}",
        F.element_at(
            F.array([F.lit(c) for c in CHAR_LIST]),
            (F.rand() * len(CHAR_LIST)).cast("int") + 1
        )
    )

# Concatenate characters into a single string ID
df = df.withColumn("id", F.concat_ws("", *[F.col(f"char_{i}") for i in range(ID_LENGTH)]))

txt = "Initial groups and IDs:"
print(txt)
df = df.select("group", "id")
df.show(truncate=False)

max_ids_per_group = 3
txt = f"Desired maximal number of IDs per group: {max_ids_per_group}"
print(txt)

df_stat = df.groupBy("group").agg(F.count("id").alias("n_ids"))
df_stat = df_stat.withColumn("terms", udf_additive_decomposition(F.col("n_ids"), F.lit(max_ids_per_group)))
df_stat = df_stat.withColumn("n_terms", F.size("terms"))
empty_int_array = F.array().cast(T.ArrayType(T.IntegerType()))
df_stat = df_stat.withColumn(
    "cum_sum",
    F.aggregate(
        F.col("terms"),
        empty_int_array,
        lambda acc, x: F.concat(
            acc,
            F.array(x + F.coalesce(F.element_at(acc, -1), F.lit(0).cast("int")))
        )
    )
)
txt = "Number of IDs per group:"
print(txt)
df_stat.orderBy("group").show(truncate=False)

df = df.join(df_stat, on="group", how="left")

w = Window.partitionBy("group").orderBy(F.asc("id"))
df = df.withColumn("rank", F.dense_rank().over(w))
df = df.withColumn("loc", F.transform("cum_sum", lambda x: (x < F.col("rank")).cast("int")))
df = df.withColumn("pos", F.array_position(F.col("loc"), 0))
df = df.withColumn("new_group", F.concat("group", F.col("pos").cast("string")))
txt = "New group IDs:"
print(txt)
df.show(truncate=False)
Initial groups and IDs:
group id
group_A RJk5q
group_A E0xID
group_A eVjqf
group_A XyTUy
group_A lfqSf
group_A jKvNh
group_A KdzIh
group_A rcsxp
group_B V6ZRm
group_B sUfvt
group_B aLuf6
group_B EptO8
group_B Lqhau
Desired maximal number of IDs per group: 3
Number of IDs per group:
group n_ids terms n_terms cum_sum
group_A 8 [3, 3, 2] 3 [3, 6, 8]
group_B 5 [3, 2] 2 [3, 5]
New group IDs:
group id n_ids terms n_terms cum_sum rank loc pos new_group
group_A E0xID 8 [3, 3, 2] 3 [3, 6, 8] 1 [0, 0, 0] 1 group_A1
group_A KdzIh 8 [3, 3, 2] 3 [3, 6, 8] 2 [0, 0, 0] 1 group_A1
group_A RJk5q 8 [3, 3, 2] 3 [3, 6, 8] 3 [0, 0, 0] 1 group_A1
group_A XyTUy 8 [3, 3, 2] 3 [3, 6, 8] 4 [1, 0, 0] 2 group_A2
group_A eVjqf 8 [3, 3, 2] 3 [3, 6, 8] 5 [1, 0, 0] 2 group_A2
group_A jKvNh 8 [3, 3, 2] 3 [3, 6, 8] 6 [1, 0, 0] 2 group_A2
group_A lfqSf 8 [3, 3, 2] 3 [3, 6, 8] 7 [1, 1, 0] 3 group_A3
group_A rcsxp 8 [3, 3, 2] 3 [3, 6, 8] 8 [1, 1, 0] 3 group_A3
group_B EptO8 5 [3, 2] 2 [3, 5] 1 [0, 0] 1 group_B1
group_B Lqhau 5 [3, 2] 2 [3, 5] 2 [0, 0] 1 group_B1
group_B V6ZRm 5 [3, 2] 2 [3, 5] 3 [0, 0] 1 group_B1
group_B aLuf6 5 [3, 2] 2 [3, 5] 4 [1, 0] 2 group_B2
group_B sUfvt 5 [3, 2] 2 [3, 5] 5 [1, 0] 2 group_B2

11.3. To divide a group into N sub-groups with equal or almost equal number of rows

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import string

def divide_into_similar_groups(n, k):
    """
    Divide an integer "n" into a set of possibly "k" repeatable integers given
    a constraint that all integers should be equal or differ by maximum of 1.
    """
    t = int(n/k)
    r = n % k
    return [t] * (k - r) + [t + 1] * r


@F.udf(returnType=T.ArrayType(T.IntegerType()))
def udf_divide_into_similar_groups(n, k):
    return divide_into_similar_groups(n, k)


spark = SparkSession.builder.appName("LabeledRandomIDs").getOrCreate()

# Character set for random ID generation
CHAR_SET = string.ascii_letters + string.digits
CHAR_LIST = list(CHAR_SET)
ID_LENGTH = 5

data = [("group_A",) for _ in range(8)] + [("group_B",) for _ in range(5)]
df = spark.createDataFrame(data, ["group"])

# Add random 5-character ID by selecting characters from CHAR_LIST
for i in range(ID_LENGTH):
    df = df.withColumn(
        f"char_{i}",
        F.element_at(
            F.array([F.lit(c) for c in CHAR_LIST]),
            (F.rand() * len(CHAR_LIST)).cast("int") + 1
        )
    )

# Concatenate characters into a single string ID
df = df.withColumn("id", F.concat_ws("", *[F.col(f"char_{i}") for i in range(ID_LENGTH)]))

txt = "Initial groups and IDs:"
print(txt)
df = df.select("group", "id")
df.show(truncate=False)

max_num_sub_groups = 2
txt = f"Maximal number of sub-groups desired: {max_num_sub_groups}"
print(txt)

df_stat = df.groupBy("group").agg(F.count("id").alias("n_ids"))
df_stat = df_stat.withColumn("terms", udf_divide_into_similar_groups(F.col("n_ids"), F.lit(max_num_sub_groups)))
df_stat = df_stat.withColumn("n_terms", F.size("terms"))
empty_int_array = F.array().cast(T.ArrayType(T.IntegerType()))
df_stat = df_stat.withColumn(
    "cum_sum",
    F.aggregate(
        F.col("terms"),
        empty_int_array,
        lambda acc, x: F.concat(
            acc,
            F.array(x + F.coalesce(F.element_at(acc, -1), F.lit(0).cast("int")))
        )
    )
)
txt = "Number of IDs per group:"
print(txt)
df_stat.orderBy("group").show(truncate=False)

df = df.join(df_stat, on="group", how="left")

w = Window.partitionBy("group").orderBy(F.asc("id"))
df = df.withColumn("rank", F.dense_rank().over(w))
df = df.withColumn("loc", F.transform("cum_sum", lambda x: (x < F.col("rank")).cast("int")))
df = df.withColumn("pos", F.array_position(F.col("loc"), 0))
df = df.withColumn("new_group", F.concat("group", F.col("pos").cast("string")))
txt = "New group IDs:"
print(txt)
df.show(truncate=False)
Initial groups and IDs:
group id
group_A GN0Ax
group_A grLyR
group_A eo47Y
group_A Adm4Z
group_A KCEUD
group_A I1M9Z
group_A nKb8g
group_A 3y7xZ
group_B paho5
group_B I6WmG
group_B 8IA5f
group_B Pol35
group_B RLxBg
Maximal number of sub-groups desired: 2
Number of IDs per group:
group n_ids terms n_terms cum_sum
group_A 8 [4, 4] 2 [4, 8]
group_B 5 [2, 3] 2 [2, 5]
New group IDs:
group id n_ids terms n_terms cum_sum rank loc pos new_group
group_A 3y7xZ 8 [4, 4] 2 [4, 8] 1 [0, 0] 1 group_A1
group_A Adm4Z 8 [4, 4] 2 [4, 8] 2 [0, 0] 1 group_A1
group_A GN0Ax 8 [4, 4] 2 [4, 8] 3 [0, 0] 1 group_A1
group_A I1M9Z 8 [4, 4] 2 [4, 8] 4 [0, 0] 1 group_A1
group_A KCEUD 8 [4, 4] 2 [4, 8] 5 [1, 0] 2 group_A2
group_A eo47Y 8 [4, 4] 2 [4, 8] 6 [1, 0] 2 group_A2
group_A grLyR 8 [4, 4] 2 [4, 8] 7 [1, 0] 2 group_A2
group_A nKb8g 8 [4, 4] 2 [4, 8] 8 [1, 0] 2 group_A2
group_B 8IA5f 5 [2, 3] 2 [2, 5] 1 [0, 0] 1 group_B1
group_B I6WmG 5 [2, 3] 2 [2, 5] 2 [0, 0] 1 group_B1
group_B Pol35 5 [2, 3] 2 [2, 5] 3 [1, 0] 2 group_B2
group_B RLxBg 5 [2, 3] 2 [2, 5] 4 [1, 0] 2 group_B2
group_B paho5 5 [2, 3] 2 [2, 5] 5 [1, 0] 2 group_B2

11.4. To calculate set intersection between arrays in two consecutive rows in a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from typing import List
import pandas as pd
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("group", T.StringType(), True),
        T.StructField("letters", T.ArrayType(T.StringType()), True),
    ]
)
data = [("group_A", ["a", "b", "c"]),
        ("group_A", ["a", "x", "z", "y"]),
        ("group_A", ["a", "x", "z", "w"]),
        ("group_A", ["d", "x", "z"]),
        ("group_B", ["a", "c", "d"]),
        ("group_B", ["a", "c", "e"]),
        ("group_B", ["a", "g", "f", "h"])]
df = spark.createDataFrame(schema=schema, data=data)


@F.pandas_udf(returnType=T.IntegerType())
def number_of_input_rows_in_win(v: pd.Series) -> int:
    """
    Calculate number of rows in a window
    """
    return v.shape[0]


@F.pandas_udf(returnType=T.IntegerType())
def arr_intxn_size(v: pd.Series) -> int:
    """
    Calculate size of set intersection between last and penultimate elements (arrays)
    """
    if v.shape[0] <= 1:
        return 0
    else:
        x = list(set(v.iloc[-1]).intersection(set(v.iloc[-2])))
        return len(x)


@F.pandas_udf(returnType=T.ArrayType(T.StringType()))
def arr_intxn_pudf(v: pd.Series) -> List[str]:
    """
    Calculate set intersection between last and penultimate elements (arrays)
    """
    if v.shape[0] <= 1:
        return []
    else:
        return list(set(v.iloc[-1]).intersection(set(v.iloc[-2])))


def arr_intxn_lag(col_name, win):
    col_lagged = F.lag(col_name, 1).over(win)
    itxn = F.array_intersect(col_name, col_lagged)
    return F.when(itxn.isNull(), F.array([])).otherwise(itxn)


# A window with previous and current row (indices: -1, 0)
win_last_two = Window.partitionBy("group").orderBy("letters").rowsBetween(-1, 0)
df = df.withColumn("nbr_rows_in_input_win", number_of_input_rows_in_win("letters").over(win_last_two))
df = df.withColumn("intxn_w_prv_row_pudf", arr_intxn_pudf("letters").over(win_last_two))

win_unbound = Window.partitionBy("group").orderBy("letters")
df = df.withColumn("intxn_w_prv_row_lag", arr_intxn_lag("letters", win_unbound))

df = df.withColumn("size(intxn_w_prv_row_pudf)", arr_intxn_size("letters").over(win_last_two))
df = df.withColumn("size(intxn_w_prv_row)", F.size("intxn_w_prv_row_lag"))
txt = "Number of rows in input window can be \nequal to 1 (first row) or\nequal to" + \
      " 2 (consecutive two rows specified by indices (-1, 0) in window definition)"
print(txt)
df.show()
Number of rows in input window can be
equal to 1 (first row) or
equal to 2 (consecutive two rows specified by indices (-1, 0) in window definition)
group letters nbr_rows_in_input_win intxn_w_prv_row_pudf intxn_w_prv_row_lag size(intxn_w_prv_row_pudf) size(intxn_w_prv_row)
group_A [a, b, c] 1 [] [] 0 0
group_A [a, x, z, w] 2 [a] [a] 1 1
group_A [a, x, z, y] 2 [a, x, z] [a, x, z] 3 3
group_A [d, x, z] 2 [x, z] [x, z] 2 2
group_B [a, c, d] 1 [] [] 0 0
group_B [a, c, e] 2 [c, a] [a, c] 2 2
group_B [a, g, f, h] 2 [a] [a] 1 1

11.5. To pivot a dataframe by some column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define schema for "name", "subject", and "grade"
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("subject", T.StringType(), True),
    T.StructField("grade", T.IntegerType(), True),
])

# Sample data for students, their subjects, and grades
data = [
    ("John", "Math", 85),
    ("John", "Science", 90),
    ("John", "History", 78),
    ("Marie", "Math", 88),
    ("Marie", "Science", 92),
    ("Marie", "History", 80),
    ("Adam", "Math", 75),
    ("Adam", "Science", 82),
    ("Adam", "History", 70),
]

# Create DataFrame
df = spark.createDataFrame(data=data, schema=schema)
txt = "Original dataframe:"
print(txt)
df.show()

# Pivot the table by "subject" and aggregate by taking the first value of "grade" for each subject
df_pvt = df.groupBy("name").pivot("subject").agg(F.first("grade"))

txt = "Original dataframe pivoted by column \"subject\":"
print(txt)
df_pvt.show(truncate=False)
Original dataframe:
name subject grade
John Math 85
John Science 90
John History 78
Marie Math 88
Marie Science 92
Marie History 80
Adam Math 75
Adam Science 82
Adam History 70
Original dataframe pivoted by column "subject":
name History Math Science
John 78 85 90
Adam 70 75 82
Marie 80 88 92

12. Sampling rows

12.1. To sample rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),
        (4, "Home"),
        (5, "Office"),
        (6, "Office"),
        (7, "Office"),
        (8, "Mall"),
        (9, "Mall"),
        (10, "School")]
df = spark.createDataFrame(schema=schema, data=data).repartition(3)
df = df.withColumn("partition", F.spark_partition_id()).orderBy("index")
print("Original dataframe:")
df.show()

print("Sampled dataframe:")
dft = df.sample(fraction=0.5, seed=1).orderBy("index")
dft.show()

Original dataframe:

index value partition
1 Home 1
2 School 0
3 Home 0
4 Home 2
5 Office 2
6 Office 2
7 Office 1
8 Mall 0
9 Mall 1
10 School 0

Sampled dataframe:

index value partition
3 Home 0
4 Home 2
7 Office 1
8 Mall 0
9 Mall 1

13. Random ID generation

13.1. To generate a UUID for every row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import random
import uuid

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
    ]
)
data = [["Alice"],
        ["Bon"],
        ["John"],
        ["Cecile"]
        ]
df = spark.createDataFrame(schema=schema, data=data).repartition(2)

def _generate_uuid(uuid_gen, v=10):
    def _replace_byte(value: int, byte: int):
        byte = byte & 0xF
        bit_shift = 76
        mask = ~(0xF << bit_shift)
        return value & mask | (byte << bit_shift)

    uuid_ = uuid_gen.generate()
    return uuid.UUID(int=(_replace_byte(uuid_.int, v)))

class RandomDistributedUUIDGenerator:
    def generate(self):
        return uuid.uuid4()

class SeedBasedUUIDGenerator:
    def __init__(self, seed):
        self.rnd = random.Random(seed)

    def generate(self):
        return uuid.UUID(int=self.rnd.getrandbits(128), version=4)

gen = RandomDistributedUUIDGenerator()
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_random_distributed", udf_generate_uuid())

seed_for_rng = 1
gen = SeedBasedUUIDGenerator(seed_for_rng)
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_seed_based", udf_generate_uuid())

print("The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.")
df = df.withColumn("Spark partition", F.spark_partition_id())
df.show(truncate=False)

The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.

Name UUID_random_distributed UUID_seed_based Spark partition
John c42dc153-f07e-acba-943d-4b18c655a13c cd613e30-d8f1-aadf-91b7-584a2265b1f5 0
Bon 11538cf9-01e3-a3e8-9f79-077c61acdfb6 1e2feb89-414c-a43c-9027-c4d1c386bbc4 0
Cecile 59e11afb-887d-afbf-b388-d09ac39b8b74 cd613e30-d8f1-aadf-91b7-584a2265b1f5 1
Alice 57d1794c-6675-a3e8-931b-457d0a7f4b30 1e2feb89-414c-a43c-9027-c4d1c386bbc4 1

13.2. To generate 5 character long random IDs

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import string

spark = SparkSession.builder.appName("RandomID").getOrCreate()

# Sample data to apply ID generation to
data = [(1,), (2,), (3,), (4,),]
df = spark.createDataFrame(data, ["row_id"]).repartition(2)

# Generate 5-character random alphanumeric ID
df = df.withColumn("random_uuid_short",
        F.expr("substring(translate(uuid(), '-', ''), 1, 5)")
      )

# Characters to choose from
CHAR_SET = string.ascii_letters + string.digits  # a-zA-Z0-9
CHAR_LIST = list(CHAR_SET)
N = 5  # Length of the random ID

# Generate each character separately and concatenate
for i in range(N):
    df = df.withColumn(f"char_{i}", F.element_at(F.array([F.lit(c) for c in CHAR_LIST]), (F.rand() * len(CHAR_LIST)).cast("int") + 1))

df = df.withColumn("random_id", F.concat_ws("", *[F.col(f"char_{i}") for i in range(N)]))

df.select("row_id", "random_uuid_short", "random_id").show(truncate=False)
row_id random_uuid_short random_id
2 bca56 ecLwX
1 a1e24 k0uCt
3 05ea2 EEhnz
4 7a9fe 459LF

14. Accumulators

14.1. To use an accumulator for a basic addition operation

from  pyspark.sql.types import IntegerType
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

df = spark.range(4).toDF("num")
df.createOrReplaceTempView("num_table")
df.show()

# Define an accumulator
accSum = spark._sc.accumulator(0)

# Accumulator is used in addition
def add_acc(int_val):
    accSum.add(int_val)
    return int_val

# Register a user defined function
spark.udf.register("reg_addacc", add_acc, IntegerType())

spark.sql("SELECT sum(reg_addacc(num)) FROM num_table").show()

print(f"Accumulator value: {accSum.value}")
num
0
1
2
3
sum(reg_addacc(num))
6

Accumulator value: 6

Author: Altynbek Isabekov

Created: 2025-05-22 Thu 08:06

Validate