PySpark Cookbook

PySpark Cookbook

1. Introduction

1.1. To create an empty dataframe

import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = []
df = spark.createDataFrame(schema=schema, data=data)
df.show()
A B

1.2. To create a dataframe with columns key and value from a dictionary

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df.show()
key value
key1 value1
key2 value2

1.3. To duplicate a column

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df = df.withColumn("value_dup", F.col("value"))
df.show()
key value value_dup
key1 value1 value1
key2 value2 value2

1.4. To rename a column using .withColumnRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnRenamed("key", "new_key") \
        .withColumnRenamed("value","new_value")
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.5. To rename a column using .withColumnsRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnsRenamed({"key": "new_key", "value": "new_value"})
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.6. To rename a column using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select(F.col("key").alias("new_key"), F.col("value").alias("new_value"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.7. To rename columns by adding a prefix

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),]
df = spark.createDataFrame(schema=schema, data=data)
print("Original dataframe:")
df.show()
print("Dataframe with renamed columns:")
df = df.select(*[F.col(k).alias(f"prefix_{k}") for k in df.columns])
df.show()

Original dataframe:

index value
1 Home
2 School
3 Home

Dataframe with renamed columns:

prefix_index prefix_value
1 Home
2 School
3 Home

1.8. To drop columns from a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)

df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()

df = df.drop("value", "const")
print("Modified dataframe:")
df.show()

Original dataframe:

key value const
key1 value1 1
key2 value2 1

Modified dataframe:

key
key1
key2

1.9. To subset columns of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()
print("Subset 'key', 'value' columns:")
df["key", "value"].show()
print("Subset 'key', 'const' columns:")
df.select("key", "const").show()

Original dataframe:

key value const
key1 value1 1
key2 value2 1

Subset 'key', 'value' columns:

key value
key1 value1
key2 value2

Subset 'key', 'const' columns:

key const
key1 1
key2 1

1.10. To add a column with a constant value using F.lit()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.withColumn("const_integer", F.lit(1))
df = df.withColumn("const_string", F.lit("string"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

key value const_integer const_string
key1 value1 1 string
key2 value2 1 string

1.11. To add a column with a constant value using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select("key", "value", F.lit("const_str").alias("constant_value"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

key value constant_value
key1 value1 const_str
key2 value2 const_str

1.12. To create a dataframe from a list of tuples

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

1.13. To get the number of rows of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
num_rows = df.count()
print(f"df has {num_rows} rows")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

df has 3 rows

1.14. To select first N rows

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
print("These are first 2 rows:")
df.limit(2).show()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

These are first 2 rows:

integer characters
1 [A, B]
2 [C, D]

1.15. To deduplicate rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("key", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
        T.StructField("comment", T.StringType(), True),
    ]
)
data = [(1, "Home", "a house"),
        (1, "Home", "a house"),
        (2, "School", "a building"),
        (2, "School", "a house"),
        (3, "Home", "a house"),]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print("Dataframe with distinct rows:")
df.distinct().show()

print("Dataframe with dropped duplicate rows:")
df.dropDuplicates().show()

print("Dataframe with dropped duplicates in columns 'key' and 'value':")
df = df.dropDuplicates(subset=["key", "value"])
df.show()

Original dataframe:

key value comment
1 Home a house
1 Home a house
2 School a building
2 School a house
3 Home a house

Dataframe with distinct rows:

key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house

Dataframe with dropped duplicate rows:

key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house

Dataframe with dropped duplicates in columns 'key' and 'value':

key value comment
1 Home a house
2 School a building
3 Home a house

1.16. To convert a column to a list using a lambda function

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").rdd.map(lambda r: r[0]).collect()
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.17. To convert a dataframe to a list of dictionaries corresponding to every row

import pprint
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst_dict = df.rdd.map(lambda row: row.asDict()).collect()
print(f"Dataframe is represented as:\n")
txt = pprint.pformat(lst_dict)
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Dataframe is represented as:

[{'characters': ['A', 'B'], 'integer': 1},
 {'characters': ['C', 'D'], 'integer': 2},
 {'characters': ['E', 'F'], 'integer': 3}]

1.18. To convert a column to a list using list comprehension

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = [k["integer"] for k in df.select("integer").rdd.collect()]
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.19. To convert a column to a list using Pandas

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").toPandas()["integer"].tolist()
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.20. To display full width of a column (do not truncate)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("sentence", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["A", "very", "long", "sentence"],),
        (["with", "many", "words", "."],)]
df = spark.createDataFrame(schema=schema, data=data)

print("Truncated output (default behavior):")
df.show()

print("Truncated to 15 characters output:")
df.show(truncate=15)

print("Non-truncated output (show all):")
df.show(truncate=False)

Truncated output (default behavior):

sentence
[A, very, long, s…
[with, many, word…

Truncated to 15 characters output:

sentence
[A, very, lo…
[with, many,…

Non-truncated output (show all):

sentence
[A, very, long, sentence]
[with, many, words, .]

2. Filtering rows

2.1. To filter based on values of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", None),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", None),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print('Filter: F.col("Location" == "Home")')
dft = df.filter(F.col("Location") == "Home")
dft.show()

print('Filter: F.col("Quantity").isNull()')
dft = df.filter(F.col("Quantity").isNull())
dft.show()

print('Filter: F.col("Quantity").isNotNull()')
dft = df.filter(F.col("Quantity").isNotNull())
dft.show()

print('Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))')
dft = df.filter((F.col("Location") == "Home") & (F.col("Product") == "Laptop"))
dft.show()

print('Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))')
dft = df.filter((F.col("Location") == "Home") & ~(F.col("Product") == "Laptop"))
dft.show()

print('Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))')
dft = df.filter((F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))
dft.show()

print('Filter: F.col("Product").isin(["Laptop", "Mouse"])')
dft = df.filter(F.col("Product").isin(["Laptop", "Mouse"]))
dft.show()

Original dataframe:

Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9
Office Laptop null
Office Monitor 10
Office Mouse 9

Filter: F.col("Location" == "Home")

Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9

Filter: F.col("Quantity").isNull()

Location Product Quantity
Home Monitor null
Office Laptop null

Filter: F.col("Quantity").isNotNull()

Location Product Quantity
Home Laptop 12
Home Keyboard 9
Office Monitor 10
Office Mouse 9

Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))

Location Product Quantity
Home Laptop 12

Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))

Location Product Quantity
Home Monitor null
Home Keyboard 9

Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))

Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9

Filter: F.col("Product").isin(["Laptop", "Mouse"])

Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9

3. Array operations

3.1. To create arrays of different lengths

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B")
dft.show()
A B
[1, 2] [2, 3, 4, 5]
[4, 5, 6] [2, 3, 4, 5]

3.2. To calculate set difference

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)

dft = df.select("A", "B",
          F.array_except("A", "B").alias("A\B"),
          F.array_except("B", "A").alias("B\A"))
dft.show()
A B A\B B\A
[b, a, c] [c, d, a, f] [b] [d, f]

3.3. To calculate set union

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B",
          F.array_union("A", "B").alias("A U B"))
dft.show()
A B A U B
[b, a, c] [c, d, a, f] [b, a, c, d, f]

3.4. To calculate set intersection

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B", F.array_intersect("A", "B").alias("A \u2229 B"))
dft.show()
A B A ∩ B
[b, a, c] [c, d, a, f] [a, c]

3.5. To pad arrays with value

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
n = 4
fill_value = 0
df1 = df.withColumn("A_padding", F.expr(f"array_repeat({fill_value}, {n} - size(A))"))
df1 = df1.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df1.select("A", "A_padding", "A_padded")
dft.show()

df2 = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df2 = df2.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df2.select("A", "A_padding", "A_padded")
dft.show()
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]

3.6. To sum two arrays elementwise using F.element_at()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr('transform(A_padded, (element, index) -> element + element_at(B, index + 1))'))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]

3.7. To sum two arrays using F.arrays_zip()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr("transform(arrays_zip(A_padded, B), x -> x.A_padded + x.B)"))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]

3.8. To find mode of an array (most common element)

