PySpark Cookbook

PySpark Cookbook

1 Introduction

1.1 To create an empty dataframe

import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = []
df = spark.createDataFrame(schema=schema, data=data)
df.show()
A B

1.2 To create a dataframe with columns key and value from a dictionary

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df.show()
key value
key1 value1
key2 value2

1.3 To duplicate a column

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df = df.withColumn("value_dup", F.col("value"))
df.show()
key value value_dup
key1 value1 value1
key2 value2 value2

1.4 To rename a column using .withColumnRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnRenamed("key", "new_key") \
        .withColumnRenamed("value","new_value")
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.5 To rename a column using .withColumnsRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnsRenamed({"key": "new_key", "value": "new_value"})
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.6 To rename a column using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select(F.col("key").alias("new_key"), F.col("value").alias("new_value"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

new_key new_value
key1 value1
key2 value2

1.7 To rename columns by adding a prefix

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),]
df = spark.createDataFrame(schema=schema, data=data)
print("Original dataframe:")
df.show()
print("Dataframe with renamed columns:")
df = df.select(*[F.col(k).alias(f"prefix_{k}") for k in df.columns])
df.show()

Original dataframe:

index value
1 Home
2 School
3 Home

Dataframe with renamed columns:

prefix_index prefix_value
1 Home
2 School
3 Home

1.8 To drop columns from a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)

df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()

df = df.drop("value", "const")
print("Modified dataframe:")
df.show()

Original dataframe:

key value const
key1 value1 1
key2 value2 1

Modified dataframe:

key
key1
key2

1.9 To subset columns of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()
print("Subset 'key', 'value' columns:")
df["key", "value"].show()
print("Subset 'key', 'const' columns:")
df.select("key", "const").show()

Original dataframe:

key value const
key1 value1 1
key2 value2 1

Subset 'key', 'value' columns:

key value
key1 value1
key2 value2

Subset 'key', 'const' columns:

key const
key1 1
key2 1

1.10 To add a column with a constant value using F.lit()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.withColumn("const_integer", F.lit(1))
df = df.withColumn("const_string", F.lit("string"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

key value const_integer const_string
key1 value1 1 string
key2 value2 1 string

1.11 To add a column with a constant value using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select("key", "value", F.lit("const_str").alias("constant_value"))
print("Modified dataframe:")
df.show()

Original dataframe:

key value
key1 value1
key2 value2

Modified dataframe:

key value constant_value
key1 value1 const_str
key2 value2 const_str

1.12 To create a dataframe from a list of tuples

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

1.13 To get the number of rows of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
num_rows = df.count()
print(f"df has {num_rows} rows")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

df has 3 rows

1.14 To select first N rows

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
print("These are first 2 rows:")
df.limit(2).show()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

These are first 2 rows:

integer characters
1 [A, B]
2 [C, D]

1.15 To deduplicate rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("key", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
        T.StructField("comment", T.StringType(), True),
    ]
)
data = [(1, "Home", "a house"),
        (1, "Home", "a house"),
        (2, "School", "a building"),
        (2, "School", "a house"),
        (3, "Home", "a house"),]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print("Dataframe with distinct rows:")
df.distinct().show()

print("Dataframe with dropped duplicate rows:")
df.dropDuplicates().show()

print("Dataframe with dropped duplicates in columns 'key' and 'value':")
df = df.dropDuplicates(subset=["key", "value"])
df.show()

Original dataframe:

key value comment
1 Home a house
1 Home a house
2 School a building
2 School a house
3 Home a house

Dataframe with distinct rows:

key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house

Dataframe with dropped duplicate rows:

key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house

Dataframe with dropped duplicates in columns 'key' and 'value':

key value comment
1 Home a house
2 School a building
3 Home a house

1.16 To convert a column to a list using a lambda function

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").rdd.map(lambda r: r[0]).collect()
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.17 To convert a dataframe to a list of dictionaries corresponding to every row

import pprint
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst_dict = df.rdd.map(lambda row: row.asDict()).collect()
print(f"Dataframe is represented as:\n")
txt = pprint.pformat(lst_dict)
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Dataframe is represented as:

[{'characters': ['A', 'B'], 'integer': 1},
 {'characters': ['C', 'D'], 'integer': 2},
 {'characters': ['E', 'F'], 'integer': 3}]

1.18 To convert a column to a list using list comprehension

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = [k["integer"] for k in df.select("integer").rdd.collect()]
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.19 To convert a column to a list using Pandas

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").toPandas()["integer"].tolist()
print(f"Column \"integer\" has values: {lst}")
integer characters
1 [A, B]
2 [C, D]
3 [E, F]

Column "integer" has values: [1, 2, 3]

1.20 To display full width of a column (do not truncate)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("sentence", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["A", "very", "long", "sentence"],),
        (["with", "many", "words", "."],)]
