PySpark Cookbook
PySpark Cookbook
- 1. Introduction
- 1.1. To create an empty dataframe
- 1.2. To create a dataframe with columns key and value from a dictionary
- 1.3. To duplicate a column
- 1.4. To rename a column using
.withColumnRenamed()
- 1.5. To rename a column using
.withColumnsRenamed()
- 1.6. To rename a column using
.select()
- 1.7. To rename columns by adding a prefix
- 1.8. To drop columns from a dataframe
- 1.9. To subset columns of a dataframe
- 1.10. To add a column with a constant value using
F.lit()
- 1.11. To add a column with a constant value using
.select()
- 1.12. To create a dataframe from a list of tuples
- 1.13. To get the number of rows of a dataframe
- 1.14. To select first N rows
- 1.15. To deduplicate rows
- 1.16. To convert a column to a list using a lambda function
- 1.17. To convert a dataframe to a list of dictionaries corresponding to every row
- 1.18. To convert a column to a list using list comprehension
- 1.19. To convert a column to a list using Pandas
- 1.20. To display full width of a column (do not truncate)
- 2. Filtering rows
- 3. Array operations
- 3.1. To create arrays of different lengths
- 3.2. To calculate set difference
- 3.3. To calculate set union
- 3.4. To calculate set intersection
- 3.5. To pad arrays with value
- 3.6. To sum two arrays elementwise using
F.element_at()
- 3.7. To sum two arrays using
F.arrays_zip()
- 3.8. To find mode of an array (most common element)
- 3.9. To calculate difference of two consecutive elements in an array
- 3.10. To apply a function to every element of an array
- 3.11. To deduplicate elements in an array (find unique/distinct elements)
- 3.12. To create a map (dictionary) from two arrays (one with keys, one with values)
- 3.13. To calculate mean of an array
- 3.14. To find out whether an array has any negative elements
- 3.15. To convert elements of an array to columns
- 3.16. To find location of the first occurence of an element in an array
- 3.17. To calculate moving difference of two consecutive elements in an array
- 3.18. To slice an array
- 3.19. To slice an array dynamically
- 4. Text processing
- 4.1. To remove prefix from a string using a UDF
- 4.2. To split a string into letters (characters) using regex
- 4.3. To concatenate columns with strings using a separator
- 4.4. To split a string into letters (characters) using split function
- 4.5. To split a string into letters (characters) and remove last character
- 4.6. To append a string to all values in a column
- 5. Time operations
- 6. Numerical operations
- 7. Dataframe join operations
- 8. Aggregation and maps
- 9. Sampling rows
- 10. UUID generation
1 Introduction
1.1 To create an empty dataframe
import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.StringType()), True), T.StructField("B", T.ArrayType(T.StringType()), True), ] ) data = [] df = spark.createDataFrame(schema=schema, data=data) df.show()
A | B |
1.2 To create a dataframe with columns key and value from a dictionary
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) df.show()
key | value |
---|---|
key1 | value1 |
key2 | value2 |
1.3 To duplicate a column
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) df = df.withColumn("value_dup", F.col("value")) df.show()
key | value | value_dup |
---|---|---|
key1 | value1 | value1 |
key2 | value2 | value2 |
1.4 To rename a column using .withColumnRenamed()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.withColumnRenamed("key", "new_key") \ .withColumnRenamed("value","new_value") print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
new_key | new_value |
---|---|
key1 | value1 |
key2 | value2 |
1.5 To rename a column using .withColumnsRenamed()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.withColumnsRenamed({"key": "new_key", "value": "new_value"}) print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
new_key | new_value |
---|---|
key1 | value1 |
key2 | value2 |
1.6 To rename a column using .select()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.select(F.col("key").alias("new_key"), F.col("value").alias("new_value")) print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
new_key | new_value |
---|---|
key1 | value1 |
key2 | value2 |
1.7 To rename columns by adding a prefix
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("index", T.IntegerType(), True), T.StructField("value", T.StringType(), True), ] ) data = [(1, "Home"), (2, "School"), (3, "Home"),] df = spark.createDataFrame(schema=schema, data=data) print("Original dataframe:") df.show() print("Dataframe with renamed columns:") df = df.select(*[F.col(k).alias(f"prefix_{k}") for k in df.columns]) df.show()
Original dataframe:
index | value |
---|---|
1 | Home |
2 | School |
3 | Home |
Dataframe with renamed columns:
prefix_index | prefix_value |
---|---|
1 | Home |
2 | School |
3 | Home |
1.8 To drop columns from a dataframe
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) df = df.withColumn("const", F.lit(1)) print("Original dataframe:") df.show() df = df.drop("value", "const") print("Modified dataframe:") df.show()
Original dataframe:
key | value | const |
---|---|---|
key1 | value1 | 1 |
key2 | value2 | 1 |
Modified dataframe:
key |
---|
key1 |
key2 |
1.9 To subset columns of a dataframe
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) df = df.withColumn("const", F.lit(1)) print("Original dataframe:") df.show() print("Subset 'key', 'value' columns:") df["key", "value"].show() print("Subset 'key', 'const' columns:") df.select("key", "const").show()
Original dataframe:
key | value | const |
---|---|---|
key1 | value1 | 1 |
key2 | value2 | 1 |
Subset 'key', 'value' columns:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Subset 'key', 'const' columns:
key | const |
---|---|
key1 | 1 |
key2 | 1 |
1.10 To add a column with a constant value using F.lit()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.withColumn("const_integer", F.lit(1)) df = df.withColumn("const_string", F.lit("string")) print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
key | value | const_integer | const_string |
---|---|---|---|
key1 | value1 | 1 | string |
key2 | value2 | 1 | string |
1.11 To add a column with a constant value using .select()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.select("key", "value", F.lit("const_str").alias("constant_value")) print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
key | value | constant_value |
---|---|---|
key1 | value1 | const_str |
key2 | value2 | const_str |
1.12 To create a dataframe from a list of tuples
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show()
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
1.13 To get the number of rows of a dataframe
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() num_rows = df.count() print(f"df has {num_rows} rows")
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
df has 3 rows
1.14 To select first N rows
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() print("These are first 2 rows:") df.limit(2).show()
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
These are first 2 rows:
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
1.15 To deduplicate rows
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("key", T.IntegerType(), True), T.StructField("value", T.StringType(), True), T.StructField("comment", T.StringType(), True), ] ) data = [(1, "Home", "a house"), (1, "Home", "a house"), (2, "School", "a building"), (2, "School", "a house"), (3, "Home", "a house"),] df = spark.createDataFrame(schema=schema, data=data) print("Original dataframe:") df.show() print("Dataframe with distinct rows:") df.distinct().show() print("Dataframe with dropped duplicate rows:") df.dropDuplicates().show() print("Dataframe with dropped duplicates in columns 'key' and 'value':") df = df.dropDuplicates(subset=["key", "value"]) df.show()
Original dataframe:
key | value | comment |
---|---|---|
1 | Home | a house |
1 | Home | a house |
2 | School | a building |
2 | School | a house |
3 | Home | a house |
Dataframe with distinct rows:
key | value | comment |
---|---|---|
2 | School | a house |
3 | Home | a house |
2 | School | a building |
1 | Home | a house |
Dataframe with dropped duplicate rows:
key | value | comment |
---|---|---|
2 | School | a house |
3 | Home | a house |
2 | School | a building |
1 | Home | a house |
Dataframe with dropped duplicates in columns 'key' and 'value':
key | value | comment |
---|---|---|
1 | Home | a house |
2 | School | a building |
3 | Home | a house |
1.16 To convert a column to a list using a lambda function
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() lst = df.select("integer").rdd.map(lambda r: r[0]).collect() print(f"Column \"integer\" has values: {lst}")
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
Column "integer" has values: [1, 2, 3]
1.17 To convert a dataframe to a list of dictionaries corresponding to every row
import pprint from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() lst_dict = df.rdd.map(lambda row: row.asDict()).collect() print(f"Dataframe is represented as:\n") txt = pprint.pformat(lst_dict)
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
Dataframe is represented as:
[{'characters': ['A', 'B'], 'integer': 1}, {'characters': ['C', 'D'], 'integer': 2}, {'characters': ['E', 'F'], 'integer': 3}]
1.18 To convert a column to a list using list comprehension
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() lst = [k["integer"] for k in df.select("integer").rdd.collect()] print(f"Column \"integer\" has values: {lst}")
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
Column "integer" has values: [1, 2, 3]
1.19 To convert a column to a list using Pandas
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() lst = df.select("integer").toPandas()["integer"].tolist() print(f"Column \"integer\" has values: {lst}")
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
Column "integer" has values: [1, 2, 3]
1.20 To display full width of a column (do not truncate)
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("sentence", T.ArrayType(T.StringType()), True), ] ) data = [(["A", "very", "long", "sentence"],), (["with", "many", "words", "."],)] df = spark.createDataFrame(schema=schema, data=data) print("Truncated output (default behavior):") df.show() print("Truncated to 15 characters output:") df.show(truncate=15) print("Non-truncated output (show all):") df.show(truncate=False)
Truncated output (default behavior):
sentence |
---|
[A, very, long, s… |
[with, many, word… |
Truncated to 15 characters output:
sentence |
---|
[A, very, lo… |
[with, many,… |
Non-truncated output (show all):
sentence |
---|
[A, very, long, sentence] |
[with, many, words, .] |
2 Filtering rows
2.1 To filter based on values of a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", None), ("Home", "Keyboard", 9), ("Office", "Laptop", None), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) print("Original dataframe:") df.show() print('Filter: F.col("Location" == "Home")') dft = df.filter(F.col("Location") == "Home") dft.show() print('Filter: F.col("Quantity").isNull()') dft = df.filter(F.col("Quantity").isNull()) dft.show() print('Filter: F.col("Quantity").isNotNull()') dft = df.filter(F.col("Quantity").isNotNull()) dft.show() print('Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))') dft = df.filter((F.col("Location") == "Home") & (F.col("Product") == "Laptop")) dft.show() print('Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))') dft = df.filter((F.col("Location") == "Home") & ~(F.col("Product") == "Laptop")) dft.show() print('Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))') dft = df.filter((F.col("Product") == "Laptop") | (F.col("Product") == "Mouse")) dft.show() print('Filter: F.col("Product").isin(["Laptop", "Mouse"])') dft = df.filter(F.col("Product").isin(["Laptop", "Mouse"])) dft.show()
Original dataframe:
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Home | Monitor | null |
Home | Keyboard | 9 |
Office | Laptop | null |
Office | Monitor | 10 |
Office | Mouse | 9 |
Filter: F.col("Location" == "Home")
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Home | Monitor | null |
Home | Keyboard | 9 |
Filter: F.col("Quantity").isNull()
Location | Product | Quantity |
---|---|---|
Home | Monitor | null |
Office | Laptop | null |
Filter: F.col("Quantity").isNotNull()
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Home | Keyboard | 9 |
Office | Monitor | 10 |
Office | Mouse | 9 |
Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))
Location | Product | Quantity |
---|---|---|
Home | Monitor | null |
Home | Keyboard | 9 |
Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Office | Laptop | null |
Office | Mouse | 9 |
Filter: F.col("Product").isin(["Laptop", "Mouse"])
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Office | Laptop | null |
Office | Mouse | 9 |
3 Array operations
3.1 To create arrays of different lengths
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), T.StructField("B", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2], [2, 3, 4, 5]), ([4, 5, 6], [2, 3, 4, 5])] df = spark.createDataFrame(schema=schema, data=data) dft = df.select("A", "B") dft.show()
A | B |
---|---|
[1, 2] | [2, 3, 4, 5] |
[4, 5, 6] | [2, 3, 4, 5] |
3.2 To calculate set difference
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.StringType()), True), T.StructField("B", T.ArrayType(T.StringType()), True), ] ) data = [(["b", "a", "c"], ["c", "d", "a", "f"])] df = spark.createDataFrame(schema=schema, data=data) dft = df.select("A", "B", F.array_except("A", "B").alias("A\B"), F.array_except("B", "A").alias("B\A")) dft.show()
A | B | A\B | B\A |
---|---|---|---|
[b, a, c] | [c, d, a, f] | [b] | [d, f] |
3.3 To calculate set union
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.StringType()), True), T.StructField("B", T.ArrayType(T.StringType()), True), ] ) data = [(["b", "a", "c"], ["c", "d", "a", "f"])] df = spark.createDataFrame(schema=schema, data=data) dft = df.select("A", "B", F.array_union("A", "B").alias("A U B")) dft.show()
A | B | A U B |
---|---|---|
[b, a, c] | [c, d, a, f] | [b, a, c, d, f] |
3.4 To calculate set intersection
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.StringType()), True), T.StructField("B", T.ArrayType(T.StringType()), True), ] ) data = [(["b", "a", "c"], ["c", "d", "a", "f"])] df = spark.createDataFrame(schema=schema, data=data) dft = df.select("A", "B", F.array_intersect("A", "B").alias("A \u2229 B")) dft.show()
A | B | A ∩ B |
---|---|---|
[b, a, c] | [c, d, a, f] | [a, c] |
3.5 To pad arrays with value
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), T.StructField("B", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2], [2, 3, 4, 5]), ([4, 5, 6], [2, 3, 4, 5])] df = spark.createDataFrame(schema=schema, data=data) n = 4 fill_value = 0 df1 = df.withColumn("A_padding", F.expr(f"array_repeat({fill_value}, {n} - size(A))")) df1 = df1.withColumn("A_padded", F.concat("A", "A_padding")) dft = df1.select("A", "A_padding", "A_padded") dft.show() df2 = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A"))) df2 = df2.withColumn("A_padded", F.concat("A", "A_padding")) dft = df2.select("A", "A_padding", "A_padded") dft.show()
A | A_padding | A_padded |
---|---|---|
[1, 2] | [0, 0] | [1, 2, 0, 0] |
[4, 5, 6] | [0] | [4, 5, 6, 0] |
A | A_padding | A_padded |
---|---|---|
[1, 2] | [0, 0] | [1, 2, 0, 0] |
[4, 5, 6] | [0] | [4, 5, 6, 0] |
3.6 To sum two arrays elementwise using F.element_at()
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), T.StructField("B", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2], [2, 3, 4, 5]), ([4, 5, 6], [2, 3, 4, 5])] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A"))) df = df.withColumn("A_padded", F.concat("A", "A_padding")) df = df.withColumn("AB_sum", F.expr('transform(A_padded, (element, index) -> element + element_at(B, index + 1))')) dft = df.select("A", "A_padded", "B", "AB_sum") dft.show()
A | A_padded | B | AB_sum |
---|---|---|---|
[1, 2] | [1, 2, 0, 0] | [2, 3, 4, 5] | [3, 5, 4, 5] |
[4, 5, 6] | [4, 5, 6, 0] | [2, 3, 4, 5] | [6, 8, 10, 5] |
3.7 To sum two arrays using F.arrays_zip()
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), T.StructField("B", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2], [2, 3, 4, 5]), ([4, 5, 6], [2, 3, 4, 5])] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A"))) df = df.withColumn("A_padded", F.concat("A", "A_padding")) df = df.withColumn("AB_sum", F.expr("transform(arrays_zip(A_padded, B), x -> x.A_padded + x.B)")) dft = df.select("A", "A_padded", "B", "AB_sum") dft.show()
A | A_padded | B | AB_sum |
---|---|---|---|
[1, 2] | [1, 2, 0, 0] | [2, 3, 4, 5] | [3, 5, 4, 5] |
[4, 5, 6] | [4, 5, 6, 0] | [2, 3, 4, 5] | [6, 8, 10, 5] |
3.8 To find mode of an array (most common element)
from collections import Counter import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2, 2, 4],), ([4, 5, 6, 7],), ([1, 1, 2, 2],)] df = spark.createDataFrame(schema=schema, data=data) @F.udf def udf_mode(x): return Counter(x).most_common(1)[0][0] dft = df.withColumn("mode", udf_mode("A")) dft.printSchema() dft.show()
Schema of dft
is:
root |-- A: array (nullable = true) | |-- element: integer (containsNull = true) |-- mode: string (nullable = true)
A | mode |
---|---|
[1, 2, 2, 4] | 2 |
[4, 5, 6, 7] | 4 |
[1, 1, 2, 2] | 1 |
3.9 To calculate difference of two consecutive elements in an array
import numpy as np import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("id", T.StringType(), True), T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [("A", [4, 1, 0, 2]), ("B", [1, 0, 3, 1])] df = spark.createDataFrame(schema=schema, data=data) @F.udf(returnType=T.ArrayType(T.IntegerType())) def diff_of_two_consecutive_elements(x): return np.ediff1d(np.array(x)).tolist() df = df.withColumn("diff", diff_of_two_consecutive_elements(F.col("values"))) df.show() df.printSchema()
id | values | diff |
---|---|---|
A | [4, 1, 0, 2] | [-3, -1, 2] |
B | [1, 0, 3, 1] | [-1, 3, -2] |
Schema of df
is:
root |-- id: string (nullable = true) |-- values: array (nullable = true) | |-- element: integer (containsNull = true) |-- diff: array (nullable = true) | |-- element: integer (containsNull = true)
3.10 To apply a function to every element of an array
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("words_with_suffixes", T.ArrayType(T.StringType()), True) ] ) data = [(["pen_10", "note_11", "bottle_12"],), (["apple_13", "orange_14", "lemon_15"],),] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("words", F.transform("words_with_suffixes", lambda x: F.split(x, "_").getItem(0))) df.show(truncate=False)
words_with_suffixes | words |
---|---|
[pen_10, note_11, bottle_12] | [pen, note, bottle] |
[apple_13, orange_14, lemon_15] | [apple, orange, lemon] |
3.11 To deduplicate elements in an array (find unique/distinct elements)
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("words", T.ArrayType(T.StringType()), True) ] ) data = [(["pen", "note", "pen"],), (["apple", "apple", "lemon"],),] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("unique_words", F.array_distinct("words")) df.show(truncate=False)
words | unique_words |
---|---|
[pen, note, pen] | [pen, note] |
[apple, apple, lemon] | [apple, lemon] |
3.12 To create a map (dictionary) from two arrays (one with keys, one with values)
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("keys", T.ArrayType(T.IntegerType()), True), T.StructField("values", T.ArrayType(T.StringType()), True), ] ) data = [([1, 2, 3], ["A", "B", "C"])] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("map_kv", F.map_from_arrays("keys", "values")) df.show(truncate=False)
keys | values | map_kv |
---|---|---|
[1, 2, 3] | [A, B, C] | {1 -> A, 2 -> B, 3 -> C} |
3.13 To calculate mean of an array
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2],), ([4, 5, 6],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("mean", F.aggregate( "values", # column F.lit(0), # initialValue lambda acc, x: acc + x, # merge operation lambda acc: acc / F.size(F.col("values")), # finish )) df.show()
values | mean |
---|---|
[1, 2] | 1.5 |
[4, 5, 6] | 5.0 |
3.14 To find out whether an array has any negative elements
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, -2],), ([4, 5, 6],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("any_negative", F.exists("values", lambda x: x < 0)) df.show()
values | any_negative |
---|---|
[1, -2] | true |
[4, 5, 6] | false |
3.15 To convert elements of an array to columns
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2, 3, 4],), ([5, 6, 7],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("first", F.col("A").getItem(0)) dft = df.select("A", "first", *[F.col("A").getItem(k).alias(f"element_{k+1}") for k in range(1,4)]) dft.show()
A | first | element_2 | element_3 | element_4 |
---|---|---|---|---|
[1, 2, 3, 4] | 1 | 2 | 3 | 4 |
[5, 6, 7] | 5 | 6 | 7 | null |
3.16 To find location of the first occurence of an element in an array
import pyspark.sql.functions as F import pyspark.sql.types as T import pandas as pd from pyspark.sql import SparkSession import numpy as np spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 7, 5],), ([7, 4, 7],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("position", F.array_position(F.col("values"), 7)) df.show()
values | position |
---|---|
[1, 7, 5] | 2 |
[7, 4, 7] | 1 |
3.17 To calculate moving difference of two consecutive elements in an array
import pyspark.sql.functions as F import pyspark.sql.types as T import pandas as pd from pyspark.sql import SparkSession import numpy as np spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2, 5],), ([4, 4, 6],)] df = spark.createDataFrame(schema=schema, data=data) @F.pandas_udf(T.ArrayType(T.IntegerType())) def diff2e(x: pd.Series) -> pd.Series: return x.apply(lambda x: (x[1:] - x[:-1])) @F.udf(returnType=T.ArrayType(T.IntegerType())) def diff_of_two_consecutive_elements(x): return np.ediff1d(np.array(x)).tolist() df = df.withColumn("diff2e", diff2e(F.col("values"))) df = df.withColumn("ediff1d", diff_of_two_consecutive_elements(F.col("values"))) df.show()
values | diff2e | ediff1d |
---|---|---|
[1, 2, 5] | [1, 3] | [1, 3] |
[4, 4, 6] | [0, 2] | [0, 2] |
3.18 To slice an array
import pyspark.sql.functions as F import pyspark.sql.types as T import pandas as pd from pyspark.sql import SparkSession import numpy as np spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 7, 5, 2],), ([6, 4, 7, 3],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("values[1:3]", F.slice("values", start=2, length=2)) df.show()
values | values[1:3] |
---|---|
[1, 7, 5, 2] | [7, 5] |
[6, 4, 7, 3] | [4, 7] |
3.19 To slice an array dynamically
import pyspark.sql.functions as F import pyspark.sql.types as T import pandas as pd from pyspark.sql import SparkSession import numpy as np spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 7, 5],), ([6, 4, 7, 3],)] df = spark.createDataFrame(schema=schema, data=data) start_idx = 2 df = df.withColumn("values[1:]", F.slice("values", start=2, length=(F.size("values") - F.lit(start_idx - 1)))) df.show()
values | values[1:] |
---|---|
[1, 7, 5] | [7, 5] |
[6, 4, 7, 3] | [4, 7, 3] |
4 Text processing
4.1 To remove prefix from a string using a UDF
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("text", T.StringType(), True), ] ) data = [("id_orange",), ("apple",)] df = spark.createDataFrame(schema=schema, data=data) remove_prefix = F.udf(lambda x: x[3:] if x[:3] == "id_" else x, T.StringType()) df = df.withColumn("no_prefix", remove_prefix(F.col("text"))) df.show()
text | no_prefix |
---|---|
id_orange | orange |
apple | apple |
4.2 To split a string into letters (characters) using regex
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("String", T.StringType(), True) ] ) data = [["This is"]] df = spark.createDataFrame(schema=schema, data=data) dft = df.select('String', F.split('String', '(?!$)').alias("Characters")) dft.show(truncate=False)
String | Characters |
---|---|
This is | [T, h, i, s, , i, s] |
4.3 To concatenate columns with strings using a separator
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Str1", T.StringType(), True), T.StructField("Str2", T.StringType(), True) ] ) data = [("This is", "a string"), ("on a", "different row")] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("Str_Concat", F.concat_ws( "_", "Str1", "Str2")) df.show()
Str1 | Str2 | Str_Concat |
---|---|---|
This is | a string | This is_a string |
on a | different row | on a_different row |
4.4 To split a string into letters (characters) using split function
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("String", T.StringType(), True) ] ) data = [["This is"]] df = spark.createDataFrame(schema=schema, data=data) fsplit = F.expr("split(String, '')") dft = df.select('String', fsplit.alias("Characters")) dft.show(truncate=False)
String | Characters |
---|---|
This is | [T, h, i, s, , i, s] |
4.5 To split a string into letters (characters) and remove last character
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("String", T.StringType(), True) ] ) data = [["This is_"]] df = spark.createDataFrame(schema=schema, data=data) print("Using split function and remove last character:") fsplit = "split(String, '')" fsplit = F.expr(f'slice({fsplit}, 1, size({fsplit}) - 1)') dft = df.select('String', fsplit.alias("Characters")) dft.show(truncate=False)
Using split function and remove last character:
String | Characters |
---|---|
This is_ | [T, h, i, s, , i, s] |
4.6 To append a string to all values in a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Str1", T.StringType(), True), T.StructField("Str2", T.StringType(), True) ] ) data = [("This is", "a string"), ("on a", "different row")] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("Str1_with_prefix", F.concat(F.lit("Prefix_"), "Str1")) dft = df.select("Str1", "Str1_with_prefix") dft.show()
Str1 | Str1_with_prefix |
---|---|
This is | Prefix_This is |
on a | Prefix_on a |
5 Time operations
5.1 To calculate cumulative sum of a column
import pandas as pd from pyspark.sql import Window import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() df = pd.DataFrame({'time': [0, 1, 2, 3, 4, 5], 'value': [False, False, True, False, True, True]}) df = spark.createDataFrame(df) df = df.withColumn("cml_n_true", F.sum((F.col("value") == True).cast("int")).over(Window.orderBy(F.col("time").asc()))) df = df.withColumn("cml_n_false", F.sum((F.col("value") == False).cast("int")).over(Window.orderBy(F.col("time").asc()))) df.show()
time | value | cml_n_true | cml_n_false |
---|---|---|---|
0 | false | 0 | 1 |
1 | false | 0 | 2 |
2 | true | 1 | 2 |
3 | false | 1 | 3 |
4 | true | 2 | 3 |
5 | true | 3 | 3 |
5.2 To convert Unix time stamp to human readable format
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("timestamp", T.LongType(), True), ] ) data = [(1703224755,), (1703285602,)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("time_stamp_hrf", F.from_unixtime(F.col("timestamp"))) df.show()
timestamp | time_stamp_hrf |
---|---|
1703224755 | 2023-12-22 06:59:15 |
1703285602 | 2023-12-22 23:53:22 |
6 Numerical operations
6.1 To find percentage of a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Laptop", 12), ("Monitor", 7), ("Mouse", 8), ("Keyboard", 9)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy())*100, 2)) dft = df.select("Product", "Quantity", "%").orderBy(F.desc("Quantity")) dft.show()
Product | Quantity | % |
---|---|---|
Laptop | 12 | 33.33 |
Keyboard | 9 | 25.0 |
Mouse | 8 | 22.22 |
Monitor | 7 | 19.44 |
6.2 To find percentage of a column within a group using a window
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy("Location"))*100, 2)) dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity")) dft.show()
Location | Product | Quantity | % |
---|---|---|---|
Office | Laptop | 23 | 54.76 |
Office | Monitor | 10 | 23.81 |
Office | Mouse | 9 | 21.43 |
Home | Laptop | 12 | 33.33 |
Home | Keyboard | 9 | 25.0 |
Home | Mouse | 8 | 22.22 |
Home | Monitor | 7 | 19.44 |
6.3 To find percentage of a column within a group using .groupBy()
and a join
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) df_sum = df.groupBy("Location").agg(F.sum("Quantity").alias("Total_Quantity")) df = df.join(df_sum, on="Location").withColumn("%", F.round(F.col("Quantity")/F.col("Total_Quantity")*100, 2)) dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity")) dft.show()
Location | Product | Quantity | % |
---|---|---|---|
Office | Laptop | 23 | 54.76 |
Office | Monitor | 10 | 23.81 |
Office | Mouse | 9 | 21.43 |
Home | Laptop | 12 | 33.33 |
Home | Keyboard | 9 | 25.0 |
Home | Mouse | 8 | 22.22 |
Home | Monitor | 7 | 19.44 |
6.4 To find maximum value of a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) df.show() max_val = df.select("Quantity").rdd.max()[0] print(f"Maximum value of Quantity: {max_val}")
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Home | Monitor | 7 |
Home | Mouse | 8 |
Home | Keyboard | 9 |
Office | Laptop | 23 |
Office | Monitor | 10 |
Office | Mouse | 9 |
Maximum value of Quantity: 23
6.5 To add a column with count of elements per group
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("count_per_group", F.count(F.lit(1)).over(Window.partitionBy(F.col("Location")))) df.show()
Location | Product | Quantity | count_per_group |
---|---|---|---|
Home | Laptop | 12 | 4 |
Home | Monitor | 7 | 4 |
Home | Mouse | 8 | 4 |
Home | Keyboard | 9 | 4 |
Office | Laptop | 23 | 3 |
Office | Monitor | 10 | 3 |
Office | Mouse | 9 | 3 |
7 Dataframe join operations
7.1 To perform a full, outer, left, right join operations
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Name", T.StringType(), True), T.StructField("Score", T.IntegerType(), True), ] ) data = [("Alice", 10), ("Bob", 11) ] df_a = spark.createDataFrame(schema=schema, data=data) print("Table A:") df_a.show() schema = T.StructType( [ T.StructField("Name", T.StringType(), True), T.StructField("Surname", T.StringType(), True), T.StructField("Age", T.StringType(), True), ] ) data = [("Alice", "Doe", 12), ("Alice", "Smith", 30), ("Jane", "Carter", 7), ] df_b = spark.createDataFrame(schema=schema, data=data) print("Table B:") df_b.show() df = df_a.join(df_b, on="Name", how="full") print("Full join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="outer") print("Outer join on 'Name':") df.show() df = df_a.join(df_b, df_a["Name"] == df_b["Name"]) print("Join on 'Name' on equal condition:") df.show() df = df_a.join(df_b, on="Name", how="inner") print("Inner join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="left") print("Left join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="left_outer") print("Left-outer join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="left_anti") print("Left-anti join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="left_semi") print("Left-semi join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="right") print("Right join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="right_outer") print("Right-outer join on 'Name':") df.show()
Table A:
Name | Score |
---|---|
Alice | 10 |
Bob | 11 |
Table B:
Name | Surname | Age |
---|---|---|
Alice | Doe | 12 |
Alice | Smith | 30 |
Jane | Carter | 7 |
Full join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Bob | 11 | null | null |
Jane | null | Carter | 7 |
Outer join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Bob | 11 | null | null |
Jane | null | Carter | 7 |
Join on 'Name' on equal condition:
Name | Score | Name | Surname | Age |
---|---|---|---|---|
Alice | 10 | Alice | Doe | 12 |
Alice | 10 | Alice | Smith | 30 |
Inner join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Left join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Bob | 11 | null | null |
Alice | 10 | Smith | 30 |
Alice | 10 | Doe | 12 |
Left-outer join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Bob | 11 | null | null |
Alice | 10 | Smith | 30 |
Alice | 10 | Doe | 12 |
Left-anti join on 'Name':
Name | Score |
---|---|
Bob | 11 |
Left-semi join on 'Name':
Name | Score |
---|---|
Alice | 10 |
Right join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Jane | null | Carter | 7 |
Right-outer join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Jane | null | Carter | 7 |
7.2 To drop one of the duplicate columns after join
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() df_a = spark.createDataFrame([ Row(id=1, value="A1"), Row(id=1, value="B1"), Row(id=1, value="C1"), Row(id=2, value="A1"), Row(id=2, value="X1"), Row(id=2, value="Y1")] ) print("Dataframe df_a:") df_1.show() df_b = spark.createDataFrame([ Row(id=1, updated="A2"), Row(id=1, updated="B1"), Row(id=1, updated="C1"), Row(id=2, updated="A1"), Row(id=2, updated="X1"), Row(id=2, updated="Y1")] ) print("Dataframe df_b:") df_2.show() df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full") print("Full join on df_a['value'] == df_b['updated']:") df.show() df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full").drop(df_b["id"]) print("Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:") df.show()
Dataframe df_a
:
id | value |
---|---|
1 | A1 |
1 | B1 |
1 | C1 |
2 | A1 |
2 | X1 |
2 | Y1 |
Dataframe df_b
:
id | updated |
---|---|
1 | A2 |
1 | B1 |
1 | C1 |
2 | A1 |
2 | X1 |
2 | Y1 |
Full join on df_a['value'] == df_b['updated']
:
id | value | id | updated |
---|---|---|---|
1.0 | A1 | null | null |
null | null | 1.0 | A2 |
1.0 | B1 | 1.0 | B1 |
1.0 | C1 | 1.0 | C1 |
2.0 | A1 | 2.0 | A1 |
2.0 | X1 | 2.0 | X1 |
2.0 | Y1 | 2.0 | Y1 |
Full join on df_a['value'] == df_b['updated']
with dropped df_b['id']
column:
id | value | updated |
---|---|---|
1.0 | A1 | null |
null | null | A2 |
1.0 | B1 | B1 |
1.0 | C1 | C1 |
2.0 | A1 | A1 |
2.0 | X1 | X1 |
2.0 | Y1 | Y1 |
8 Aggregation and maps
8.1 To group by and aggregate into a map using F.map_from_entries()
import pyspark.sql.functions as F from pyspark.sql import Row from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() df = spark.createDataFrame([ Row(id=1, key='a', value="A1"), Row(id=1, key='b', value="B1"), Row(id=1, key='c', value="C1"), Row(id=2, key='a', value="A1"), Row(id=2, key='x', value="X1"), Row(id=2, key='y', value="Y1")] ) print("Dataframe with keys and values:") df.show(truncate=False) dft = df.groupBy("id").agg(F.map_from_entries(F.collect_list( F.struct("key", "value"))).alias("key_value") ) print("Dataframe with key -> value mapping") dft.show(truncate=False) dft.printSchema()
Dataframe with keys and values:
id | key | value |
---|---|---|
1 | a | A1 |
1 | b | B1 |
1 | c | C1 |
2 | a | A1 |
2 | x | X1 |
2 | y | Y1 |
Dataframe with key -> value mapping
id | key_value |
---|---|
1 | {a -> A1, b -> B1, c -> C1} |
2 | {a -> A1, x -> X1, y -> Y1} |
Schema of dft
is:
root |-- id: long (nullable = true) |-- key_value: map (nullable = false) | |-- key: string | |-- value: string (valueContainsNull = true)
8.2 To group by and aggregate into a map using UDF
import pyspark.sql.functions as F from pyspark.sql import Row from pyspark.sql import SparkSession import pyspark.sql.types as T spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() df = spark.createDataFrame([ Row(id=1, key='a', value="A1"), Row(id=1, key='b', value="B1"), Row(id=1, key='c', value="C1"), Row(id=2, key='a', value="A1"), Row(id=2, key='x', value="X1"), Row(id=2, key='y', value="Y1")] ) print("Dataframe with keys and values:") df.show() @F.udf(returnType=T.MapType(T.StringType(), T.StringType())) def map_array(column): return dict(column) dft = (df.groupBy("id") .agg(F.collect_list(F.struct("key", "value")).alias("key_value")) .withColumn('key_value', map_array('key_value'))) print("Dataframe with keys and values:") dft.show(truncate=False) dft.printSchema()
Dataframe with keys and values:
id | key | value |
---|---|---|
1 | a | A1 |
1 | b | B1 |
1 | c | C1 |
2 | a | A1 |
2 | x | X1 |
2 | y | Y1 |
Dataframe with keys and values:
id | key_value |
---|---|
1 | {a -> A1, b -> B1, c -> C1} |
2 | {x -> X1, a -> A1, y -> Y1} |
Schema of dft
is:
root |-- id: long (nullable = true) |-- key_value: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true)
8.3 To agregate over multiple columns and sum values of dictionaries
import pyspark.sql.types as T import pyspark.sql.functions as F from pyspark.sql import SparkSession df_schema = T.StructType([T.StructField('clid', T.StringType(), True), T.StructField('coef_1', T.MapType(T.StringType(), T.DoubleType(), True), False), T.StructField('coef_2', T.MapType(T.StringType(), T.DoubleType(), True), False), T.StructField('coef_3', T.MapType(T.StringType(), T.DoubleType(), True), False)]) df_data = [["X", {'B': 0.4, 'C': 0.4}, {'B': 0.33, 'C': 0.5}, {'A': 0.5, 'C': 0.33}], ["Y", {'B': 0.67, 'C': 0.33}, {'B': 0.85}, {'A': 0.4, 'C': 0.57}], ] spark = SparkSession.builder\ .appName("Parse DataFrame Schema")\ .getOrCreate() df = spark.createDataFrame(data=df_data, schema=df_schema) df = df.withColumn("coef_total", F.col("coef_1")) for i in range(2,4): df = df.withColumn("coef_total", F.map_zip_with("coef_total", f"coef_{i}", lambda k, v1, v2: F.when(v1.isNull(), 0).otherwise(v1) + F.when(v2.isNull(), 0).otherwise(v2))) df.show(truncate=False)
clid | coef_1 | coef_2 | coef_3 | coef_total |
---|---|---|---|---|
X | {B -> 0.4, C -> 0.4} | {B -> 0.33, C -> 0.5} | {A -> 0.5, C -> 0.33} | {B -> 0.73, C -> 1.23, A -> 0.5} |
Y | {B -> 0.67, C -> 0.33} | {B -> 0.85} | {A -> 0.4, C -> 0.57} | {B -> 1.52, C -> 0.8999999999999999, A -> 0.4} |
9 Sampling rows
9.1 To sample rows
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("index", T.IntegerType(), True), T.StructField("value", T.StringType(), True), ] ) data = [(1, "Home"), (2, "School"), (3, "Home"), (4, "Home"), (5, "Office"), (6, "Office"), (7, "Office"), (8, "Mall"), (9, "Mall"), (10, "School")] df = spark.createDataFrame(schema=schema, data=data).repartition(3) df = df.withColumn("partition", F.spark_partition_id()).orderBy("index") print("Original dataframe:") df.show() print("Sampled dataframe:") dft = df.sample(fraction=0.5, seed=1).orderBy("index") dft.show()
Original dataframe:
index | value | partition |
---|---|---|
1 | Home | 1 |
2 | School | 0 |
3 | Home | 0 |
4 | Home | 2 |
5 | Office | 2 |
6 | Office | 2 |
7 | Office | 1 |
8 | Mall | 0 |
9 | Mall | 1 |
10 | School | 0 |
Sampled dataframe:
index | value | partition |
---|---|---|
3 | Home | 0 |
4 | Home | 2 |
7 | Office | 1 |
8 | Mall | 0 |
9 | Mall | 1 |
10 UUID generation
10.1 To generate a UUID for every row
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession import random import uuid spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Name", T.StringType(), True), ] ) data = [["Alice"], ["Bon"], ["John"], ["Cecile"] ] df = spark.createDataFrame(schema=schema, data=data).repartition(2) def _generate_uuid(uuid_gen, v=10): def _replace_byte(value: int, byte: int): byte = byte & 0xF bit_shift = 76 mask = ~(0xF << bit_shift) return value & mask | (byte << bit_shift) uuid_ = uuid_gen.generate() return uuid.UUID(int=(_replace_byte(uuid_.int, v))) class RandomDistributedUUIDGenerator: def generate(self): return uuid.uuid4() class SeedBasedUUIDGenerator: def __init__(self, seed): self.rnd = random.Random(seed) def generate(self): return uuid.UUID(int=self.rnd.getrandbits(128), version=4) gen = RandomDistributedUUIDGenerator() udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType()) df = df.withColumn("UUID_random_distributed", udf_generate_uuid()) seed_for_rng = 1 gen = SeedBasedUUIDGenerator(seed_for_rng) udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType()) df = df.withColumn("UUID_seed_based", udf_generate_uuid()) print("The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.") df.show(truncate=False)
The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.
Name | UUID_random_distributed | UUID_seed_based |
---|---|---|
John | 4e9a3bb1-a189-a25e-8389-7f8382635b09 | cd613e30-d8f1-aadf-91b7-584a2265b1f5 |
Bon | 16cd1549-0c74-a483-9bbe-707e59e0796f | 1e2feb89-414c-a43c-9027-c4d1c386bbc4 |
Cecile | b8b05619-6004-aa75-b98b-7e1c83c9f301 | cd613e30-d8f1-aadf-91b7-584a2265b1f5 |
Alice | b1f1a9fb-feb9-a946-9171-3e7cb577fdaa | 1e2feb89-414c-a43c-9027-c4d1c386bbc4 |