from collections import Counter
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 2, 4],),
        ([4, 5, 6, 7],),
        ([1, 1, 2, 2],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf
def udf_mode(x):
    return Counter(x).most_common(1)[0][0]

dft = df.withColumn("mode", udf_mode("A"))
dft.printSchema()
dft.show()

Schema of dft is:

root
 |-- A: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- mode: string (nullable = true)
A mode
[1, 2, 2, 4] 2
[4, 5, 6, 7] 4
[1, 1, 2, 2] 1

3.9. To calculate difference of two consecutive elements in an array

import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("id", T.StringType(), True),
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [("A", [4, 1, 0, 2]),
        ("B", [1, 0, 3, 1])]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff", diff_of_two_consecutive_elements(F.col("values")))
df.show()
df.printSchema()
id values diff
A [4, 1, 0, 2] [-3, -1, 2]
B [1, 0, 3, 1] [-1, 3, -2]

Schema of df is:

root
 |-- id: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- diff: array (nullable = true)
 |    |-- element: integer (containsNull = true)

3.10. To apply a function to every element of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words_with_suffixes", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen_10", "note_11", "bottle_12"],), (["apple_13", "orange_14", "lemon_15"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("words", F.transform("words_with_suffixes", lambda x: F.split(x, "_").getItem(0)))
df.show(truncate=False)
words_with_suffixes words
[pen_10, note_11, bottle_12] [pen, note, bottle]
[apple_13, orange_14, lemon_15] [apple, orange, lemon]

3.11. To deduplicate elements in an array (find unique/distinct elements)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen", "note", "pen"],), (["apple", "apple", "lemon"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("unique_words", F.array_distinct("words"))
df.show(truncate=False)
words unique_words
[pen, note, pen] [pen, note]
[apple, apple, lemon] [apple, lemon]

3.12. To create a map (dictionary) from two arrays (one with keys, one with values)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("keys", T.ArrayType(T.IntegerType()), True),
        T.StructField("values", T.ArrayType(T.StringType()), True),
    ]
)
data = [([1, 2, 3], ["A", "B", "C"])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("map_kv", F.map_from_arrays("keys", "values"))
df.show(truncate=False)
keys values map_kv
[1, 2, 3] [A, B, C] {1 -> A, 2 -> B, 3 -> C}

3.13. To calculate mean of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("mean", F.aggregate(
          "values",                                  # column
          F.lit(0),                                  # initialValue
          lambda acc, x: acc + x,                    # merge operation
          lambda acc: acc / F.size(F.col("values")), # finish
      ))
df.show()
values mean
[1, 2] 1.5
[4, 5, 6] 5.0

3.14. To remove NULL values from an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2, None],),
        ([4, 5, None, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("values_without_nulls", F.array_compact("values"))
df.show()
values values_without_nulls
[1, -2, NULL] [1, -2]
[4, 5, NULL, 6] [4, 5, 6]

3.15. To find out whether an array has any negative elements

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("any_negative", F.exists("values", lambda x: x < 0))
df.show()
values any_negative
[1, -2] true
[4, 5, 6] false

3.16. To convert elements of an array to columns

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 3, 4],),
        ([5, 6, 7],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("first", F.col("A").getItem(0))
dft = df.select("A", "first", *[F.col("A").getItem(k).alias(f"element_{k+1}") for k in range(1,4)])
dft.show()
A first element_2 element_3 element_4
[1, 2, 3, 4] 1 2 3 4
[5, 6, 7] 5 6 7 null

3.17. To find location of the first occurence of an element in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([7, 4, 7],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("position", F.array_position(F.col("values"), 7))
df.show()
values position
[1, 7, 5] 2
[7, 4, 7] 1

3.18. To calculate moving difference of two consecutive elements in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 5],),
        ([4, 4, 6],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.pandas_udf(T.ArrayType(T.IntegerType()))
def diff2e(x: pd.Series) -> pd.Series:
    return x.apply(lambda x: (x[1:] - x[:-1]))

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff2e", diff2e(F.col("values")))
df = df.withColumn("ediff1d", diff_of_two_consecutive_elements(F.col("values")))
df.show()
values diff2e ediff1d
[1, 2, 5] [1, 3] [1, 3]
[4, 4, 6] [0, 2] [0, 2]

3.19. To find a union of all unique elements of multiple arrays in a group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("group", T.StringType(), True),
    T.StructField("id", T.StringType(), True),
    T.StructField("values", T.ArrayType(T.IntegerType()), True),
])

data = [
    ("group_1", "A", [4, 1, 2]),
    ("group_1", "B", [1, 0, 3, 1]),
    ("group_2", "C", [5, 6, 7, 5]),
    ("group_2", "D", [7, 6, 9])
]

df = spark.createDataFrame(data=data, schema=schema)
df.show(truncate=False)

# Flatten arrays within groups and calculate unique elements
dfs = (
    df.groupBy("group")
    .agg(F.collect_list("values").alias("list_of_lists"))
    .withColumn("all_values", F.flatten("list_of_lists"))
    .withColumn("unique_values", F.array_distinct("all_values"))
    .select("group", "list_of_lists", "all_values", "unique_values")
)

dfs.show(truncate=False)
group id values
group_1 A [4, 1, 2]
group_1 B [1, 0, 3, 1]
group_2 C [5, 6, 7, 5]
group_2 D [7, 6, 9]
group list_of_lists all_values unique_values
group_1 [[4, 1, 2], [1, 0, 3, 1 ]] [4, 1, 2, 1, 0, 3, 1] [4, 1, 2, 0, 3]
group_2 [[5, 6, 7, 5], [7, 6, 9 ]] [5, 6, 7, 5, 7, 6, 9] [5, 6, 7, 9]

3.20. To slice an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5, 2],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("values[1:3]", F.slice("values", start=2, length=2))
df.show()
values values[1:3]
[1, 7, 5, 2] [7, 5]
[6, 4, 7, 3] [4, 7]

3.21. To slice an array dynamically

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)
start_idx = 2
df = df.withColumn("values[1:]", F.slice("values", start=2, length=(F.size("values") - F.lit(start_idx - 1))))
df.show()
values values[1:]
[1, 7, 5] [7, 5]
[6, 4, 7, 3] [4, 7, 3]

4. Text processing

4.1. To remove prefix from a string

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("text", T.StringType(), True),
    ]
)
data = [("id_orange",),
        ("apple",)]
df = spark.createDataFrame(schema=schema, data=data)
remove_prefix = F.udf(lambda x: x[3:] if x[:3] == "id_" else x, T.StringType())
df = df.withColumn("no_prefix_udf", remove_prefix(F.col("text")))
df = df.withColumn("no_prefix_rgx", F.regexp_replace("text", "id_", ""))
df.show()
text no_prefix_udf no_prefix_rgx
id_orange orange orange
apple apple apple

4.2. To deduplicate identical consecutive characters using F.regexp_replace()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["aaaabbccc"], ["bbbccaaa"]]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn('Shortened', F.regexp_replace("String", "([ab])\\1+", "$1"))
df.show(truncate=False)
String Shortened
aaaabbccc abccc
bbbccaaa bcca

4.3. To deduplicate identical consecutive characters using a UDF

import re
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["aaaabbccc"], ["bbbccaaa"]]

@F.udf(returnType=T.StringType())
def remove_consecutive_duplicate_characters_ab(s):
    return re.sub(r'([ab])\1+', r'\1', s)

df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn('Shortened', remove_consecutive_duplicate_characters_ab("String"))
df.show(truncate=False)
String Shortened
aaaabbccc abccc
bbbccaaa bcca

4.4. To split a string into letters (characters) using regex

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select('String', F.split('String', '(?!$)').alias("Characters"))
dft.show(truncate=False)
String Characters
This is [T, h, i, s, , i, s]

4.5. To concatenate columns with strings using a separator

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str_Concat", F.concat_ws( "_", "Str1", "Str2"))
df.show()
Str1 Str2 Str_Concat
This is a string This is_a string
on a different row on a_different row

4.6. To split a string into letters (characters) using split function

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
fsplit = F.expr("split(String, '')")
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)
String Characters
This is [T, h, i, s, , i, s]

4.7. To split a string into letters (characters) and remove last character

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is_"]]
df = spark.createDataFrame(schema=schema, data=data)
print("Using split function and remove last character:")
fsplit = "split(String, '')"
fsplit = F.expr(f'slice({fsplit}, 1, size({fsplit}) - 1)')
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)

Using split function and remove last character:

String Characters
This is_ [T, h, i, s, , i, s]

4.8. To append a string to all values in a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str1_with_prefix", F.concat(F.lit("Prefix_"), "Str1"))
dft = df.select("Str1", "Str1_with_prefix")
dft.show()
Str1 Str1_with_prefix
This is Prefix_This is
on a Prefix_on a

5. Time operations