df = spark.createDataFrame(schema=schema, data=data)

print("Truncated output (default behavior):")
df.show()

print("Truncated to 15 characters output:")
df.show(truncate=15)

print("Non-truncated output (show all):")
df.show(truncate=False)

Truncated output (default behavior):

sentence
[A, very, long, s…
[with, many, word…

Truncated to 15 characters output:

sentence
[A, very, lo…
[with, many,…

Non-truncated output (show all):

sentence
[A, very, long, sentence]
[with, many, words, .]

2 Filtering rows

2.1 To filter based on values of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", None),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", None),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print('Filter: F.col("Location" == "Home")')
dft = df.filter(F.col("Location") == "Home")
dft.show()

print('Filter: F.col("Quantity").isNull()')
dft = df.filter(F.col("Quantity").isNull())
dft.show()

print('Filter: F.col("Quantity").isNotNull()')
dft = df.filter(F.col("Quantity").isNotNull())
dft.show()

print('Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))')
dft = df.filter((F.col("Location") == "Home") & (F.col("Product") == "Laptop"))
dft.show()

print('Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))')
dft = df.filter((F.col("Location") == "Home") & ~(F.col("Product") == "Laptop"))
dft.show()

print('Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))')
dft = df.filter((F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))
dft.show()

print('Filter: F.col("Product").isin(["Laptop", "Mouse"])')
dft = df.filter(F.col("Product").isin(["Laptop", "Mouse"]))
dft.show()

Original dataframe:

Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9
Office Laptop null
Office Monitor 10
Office Mouse 9

Filter: F.col("Location" == "Home")

Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9

Filter: F.col("Quantity").isNull()

Location Product Quantity
Home Monitor null
Office Laptop null

Filter: F.col("Quantity").isNotNull()

Location Product Quantity
Home Laptop 12
Home Keyboard 9
Office Monitor 10
Office Mouse 9

Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))

Location Product Quantity
Home Laptop 12

Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))

Location Product Quantity
Home Monitor null
Home Keyboard 9

Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))

Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9

Filter: F.col("Product").isin(["Laptop", "Mouse"])

Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9

3 Array operations

3.1 To create arrays of different lengths

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B")
dft.show()
A B
[1, 2] [2, 3, 4, 5]
[4, 5, 6] [2, 3, 4, 5]

3.2 To calculate set difference

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)

dft = df.select("A", "B",
          F.array_except("A", "B").alias("A\B"),
          F.array_except("B", "A").alias("B\A"))
dft.show()
A B A\B B\A
[b, a, c] [c, d, a, f] [b] [d, f]

3.3 To calculate set union

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B",
          F.array_union("A", "B").alias("A U B"))
dft.show()
A B A U B
[b, a, c] [c, d, a, f] [b, a, c, d, f]

3.4 To calculate set intersection

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B", F.array_intersect("A", "B").alias("A \u2229 B"))
dft.show()
A B A ∩ B
[b, a, c] [c, d, a, f] [a, c]

3.5 To pad arrays with value

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
n = 4
fill_value = 0
df1 = df.withColumn("A_padding", F.expr(f"array_repeat({fill_value}, {n} - size(A))"))
df1 = df1.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df1.select("A", "A_padding", "A_padded")
dft.show()

df2 = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df2 = df2.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df2.select("A", "A_padding", "A_padded")
dft.show()
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]

3.6 To sum two arrays elementwise using F.element_at()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr('transform(A_padded, (element, index) -> element + element_at(B, index + 1))'))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]

3.7 To sum two arrays using F.arrays_zip()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr("transform(arrays_zip(A_padded, B), x -> x.A_padded + x.B)"))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]

3.8 To find mode of an array (most common element)