5.1. To convert Unix time stamp to human readable format

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("timestamp", T.LongType(), True),
    ]
)
data = [(1703224755,),
        (1703285602,)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("time_stamp_hrf", F.from_unixtime(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
df.show()
timestamp time_stamp_hrf
1703224755 2023-12-22 06:59:15
1703285602 2023-12-22 23:53:22

5.2. To detect log in and log out timestamps in a sequence of events

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("tstp_unix", T.LongType(), True),
    T.StructField("computer", T.StringType(), True),
    T.StructField("user", T.StringType(), True),
])

data = [
    (1703224755, "computer_A", "user_1"),
    (1703224780, "computer_A", "user_1"),
    (1703224850, "computer_A", "user_1"),
    (1703224950, "computer_B", "user_1"),
    (1703225050, "computer_B", "user_1"),
    (1703225100, "computer_B", "user_1"),
    (1703225150, "computer_A", "user_1"),
    (1703225200, "computer_B", "user_1"),
    (1703225250, "computer_B", "user_1"),
    (1703285700, "computer_C", "user_2"),
    (1703285800, "computer_C", "user_2"),
    (1703285900, "computer_C", "user_2"),
    (1703286000, "computer_C", "user_2"),
    (1703286100, "computer_D", "user_2"),
    (1703286200, "computer_D", "user_2"),
    (1703286300, "computer_D", "user_2"),
    (1703286400, "computer_D", "user_2"),
]

df = spark.createDataFrame(data=data, schema=schema)

# Add human-readable timestamp
df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss"))


# Define a window to identify session changes
window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix"))


# Previous row
prev = F.lag("computer", 1).over(window_spec)
cmp_equ_prev = (F.col("computer") != F.when(prev.isNull(), "some_random_computer").otherwise(prev))
# Detect changes in computer usage: compare with the previous row in a window
df = df.withColumn("is_first", F.when(cmp_equ_prev, 1).otherwise(0))

next = F.lag("computer", -1).over(window_spec)
cmp_equ_next = (F.col("computer") != F.when(next.isNull(), "some_random_computer").otherwise(next))
# Detect changes in computer usage: compare with the next row in a window
df = df.withColumn("is_last", F.when(cmp_equ_next, 1).otherwise(0))

df = df.withColumn("unq_grp", F.sum("is_first").over(window_spec))

cols = ["tstp_unix", "tstp_hrf", "user", "computer", "is_first", "is_last", "unq_grp"]
df.select(cols).show()

cols = ["tstp_unix", "tstp_hrf", "user", "computer", "unq_grp",
        F.col("is_first").alias("login"), F.col("is_last").alias("logout")]
df.filter((F.col("is_first") == 1) | (F.col("is_last") == 1)).select(cols).show()
tstp_unix tstp_hrf user computer is_first is_last unq_grp
1703224755 2023-12-22 06:59:15 user_1 computer_A 1 0 1
1703224780 2023-12-22 06:59:40 user_1 computer_A 0 0 1
1703224850 2023-12-22 07:00:50 user_1 computer_A 0 1 1
1703224950 2023-12-22 07:02:30 user_1 computer_B 1 0 2
1703225050 2023-12-22 07:04:10 user_1 computer_B 0 0 2
1703225100 2023-12-22 07:05:00 user_1 computer_B 0 1 2
1703225150 2023-12-22 07:05:50 user_1 computer_A 1 1 3
1703225200 2023-12-22 07:06:40 user_1 computer_B 1 0 4
1703225250 2023-12-22 07:07:30 user_1 computer_B 0 1 4
1703285700 2023-12-22 23:55:00 user_2 computer_C 1 0 1
1703285800 2023-12-22 23:56:40 user_2 computer_C 0 0 1
1703285900 2023-12-22 23:58:20 user_2 computer_C 0 0 1
1703286000 2023-12-23 00:00:00 user_2 computer_C 0 1 1
1703286100 2023-12-23 00:01:40 user_2 computer_D 1 0 2
1703286200 2023-12-23 00:03:20 user_2 computer_D 0 0 2
1703286300 2023-12-23 00:05:00 user_2 computer_D 0 0 2
1703286400 2023-12-23 00:06:40 user_2 computer_D 0 1 2
tstp_unix tstp_hrf user computer unq_grp login logout
1703224755 2023-12-22 06:59:15 user_1 computer_A 1 1 0
1703224850 2023-12-22 07:00:50 user_1 computer_A 1 0 1
1703224950 2023-12-22 07:02:30 user_1 computer_B 2 1 0
1703225100 2023-12-22 07:05:00 user_1 computer_B 2 0 1
1703225150 2023-12-22 07:05:50 user_1 computer_A 3 1 1
1703225200 2023-12-22 07:06:40 user_1 computer_B 4 1 0
1703225250 2023-12-22 07:07:30 user_1 computer_B 4 0 1
1703285700 2023-12-22 23:55:00 user_2 computer_C 1 1 0
1703286000 2023-12-23 00:00:00 user_2 computer_C 1 0 1
1703286100 2023-12-23 00:01:40 user_2 computer_D 2 1 0
1703286400 2023-12-23 00:06:40 user_2 computer_D 2 0 1

5.3. To estimate host change time based on log in and log out information in a sequence of events

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("tstp_unix", T.LongType(), True),
    T.StructField("host", T.StringType(), True),
    T.StructField("user", T.StringType(), True),
])

data = [
    (1703224755, "host_A", "user_1"),
    (1703224852, "host_A", "user_1"),
    (1703224950, "host_B", "user_1"),
    (1703225104, "host_B", "user_1"),
    (1703225150, "host_A", "user_1"),
    (1703225200, "host_B", "user_1"),
    (1703225250, "host_B", "user_1"),
    (1703285700, "host_C", "user_2"),
    (1703286000, "host_C", "user_2"),
    (1703286100, "host_D", "user_2"),
    (1703286400, "host_D", "user_2"),
]

df = spark.createDataFrame(data=data, schema=schema)

# Add human-readable timestamp
df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss"))


# Define a window to identify session changes
window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix"))

prev_tstp = F.lag("tstp_unix", 1).over(window_spec)
next_tstp = F.lag("tstp_unix", -1).over(window_spec)
prev_host = F.lag("host", 1).over(window_spec)
next_host = F.lag("host", -1).over(window_spec)

t_delta_prev = (F.col("tstp_unix") - prev_tstp)
df = df.withColumn("t_diff_prv",
                   F.when(prev_host.isNull(), 0.0).otherwise(
                         F.when(F.col("host") != prev_host, t_delta_prev).otherwise(None))
                   )

t_delta_next = (next_tstp - F.col("tstp_unix"))
df = df.withColumn("t_diff_nxt",
                   F.when(next_host.isNull(), 0.0).otherwise(
                         F.when(F.col("host") != next_host, t_delta_next).otherwise(None))
                   )

df = df.withColumn("new_tstp_prv", F.when(F.col("t_diff_prv").isNotNull(), F.col("tstp_unix") - F.col("t_diff_prv")/2 + 1))
df = df.withColumn("new_tstp_nxt", F.when(F.col("t_diff_nxt").isNotNull(), F.col("tstp_unix") + F.col("t_diff_nxt")/2))
df = df.withColumn("new_tstp_prv", F.from_unixtime("new_tstp_prv", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("new_tstp_nxt", F.from_unixtime("new_tstp_nxt", "yyyy-MM-dd HH:mm:ss"))

txt = "Calculate time delta to previous and next timestamps:"
print(txt)
cols = ["tstp_unix", "tstp_hrf", "user", "host", "t_diff_prv", "t_diff_nxt", "new_tstp_prv", "new_tstp_nxt"]
df.select(cols).show()

txt = "Combine new timestamps calculated from adding time deltas into an array:"
print(txt)
df = df.withColumn("new_tstps", F.array_compact(F.array("new_tstp_prv", "new_tstp_nxt")))
cols = ["tstp_hrf", "user", "host", "new_tstp_prv", "new_tstp_nxt", "new_tstps"]
df.select(cols).show(truncate=False)

txt = "Explode the array with new timestamps:"
print(txt)
df = df.withColumn("new_tstp", F.explode("new_tstps"))
cols = ["user", "host", "tstp_hrf", "new_tstps", "new_tstp"]
df.select(cols).show(truncate=False)
Calculate time delta to previous and next timestamps:
tstp_unix tstp_hrf user host t_diff_prv t_diff_nxt new_tstp_prv new_tstp_nxt
1703224755 2023-12-22 06:59:15 user_1 host_A 0.0 NULL 2023-12-22 06:59:16 NULL
1703224852 2023-12-22 07:00:52 user_1 host_A NULL 98.0 NULL 2023-12-22 07:01:41
1703224950 2023-12-22 07:02:30 user_1 host_B 98.0 NULL 2023-12-22 07:01:42 NULL
1703225104 2023-12-22 07:05:04 user_1 host_B NULL 46.0 NULL 2023-12-22 07:05:27
1703225150 2023-12-22 07:05:50 user_1 host_A 46.0 50.0 2023-12-22 07:05:28 2023-12-22 07:06:15
1703225200 2023-12-22 07:06:40 user_1 host_B 50.0 NULL 2023-12-22 07:06:16 NULL
1703225250 2023-12-22 07:07:30 user_1 host_B NULL 0.0 NULL 2023-12-22 07:07:30
1703285700 2023-12-22 23:55:00 user_2 host_C 0.0 NULL 2023-12-22 23:55:01 NULL
1703286000 2023-12-23 00:00:00 user_2 host_C NULL 100.0 NULL 2023-12-23 00:00:50
1703286100 2023-12-23 00:01:40 user_2 host_D 100.0 NULL 2023-12-23 00:00:51 NULL
1703286400 2023-12-23 00:06:40 user_2 host_D NULL 0.0 NULL 2023-12-23 00:06:40
Combine new timestamps calculated from adding time deltas into an array:
tstp_hrf user host new_tstp_prv new_tstp_nxt new_tstps
2023-12-22 06:59:15 user_1 host_A 2023-12-22 06:59:16 NULL [ 2023-12-22 06:59:16 ]
2023-12-22 07:00:52 user_1 host_A NULL 2023-12-22 07:01:41 [ 2023-12-22 07:01:41 ]
2023-12-22 07:02:30 user_1 host_B 2023-12-22 07:01:42 NULL [ 2023-12-22 07:01:42 ]
2023-12-22 07:05:04 user_1 host_B NULL 2023-12-22 07:05:27 [ 2023-12-22 07:05:27 ]
2023-12-22 07:05:50 user_1 host_A 2023-12-22 07:05:28 2023-12-22 07:06:15 [ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ]
2023-12-22 07:06:40 user_1 host_B 2023-12-22 07:06:16 NULL [ 2023-12-22 07:06:16 ]
2023-12-22 07:07:30 user_1 host_B NULL 2023-12-22 07:07:30 [ 2023-12-22 07:07:30 ]
2023-12-22 23:55:00 user_2 host_C 2023-12-22 23:55:01 NULL [ 2023-12-22 23:55:01 ]
2023-12-23 00:00:00 user_2 host_C NULL 2023-12-23 00:00:50 [ 2023-12-23 00:00:50 ]
2023-12-23 00:01:40 user_2 host_D 2023-12-23 00:00:51 NULL [ 2023-12-23 00:00:51 ]
2023-12-23 00:06:40 user_2 host_D NULL 2023-12-23 00:06:40 [ 2023-12-23 00:06:40 ]
Explode the array with new timestamps:
user host tstp_hrf new_tstps new_tstp
user_1 host_A 2023-12-22 06:59:15 [ 2023-12-22 06:59:16 ] 2023-12-22 06:59:16
user_1 host_A 2023-12-22 07:00:52 [ 2023-12-22 07:01:41 ] 2023-12-22 07:01:41
user_1 host_B 2023-12-22 07:02:30 [ 2023-12-22 07:01:42 ] 2023-12-22 07:01:42
user_1 host_B 2023-12-22 07:05:04 [ 2023-12-22 07:05:27 ] 2023-12-22 07:05:27
user_1 host_A 2023-12-22 07:05:50 [ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ] 2023-12-22 07:05:28
user_1 host_A 2023-12-22 07:05:50 [ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ] 2023-12-22 07:06:15
user_1 host_B 2023-12-22 07:06:40 [ 2023-12-22 07:06:16 ] 2023-12-22 07:06:16
user_1 host_B 2023-12-22 07:07:30 [ 2023-12-22 07:07:30 ] 2023-12-22 07:07:30
user_2 host_C 2023-12-22 23:55:00 [ 2023-12-22 23:55:01 ] 2023-12-22 23:55:01
user_2 host_C 2023-12-23 00:00:00 [ 2023-12-23 00:00:50 ] 2023-12-23 00:00:50
user_2 host_D 2023-12-23 00:01:40 [ 2023-12-23 00:00:51 ] 2023-12-23 00:00:51
user_2 host_D 2023-12-23 00:06:40 [ 2023-12-23 00:06:40 ] 2023-12-23 00:06:40

5.4. To round down log-in and round up log-out times of a user to midnight timestamp in a sequence of events

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("tstp_unix", T.LongType(), True),
    T.StructField("computer", T.StringType(), True),
    T.StructField("user", T.StringType(), True),
])

data = [
    (1703224755, "computer_A", "user_1"),
    (1703224780, "computer_A", "user_1"),
    (1703224850, "computer_A", "user_1"),
    (1703224950, "computer_B", "user_1"),
    (1703225050, "computer_B", "user_1"),
    (1703285700, "computer_C", "user_2"),
    (1703285800, "computer_C", "user_2"),
    (1703276000, "computer_C", "user_2"),
    (1703286300, "computer_D", "user_2"),
    (1703296400, "computer_D", "user_2"),
]

df = spark.createDataFrame(data=data, schema=schema)

# Add human-readable timestamp
df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss"))


# Define a window to identify session changes
window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix"))


# Previous row
prev = F.lag("computer", 1).over(window_spec)
df = df.withColumn("is_first", F.when(prev.isNull(), 1).otherwise(0))


rounded_down = F.from_unixtime("tstp_unix", "yyyy-MM-dd")
df = df.withColumn("rounded_tstp_first",
                   F.when(F.col("is_first") == 1, F.unix_timestamp(rounded_down, "yyyy-MM-dd")
                   ).otherwise(F.col("tstp_unix")))

# Next row
next = F.lag("computer", -1).over(window_spec)
df = df.withColumn("is_last", F.when(next.isNull(), 1).otherwise(0))

rounded_up = F.from_unixtime(F.col("tstp_unix") + 24*60*60, "yyyy-MM-dd")
df = df.withColumn("rounded_tstp_last",
                   F.when(F.col("is_last") == 1, F.unix_timestamp(rounded_up, "yyyy-MM-dd")
                   ).otherwise(F.col("tstp_unix")))

# Combine rounded values
df = df.withColumn("rounded_tstp",
                   F.when(next.isNull(), F.col("rounded_tstp_last")
                   ).otherwise(F.col("rounded_tstp_first")))
df = df.withColumn("rounded_tstp_hrf",
                   F.from_unixtime("rounded_tstp", "yyyy-MM-dd HH:mm:ss"))

cols = ["user", "computer", "tstp_unix", "tstp_hrf", "is_first", "is_last", "rounded_tstp_hrf"]
df.select(cols).show(truncate=False)
user computer tstp_unix tstp_hrf is_first is_last rounded_tstp_hrf
user_1 computer_A 1703224755 2023-12-22 06:59:15 1 0 2023-12-22 00:00:00
user_1 computer_A 1703224780 2023-12-22 06:59:40 0 0 2023-12-22 06:59:40
user_1 computer_A 1703224850 2023-12-22 07:00:50 0 0 2023-12-22 07:00:50
user_1 computer_B 1703224950 2023-12-22 07:02:30 0 0 2023-12-22 07:02:30
user_1 computer_B 1703225050 2023-12-22 07:04:10 0 1 2023-12-23 00:00:00
user_2 computer_C 1703276000 2023-12-22 21:13:20 1 0 2023-12-22 00:00:00
user_2 computer_C 1703285700 2023-12-22 23:55:00 0 0 2023-12-22 23:55:00
user_2 computer_C 1703285800 2023-12-22 23:56:40 0 0 2023-12-22 23:56:40
user_2 computer_D 1703286300 2023-12-23 00:05:00 0 0 2023-12-23 00:05:00
user_2 computer_D 1703296400 2023-12-23 02:53:20 0 1 2023-12-24 00:00:00

6. Numerical operations

6.1. To find percentage of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Laptop", 12),
        ("Monitor", 7),
        ("Mouse", 8),
        ("Keyboard", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy())*100, 2))
dft = df.select("Product", "Quantity", "%").orderBy(F.desc("Quantity"))
dft.show()
Product Quantity %
Laptop 12 33.33
Keyboard 9 25.0
Mouse 8 22.22
Monitor 7 19.44

6.2. To find percentage of a column within a group using a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy("Location"))*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44