from collections import Counter
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 2, 4],),
        ([4, 5, 6, 7],),
        ([1, 1, 2, 2],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf
def udf_mode(x):
    return Counter(x).most_common(1)[0][0]

dft = df.withColumn("mode", udf_mode("A"))
dft.printSchema()
dft.show()

Schema of dft is:

root
 |-- A: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- mode: string (nullable = true)
A mode
[1, 2, 2, 4] 2
[4, 5, 6, 7] 4
[1, 1, 2, 2] 1

3.9 To calculate difference of two consecutive elements in an array

import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("id", T.StringType(), True),
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [("A", [4, 1, 0, 2]),
        ("B", [1, 0, 3, 1])]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff", diff_of_two_consecutive_elements(F.col("values")))
df.show()
df.printSchema()
id values diff
A [4, 1, 0, 2] [-3, -1, 2]
B [1, 0, 3, 1] [-1, 3, -2]

Schema of df is:

root
 |-- id: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- diff: array (nullable = true)
 |    |-- element: integer (containsNull = true)

3.10 To apply a function to every element of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words_with_suffixes", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen_10", "note_11", "bottle_12"],), (["apple_13", "orange_14", "lemon_15"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("words", F.transform("words_with_suffixes", lambda x: F.split(x, "_").getItem(0)))
df.show(truncate=False)
words_with_suffixes words
[pen_10, note_11, bottle_12] [pen, note, bottle]
[apple_13, orange_14, lemon_15] [apple, orange, lemon]

3.11 To deduplicate elements in an array (find unique/distinct elements)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen", "note", "pen"],), (["apple", "apple", "lemon"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("unique_words", F.array_distinct("words"))
df.show(truncate=False)
words unique_words
[pen, note, pen] [pen, note]
[apple, apple, lemon] [apple, lemon]

3.12 To create a map (dictionary) from two arrays (one with keys, one with values)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("keys", T.ArrayType(T.IntegerType()), True),
        T.StructField("values", T.ArrayType(T.StringType()), True),
    ]
)
data = [([1, 2, 3], ["A", "B", "C"])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("map_kv", F.map_from_arrays("keys", "values"))
df.show(truncate=False)
keys values map_kv
[1, 2, 3] [A, B, C] {1 -> A, 2 -> B, 3 -> C}

3.13 To calculate mean of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("mean", F.aggregate(
          "values",                                  # column
          F.lit(0),                                  # initialValue
          lambda acc, x: acc + x,                    # merge operation
          lambda acc: acc / F.size(F.col("values")), # finish
      ))
df.show()
values mean
[1, 2] 1.5
[4, 5, 6] 5.0

3.14 To find out whether an array has any negative elements

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("any_negative", F.exists("values", lambda x: x < 0))
df.show()
values any_negative
[1, -2] true
[4, 5, 6] false

3.15 To convert elements of an array to columns

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 3, 4],),
        ([5, 6, 7],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("first", F.col("A").getItem(0))
dft = df.select("A", "first", *[F.col("A").getItem(k).alias(f"element_{k+1}") for k in range(1,4)])
dft.show()
A first element_2 element_3 element_4
[1, 2, 3, 4] 1 2 3 4
[5, 6, 7] 5 6 7 null

3.16 To find location of the first occurence of an element in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([7, 4, 7],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("position", F.array_position(F.col("values"), 7))
df.show()
values position
[1, 7, 5] 2
[7, 4, 7] 1

3.17 To calculate moving difference of two consecutive elements in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 5],),
        ([4, 4, 6],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.pandas_udf(T.ArrayType(T.IntegerType()))
def diff2e(x: pd.Series) -> pd.Series:
    return x.apply(lambda x: (x[1:] - x[:-1]))

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff2e", diff2e(F.col("values")))
df = df.withColumn("ediff1d", diff_of_two_consecutive_elements(F.col("values")))
df.show()
values diff2e ediff1d
[1, 2, 5] [1, 3] [1, 3]
[4, 4, 6] [0, 2] [0, 2]

3.18 To slice an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5, 2],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("values[1:3]", F.slice("values", start=2, length=2))
df.show()
values values[1:3]
[1, 7, 5, 2] [7, 5]
[6, 4, 7, 3] [4, 7]

3.19 To slice an array dynamically

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)
start_idx = 2
df = df.withColumn("values[1:]", F.slice("values", start=2, length=(F.size("values") - F.lit(start_idx - 1))))
df.show()
values values[1:]
[1, 7, 5] [7, 5]
[6, 4, 7, 3] [4, 7, 3]

4 Text processing