6.3. To find percentage of a column within a group using .groupBy() and a join

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df_sum = df.groupBy("Location").agg(F.sum("Quantity").alias("Total_Quantity"))
df = df.join(df_sum, on="Location").withColumn("%", F.round(F.col("Quantity")/F.col("Total_Quantity")*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44

6.4. To add a row with total count and percentage for a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Books read", T.IntegerType(), True),
    ]
)
data = [("Alice", 12),
        ("Bob", 7),
        ("Michael", 8),
        ("Kevin", 10)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("%", F.round( F.col("Books read")/F.sum("Books read").over(Window.partitionBy())*100, 6))
dft = df.select("Name", "Books read", "%").orderBy(F.desc("Books read"))
dft = dft.union(
               dft.groupBy().agg(F.lit("Total"),
                                     F.sum("Books read").alias("Books read"),
                                     F.sum("%").alias("%"))
               )
dft.show()
Name Books read %
Alice 12 32.432432
Kevin 10 27.027027
Michael 8 21.621622
Bob 7 18.918919
Total 37 100.0

6.5. To find maximum value of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
max_val = df.select("Quantity").rdd.max()[0]
print(f"Maximum value of Quantity: {max_val}")
Location Product Quantity
Home Laptop 12
Home Monitor 7
Home Mouse 8
Home Keyboard 9
Office Laptop 23
Office Monitor 10
Office Mouse 9

Maximum value of Quantity: 23

6.6. To add a column with count of elements per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("count_per_group", F.count(F.lit(1)).over(Window.partitionBy(F.col("Location"))))
df.show()
Location Product Quantity count_per_group
Home Laptop 12 4
Home Monitor 7 4
Home Mouse 8 4
Home Keyboard 9 4
Office Laptop 23 3
Office Monitor 10 3
Office Mouse 9 3

6.7. To add a column with count of elements whose quantity is larger than some number per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
dfs = df.groupBy("Location").agg(F.sum((F.col("Quantity") >= 10).cast("int")).alias("Count of products with quantity >= 10 per location"))
dfs.show()
Location Product Quantity
Home Laptop 12
Home Monitor 7
Home Mouse 8
Home Keyboard 9
Office Laptop 23
Office Monitor 10
Office Mouse 9
Location Count of products with quantity >= 10 per location
Home 1
Office 2

6.8. To calculate minimal value of two columns per row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Updated schema to include two separate score fields
schema = T.StructType([
    T.StructField('Student', T.StructType([
        T.StructField('First name', T.StringType(), True),
        T.StructField('Middle name', T.StringType(), True),
        T.StructField('Last name', T.StringType(), True)
    ])),
    T.StructField('ID', T.StringType(), True),
    T.StructField('Gender', T.StringType(), True),
    T.StructField('Score1', T.IntegerType(), True),
    T.StructField('Score2', T.IntegerType(), True)
])

# Sample data with two scores for each student
data = [
    (("John", "", "Doe"), "1007", "M", 75, 80),
    (("Adam", "Scott", "Smith"), "1008", "M", 55, 65),
    (("Marie", "", "Carpenter"), "1004", "F", 67, 70),
    (("Samantha", "Louise", "Herbert"), "1002", "F", 90, 85),
    (("Craig", "", "Brown"), "1011", "M", 88, 92)
]

df = spark.createDataFrame(data=data, schema=schema)
# Calculate the minimum score between Score1 and Score2 and store it in a new column 'MinScore'
df = df.withColumn("MinScore", F.least(F.col("Score1"), F.col("Score2")))
df.printSchema()
# Show the result
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Score1: integer (nullable = true)
 |-- Score2: integer (nullable = true)
 |-- MinScore: integer (nullable = true)
Student ID Gender Score1 Score2 MinScore
{John, , Doe} 1007 M 75 80 75
{Adam, Scott, Smith} 1008 M 55 65 55
{Marie, , Carpenter} 1004 F 67 70 67
{Samantha, Louise, Herbert} 1002 F 90 85 85
{Craig, , Brown} 1011 M 88 92 88

6.9. To calculate cumulative sum of a column

import pandas as pd
from pyspark.sql import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = pd.DataFrame({'time': [0, 1, 2, 3, 4, 5],
                   'value': [False, False, True, False, True, True]})

df = spark.createDataFrame(df)
df = df.withColumn("cml_n_true", F.sum((F.col("value") == True).cast("int")).over(Window.orderBy(F.col("time").asc())))
df = df.withColumn("cml_n_false", F.sum((F.col("value") == False).cast("int")).over(Window.orderBy(F.col("time").asc())))
df.show()
time value cml_n_true cml_n_false
0 false 0 1
1 false 0 2
2 true 1 2
3 false 1 3
4 true 2 3
5 true 3 3

6.10. To calculate difference of values of two consecutive rows for a certain column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Update schema to include a dataset column
schema = T.StructType([
    T.StructField("dataset", T.StringType(), True),
    T.StructField("group_data", T.StringType(), True),
    T.StructField("cnt", T.IntegerType(), True),
])

# Update data to include multiple datasets
data = [
    ("dataset1", "group_b", 7),
    ("dataset1", "group_c", 8),
    ("dataset1", "group_d", 9),
    ("dataset1", "group_e", 23),
    ("dataset1", "group_g", 9),
    ("dataset2", "group_a", 5),
    ("dataset2", "group_b", 15),
    ("dataset2", "group_d", 20),
    ("dataset2", "group_e", 8),
    ("dataset2", "group_g", 18),
]

# Create DataFrame
dfs = spark.createDataFrame(schema=schema, data=data)

# Define a window partitioned by dataset and ordered by cnt
window_by_dataset = Window.partitionBy("dataset").orderBy(F.asc("cnt")).rowsBetween(Window.unboundedPreceding, 0)

# Calculate cumulative sum of 'cnt' within each dataset
dfs = dfs.withColumn("cml_cnt", F.sum("cnt").over(window_by_dataset))

# Define another window for consecutive row difference, partitioned by dataset
w = Window.partitionBy("dataset").orderBy(F.col("cnt").desc(), F.col("group_data").desc())

# Define fill value for NULL and shift (row lag) value
fill_null_value = 0
shift = -1

# Calculate difference between two consecutive rows within each dataset
dfs = dfs.withColumn(
    "diff_of_two_consec_rows",
    F.col("cml_cnt") - F.lag("cml_cnt", shift, fill_null_value).over(w)
)

# Show the resulting DataFrame
dfs.show(truncate=False)
dataset group_data cnt cml_cnt diff_of_two_consec_rows
dataset1 group_e 23 56 23
dataset1 group_g 9 33 9
dataset1 group_d 9 24 9
dataset1 group_c 8 15 8
dataset1 group_b 7 7 7
dataset2 group_d 20 66 20
dataset2 group_g 18 46 18
dataset2 group_b 15 28 15
dataset2 group_e 8 13 8
dataset2 group_a 5 5 5

6.11. To calculate difference of values of two consecutive rows for a certain column using .applyInPandas()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
    T.StructField("group", T.StringType(), True),
    T.StructField("value", T.IntegerType(), True),
    T.StructField("timestamp", T.IntegerType(), True),
])

data = [
    ("A", 10, 1),
    ("A", 15, 2),
    ("A", 21, 3),
    ("B", 5, 1),
    ("B", 7, 2),
    ("B", 12, 3),
]

df = spark.createDataFrame(data=data, schema=schema)

# Define a Pandas UDF to calculate differences
def calculate_diff(pdf):
    # Ensure the data is sorted by timestamp within each group
    pdf = pdf.sort_values("timestamp")
    # Calculate the difference between consecutive rows
    pdf["difference"] = pdf["value"].diff()
    pdf.loc[0, "difference"] = pdf.loc[0, "value"]
    return pdf


# Apply the Pandas UDF, partitioning by the "group" column
df_res = df.groupBy("group").applyInPandas(calculate_diff, schema="group string, timestamp int, value int, difference int")

# Show the result
df_res.show(truncate=False)
group timestamp value difference
A 1 10 10
A 2 15 5
A 3 21 6
B 1 5 5
B 2 7 2
B 3 12 5

6.12. To calculate down-sampling ratios for over-represented groups of some data used in training of a machine learning algorithm

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("group_data", T.StringType(), True),
        T.StructField("cnt", T.IntegerType(), True),
    ]
)
data = [("group_a", 12),
        ("group_b", 7),
        ("group_c", 8),
        ("group_d", 9),
        ("group_e", 23),
        ("group_f", 10),
        ("group_g", 9)]
dfs = spark.createDataFrame(schema=schema, data=data)

dfs = dfs.withColumn(
        "cml_cnt",
        F.sum("cnt").over(Window.orderBy(F.col("cnt").asc(), F.col("group_data").desc()).rowsBetween(Window.unboundedPreceding, 0)),
    )
txt = "Initial data composition:"
print(txt)
# Total count of samples in the input dataset
dfs = dfs.withColumn("cnt_tot", F.sum("cnt").over(Window.partitionBy()))
dfs.show(truncate=False)
dfs = dfs.withColumn("rank", F.rank().over(Window.orderBy(F.desc("cnt"), F.asc("group_data"))))

cnt_total = dfs.select("cnt_tot").limit(1).collect()[0][0]
dfs = dfs.drop("cnt_total")
n_samples_limit = 65
txt = f"Desired number of samples as parameter: {n_samples_limit}\n"
w = Window.partitionBy().orderBy(F.asc("rank"))
dfs = dfs.withColumn("cml_cnt + (rank-1)*F.lag(cnt, 1)",
                      F.concat_ws("", F.format_string("%2d", F.col("cml_cnt")), F.lit(" + "),
                                      F.lit("("), F.col("rank").cast(T.StringType()), F.lit(" - "), F.lit(1).cast(T.StringType()),
                                      F.lit(")"), F.lit(" * "), (F.lag("cnt", 1, cnt_total).over(w)).cast(T.StringType())))
dfs = dfs.withColumn("cnt_tot_after_top_cut", F.col("cml_cnt") + (F.col("rank") - F.lit(1)) * F.lag("cnt", 1, cnt_total).over(w))
dfs = dfs.withColumn("loc_cutoff", (F.col("cnt_tot_after_top_cut") > F.lit(n_samples_limit)).cast("int"))
dfs = dfs.withColumn("loc_cutoff_last", F.last("rank").over(Window.partitionBy("loc_cutoff").orderBy("loc_cutoff")))
dfs = dfs.withColumn("loc_cutoff", F.when((F.col("loc_cutoff") == 1) & (F.col("rank") == F.col("loc_cutoff_last")), F.lit(-1)).otherwise(F.col("loc_cutoff")))
dfs = dfs.drop("loc_cutoff_last")

rank_cutoff = dfs.filter(F.col("loc_cutoff") == 1).orderBy(F.desc("rank")).limit(1).select("rank").collect()[0][0]
txt += f"Rank at cutoff: {rank_cutoff}\n"
sum_before_cutoff = dfs.filter(F.col("loc_cutoff") == -1).orderBy(F.asc("rank")).limit(1).select("cml_cnt").collect()[0][0]
cnt_new_flat = (n_samples_limit - sum_before_cutoff)/rank_cutoff
txt += f"New samples count for groups with rank 1 to {rank_cutoff} (inclusive): {cnt_new_flat}\n"
dfs = dfs.withColumn("cnt_new", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)).otherwise(F.col("cnt").cast("float")))
txt += f"Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are.\n"
dfs = dfs.withColumn(
        "cml_cnt_new",
        F.sum("cnt_new").over(Window.orderBy(F.desc("rank")).rowsBetween(Window.unboundedPreceding, 0)),
).orderBy(F.asc("rank"))

dfs = dfs.withColumn("sampling_ratio", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)/F.col("cnt")).otherwise(F.lit(1.0)))
txt += f"Data from groups with rank 1 to {rank_cutoff} (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:"
print(txt)
dfs.show(truncate=False)
Initial data composition:
group_data cnt cml_cnt cnt_tot
group_b 7 7 78
group_c 8 15 78
group_g 9 24 78
group_d 9 33 78
group_f 10 43 78
group_a 12 55 78
group_e 23 78 78
Desired number of samples as parameter: 65
Rank at cutoff: 2
New samples count for groups with rank 1 to 2 (inclusive): 11.0
Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are.
Data from groups with rank 1 to 2 (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:
group_data cnt cml_cnt cnt_tot rank cml_cnt + (rank-1)*F.lag(cnt, 1) cnt_tot_after_top_cut loc_cutoff cnt_new cml_cnt_new sampling_ratio
group_e 23 78 78 1 78 + (1 - 1) * 78 78 1 11.0 65.0 0.4782608695652174
group_a 12 55 78 2 55 + (2 - 1) * 23 78 1 11.0 54.0 0.9166666666666666
group_f 10 43 78 3 43 + (3 - 1) * 12 67 -1 10.0 43.0 1.0
group_d 9 33 78 4 33 + (4 - 1) * 10 63 0 9.0 33.0 1.0
group_g 9 24 78 5 24 + (5 - 1) * 9 60 0 9.0 24.0 1.0
group_c 8 15 78 6 15 + (6 - 1) * 9 60 0 8.0 15.0 1.0
group_b 7 7 78 7 7 + (7 - 1) * 8 55 0 7.0 7.0 1.0

7. Structures

7.1. To convert a map to a struct

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.sql("SELECT map('John', 1, 'Michael', 2) as Students_map")
df = df.withColumn("Students_struct", F.map_entries("Students_map").alias("a", "b"))
df.printSchema()
df.show(truncate=False)

Schema of df is:

root
 |-- Students_map: map (nullable = false)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)
 |-- Students_struct: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- key: string (nullable = false)
 |    |    |-- value: integer (nullable = false)
Students_map Students_struct
{John -> 1, Michael -> 2} [{John, 1}, {Michael, 2}]

7.2. To extract a field from a struct as a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType([
        T.StructField('Student', T.StructType([
             T.StructField('First name', T.StringType(), True),
             T.StructField('Middle name', T.StringType(), True),
             T.StructField('Last name', T.StringType(), True)
             ])),
         T.StructField('ID', T.StringType(), True),
         T.StructField('Gender', T.StringType(), True),
         T.StructField('Score', T.IntegerType(), True)
         ])

data = [
    (("John", "", "Doe"),"1007", "M", 75),
    (("Adam", "Scott", "Smith"),"1008", "M", 55),
    (("Marie", "", "Carpenter"), "1004", "F", 67),
    (("Samantha", "Louise", "Herbert"),"1002", "F", 90),
    (("Craig", "", "Brown"), "1011", "M" , 88)
  ]
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df = df.withColumn("First name", F.col("Student.First Name"))
df = df.withColumn("Last name", F.upper(F.col("Student").getField("Last name")))
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Score: integer (nullable = true)
Student ID Gender Score First name Last name
{John, , Doe} 1007 M 75 John DOE
{Adam, Scott, Smith} 1008 M 55 Adam SMITH
{Marie, , Carpenter} 1004 F 67 Marie CARPENTER
{Samantha, Louise, Herbert} 1002 F 90 Samantha HERBERT
{Craig, , Brown} 1011 M 88 Craig BROWN

7.3. To process an array of structs using F.transform() and .getField()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define schema with a struct that includes an array of structs for subjects
schema = T.StructType([
    T.StructField('Student', T.StructType([
        T.StructField('First name', T.StringType(), True),
        T.StructField('Middle name', T.StringType(), True),
        T.StructField('Last name', T.StringType(), True)
    ])),
    T.StructField('ID', T.StringType(), True),
    T.StructField('Gender', T.StringType(), True),
    T.StructField('Score', T.IntegerType(), True),
    T.StructField('Subjects and scores', T.ArrayType(
        T.StructType([
            T.StructField('Subject name', T.StringType(), True),
            T.StructField('Subject score', T.IntegerType(), True)
        ])
    ), True)
])

# Define data with an array of subjects for each student
data = [
    (("John", "", "Doe"), "1007", "M", 75, [("Math", 78), ("Science", 82)]),
    (("Adam", "Scott", "Smith"), "1008", "M", 55, [("Math", 55), ("English", 65)]),
    (("Marie", "", "Carpenter"), "1004", "F", 67, [("Math", 72), ("Science", 68), ("History", 75)]),
    (("Samantha", "Louise", "Herbert"), "1002", "F", 90, [("Math", 92), ("Science", 88), ("English", 91)]),
    (("Craig", "", "Brown"), "1011", "M", 88, [("Math", 85), ("Science", 90)])
]

df = spark.createDataFrame(data=data, schema=schema)

df.printSchema()

df = df.withColumn("Subjects taken", F.transform("Subjects and scores", lambda x: x.getField("Subject name")))
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Subjects and scores: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Subject name: string (nullable = true)
 |    |    |-- Subject score: integer (nullable = true)
Student ID Gender Score Subjects and scores Subjects taken
{John, , Doe} 1007 M 75 [{Math, 78}, {Science, 82}] [Math, Science]
{Adam, Scott, Smith} 1008 M 55 [{Math, 55}, {English, 65}] [Math, English]
{Marie, , Carpenter} 1004 F 67 [{Math, 72}, {Science, 68}, {History, 75}] [Math, Science, History]
{Samantha, Louise, Herbert} 1002 F 90 [{Math, 92}, {Science, 88}, {English, 91}] [Math, Science, English]
{Craig, , Brown} 1011 M 88 [{Math, 85}, {Science, 90}] [Math, Science]