4.1 To remove prefix from a string using a UDF

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("text", T.StringType(), True),
    ]
)
data = [("id_orange",),
        ("apple",)]
df = spark.createDataFrame(schema=schema, data=data)
remove_prefix = F.udf(lambda x: x[3:] if x[:3] == "id_" else x, T.StringType())
df = df.withColumn("no_prefix", remove_prefix(F.col("text")))
df.show()
text no_prefix
id_orange orange
apple apple

4.2 To split a string into letters (characters) using regex

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select('String', F.split('String', '(?!$)').alias("Characters"))
dft.show(truncate=False)
String Characters
This is [T, h, i, s, , i, s]

4.3 To concatenate columns with strings using a separator

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str_Concat", F.concat_ws( "_", "Str1", "Str2"))
df.show()
Str1 Str2 Str_Concat
This is a string This is_a string
on a different row on a_different row

4.4 To split a string into letters (characters) using split function

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
fsplit = F.expr("split(String, '')")
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)
String Characters
This is [T, h, i, s, , i, s]

4.5 To split a string into letters (characters) and remove last character

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is_"]]
df = spark.createDataFrame(schema=schema, data=data)
print("Using split function and remove last character:")
fsplit = "split(String, '')"
fsplit = F.expr(f'slice({fsplit}, 1, size({fsplit}) - 1)')
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)

Using split function and remove last character:

String Characters
This is_ [T, h, i, s, , i, s]

4.6 To append a string to all values in a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str1_with_prefix", F.concat(F.lit("Prefix_"), "Str1"))
dft = df.select("Str1", "Str1_with_prefix")
dft.show()
Str1 Str1_with_prefix
This is Prefix_This is
on a Prefix_on a

5 Time operations

5.1 To calculate cumulative sum of a column

import pandas as pd
from pyspark.sql import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = pd.DataFrame({'time': [0, 1, 2, 3, 4, 5],
                   'value': [False, False, True, False, True, True]})

df = spark.createDataFrame(df)
df = df.withColumn("cml_n_true", F.sum((F.col("value") == True).cast("int")).over(Window.orderBy(F.col("time").asc())))
df = df.withColumn("cml_n_false", F.sum((F.col("value") == False).cast("int")).over(Window.orderBy(F.col("time").asc())))
df.show()
time value cml_n_true cml_n_false
0 false 0 1
1 false 0 2
2 true 1 2
3 false 1 3
4 true 2 3
5 true 3 3

5.2 To convert Unix time stamp to human readable format

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("timestamp", T.LongType(), True),
    ]
)
data = [(1703224755,),
        (1703285602,)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("time_stamp_hrf", F.from_unixtime(F.col("timestamp")))
df.show()
timestamp time_stamp_hrf
1703224755 2023-12-22 06:59:15
1703285602 2023-12-22 23:53:22

6 Numerical operations

6.1 To find percentage of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Laptop", 12),
        ("Monitor", 7),
        ("Mouse", 8),
        ("Keyboard", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy())*100, 2))
dft = df.select("Product", "Quantity", "%").orderBy(F.desc("Quantity"))
dft.show()
Product Quantity %
Laptop 12 33.33
Keyboard 9 25.0
Mouse 8 22.22
Monitor 7 19.44

6.2 To find percentage of a column within a group using a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy("Location"))*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44

6.3 To find percentage of a column within a group using .groupBy() and a join

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df_sum = df.groupBy("Location").agg(F.sum("Quantity").alias("Total_Quantity"))
df = df.join(df_sum, on="Location").withColumn("%", F.round(F.col("Quantity")/F.col("Total_Quantity")*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44

6.4 To find maximum value of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
max_val = df.select("Quantity").rdd.max()[0]
print(f"Maximum value of Quantity: {max_val}")
Location Product Quantity
Home Laptop 12
Home Monitor 7
Home Mouse 8
Home Keyboard 9
Office Laptop 23
Office Monitor 10
Office Mouse 9

Maximum value of Quantity: 23

6.5 To add a column with count of elements per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("count_per_group", F.count(F.lit(1)).over(Window.partitionBy(F.col("Location"))))
df.show()
Location Product Quantity count_per_group
Home Laptop 12 4
Home Monitor 7 4
Home Mouse 8 4
Home Keyboard 9 4
Office Laptop 23 3
Office Monitor 10 3
Office Mouse 9 3

7 Dataframe join operations

7.1 To perform a full, outer, left, right join operations

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Score", T.IntegerType(), True),
    ]
)
data = [("Alice", 10),
        ("Bob", 11)
        ]
df_a = spark.createDataFrame(schema=schema, data=data)
print("Table A:")
df_a.show()

schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Surname", T.StringType(), True),
        T.StructField("Age", T.StringType(), True),
    ]
)
data = [("Alice", "Doe", 12),
        ("Alice", "Smith", 30),
        ("Jane", "Carter", 7),
]
df_b = spark.createDataFrame(schema=schema, data=data)
print("Table B:")
df_b.show()

df = df_a.join(df_b, on="Name", how="full")
print("Full join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="outer")
print("Outer join on 'Name':")
df.show()

df = df_a.join(df_b, df_a["Name"] == df_b["Name"])
print("Join on 'Name' on equal condition:")
df.show()

df = df_a.join(df_b, on="Name", how="inner")
print("Inner join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left")
print("Left join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_outer")
print("Left-outer join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_anti")
print("Left-anti join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_semi")
print("Left-semi join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right")
print("Right join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right_outer")
print("Right-outer join on 'Name':")
df.show()

Table A:

Name Score
Alice 10
Bob 11

Table B:

Name Surname Age
Alice Doe 12
Alice Smith 30
Jane Carter 7

Full join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7

Outer join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7

Join on 'Name' on equal condition:

Name Score Name Surname Age
Alice 10 Alice Doe 12
Alice 10 Alice Smith 30

Inner join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30

Left join on 'Name':

Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12

Left-outer join on 'Name':

Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12

Left-anti join on 'Name':

Name Score
Bob 11

Left-semi join on 'Name':

Name Score
Alice 10

Right join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7

Right-outer join on 'Name':

Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7

7.2 To drop one of the duplicate columns after join

from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df_a = spark.createDataFrame([
  Row(id=1, value="A1"),
  Row(id=1, value="B1"),
  Row(id=1, value="C1"),
  Row(id=2, value="A1"),
  Row(id=2, value="X1"),
  Row(id=2, value="Y1")]
)
print("Dataframe df_a:")
df_1.show()

df_b = spark.createDataFrame([
  Row(id=1, updated="A2"),
  Row(id=1, updated="B1"),
  Row(id=1, updated="C1"),
  Row(id=2, updated="A1"),
  Row(id=2, updated="X1"),
  Row(id=2, updated="Y1")]
)
print("Dataframe df_b:")
df_2.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full")
print("Full join on df_a['value'] == df_b['updated']:")
df.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full").drop(df_b["id"])
print("Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:")
df.show()

Dataframe df_a:

id value
1 A1
1 B1
1 C1
2 A1
2 X1
2 Y1

Dataframe df_b:

id updated
1 A2
1 B1
1 C1
2 A1
2 X1
2 Y1

Full join on df_a['value'] == df_b['updated']:

id value id updated
1.0 A1 null null
null null 1.0 A2
1.0 B1 1.0 B1
1.0 C1 1.0 C1
2.0 A1 2.0 A1
2.0 X1 2.0 X1
2.0 Y1 2.0 Y1

Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:

id value updated
1.0 A1 null
null null A2
1.0 B1 B1
1.0 C1 C1
2.0 A1 A1
2.0 X1 X1
2.0 Y1 Y1

8 Aggregation and maps

8.1 To group by and aggregate into a map using F.map_from_entries()

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show(truncate=False)
dft = df.groupBy("id").agg(F.map_from_entries(F.collect_list(
          F.struct("key", "value"))).alias("key_value")
)
print("Dataframe with key -> value mapping")
dft.show(truncate=False)
dft.printSchema()

Dataframe with keys and values:

id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1

Dataframe with key -> value mapping

id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {a -> A1, x -> X1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

8.2 To group by and aggregate into a map using UDF

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.types as T

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show()

@F.udf(returnType=T.MapType(T.StringType(), T.StringType()))
def map_array(column):
    return dict(column)

dft = (df.groupBy("id")
   .agg(F.collect_list(F.struct("key", "value")).alias("key_value"))
   .withColumn('key_value', map_array('key_value')))
print("Dataframe with keys and values:")
dft.show(truncate=False)
dft.printSchema()

Dataframe with keys and values:

id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1

Dataframe with keys and values:

id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {x -> X1, a -> A1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

8.3 To agregate over multiple columns and sum values of dictionaries

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

df_schema = T.StructType([T.StructField('clid', T.StringType(), True),
                        T.StructField('coef_1', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_2', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_3', T.MapType(T.StringType(), T.DoubleType(), True), False)])
df_data = [["X", {'B': 0.4, 'C': 0.4}, {'B': 0.33, 'C': 0.5}, {'A': 0.5, 'C': 0.33}],
           ["Y", {'B': 0.67, 'C': 0.33}, {'B': 0.85}, {'A': 0.4, 'C': 0.57}],
           ]
spark = SparkSession.builder\
        .appName("Parse DataFrame Schema")\
        .getOrCreate()
df = spark.createDataFrame(data=df_data, schema=df_schema)

df = df.withColumn("coef_total", F.col("coef_1"))
for i in range(2,4):
    df = df.withColumn("coef_total", F.map_zip_with("coef_total", f"coef_{i}",
                      lambda k, v1, v2: F.when(v1.isNull(), 0).otherwise(v1) + F.when(v2.isNull(), 0).otherwise(v2)))
df.show(truncate=False)
clid coef_1 coef_2 coef_3 coef_total
X {B -> 0.4, C -> 0.4} {B -> 0.33, C -> 0.5} {A -> 0.5, C -> 0.33} {B -> 0.73, C -> 1.23, A -> 0.5}
Y {B -> 0.67, C -> 0.33} {B -> 0.85} {A -> 0.4, C -> 0.57} {B -> 1.52, C -> 0.8999999999999999, A -> 0.4}

9 Sampling rows

9.1 To sample rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),
        (4, "Home"),
        (5, "Office"),
        (6, "Office"),
        (7, "Office"),
        (8, "Mall"),
        (9, "Mall"),
        (10, "School")]
df = spark.createDataFrame(schema=schema, data=data).repartition(3)
df = df.withColumn("partition", F.spark_partition_id()).orderBy("index")
print("Original dataframe:")
df.show()

print("Sampled dataframe:")
dft = df.sample(fraction=0.5, seed=1).orderBy("index")
dft.show()

Original dataframe:

index value partition
1 Home 1
2 School 0
3 Home 0
4 Home 2
5 Office 2
6 Office 2
7 Office 1
8 Mall 0
9 Mall 1
10 School 0

Sampled dataframe:

index value partition
3 Home 0
4 Home 2
7 Office 1
8 Mall 0
9 Mall 1

10 UUID generation

10.1 To generate a UUID for every row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import random
import uuid

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
    ]
)
data = [["Alice"],
        ["Bon"],
        ["John"],
        ["Cecile"]
        ]