7.4. To process an array of nested structs using F.transform() and .getField()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define the schema with nested structs
schema = T.StructType([
  T.StructField('Student', T.StructType([
      T.StructField('First name', T.StringType(), True),
      T.StructField('Middle name', T.StringType(), True),
      T.StructField('Last name', T.StringType(), True)
  ])),
  T.StructField('ID', T.StringType(), True),
  T.StructField('Gender', T.StringType(), True),
  T.StructField('Subjects', T.ArrayType(T.StructType([
      T.StructField('Subject Name', T.StringType(), True),
      T.StructField('Assessments', T.ArrayType(T.StructType([
          T.StructField('Assessment Name', T.StringType(), True),
          T.StructField('Score', T.IntegerType(), True)
      ])), True)
  ])), True)
])

# Sample data matching the new schema
data = [
  (("John", "", "Doe"), "1007", "M", [
      ("Math", [("Midterm", 70), ("Final", 80)]),
      ("Science", [("Midterm", 65), ("Final", 75)])
  ]),
  (("Adam", "Scott", "Smith"), "1008", "M", [
      ("Math", [("Midterm", 50), ("Final", 60)]),
      ("Science", [("Midterm", 55), ("Final", 65)])
  ]),
  (("Marie", "", "Carpenter"), "1004", "F", [
      ("Math", [("Midterm", 60), ("Final", 75)]),
      ("Science", [("Midterm", 68), ("Final", 70)])
  ]),
  (("Samantha", "Louise", "Herbert"), "1002", "F", [
      ("Math", [("Midterm", 88), ("Final", 92)]),
      ("Science", [("Midterm", 85), ("Final", 89)])
  ]),
  (("Craig", "", "Brown"), "1011", "M", [
      ("Math", [("Midterm", 78), ("Final", 85)]),
      ("Science", [("Midterm", 80), ("Final", 84)])
  ])
]

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

df = df.withColumn("Scores", F.transform("Subjects", lambda x: x["Assessments"].getField("Score")))
df.show(truncate=False)

Schema of df is:

root
 |-- Student: struct (nullable = true)
 |    |-- First name: string (nullable = true)
 |    |-- Middle name: string (nullable = true)
 |    |-- Last name: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Subjects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Subject Name: string (nullable = true)
 |    |    |-- Assessments: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Assessment Name: string (nullable = true)
 |    |    |    |    |-- Score: integer (nullable = true)
Student ID Gender Subjects Scores
{John, , Doe} 1007 M [{Math, [{Midterm, 70}, {Final, 80}]}, {Science, [{Midterm, 65}, {Final, 75}]}] [[70, 80], [65, 75]]
{Adam, Scott, Smith} 1008 M [{Math, [{Midterm, 50}, {Final, 60}]}, {Science, [{Midterm, 55}, {Final, 65}]}] [[50, 60], [55, 65]]
{Marie, , Carpenter} 1004 F [{Math, [{Midterm, 60}, {Final, 75}]}, {Science, [{Midterm, 68}, {Final, 70}]}] [[60, 75], [68, 70]]
{Samantha, Louise, Herbert} 1002 F [{Math, [{Midterm, 88}, {Final, 92}]}, {Science, [{Midterm, 85}, {Final, 89}]}] [[88, 92], [85, 89]]
{Craig, , Brown} 1011 M [{Math, [{Midterm, 78}, {Final, 85}]}, {Science, [{Midterm, 80}, {Final, 84}]}] [[78, 85], [80, 84]]

8. Dataframe join operations

8.1. To perform a full, outer, left, right join operations

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Score", T.IntegerType(), True),
    ]
)
data = [("Alice", 10),
        ("Bob", 11)
        ]
df_a = spark.createDataFrame(schema=schema, data=data)
print("Table A:")
df_a.show()

schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Surname", T.StringType(), True),
        T.StructField("Age", T.StringType(), True),
    ]
)
data = [("Alice", "Doe", 12),
        ("Alice", "Smith", 30),
        ("Jane", "Carter", 7),
]
df_b = spark.createDataFrame(schema=schema, data=data)
print("Table B:")
df_b.show()

df = df_a.join(df_b, on="Name", how="full")
print("Full join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="outer")
print("Outer join on 'Name':")
df.show()

df = df_a.join(df_b, df_a["Name"] == df_b["Name"])
print("Join on 'Name' on equal condition:")
df.show()

df = df_a.join(df_b, on="Name", how="inner")
print("Inner join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left")
print("Left join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_outer")
print("Left-outer join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_anti")
print("Left-anti join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_semi")
print("Left-semi join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right")
print("Right join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right_outer")
print("Right-outer join on 'Name':")
df.show()

Table A:

Name Score
Alice 10
Bob 11

Table B:

Name Surname Age
Alice Doe 12
Alice Smith 30
Jane Carter 7

Full join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7

Outer join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7

Join on 'Name' on equal condition:

Name Score Name Surname Age
Alice 10 Alice Doe 12
Alice 10 Alice Smith 30

Inner join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30

Left join on 'Name':

Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12

Left-outer join on 'Name':

Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12

Left-anti join on 'Name':

Name Score
Bob 11

Left-semi join on 'Name':

Name Score
Alice 10

Right join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7

Right-outer join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7

8.2. To drop one of the duplicate columns after join

from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df_a = spark.createDataFrame([
  Row(id=1, value="A1"),
  Row(id=1, value="B1"),
  Row(id=1, value="C1"),
  Row(id=2, value="A1"),
  Row(id=2, value="X1"),
  Row(id=2, value="Y1")]
)
print("Dataframe df_a:")
df_1.show()

df_b = spark.createDataFrame([
  Row(id=1, updated="A2"),
  Row(id=1, updated="B1"),
  Row(id=1, updated="C1"),
  Row(id=2, updated="A1"),
  Row(id=2, updated="X1"),
  Row(id=2, updated="Y1")]
)
print("Dataframe df_b:")
df_2.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full")
print("Full join on df_a['value'] == df_b['updated']:")
df.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full").drop(df_b["id"])
print("Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:")
df.show()

Dataframe df_a:

id value
1 A1
1 B1
1 C1
2 A1
2 X1
2 Y1

Dataframe df_b:

id updated
1 A2
1 B1
1 C1
2 A1
2 X1
2 Y1

Full join on df_a['value'] == df_b['updated']:

id value id updated
1.0 A1 null null
null null 1.0 A2
1.0 B1 1.0 B1
1.0 C1 1.0 C1
2.0 A1 2.0 A1
2.0 X1 2.0 X1
2.0 Y1 2.0 Y1

Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:

id value updated
1.0 A1 null
null null A2
1.0 B1 B1
1.0 C1 C1
2.0 A1 A1
2.0 X1 X1
2.0 Y1 Y1

9. Aggregation and maps

9.1. To group by and aggregate into a map using F.map_from_entries()

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show(truncate=False)
dft = df.groupBy("id").agg(F.map_from_entries(F.collect_list(
          F.struct("key", "value"))).alias("key_value")
)
print("Dataframe with key -> value mapping")
dft.show(truncate=False)
dft.printSchema()

Dataframe with keys and values:

id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1

Dataframe with key -> value mapping

id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {a -> A1, x -> X1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

9.2. To group by and aggregate into a map using UDF

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.types as T

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show()

@F.udf(returnType=T.MapType(T.StringType(), T.StringType()))
def map_array(column):
    return dict(column)

dft = (df.groupBy("id")
   .agg(F.collect_list(F.struct("key", "value")).alias("key_value"))
   .withColumn('key_value', map_array('key_value')))
print("Dataframe with keys and values:")
dft.show(truncate=False)
dft.printSchema()

Dataframe with keys and values:

id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1

Dataframe with keys and values:

id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {x -> X1, a -> A1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

9.3. To agregate over multiple columns and sum values of dictionaries

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

df_schema = T.StructType([T.StructField('clid', T.StringType(), True),
                        T.StructField('coef_1', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_2', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_3', T.MapType(T.StringType(), T.DoubleType(), True), False)])
df_data = [["X", {'B': 0.4, 'C': 0.4}, {'B': 0.33, 'C': 0.5}, {'A': 0.5, 'C': 0.33}],
           ["Y", {'B': 0.67, 'C': 0.33}, {'B': 0.85}, {'A': 0.4, 'C': 0.57}],
           ]
spark = SparkSession.builder\
        .appName("Parse DataFrame Schema")\
        .getOrCreate()
df = spark.createDataFrame(data=df_data, schema=df_schema)

df = df.withColumn("coef_total", F.col("coef_1"))
for i in range(2,4):
    df = df.withColumn("coef_total", F.map_zip_with("coef_total", f"coef_{i}",
                      lambda k, v1, v2: F.when(v1.isNull(), 0).otherwise(v1) + F.when(v2.isNull(), 0).otherwise(v2)))
df.show(truncate=False)
clid coef_1 coef_2 coef_3 coef_total
X {B -> 0.4, C -> 0.4} {B -> 0.33, C -> 0.5} {A -> 0.5, C -> 0.33} {B -> 0.73, C -> 1.23, A -> 0.5}
Y {B -> 0.67, C -> 0.33} {B -> 0.85} {A -> 0.4, C -> 0.57} {B -> 1.52, C -> 0.8999999999999999, A -> 0.4}

10. Groups, aggregations, pivoting and window operations

10.1. To get 2nd smallest element in a group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
w = Window.partitionBy("Location").orderBy(F.asc("Quantity"))
df = df.withColumn("rank", F.rank().over(w))
df.show()
print("Products with the 2nd smallest quantity in a location:")
df = df.filter(F.col("rank") == 2)
df.show()
Location Product Quantity rank
Home Monitor 7 1
Home Mouse 8 2
Home Keyboard 9 3
Home Laptop 12 4
Office Mouse 9 1
Office Monitor 10 2
Office Laptop 23 3

Products with the 2nd smallest quantity in a location:

Location Product Quantity rank
Home Mouse 8 2
Office Monitor 10 2

10.2. To calculate set intersection between arrays in two consecutive rows in a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from typing import List
import pandas as pd
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("group", T.StringType(), True),
        T.StructField("letters", T.ArrayType(T.StringType()), True),
    ]
)
data = [("group_A", ["a", "b", "c"]),
        ("group_A", ["a", "x", "z", "y"]),
        ("group_A", ["a", "x", "z", "w"]),
        ("group_A", ["d", "x", "z"]),
        ("group_B", ["a", "c", "d"]),
        ("group_B", ["a", "c", "e"]),
        ("group_B", ["a", "g", "f", "h"])]
df = spark.createDataFrame(schema=schema, data=data)


@F.pandas_udf(returnType=T.IntegerType())
def number_of_input_rows_in_win(v: pd.Series) -> int:
    """
    Calculate number of rows in a window
    """
    return v.shape[0]


@F.pandas_udf(returnType=T.IntegerType())
def arr_intxn_size(v: pd.Series) -> int:
    """
    Calculate size of set intersection between last and penultimate elements (arrays)
    """
    if v.shape[0] <= 1:
        return 0
    else:
        x = list(set(v.iloc[-1]).intersection(set(v.iloc[-2])))
        return len(x)


@F.pandas_udf(returnType=T.ArrayType(T.StringType()))
def arr_intxn_pudf(v: pd.Series) -> List[str]:
    """
    Calculate set intersection between last and penultimate elements (arrays)
    """
    if v.shape[0] <= 1:
        return []
    else:
        return list(set(v.iloc[-1]).intersection(set(v.iloc[-2])))


def arr_intxn_lag(col_name, win):
    col_lagged = F.lag(col_name, 1).over(win)
    itxn = F.array_intersect(col_name, col_lagged)
    return F.when(itxn.isNull(), F.array([])).otherwise(itxn)


# A window with previous and current row (indices: -1, 0)
win_last_two = Window.partitionBy("group").orderBy("letters").rowsBetween(-1, 0)
df = df.withColumn("nbr_rows_in_input_win", number_of_input_rows_in_win("letters").over(win_last_two))
df = df.withColumn("intxn_w_prv_row_pudf", arr_intxn_pudf("letters").over(win_last_two))

win_unbound = Window.partitionBy("group").orderBy("letters")
df = df.withColumn("intxn_w_prv_row_lag", arr_intxn_lag("letters", win_unbound))

df = df.withColumn("size(intxn_w_prv_row_pudf)", arr_intxn_size("letters").over(win_last_two))
df = df.withColumn("size(intxn_w_prv_row)", F.size("intxn_w_prv_row_lag"))
txt = "Number of rows in input window can be \nequal to 1 (first row) or\nequal to" + \
      " 2 (consecutive two rows specified by indices (-1, 0) in window definition)"
print(txt)
df.show()
Number of rows in input window can be
equal to 1 (first row) or
equal to 2 (consecutive two rows specified by indices (-1, 0) in window definition)
group letters nbr_rows_in_input_win intxn_w_prv_row_pudf intxn_w_prv_row_lag size(intxn_w_prv_row_pudf) size(intxn_w_prv_row)
group_A [a, b, c] 1 [] [] 0 0
group_A [a, x, z, w] 2 [a] [a] 1 1
group_A [a, x, z, y] 2 [a, x, z] [a, x, z] 3 3
group_A [d, x, z] 2 [x, z] [x, z] 2 2
group_B [a, c, d] 1 [] [] 0 0
group_B [a, c, e] 2 [c, a] [a, c] 2 2
group_B [a, g, f, h] 2 [a] [a] 1 1

10.3. To pivot a dataframe by some column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

# Define schema for "name", "subject", and "grade"
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("subject", T.StringType(), True),
    T.StructField("grade", T.IntegerType(), True),
])