df = spark.createDataFrame(schema=schema, data=data).repartition(2)

def _generate_uuid(uuid_gen, v=10):
    def _replace_byte(value: int, byte: int):
        byte = byte & 0xF
        bit_shift = 76
        mask = ~(0xF << bit_shift)
        return value & mask | (byte << bit_shift)

    uuid_ = uuid_gen.generate()
    return uuid.UUID(int=(_replace_byte(uuid_.int, v)))

class RandomDistributedUUIDGenerator:
    def generate(self):
        return uuid.uuid4()

class SeedBasedUUIDGenerator:
    def __init__(self, seed):
        self.rnd = random.Random(seed)

    def generate(self):
        return uuid.UUID(int=self.rnd.getrandbits(128), version=4)

gen = RandomDistributedUUIDGenerator()
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_random_distributed", udf_generate_uuid())

seed_for_rng = 1
gen = SeedBasedUUIDGenerator(seed_for_rng)
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_seed_based", udf_generate_uuid())

print("The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.")
df.show(truncate=False)

The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.

Name UUID_random_distributed UUID_seed_based
John 4e9a3bb1-a189-a25e-8389-7f8382635b09 cd613e30-d8f1-aadf-91b7-584a2265b1f5
Bon 16cd1549-0c74-a483-9bbe-707e59e0796f 1e2feb89-414c-a43c-9027-c4d1c386bbc4
Cecile b8b05619-6004-aa75-b98b-7e1c83c9f301 cd613e30-d8f1-aadf-91b7-584a2265b1f5
Alice b1f1a9fb-feb9-a946-9171-3e7cb577fdaa 1e2feb89-414c-a43c-9027-c4d1c386bbc4

Author: Altynbek Isabekov

Created: 2024-04-18 Thu 19:50

Validate