# Sample data for students, their subjects, and grades
data = [
    ("John", "Math", 85),
    ("John", "Science", 90),
    ("John", "History", 78),
    ("Marie", "Math", 88),
    ("Marie", "Science", 92),
    ("Marie", "History", 80),
    ("Adam", "Math", 75),
    ("Adam", "Science", 82),
    ("Adam", "History", 70),
]

# Create DataFrame
df = spark.createDataFrame(data=data, schema=schema)
txt = "Original dataframe:"
print(txt)
df.show()

# Pivot the table by "subject" and aggregate by taking the first value of "grade" for each subject
df_pvt = df.groupBy("name").pivot("subject").agg(F.first("grade"))

txt = "Original dataframe pivoted by column \"subject\":"
print(txt)
df_pvt.show(truncate=False)
Original dataframe:
name subject grade
John Math 85
John Science 90
John History 78
Marie Math 88
Marie Science 92
Marie History 80
Adam Math 75
Adam Science 82
Adam History 70
Original dataframe pivoted by column "subject":
name History Math Science
John 78 85 90
Adam 70 75 82
Marie 80 88 92

11. Sampling rows

11.1. To sample rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),
        (4, "Home"),
        (5, "Office"),
        (6, "Office"),
        (7, "Office"),
        (8, "Mall"),
        (9, "Mall"),
        (10, "School")]
df = spark.createDataFrame(schema=schema, data=data).repartition(3)
df = df.withColumn("partition", F.spark_partition_id()).orderBy("index")
print("Original dataframe:")
df.show()

print("Sampled dataframe:")
dft = df.sample(fraction=0.5, seed=1).orderBy("index")
dft.show()

Original dataframe:

index value partition
1 Home 1
2 School 0
3 Home 0
4 Home 2
5 Office 2
6 Office 2
7 Office 1
8 Mall 0
9 Mall 1
10 School 0

Sampled dataframe:

index value partition
3 Home 0
4 Home 2
7 Office 1
8 Mall 0
9 Mall 1

12. UUID generation

12.1. To generate a UUID for every row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import random
import uuid

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
    ]
)
data = [["Alice"],
        ["Bon"],
        ["John"],
        ["Cecile"]
        ]
df = spark.createDataFrame(schema=schema, data=data).repartition(2)

def _generate_uuid(uuid_gen, v=10):
    def _replace_byte(value: int, byte: int):
        byte = byte & 0xF
        bit_shift = 76
        mask = ~(0xF << bit_shift)
        return value & mask | (byte << bit_shift)

    uuid_ = uuid_gen.generate()
    return uuid.UUID(int=(_replace_byte(uuid_.int, v)))

class RandomDistributedUUIDGenerator:
    def generate(self):
        return uuid.uuid4()

class SeedBasedUUIDGenerator:
    def __init__(self, seed):
        self.rnd = random.Random(seed)

    def generate(self):
        return uuid.UUID(int=self.rnd.getrandbits(128), version=4)

gen = RandomDistributedUUIDGenerator()
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_random_distributed", udf_generate_uuid())

seed_for_rng = 1
gen = SeedBasedUUIDGenerator(seed_for_rng)
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_seed_based", udf_generate_uuid())

print("The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.")
df.show(truncate=False)

The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.

Name UUID_random_distributed UUID_seed_based
John 4e9a3bb1-a189-a25e-8389-7f8382635b09 cd613e30-d8f1-aadf-91b7-584a2265b1f5
Bon 16cd1549-0c74-a483-9bbe-707e59e0796f 1e2feb89-414c-a43c-9027-c4d1c386bbc4
Cecile b8b05619-6004-aa75-b98b-7e1c83c9f301 cd613e30-d8f1-aadf-91b7-584a2265b1f5
Alice b1f1a9fb-feb9-a946-9171-3e7cb577fdaa 1e2feb89-414c-a43c-9027-c4d1c386bbc4

Author: Altynbek Isabekov

Created: 2024-12-17 Tue 15:14

Validate