PySpark Cookbook
PySpark Cookbook
- 1. Introduction
- 1.1. To create an empty dataframe
- 1.2. To create a dataframe with columns key and value from a dictionary
- 1.3. To duplicate a column
- 1.4. To rename a column using
.withColumnRenamed()
- 1.5. To rename a column using
.withColumnsRenamed()
- 1.6. To rename a column using
.select()
- 1.7. To rename columns by adding a prefix
- 1.8. To drop columns from a dataframe
- 1.9. To subset columns of a dataframe
- 1.10. To add a column with a constant value using
F.lit()
- 1.11. To add a column with a constant value using
.select()
- 1.12. To create a dataframe from a list of tuples
- 1.13. To get the number of rows of a dataframe
- 1.14. To select first N rows
- 1.15. To deduplicate rows
- 1.16. To convert a column to a list using a lambda function
- 1.17. To convert a dataframe to a list of dictionaries corresponding to every row
- 1.18. To convert a column to a list using list comprehension
- 1.19. To convert a column to a list using Pandas
- 1.20. To display full width of a column (do not truncate)
- 2. Filtering rows
- 3. Array operations
- 3.1. To create arrays of different lengths
- 3.2. To calculate set difference
- 3.3. To calculate set union
- 3.4. To calculate set intersection
- 3.5. To pad arrays with value
- 3.6. To sum two arrays elementwise using
F.element_at()
- 3.7. To sum two arrays using
F.arrays_zip()
- 3.8. To find mode of an array (most common element)
- 3.9. To calculate difference of two consecutive elements in an array
- 3.10. To apply a function to every element of an array
- 3.11. To deduplicate elements in an array (find unique/distinct elements)
- 3.12. To create a map (dictionary) from two arrays (one with keys, one with values)
- 3.13. To calculate mean of an array
- 3.14. To remove NULL values from an array
- 3.15. To find out whether an array has any negative elements
- 3.16. To convert elements of an array to columns
- 3.17. To find location of the first occurence of an element in an array
- 3.18. To calculate moving difference of two consecutive elements in an array
- 3.19. To find a union of all unique elements of multiple arrays in a group
- 3.20. To slice an array
- 3.21. To slice an array dynamically
- 4. Text processing
- 4.1. To remove prefix from a string
- 4.2. To deduplicate identical consecutive characters using
F.regexp_replace()
- 4.3. To deduplicate identical consecutive characters using a UDF
- 4.4. To split a string into letters (characters) using regex
- 4.5. To concatenate columns with strings using a separator
- 4.6. To split a string into letters (characters) using split function
- 4.7. To split a string into letters (characters) and remove last character
- 4.8. To append a string to all values in a column
- 5. Time operations
- 5.1. To convert Unix time stamp to human readable format
- 5.2. To detect log in and log out timestamps in a sequence of events
- 5.3. To estimate host change time based on log in and log out information in a sequence of events
- 5.4. To round down log-in and round up log-out times of a user to midnight timestamp in a sequence of events
- 6. Numerical operations
- 6.1. To find percentage of a column
- 6.2. To find percentage of a column within a group using a window
- 6.3. To find percentage of a column within a group using
.groupBy()
and a join - 6.4. To add a row with total count and percentage for a column
- 6.5. To find maximum value of a column
- 6.6. To add a column with count of elements per group
- 6.7. To add a column with count of elements whose quantity is larger than some number per group
- 6.8. To calculate minimal value of two columns per row
- 6.9. To calculate cumulative sum of a column
- 6.10. To calculate difference of values of two consecutive rows for a certain column
- 6.11. To calculate difference of values of two consecutive rows for a certain column using
.applyInPandas()
- 6.12. To calculate down-sampling ratios for over-represented groups of some data used in training of a machine learning algorithm
- 7. Structures
- 8. Dataframe join operations
- 9. Aggregation and maps
- 10. Groups, aggregations, pivoting and window operations
- 11. Sampling rows
- 12. UUID generation
1. Introduction
1.1. To create an empty dataframe
import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.StringType()), True), T.StructField("B", T.ArrayType(T.StringType()), True), ] ) data = [] df = spark.createDataFrame(schema=schema, data=data) df.show()
A | B |
1.2. To create a dataframe with columns key and value from a dictionary
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) df.show()
key | value |
---|---|
key1 | value1 |
key2 | value2 |
1.3. To duplicate a column
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) df = df.withColumn("value_dup", F.col("value")) df.show()
key | value | value_dup |
---|---|---|
key1 | value1 | value1 |
key2 | value2 | value2 |
1.4. To rename a column using .withColumnRenamed()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.withColumnRenamed("key", "new_key") \ .withColumnRenamed("value","new_value") print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
new_key | new_value |
---|---|
key1 | value1 |
key2 | value2 |
1.5. To rename a column using .withColumnsRenamed()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.withColumnsRenamed({"key": "new_key", "value": "new_value"}) print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
new_key | new_value |
---|---|
key1 | value1 |
key2 | value2 |
1.6. To rename a column using .select()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.select(F.col("key").alias("new_key"), F.col("value").alias("new_value")) print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
new_key | new_value |
---|---|
key1 | value1 |
key2 | value2 |
1.7. To rename columns by adding a prefix
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("index", T.IntegerType(), True), T.StructField("value", T.StringType(), True), ] ) data = [(1, "Home"), (2, "School"), (3, "Home"),] df = spark.createDataFrame(schema=schema, data=data) print("Original dataframe:") df.show() print("Dataframe with renamed columns:") df = df.select(*[F.col(k).alias(f"prefix_{k}") for k in df.columns]) df.show()
Original dataframe:
index | value |
---|---|
1 | Home |
2 | School |
3 | Home |
Dataframe with renamed columns:
prefix_index | prefix_value |
---|---|
1 | Home |
2 | School |
3 | Home |
1.8. To drop columns from a dataframe
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) df = df.withColumn("const", F.lit(1)) print("Original dataframe:") df.show() df = df.drop("value", "const") print("Modified dataframe:") df.show()
Original dataframe:
key | value | const |
---|---|---|
key1 | value1 | 1 |
key2 | value2 | 1 |
Modified dataframe:
key |
---|
key1 |
key2 |
1.9. To subset columns of a dataframe
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) df = df.withColumn("const", F.lit(1)) print("Original dataframe:") df.show() print("Subset 'key', 'value' columns:") df["key", "value"].show() print("Subset 'key', 'const' columns:") df.select("key", "const").show()
Original dataframe:
key | value | const |
---|---|---|
key1 | value1 | 1 |
key2 | value2 | 1 |
Subset 'key', 'value' columns:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Subset 'key', 'const' columns:
key | const |
---|---|
key1 | 1 |
key2 | 1 |
1.10. To add a column with a constant value using F.lit()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.withColumn("const_integer", F.lit(1)) df = df.withColumn("const_string", F.lit("string")) print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
key | value | const_integer | const_string |
---|---|---|---|
key1 | value1 | 1 | string |
key2 | value2 | 1 | string |
1.11. To add a column with a constant value using .select()
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() dict_a = {"key1": "value1", "key2": "value2"} values = [(k, v) for k, v in dict_a.items()] columns = ["key", "value"] df = spark.createDataFrame(values, columns) print("Original dataframe:") df.show() df = df.select("key", "value", F.lit("const_str").alias("constant_value")) print("Modified dataframe:") df.show()
Original dataframe:
key | value |
---|---|
key1 | value1 |
key2 | value2 |
Modified dataframe:
key | value | constant_value |
---|---|---|
key1 | value1 | const_str |
key2 | value2 | const_str |
1.12. To create a dataframe from a list of tuples
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show()
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
1.13. To get the number of rows of a dataframe
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() num_rows = df.count() print(f"df has {num_rows} rows")
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
df has 3 rows
1.14. To select first N rows
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() print("These are first 2 rows:") df.limit(2).show()
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
These are first 2 rows:
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
1.15. To deduplicate rows
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("key", T.IntegerType(), True), T.StructField("value", T.StringType(), True), T.StructField("comment", T.StringType(), True), ] ) data = [(1, "Home", "a house"), (1, "Home", "a house"), (2, "School", "a building"), (2, "School", "a house"), (3, "Home", "a house"),] df = spark.createDataFrame(schema=schema, data=data) print("Original dataframe:") df.show() print("Dataframe with distinct rows:") df.distinct().show() print("Dataframe with dropped duplicate rows:") df.dropDuplicates().show() print("Dataframe with dropped duplicates in columns 'key' and 'value':") df = df.dropDuplicates(subset=["key", "value"]) df.show()
Original dataframe:
key | value | comment |
---|---|---|
1 | Home | a house |
1 | Home | a house |
2 | School | a building |
2 | School | a house |
3 | Home | a house |
Dataframe with distinct rows:
key | value | comment |
---|---|---|
2 | School | a house |
3 | Home | a house |
2 | School | a building |
1 | Home | a house |
Dataframe with dropped duplicate rows:
key | value | comment |
---|---|---|
2 | School | a house |
3 | Home | a house |
2 | School | a building |
1 | Home | a house |
Dataframe with dropped duplicates in columns 'key' and 'value':
key | value | comment |
---|---|---|
1 | Home | a house |
2 | School | a building |
3 | Home | a house |
1.16. To convert a column to a list using a lambda function
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() lst = df.select("integer").rdd.map(lambda r: r[0]).collect() print(f"Column \"integer\" has values: {lst}")
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
Column "integer" has values: [1, 2, 3]
1.17. To convert a dataframe to a list of dictionaries corresponding to every row
import pprint from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() lst_dict = df.rdd.map(lambda row: row.asDict()).collect() print(f"Dataframe is represented as:\n") txt = pprint.pformat(lst_dict)
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
Dataframe is represented as:
[{'characters': ['A', 'B'], 'integer': 1}, {'characters': ['C', 'D'], 'integer': 2}, {'characters': ['E', 'F'], 'integer': 3}]
1.18. To convert a column to a list using list comprehension
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() lst = [k["integer"] for k in df.select("integer").rdd.collect()] print(f"Column \"integer\" has values: {lst}")
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
Column "integer" has values: [1, 2, 3]
1.19. To convert a column to a list using Pandas
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])] columns = ["integer", "characters"] df = spark.createDataFrame(values, columns) df.show() lst = df.select("integer").toPandas()["integer"].tolist() print(f"Column \"integer\" has values: {lst}")
integer | characters |
---|---|
1 | [A, B] |
2 | [C, D] |
3 | [E, F] |
Column "integer" has values: [1, 2, 3]
1.20. To display full width of a column (do not truncate)
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("sentence", T.ArrayType(T.StringType()), True), ] ) data = [(["A", "very", "long", "sentence"],), (["with", "many", "words", "."],)] df = spark.createDataFrame(schema=schema, data=data) print("Truncated output (default behavior):") df.show() print("Truncated to 15 characters output:") df.show(truncate=15) print("Non-truncated output (show all):") df.show(truncate=False)
Truncated output (default behavior):
sentence |
---|
[A, very, long, s… |
[with, many, word… |
Truncated to 15 characters output:
sentence |
---|
[A, very, lo… |
[with, many,… |
Non-truncated output (show all):
sentence |
---|
[A, very, long, sentence] |
[with, many, words, .] |
2. Filtering rows
2.1. To filter based on values of a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", None), ("Home", "Keyboard", 9), ("Office", "Laptop", None), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) print("Original dataframe:") df.show() print('Filter: F.col("Location" == "Home")') dft = df.filter(F.col("Location") == "Home") dft.show() print('Filter: F.col("Quantity").isNull()') dft = df.filter(F.col("Quantity").isNull()) dft.show() print('Filter: F.col("Quantity").isNotNull()') dft = df.filter(F.col("Quantity").isNotNull()) dft.show() print('Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))') dft = df.filter((F.col("Location") == "Home") & (F.col("Product") == "Laptop")) dft.show() print('Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))') dft = df.filter((F.col("Location") == "Home") & ~(F.col("Product") == "Laptop")) dft.show() print('Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))') dft = df.filter((F.col("Product") == "Laptop") | (F.col("Product") == "Mouse")) dft.show() print('Filter: F.col("Product").isin(["Laptop", "Mouse"])') dft = df.filter(F.col("Product").isin(["Laptop", "Mouse"])) dft.show()
Original dataframe:
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Home | Monitor | null |
Home | Keyboard | 9 |
Office | Laptop | null |
Office | Monitor | 10 |
Office | Mouse | 9 |
Filter: F.col("Location" == "Home")
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Home | Monitor | null |
Home | Keyboard | 9 |
Filter: F.col("Quantity").isNull()
Location | Product | Quantity |
---|---|---|
Home | Monitor | null |
Office | Laptop | null |
Filter: F.col("Quantity").isNotNull()
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Home | Keyboard | 9 |
Office | Monitor | 10 |
Office | Mouse | 9 |
Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))
Location | Product | Quantity |
---|---|---|
Home | Monitor | null |
Home | Keyboard | 9 |
Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Office | Laptop | null |
Office | Mouse | 9 |
Filter: F.col("Product").isin(["Laptop", "Mouse"])
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Office | Laptop | null |
Office | Mouse | 9 |
3. Array operations
3.1. To create arrays of different lengths
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), T.StructField("B", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2], [2, 3, 4, 5]), ([4, 5, 6], [2, 3, 4, 5])] df = spark.createDataFrame(schema=schema, data=data) dft = df.select("A", "B") dft.show()
A | B |
---|---|
[1, 2] | [2, 3, 4, 5] |
[4, 5, 6] | [2, 3, 4, 5] |
3.2. To calculate set difference
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.StringType()), True), T.StructField("B", T.ArrayType(T.StringType()), True), ] ) data = [(["b", "a", "c"], ["c", "d", "a", "f"])] df = spark.createDataFrame(schema=schema, data=data) dft = df.select("A", "B", F.array_except("A", "B").alias("A\B"), F.array_except("B", "A").alias("B\A")) dft.show()
A | B | A\B | B\A |
---|---|---|---|
[b, a, c] | [c, d, a, f] | [b] | [d, f] |
3.3. To calculate set union
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.StringType()), True), T.StructField("B", T.ArrayType(T.StringType()), True), ] ) data = [(["b", "a", "c"], ["c", "d", "a", "f"])] df = spark.createDataFrame(schema=schema, data=data) dft = df.select("A", "B", F.array_union("A", "B").alias("A U B")) dft.show()
A | B | A U B |
---|---|---|
[b, a, c] | [c, d, a, f] | [b, a, c, d, f] |
3.4. To calculate set intersection
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.StringType()), True), T.StructField("B", T.ArrayType(T.StringType()), True), ] ) data = [(["b", "a", "c"], ["c", "d", "a", "f"])] df = spark.createDataFrame(schema=schema, data=data) dft = df.select("A", "B", F.array_intersect("A", "B").alias("A \u2229 B")) dft.show()
A | B | A ∩ B |
---|---|---|
[b, a, c] | [c, d, a, f] | [a, c] |
3.5. To pad arrays with value
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), T.StructField("B", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2], [2, 3, 4, 5]), ([4, 5, 6], [2, 3, 4, 5])] df = spark.createDataFrame(schema=schema, data=data) n = 4 fill_value = 0 df1 = df.withColumn("A_padding", F.expr(f"array_repeat({fill_value}, {n} - size(A))")) df1 = df1.withColumn("A_padded", F.concat("A", "A_padding")) dft = df1.select("A", "A_padding", "A_padded") dft.show() df2 = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A"))) df2 = df2.withColumn("A_padded", F.concat("A", "A_padding")) dft = df2.select("A", "A_padding", "A_padded") dft.show()
A | A_padding | A_padded |
---|---|---|
[1, 2] | [0, 0] | [1, 2, 0, 0] |
[4, 5, 6] | [0] | [4, 5, 6, 0] |
A | A_padding | A_padded |
---|---|---|
[1, 2] | [0, 0] | [1, 2, 0, 0] |
[4, 5, 6] | [0] | [4, 5, 6, 0] |
3.6. To sum two arrays elementwise using F.element_at()
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), T.StructField("B", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2], [2, 3, 4, 5]), ([4, 5, 6], [2, 3, 4, 5])] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A"))) df = df.withColumn("A_padded", F.concat("A", "A_padding")) df = df.withColumn("AB_sum", F.expr('transform(A_padded, (element, index) -> element + element_at(B, index + 1))')) dft = df.select("A", "A_padded", "B", "AB_sum") dft.show()
A | A_padded | B | AB_sum |
---|---|---|---|
[1, 2] | [1, 2, 0, 0] | [2, 3, 4, 5] | [3, 5, 4, 5] |
[4, 5, 6] | [4, 5, 6, 0] | [2, 3, 4, 5] | [6, 8, 10, 5] |
3.7. To sum two arrays using F.arrays_zip()
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), T.StructField("B", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2], [2, 3, 4, 5]), ([4, 5, 6], [2, 3, 4, 5])] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A"))) df = df.withColumn("A_padded", F.concat("A", "A_padding")) df = df.withColumn("AB_sum", F.expr("transform(arrays_zip(A_padded, B), x -> x.A_padded + x.B)")) dft = df.select("A", "A_padded", "B", "AB_sum") dft.show()
A | A_padded | B | AB_sum |
---|---|---|---|
[1, 2] | [1, 2, 0, 0] | [2, 3, 4, 5] | [3, 5, 4, 5] |
[4, 5, 6] | [4, 5, 6, 0] | [2, 3, 4, 5] | [6, 8, 10, 5] |
3.8. To find mode of an array (most common element)
from collections import Counter import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2, 2, 4],), ([4, 5, 6, 7],), ([1, 1, 2, 2],)] df = spark.createDataFrame(schema=schema, data=data) @F.udf def udf_mode(x): return Counter(x).most_common(1)[0][0] dft = df.withColumn("mode", udf_mode("A")) dft.printSchema() dft.show()
Schema of dft
is:
root |-- A: array (nullable = true) | |-- element: integer (containsNull = true) |-- mode: string (nullable = true)
A | mode |
---|---|
[1, 2, 2, 4] | 2 |
[4, 5, 6, 7] | 4 |
[1, 1, 2, 2] | 1 |
3.9. To calculate difference of two consecutive elements in an array
import numpy as np import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("id", T.StringType(), True), T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [("A", [4, 1, 0, 2]), ("B", [1, 0, 3, 1])] df = spark.createDataFrame(schema=schema, data=data) @F.udf(returnType=T.ArrayType(T.IntegerType())) def diff_of_two_consecutive_elements(x): return np.ediff1d(np.array(x)).tolist() df = df.withColumn("diff", diff_of_two_consecutive_elements(F.col("values"))) df.show() df.printSchema()
id | values | diff |
---|---|---|
A | [4, 1, 0, 2] | [-3, -1, 2] |
B | [1, 0, 3, 1] | [-1, 3, -2] |
Schema of df
is:
root |-- id: string (nullable = true) |-- values: array (nullable = true) | |-- element: integer (containsNull = true) |-- diff: array (nullable = true) | |-- element: integer (containsNull = true)
3.10. To apply a function to every element of an array
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("words_with_suffixes", T.ArrayType(T.StringType()), True) ] ) data = [(["pen_10", "note_11", "bottle_12"],), (["apple_13", "orange_14", "lemon_15"],),] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("words", F.transform("words_with_suffixes", lambda x: F.split(x, "_").getItem(0))) df.show(truncate=False)
words_with_suffixes | words |
---|---|
[pen_10, note_11, bottle_12] | [pen, note, bottle] |
[apple_13, orange_14, lemon_15] | [apple, orange, lemon] |
3.11. To deduplicate elements in an array (find unique/distinct elements)
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("words", T.ArrayType(T.StringType()), True) ] ) data = [(["pen", "note", "pen"],), (["apple", "apple", "lemon"],),] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("unique_words", F.array_distinct("words")) df.show(truncate=False)
words | unique_words |
---|---|
[pen, note, pen] | [pen, note] |
[apple, apple, lemon] | [apple, lemon] |
3.12. To create a map (dictionary) from two arrays (one with keys, one with values)
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("keys", T.ArrayType(T.IntegerType()), True), T.StructField("values", T.ArrayType(T.StringType()), True), ] ) data = [([1, 2, 3], ["A", "B", "C"])] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("map_kv", F.map_from_arrays("keys", "values")) df.show(truncate=False)
keys | values | map_kv |
---|---|---|
[1, 2, 3] | [A, B, C] | {1 -> A, 2 -> B, 3 -> C} |
3.13. To calculate mean of an array
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2],), ([4, 5, 6],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("mean", F.aggregate( "values", # column F.lit(0), # initialValue lambda acc, x: acc + x, # merge operation lambda acc: acc / F.size(F.col("values")), # finish )) df.show()
values | mean |
---|---|
[1, 2] | 1.5 |
[4, 5, 6] | 5.0 |
3.14. To remove NULL values from an array
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, -2, None],), ([4, 5, None, 6],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("values_without_nulls", F.array_compact("values")) df.show()
values | values_without_nulls |
---|---|
[1, -2, NULL] | [1, -2] |
[4, 5, NULL, 6] | [4, 5, 6] |
3.15. To find out whether an array has any negative elements
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, -2],), ([4, 5, 6],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("any_negative", F.exists("values", lambda x: x < 0)) df.show()
values | any_negative |
---|---|
[1, -2] | true |
[4, 5, 6] | false |
3.16. To convert elements of an array to columns
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("A", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2, 3, 4],), ([5, 6, 7],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("first", F.col("A").getItem(0)) dft = df.select("A", "first", *[F.col("A").getItem(k).alias(f"element_{k+1}") for k in range(1,4)]) dft.show()
A | first | element_2 | element_3 | element_4 |
---|---|---|---|---|
[1, 2, 3, 4] | 1 | 2 | 3 | 4 |
[5, 6, 7] | 5 | 6 | 7 | null |
3.17. To find location of the first occurence of an element in an array
import pyspark.sql.functions as F import pyspark.sql.types as T import pandas as pd from pyspark.sql import SparkSession import numpy as np spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 7, 5],), ([7, 4, 7],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("position", F.array_position(F.col("values"), 7)) df.show()
values | position |
---|---|
[1, 7, 5] | 2 |
[7, 4, 7] | 1 |
3.18. To calculate moving difference of two consecutive elements in an array
import pyspark.sql.functions as F import pyspark.sql.types as T import pandas as pd from pyspark.sql import SparkSession import numpy as np spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 2, 5],), ([4, 4, 6],)] df = spark.createDataFrame(schema=schema, data=data) @F.pandas_udf(T.ArrayType(T.IntegerType())) def diff2e(x: pd.Series) -> pd.Series: return x.apply(lambda x: (x[1:] - x[:-1])) @F.udf(returnType=T.ArrayType(T.IntegerType())) def diff_of_two_consecutive_elements(x): return np.ediff1d(np.array(x)).tolist() df = df.withColumn("diff2e", diff2e(F.col("values"))) df = df.withColumn("ediff1d", diff_of_two_consecutive_elements(F.col("values"))) df.show()
values | diff2e | ediff1d |
---|---|---|
[1, 2, 5] | [1, 3] | [1, 3] |
[4, 4, 6] | [0, 2] | [0, 2] |
3.19. To find a union of all unique elements of multiple arrays in a group
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType([ T.StructField("group", T.StringType(), True), T.StructField("id", T.StringType(), True), T.StructField("values", T.ArrayType(T.IntegerType()), True), ]) data = [ ("group_1", "A", [4, 1, 2]), ("group_1", "B", [1, 0, 3, 1]), ("group_2", "C", [5, 6, 7, 5]), ("group_2", "D", [7, 6, 9]) ] df = spark.createDataFrame(data=data, schema=schema) df.show(truncate=False) # Flatten arrays within groups and calculate unique elements dfs = ( df.groupBy("group") .agg(F.collect_list("values").alias("list_of_lists")) .withColumn("all_values", F.flatten("list_of_lists")) .withColumn("unique_values", F.array_distinct("all_values")) .select("group", "list_of_lists", "all_values", "unique_values") ) dfs.show(truncate=False)
group | id | values |
---|---|---|
group_1 | A | [4, 1, 2] |
group_1 | B | [1, 0, 3, 1] |
group_2 | C | [5, 6, 7, 5] |
group_2 | D | [7, 6, 9] |
group | list_of_lists | all_values | unique_values |
---|---|---|---|
group_1 | [[4, 1, 2], [1, 0, 3, 1 ]] | [4, 1, 2, 1, 0, 3, 1] | [4, 1, 2, 0, 3] |
group_2 | [[5, 6, 7, 5], [7, 6, 9 ]] | [5, 6, 7, 5, 7, 6, 9] | [5, 6, 7, 9] |
3.20. To slice an array
import pyspark.sql.functions as F import pyspark.sql.types as T import pandas as pd from pyspark.sql import SparkSession import numpy as np spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 7, 5, 2],), ([6, 4, 7, 3],)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("values[1:3]", F.slice("values", start=2, length=2)) df.show()
values | values[1:3] |
---|---|
[1, 7, 5, 2] | [7, 5] |
[6, 4, 7, 3] | [4, 7] |
3.21. To slice an array dynamically
import pyspark.sql.functions as F import pyspark.sql.types as T import pandas as pd from pyspark.sql import SparkSession import numpy as np spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("values", T.ArrayType(T.IntegerType()), True), ] ) data = [([1, 7, 5],), ([6, 4, 7, 3],)] df = spark.createDataFrame(schema=schema, data=data) start_idx = 2 df = df.withColumn("values[1:]", F.slice("values", start=2, length=(F.size("values") - F.lit(start_idx - 1)))) df.show()
values | values[1:] |
---|---|
[1, 7, 5] | [7, 5] |
[6, 4, 7, 3] | [4, 7, 3] |
4. Text processing
4.1. To remove prefix from a string
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("text", T.StringType(), True), ] ) data = [("id_orange",), ("apple",)] df = spark.createDataFrame(schema=schema, data=data) remove_prefix = F.udf(lambda x: x[3:] if x[:3] == "id_" else x, T.StringType()) df = df.withColumn("no_prefix_udf", remove_prefix(F.col("text"))) df = df.withColumn("no_prefix_rgx", F.regexp_replace("text", "id_", "")) df.show()
text | no_prefix_udf | no_prefix_rgx |
---|---|---|
id_orange | orange | orange |
apple | apple | apple |
4.2. To deduplicate identical consecutive characters using F.regexp_replace()
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("String", T.StringType(), True) ] ) data = [["aaaabbccc"], ["bbbccaaa"]] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn('Shortened', F.regexp_replace("String", "([ab])\\1+", "$1")) df.show(truncate=False)
String | Shortened |
---|---|
aaaabbccc | abccc |
bbbccaaa | bcca |
4.3. To deduplicate identical consecutive characters using a UDF
import re import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("String", T.StringType(), True) ] ) data = [["aaaabbccc"], ["bbbccaaa"]] @F.udf(returnType=T.StringType()) def remove_consecutive_duplicate_characters_ab(s): return re.sub(r'([ab])\1+', r'\1', s) df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn('Shortened', remove_consecutive_duplicate_characters_ab("String")) df.show(truncate=False)
String | Shortened |
---|---|
aaaabbccc | abccc |
bbbccaaa | bcca |
4.4. To split a string into letters (characters) using regex
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("String", T.StringType(), True) ] ) data = [["This is"]] df = spark.createDataFrame(schema=schema, data=data) dft = df.select('String', F.split('String', '(?!$)').alias("Characters")) dft.show(truncate=False)
String | Characters |
---|---|
This is | [T, h, i, s, , i, s] |
4.5. To concatenate columns with strings using a separator
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Str1", T.StringType(), True), T.StructField("Str2", T.StringType(), True) ] ) data = [("This is", "a string"), ("on a", "different row")] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("Str_Concat", F.concat_ws( "_", "Str1", "Str2")) df.show()
Str1 | Str2 | Str_Concat |
---|---|---|
This is | a string | This is_a string |
on a | different row | on a_different row |
4.6. To split a string into letters (characters) using split function
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("String", T.StringType(), True) ] ) data = [["This is"]] df = spark.createDataFrame(schema=schema, data=data) fsplit = F.expr("split(String, '')") dft = df.select('String', fsplit.alias("Characters")) dft.show(truncate=False)
String | Characters |
---|---|
This is | [T, h, i, s, , i, s] |
4.7. To split a string into letters (characters) and remove last character
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("String", T.StringType(), True) ] ) data = [["This is_"]] df = spark.createDataFrame(schema=schema, data=data) print("Using split function and remove last character:") fsplit = "split(String, '')" fsplit = F.expr(f'slice({fsplit}, 1, size({fsplit}) - 1)') dft = df.select('String', fsplit.alias("Characters")) dft.show(truncate=False)
Using split function and remove last character:
String | Characters |
---|---|
This is_ | [T, h, i, s, , i, s] |
4.8. To append a string to all values in a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Str1", T.StringType(), True), T.StructField("Str2", T.StringType(), True) ] ) data = [("This is", "a string"), ("on a", "different row")] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("Str1_with_prefix", F.concat(F.lit("Prefix_"), "Str1")) dft = df.select("Str1", "Str1_with_prefix") dft.show()
Str1 | Str1_with_prefix |
---|---|
This is | Prefix_This is |
on a | Prefix_on a |
5. Time operations
5.1. To convert Unix time stamp to human readable format
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("timestamp", T.LongType(), True), ] ) data = [(1703224755,), (1703285602,)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("time_stamp_hrf", F.from_unixtime(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss")) df.show()
timestamp | time_stamp_hrf |
---|---|
1703224755 | 2023-12-22 06:59:15 |
1703285602 | 2023-12-22 23:53:22 |
5.2. To detect log in and log out timestamps in a sequence of events
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession from pyspark.sql.window import Window spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType([ T.StructField("tstp_unix", T.LongType(), True), T.StructField("computer", T.StringType(), True), T.StructField("user", T.StringType(), True), ]) data = [ (1703224755, "computer_A", "user_1"), (1703224780, "computer_A", "user_1"), (1703224850, "computer_A", "user_1"), (1703224950, "computer_B", "user_1"), (1703225050, "computer_B", "user_1"), (1703225100, "computer_B", "user_1"), (1703225150, "computer_A", "user_1"), (1703225200, "computer_B", "user_1"), (1703225250, "computer_B", "user_1"), (1703285700, "computer_C", "user_2"), (1703285800, "computer_C", "user_2"), (1703285900, "computer_C", "user_2"), (1703286000, "computer_C", "user_2"), (1703286100, "computer_D", "user_2"), (1703286200, "computer_D", "user_2"), (1703286300, "computer_D", "user_2"), (1703286400, "computer_D", "user_2"), ] df = spark.createDataFrame(data=data, schema=schema) # Add human-readable timestamp df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss")) # Define a window to identify session changes window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix")) # Previous row prev = F.lag("computer", 1).over(window_spec) cmp_equ_prev = (F.col("computer") != F.when(prev.isNull(), "some_random_computer").otherwise(prev)) # Detect changes in computer usage: compare with the previous row in a window df = df.withColumn("is_first", F.when(cmp_equ_prev, 1).otherwise(0)) next = F.lag("computer", -1).over(window_spec) cmp_equ_next = (F.col("computer") != F.when(next.isNull(), "some_random_computer").otherwise(next)) # Detect changes in computer usage: compare with the next row in a window df = df.withColumn("is_last", F.when(cmp_equ_next, 1).otherwise(0)) df = df.withColumn("unq_grp", F.sum("is_first").over(window_spec)) cols = ["tstp_unix", "tstp_hrf", "user", "computer", "is_first", "is_last", "unq_grp"] df.select(cols).show() cols = ["tstp_unix", "tstp_hrf", "user", "computer", "unq_grp", F.col("is_first").alias("login"), F.col("is_last").alias("logout")] df.filter((F.col("is_first") == 1) | (F.col("is_last") == 1)).select(cols).show()
tstp_unix | tstp_hrf | user | computer | is_first | is_last | unq_grp |
---|---|---|---|---|---|---|
1703224755 | 2023-12-22 06:59:15 | user_1 | computer_A | 1 | 0 | 1 |
1703224780 | 2023-12-22 06:59:40 | user_1 | computer_A | 0 | 0 | 1 |
1703224850 | 2023-12-22 07:00:50 | user_1 | computer_A | 0 | 1 | 1 |
1703224950 | 2023-12-22 07:02:30 | user_1 | computer_B | 1 | 0 | 2 |
1703225050 | 2023-12-22 07:04:10 | user_1 | computer_B | 0 | 0 | 2 |
1703225100 | 2023-12-22 07:05:00 | user_1 | computer_B | 0 | 1 | 2 |
1703225150 | 2023-12-22 07:05:50 | user_1 | computer_A | 1 | 1 | 3 |
1703225200 | 2023-12-22 07:06:40 | user_1 | computer_B | 1 | 0 | 4 |
1703225250 | 2023-12-22 07:07:30 | user_1 | computer_B | 0 | 1 | 4 |
1703285700 | 2023-12-22 23:55:00 | user_2 | computer_C | 1 | 0 | 1 |
1703285800 | 2023-12-22 23:56:40 | user_2 | computer_C | 0 | 0 | 1 |
1703285900 | 2023-12-22 23:58:20 | user_2 | computer_C | 0 | 0 | 1 |
1703286000 | 2023-12-23 00:00:00 | user_2 | computer_C | 0 | 1 | 1 |
1703286100 | 2023-12-23 00:01:40 | user_2 | computer_D | 1 | 0 | 2 |
1703286200 | 2023-12-23 00:03:20 | user_2 | computer_D | 0 | 0 | 2 |
1703286300 | 2023-12-23 00:05:00 | user_2 | computer_D | 0 | 0 | 2 |
1703286400 | 2023-12-23 00:06:40 | user_2 | computer_D | 0 | 1 | 2 |
tstp_unix | tstp_hrf | user | computer | unq_grp | login | logout |
---|---|---|---|---|---|---|
1703224755 | 2023-12-22 06:59:15 | user_1 | computer_A | 1 | 1 | 0 |
1703224850 | 2023-12-22 07:00:50 | user_1 | computer_A | 1 | 0 | 1 |
1703224950 | 2023-12-22 07:02:30 | user_1 | computer_B | 2 | 1 | 0 |
1703225100 | 2023-12-22 07:05:00 | user_1 | computer_B | 2 | 0 | 1 |
1703225150 | 2023-12-22 07:05:50 | user_1 | computer_A | 3 | 1 | 1 |
1703225200 | 2023-12-22 07:06:40 | user_1 | computer_B | 4 | 1 | 0 |
1703225250 | 2023-12-22 07:07:30 | user_1 | computer_B | 4 | 0 | 1 |
1703285700 | 2023-12-22 23:55:00 | user_2 | computer_C | 1 | 1 | 0 |
1703286000 | 2023-12-23 00:00:00 | user_2 | computer_C | 1 | 0 | 1 |
1703286100 | 2023-12-23 00:01:40 | user_2 | computer_D | 2 | 1 | 0 |
1703286400 | 2023-12-23 00:06:40 | user_2 | computer_D | 2 | 0 | 1 |
5.3. To estimate host change time based on log in and log out information in a sequence of events
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession from pyspark.sql.window import Window spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType([ T.StructField("tstp_unix", T.LongType(), True), T.StructField("host", T.StringType(), True), T.StructField("user", T.StringType(), True), ]) data = [ (1703224755, "host_A", "user_1"), (1703224852, "host_A", "user_1"), (1703224950, "host_B", "user_1"), (1703225104, "host_B", "user_1"), (1703225150, "host_A", "user_1"), (1703225200, "host_B", "user_1"), (1703225250, "host_B", "user_1"), (1703285700, "host_C", "user_2"), (1703286000, "host_C", "user_2"), (1703286100, "host_D", "user_2"), (1703286400, "host_D", "user_2"), ] df = spark.createDataFrame(data=data, schema=schema) # Add human-readable timestamp df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss")) # Define a window to identify session changes window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix")) prev_tstp = F.lag("tstp_unix", 1).over(window_spec) next_tstp = F.lag("tstp_unix", -1).over(window_spec) prev_host = F.lag("host", 1).over(window_spec) next_host = F.lag("host", -1).over(window_spec) t_delta_prev = (F.col("tstp_unix") - prev_tstp) df = df.withColumn("t_diff_prv", F.when(prev_host.isNull(), 0.0).otherwise( F.when(F.col("host") != prev_host, t_delta_prev).otherwise(None)) ) t_delta_next = (next_tstp - F.col("tstp_unix")) df = df.withColumn("t_diff_nxt", F.when(next_host.isNull(), 0.0).otherwise( F.when(F.col("host") != next_host, t_delta_next).otherwise(None)) ) df = df.withColumn("new_tstp_prv", F.when(F.col("t_diff_prv").isNotNull(), F.col("tstp_unix") - F.col("t_diff_prv")/2 + 1)) df = df.withColumn("new_tstp_nxt", F.when(F.col("t_diff_nxt").isNotNull(), F.col("tstp_unix") + F.col("t_diff_nxt")/2)) df = df.withColumn("new_tstp_prv", F.from_unixtime("new_tstp_prv", "yyyy-MM-dd HH:mm:ss")) df = df.withColumn("new_tstp_nxt", F.from_unixtime("new_tstp_nxt", "yyyy-MM-dd HH:mm:ss")) txt = "Calculate time delta to previous and next timestamps:" print(txt) cols = ["tstp_unix", "tstp_hrf", "user", "host", "t_diff_prv", "t_diff_nxt", "new_tstp_prv", "new_tstp_nxt"] df.select(cols).show() txt = "Combine new timestamps calculated from adding time deltas into an array:" print(txt) df = df.withColumn("new_tstps", F.array_compact(F.array("new_tstp_prv", "new_tstp_nxt"))) cols = ["tstp_hrf", "user", "host", "new_tstp_prv", "new_tstp_nxt", "new_tstps"] df.select(cols).show(truncate=False) txt = "Explode the array with new timestamps:" print(txt) df = df.withColumn("new_tstp", F.explode("new_tstps")) cols = ["user", "host", "tstp_hrf", "new_tstps", "new_tstp"] df.select(cols).show(truncate=False)
Calculate time delta to previous and next timestamps:
tstp_unix | tstp_hrf | user | host | t_diff_prv | t_diff_nxt | new_tstp_prv | new_tstp_nxt |
---|---|---|---|---|---|---|---|
1703224755 | 2023-12-22 06:59:15 | user_1 | host_A | 0.0 | NULL | 2023-12-22 06:59:16 | NULL |
1703224852 | 2023-12-22 07:00:52 | user_1 | host_A | NULL | 98.0 | NULL | 2023-12-22 07:01:41 |
1703224950 | 2023-12-22 07:02:30 | user_1 | host_B | 98.0 | NULL | 2023-12-22 07:01:42 | NULL |
1703225104 | 2023-12-22 07:05:04 | user_1 | host_B | NULL | 46.0 | NULL | 2023-12-22 07:05:27 |
1703225150 | 2023-12-22 07:05:50 | user_1 | host_A | 46.0 | 50.0 | 2023-12-22 07:05:28 | 2023-12-22 07:06:15 |
1703225200 | 2023-12-22 07:06:40 | user_1 | host_B | 50.0 | NULL | 2023-12-22 07:06:16 | NULL |
1703225250 | 2023-12-22 07:07:30 | user_1 | host_B | NULL | 0.0 | NULL | 2023-12-22 07:07:30 |
1703285700 | 2023-12-22 23:55:00 | user_2 | host_C | 0.0 | NULL | 2023-12-22 23:55:01 | NULL |
1703286000 | 2023-12-23 00:00:00 | user_2 | host_C | NULL | 100.0 | NULL | 2023-12-23 00:00:50 |
1703286100 | 2023-12-23 00:01:40 | user_2 | host_D | 100.0 | NULL | 2023-12-23 00:00:51 | NULL |
1703286400 | 2023-12-23 00:06:40 | user_2 | host_D | NULL | 0.0 | NULL | 2023-12-23 00:06:40 |
Combine new timestamps calculated from adding time deltas into an array:
tstp_hrf | user | host | new_tstp_prv | new_tstp_nxt | new_tstps |
---|---|---|---|---|---|
2023-12-22 06:59:15 | user_1 | host_A | 2023-12-22 06:59:16 | NULL | [ 2023-12-22 06:59:16 ] |
2023-12-22 07:00:52 | user_1 | host_A | NULL | 2023-12-22 07:01:41 | [ 2023-12-22 07:01:41 ] |
2023-12-22 07:02:30 | user_1 | host_B | 2023-12-22 07:01:42 | NULL | [ 2023-12-22 07:01:42 ] |
2023-12-22 07:05:04 | user_1 | host_B | NULL | 2023-12-22 07:05:27 | [ 2023-12-22 07:05:27 ] |
2023-12-22 07:05:50 | user_1 | host_A | 2023-12-22 07:05:28 | 2023-12-22 07:06:15 | [ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ] |
2023-12-22 07:06:40 | user_1 | host_B | 2023-12-22 07:06:16 | NULL | [ 2023-12-22 07:06:16 ] |
2023-12-22 07:07:30 | user_1 | host_B | NULL | 2023-12-22 07:07:30 | [ 2023-12-22 07:07:30 ] |
2023-12-22 23:55:00 | user_2 | host_C | 2023-12-22 23:55:01 | NULL | [ 2023-12-22 23:55:01 ] |
2023-12-23 00:00:00 | user_2 | host_C | NULL | 2023-12-23 00:00:50 | [ 2023-12-23 00:00:50 ] |
2023-12-23 00:01:40 | user_2 | host_D | 2023-12-23 00:00:51 | NULL | [ 2023-12-23 00:00:51 ] |
2023-12-23 00:06:40 | user_2 | host_D | NULL | 2023-12-23 00:06:40 | [ 2023-12-23 00:06:40 ] |
Explode the array with new timestamps:
user | host | tstp_hrf | new_tstps | new_tstp |
---|---|---|---|---|
user_1 | host_A | 2023-12-22 06:59:15 | [ 2023-12-22 06:59:16 ] | 2023-12-22 06:59:16 |
user_1 | host_A | 2023-12-22 07:00:52 | [ 2023-12-22 07:01:41 ] | 2023-12-22 07:01:41 |
user_1 | host_B | 2023-12-22 07:02:30 | [ 2023-12-22 07:01:42 ] | 2023-12-22 07:01:42 |
user_1 | host_B | 2023-12-22 07:05:04 | [ 2023-12-22 07:05:27 ] | 2023-12-22 07:05:27 |
user_1 | host_A | 2023-12-22 07:05:50 | [ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ] | 2023-12-22 07:05:28 |
user_1 | host_A | 2023-12-22 07:05:50 | [ 2023-12-22 07:05:28, 2023-12-22 07:06:15 ] | 2023-12-22 07:06:15 |
user_1 | host_B | 2023-12-22 07:06:40 | [ 2023-12-22 07:06:16 ] | 2023-12-22 07:06:16 |
user_1 | host_B | 2023-12-22 07:07:30 | [ 2023-12-22 07:07:30 ] | 2023-12-22 07:07:30 |
user_2 | host_C | 2023-12-22 23:55:00 | [ 2023-12-22 23:55:01 ] | 2023-12-22 23:55:01 |
user_2 | host_C | 2023-12-23 00:00:00 | [ 2023-12-23 00:00:50 ] | 2023-12-23 00:00:50 |
user_2 | host_D | 2023-12-23 00:01:40 | [ 2023-12-23 00:00:51 ] | 2023-12-23 00:00:51 |
user_2 | host_D | 2023-12-23 00:06:40 | [ 2023-12-23 00:06:40 ] | 2023-12-23 00:06:40 |
5.4. To round down log-in and round up log-out times of a user to midnight timestamp in a sequence of events
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession from pyspark.sql.window import Window spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType([ T.StructField("tstp_unix", T.LongType(), True), T.StructField("computer", T.StringType(), True), T.StructField("user", T.StringType(), True), ]) data = [ (1703224755, "computer_A", "user_1"), (1703224780, "computer_A", "user_1"), (1703224850, "computer_A", "user_1"), (1703224950, "computer_B", "user_1"), (1703225050, "computer_B", "user_1"), (1703285700, "computer_C", "user_2"), (1703285800, "computer_C", "user_2"), (1703276000, "computer_C", "user_2"), (1703286300, "computer_D", "user_2"), (1703296400, "computer_D", "user_2"), ] df = spark.createDataFrame(data=data, schema=schema) # Add human-readable timestamp df = df.withColumn("tstp_hrf", F.from_unixtime(F.col("tstp_unix"), "yyyy-MM-dd HH:mm:ss")) # Define a window to identify session changes window_spec = Window.partitionBy("user").orderBy(F.asc("tstp_unix")) # Previous row prev = F.lag("computer", 1).over(window_spec) df = df.withColumn("is_first", F.when(prev.isNull(), 1).otherwise(0)) rounded_down = F.from_unixtime("tstp_unix", "yyyy-MM-dd") df = df.withColumn("rounded_tstp_first", F.when(F.col("is_first") == 1, F.unix_timestamp(rounded_down, "yyyy-MM-dd") ).otherwise(F.col("tstp_unix"))) # Next row next = F.lag("computer", -1).over(window_spec) df = df.withColumn("is_last", F.when(next.isNull(), 1).otherwise(0)) rounded_up = F.from_unixtime(F.col("tstp_unix") + 24*60*60, "yyyy-MM-dd") df = df.withColumn("rounded_tstp_last", F.when(F.col("is_last") == 1, F.unix_timestamp(rounded_up, "yyyy-MM-dd") ).otherwise(F.col("tstp_unix"))) # Combine rounded values df = df.withColumn("rounded_tstp", F.when(next.isNull(), F.col("rounded_tstp_last") ).otherwise(F.col("rounded_tstp_first"))) df = df.withColumn("rounded_tstp_hrf", F.from_unixtime("rounded_tstp", "yyyy-MM-dd HH:mm:ss")) cols = ["user", "computer", "tstp_unix", "tstp_hrf", "is_first", "is_last", "rounded_tstp_hrf"] df.select(cols).show(truncate=False)
user | computer | tstp_unix | tstp_hrf | is_first | is_last | rounded_tstp_hrf |
---|---|---|---|---|---|---|
user_1 | computer_A | 1703224755 | 2023-12-22 06:59:15 | 1 | 0 | 2023-12-22 00:00:00 |
user_1 | computer_A | 1703224780 | 2023-12-22 06:59:40 | 0 | 0 | 2023-12-22 06:59:40 |
user_1 | computer_A | 1703224850 | 2023-12-22 07:00:50 | 0 | 0 | 2023-12-22 07:00:50 |
user_1 | computer_B | 1703224950 | 2023-12-22 07:02:30 | 0 | 0 | 2023-12-22 07:02:30 |
user_1 | computer_B | 1703225050 | 2023-12-22 07:04:10 | 0 | 1 | 2023-12-23 00:00:00 |
user_2 | computer_C | 1703276000 | 2023-12-22 21:13:20 | 1 | 0 | 2023-12-22 00:00:00 |
user_2 | computer_C | 1703285700 | 2023-12-22 23:55:00 | 0 | 0 | 2023-12-22 23:55:00 |
user_2 | computer_C | 1703285800 | 2023-12-22 23:56:40 | 0 | 0 | 2023-12-22 23:56:40 |
user_2 | computer_D | 1703286300 | 2023-12-23 00:05:00 | 0 | 0 | 2023-12-23 00:05:00 |
user_2 | computer_D | 1703296400 | 2023-12-23 02:53:20 | 0 | 1 | 2023-12-24 00:00:00 |
6. Numerical operations
6.1. To find percentage of a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Laptop", 12), ("Monitor", 7), ("Mouse", 8), ("Keyboard", 9)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy())*100, 2)) dft = df.select("Product", "Quantity", "%").orderBy(F.desc("Quantity")) dft.show()
Product | Quantity | % |
---|---|---|
Laptop | 12 | 33.33 |
Keyboard | 9 | 25.0 |
Mouse | 8 | 22.22 |
Monitor | 7 | 19.44 |
6.2. To find percentage of a column within a group using a window
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy("Location"))*100, 2)) dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity")) dft.show()
Location | Product | Quantity | % |
---|---|---|---|
Office | Laptop | 23 | 54.76 |
Office | Monitor | 10 | 23.81 |
Office | Mouse | 9 | 21.43 |
Home | Laptop | 12 | 33.33 |
Home | Keyboard | 9 | 25.0 |
Home | Mouse | 8 | 22.22 |
Home | Monitor | 7 | 19.44 |
6.3. To find percentage of a column within a group using .groupBy()
and a join
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) df_sum = df.groupBy("Location").agg(F.sum("Quantity").alias("Total_Quantity")) df = df.join(df_sum, on="Location").withColumn("%", F.round(F.col("Quantity")/F.col("Total_Quantity")*100, 2)) dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity")) dft.show()
Location | Product | Quantity | % |
---|---|---|---|
Office | Laptop | 23 | 54.76 |
Office | Monitor | 10 | 23.81 |
Office | Mouse | 9 | 21.43 |
Home | Laptop | 12 | 33.33 |
Home | Keyboard | 9 | 25.0 |
Home | Mouse | 8 | 22.22 |
Home | Monitor | 7 | 19.44 |
6.4. To add a row with total count and percentage for a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Name", T.StringType(), True), T.StructField("Books read", T.IntegerType(), True), ] ) data = [("Alice", 12), ("Bob", 7), ("Michael", 8), ("Kevin", 10)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("%", F.round( F.col("Books read")/F.sum("Books read").over(Window.partitionBy())*100, 6)) dft = df.select("Name", "Books read", "%").orderBy(F.desc("Books read")) dft = dft.union( dft.groupBy().agg(F.lit("Total"), F.sum("Books read").alias("Books read"), F.sum("%").alias("%")) ) dft.show()
Name | Books read | % |
---|---|---|
Alice | 12 | 32.432432 |
Kevin | 10 | 27.027027 |
Michael | 8 | 21.621622 |
Bob | 7 | 18.918919 |
Total | 37 | 100.0 |
6.5. To find maximum value of a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) df.show() max_val = df.select("Quantity").rdd.max()[0] print(f"Maximum value of Quantity: {max_val}")
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Home | Monitor | 7 |
Home | Mouse | 8 |
Home | Keyboard | 9 |
Office | Laptop | 23 |
Office | Monitor | 10 |
Office | Mouse | 9 |
Maximum value of Quantity: 23
6.6. To add a column with count of elements per group
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) df = df.withColumn("count_per_group", F.count(F.lit(1)).over(Window.partitionBy(F.col("Location")))) df.show()
Location | Product | Quantity | count_per_group |
---|---|---|---|
Home | Laptop | 12 | 4 |
Home | Monitor | 7 | 4 |
Home | Mouse | 8 | 4 |
Home | Keyboard | 9 | 4 |
Office | Laptop | 23 | 3 |
Office | Monitor | 10 | 3 |
Office | Mouse | 9 | 3 |
6.7. To add a column with count of elements whose quantity is larger than some number per group
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) df.show() dfs = df.groupBy("Location").agg(F.sum((F.col("Quantity") >= 10).cast("int")).alias("Count of products with quantity >= 10 per location")) dfs.show()
Location | Product | Quantity |
---|---|---|
Home | Laptop | 12 |
Home | Monitor | 7 |
Home | Mouse | 8 |
Home | Keyboard | 9 |
Office | Laptop | 23 |
Office | Monitor | 10 |
Office | Mouse | 9 |
Location | Count of products with quantity >= 10 per location |
---|---|
Home | 1 |
Office | 2 |
6.8. To calculate minimal value of two columns per row
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() # Updated schema to include two separate score fields schema = T.StructType([ T.StructField('Student', T.StructType([ T.StructField('First name', T.StringType(), True), T.StructField('Middle name', T.StringType(), True), T.StructField('Last name', T.StringType(), True) ])), T.StructField('ID', T.StringType(), True), T.StructField('Gender', T.StringType(), True), T.StructField('Score1', T.IntegerType(), True), T.StructField('Score2', T.IntegerType(), True) ]) # Sample data with two scores for each student data = [ (("John", "", "Doe"), "1007", "M", 75, 80), (("Adam", "Scott", "Smith"), "1008", "M", 55, 65), (("Marie", "", "Carpenter"), "1004", "F", 67, 70), (("Samantha", "Louise", "Herbert"), "1002", "F", 90, 85), (("Craig", "", "Brown"), "1011", "M", 88, 92) ] df = spark.createDataFrame(data=data, schema=schema) # Calculate the minimum score between Score1 and Score2 and store it in a new column 'MinScore' df = df.withColumn("MinScore", F.least(F.col("Score1"), F.col("Score2"))) df.printSchema() # Show the result df.show(truncate=False)
Schema of df
is:
root |-- Student: struct (nullable = true) | |-- First name: string (nullable = true) | |-- Middle name: string (nullable = true) | |-- Last name: string (nullable = true) |-- ID: string (nullable = true) |-- Gender: string (nullable = true) |-- Score1: integer (nullable = true) |-- Score2: integer (nullable = true) |-- MinScore: integer (nullable = true)
Student | ID | Gender | Score1 | Score2 | MinScore |
---|---|---|---|---|---|
{John, , Doe} | 1007 | M | 75 | 80 | 75 |
{Adam, Scott, Smith} | 1008 | M | 55 | 65 | 55 |
{Marie, , Carpenter} | 1004 | F | 67 | 70 | 67 |
{Samantha, Louise, Herbert} | 1002 | F | 90 | 85 | 85 |
{Craig, , Brown} | 1011 | M | 88 | 92 | 88 |
6.9. To calculate cumulative sum of a column
import pandas as pd from pyspark.sql import Window import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() df = pd.DataFrame({'time': [0, 1, 2, 3, 4, 5], 'value': [False, False, True, False, True, True]}) df = spark.createDataFrame(df) df = df.withColumn("cml_n_true", F.sum((F.col("value") == True).cast("int")).over(Window.orderBy(F.col("time").asc()))) df = df.withColumn("cml_n_false", F.sum((F.col("value") == False).cast("int")).over(Window.orderBy(F.col("time").asc()))) df.show()
time | value | cml_n_true | cml_n_false |
---|---|---|---|
0 | false | 0 | 1 |
1 | false | 0 | 2 |
2 | true | 1 | 2 |
3 | false | 1 | 3 |
4 | true | 2 | 3 |
5 | true | 3 | 3 |
6.10. To calculate difference of values of two consecutive rows for a certain column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() # Update schema to include a dataset column schema = T.StructType([ T.StructField("dataset", T.StringType(), True), T.StructField("group_data", T.StringType(), True), T.StructField("cnt", T.IntegerType(), True), ]) # Update data to include multiple datasets data = [ ("dataset1", "group_b", 7), ("dataset1", "group_c", 8), ("dataset1", "group_d", 9), ("dataset1", "group_e", 23), ("dataset1", "group_g", 9), ("dataset2", "group_a", 5), ("dataset2", "group_b", 15), ("dataset2", "group_d", 20), ("dataset2", "group_e", 8), ("dataset2", "group_g", 18), ] # Create DataFrame dfs = spark.createDataFrame(schema=schema, data=data) # Define a window partitioned by dataset and ordered by cnt window_by_dataset = Window.partitionBy("dataset").orderBy(F.asc("cnt")).rowsBetween(Window.unboundedPreceding, 0) # Calculate cumulative sum of 'cnt' within each dataset dfs = dfs.withColumn("cml_cnt", F.sum("cnt").over(window_by_dataset)) # Define another window for consecutive row difference, partitioned by dataset w = Window.partitionBy("dataset").orderBy(F.col("cnt").desc(), F.col("group_data").desc()) # Define fill value for NULL and shift (row lag) value fill_null_value = 0 shift = -1 # Calculate difference between two consecutive rows within each dataset dfs = dfs.withColumn( "diff_of_two_consec_rows", F.col("cml_cnt") - F.lag("cml_cnt", shift, fill_null_value).over(w) ) # Show the resulting DataFrame dfs.show(truncate=False)
dataset | group_data | cnt | cml_cnt | diff_of_two_consec_rows |
---|---|---|---|---|
dataset1 | group_e | 23 | 56 | 23 |
dataset1 | group_g | 9 | 33 | 9 |
dataset1 | group_d | 9 | 24 | 9 |
dataset1 | group_c | 8 | 15 | 8 |
dataset1 | group_b | 7 | 7 | 7 |
dataset2 | group_d | 20 | 66 | 20 |
dataset2 | group_g | 18 | 46 | 18 |
dataset2 | group_b | 15 | 28 | 15 |
dataset2 | group_e | 8 | 13 | 8 |
dataset2 | group_a | 5 | 5 | 5 |
6.11. To calculate difference of values of two consecutive rows for a certain column using .applyInPandas()
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession import pandas as pd spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType([ T.StructField("group", T.StringType(), True), T.StructField("value", T.IntegerType(), True), T.StructField("timestamp", T.IntegerType(), True), ]) data = [ ("A", 10, 1), ("A", 15, 2), ("A", 21, 3), ("B", 5, 1), ("B", 7, 2), ("B", 12, 3), ] df = spark.createDataFrame(data=data, schema=schema) # Define a Pandas UDF to calculate differences def calculate_diff(pdf): # Ensure the data is sorted by timestamp within each group pdf = pdf.sort_values("timestamp") # Calculate the difference between consecutive rows pdf["difference"] = pdf["value"].diff() pdf.loc[0, "difference"] = pdf.loc[0, "value"] return pdf # Apply the Pandas UDF, partitioning by the "group" column df_res = df.groupBy("group").applyInPandas(calculate_diff, schema="group string, timestamp int, value int, difference int") # Show the result df_res.show(truncate=False)
group | timestamp | value | difference |
---|---|---|---|
A | 1 | 10 | 10 |
A | 2 | 15 | 5 |
A | 3 | 21 | 6 |
B | 1 | 5 | 5 |
B | 2 | 7 | 2 |
B | 3 | 12 | 5 |
6.12. To calculate down-sampling ratios for over-represented groups of some data used in training of a machine learning algorithm
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("group_data", T.StringType(), True), T.StructField("cnt", T.IntegerType(), True), ] ) data = [("group_a", 12), ("group_b", 7), ("group_c", 8), ("group_d", 9), ("group_e", 23), ("group_f", 10), ("group_g", 9)] dfs = spark.createDataFrame(schema=schema, data=data) dfs = dfs.withColumn( "cml_cnt", F.sum("cnt").over(Window.orderBy(F.col("cnt").asc(), F.col("group_data").desc()).rowsBetween(Window.unboundedPreceding, 0)), ) txt = "Initial data composition:" print(txt) # Total count of samples in the input dataset dfs = dfs.withColumn("cnt_tot", F.sum("cnt").over(Window.partitionBy())) dfs.show(truncate=False) dfs = dfs.withColumn("rank", F.rank().over(Window.orderBy(F.desc("cnt"), F.asc("group_data")))) cnt_total = dfs.select("cnt_tot").limit(1).collect()[0][0] dfs = dfs.drop("cnt_total") n_samples_limit = 65 txt = f"Desired number of samples as parameter: {n_samples_limit}\n" w = Window.partitionBy().orderBy(F.asc("rank")) dfs = dfs.withColumn("cml_cnt + (rank-1)*F.lag(cnt, 1)", F.concat_ws("", F.format_string("%2d", F.col("cml_cnt")), F.lit(" + "), F.lit("("), F.col("rank").cast(T.StringType()), F.lit(" - "), F.lit(1).cast(T.StringType()), F.lit(")"), F.lit(" * "), (F.lag("cnt", 1, cnt_total).over(w)).cast(T.StringType()))) dfs = dfs.withColumn("cnt_tot_after_top_cut", F.col("cml_cnt") + (F.col("rank") - F.lit(1)) * F.lag("cnt", 1, cnt_total).over(w)) dfs = dfs.withColumn("loc_cutoff", (F.col("cnt_tot_after_top_cut") > F.lit(n_samples_limit)).cast("int")) dfs = dfs.withColumn("loc_cutoff_last", F.last("rank").over(Window.partitionBy("loc_cutoff").orderBy("loc_cutoff"))) dfs = dfs.withColumn("loc_cutoff", F.when((F.col("loc_cutoff") == 1) & (F.col("rank") == F.col("loc_cutoff_last")), F.lit(-1)).otherwise(F.col("loc_cutoff"))) dfs = dfs.drop("loc_cutoff_last") rank_cutoff = dfs.filter(F.col("loc_cutoff") == 1).orderBy(F.desc("rank")).limit(1).select("rank").collect()[0][0] txt += f"Rank at cutoff: {rank_cutoff}\n" sum_before_cutoff = dfs.filter(F.col("loc_cutoff") == -1).orderBy(F.asc("rank")).limit(1).select("cml_cnt").collect()[0][0] cnt_new_flat = (n_samples_limit - sum_before_cutoff)/rank_cutoff txt += f"New samples count for groups with rank 1 to {rank_cutoff} (inclusive): {cnt_new_flat}\n" dfs = dfs.withColumn("cnt_new", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)).otherwise(F.col("cnt").cast("float"))) txt += f"Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are.\n" dfs = dfs.withColumn( "cml_cnt_new", F.sum("cnt_new").over(Window.orderBy(F.desc("rank")).rowsBetween(Window.unboundedPreceding, 0)), ).orderBy(F.asc("rank")) dfs = dfs.withColumn("sampling_ratio", F.when(F.col("loc_cutoff") == F.lit(1), F.lit(cnt_new_flat)/F.col("cnt")).otherwise(F.lit(1.0))) txt += f"Data from groups with rank 1 to {rank_cutoff} (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:" print(txt) dfs.show(truncate=False)
Initial data composition:
group_data | cnt | cml_cnt | cnt_tot |
---|---|---|---|
group_b | 7 | 7 | 78 |
group_c | 8 | 15 | 78 |
group_g | 9 | 24 | 78 |
group_d | 9 | 33 | 78 |
group_f | 10 | 43 | 78 |
group_a | 12 | 55 | 78 |
group_e | 23 | 78 | 78 |
Desired number of samples as parameter: 65 Rank at cutoff: 2 New samples count for groups with rank 1 to 2 (inclusive): 11.0 Over-represented groups will be down-sampled to have a flat distribution, under-represented groups will be kept as they are. Data from groups with rank 1 to 2 (inclusive) should be downsampled with a ratio of samples to keep shown in 'sampling_ratio' column:
group_data | cnt | cml_cnt | cnt_tot | rank | cml_cnt + (rank-1)*F.lag(cnt, 1) | cnt_tot_after_top_cut | loc_cutoff | cnt_new | cml_cnt_new | sampling_ratio |
---|---|---|---|---|---|---|---|---|---|---|
group_e | 23 | 78 | 78 | 1 | 78 + (1 - 1) * 78 | 78 | 1 | 11.0 | 65.0 | 0.4782608695652174 |
group_a | 12 | 55 | 78 | 2 | 55 + (2 - 1) * 23 | 78 | 1 | 11.0 | 54.0 | 0.9166666666666666 |
group_f | 10 | 43 | 78 | 3 | 43 + (3 - 1) * 12 | 67 | -1 | 10.0 | 43.0 | 1.0 |
group_d | 9 | 33 | 78 | 4 | 33 + (4 - 1) * 10 | 63 | 0 | 9.0 | 33.0 | 1.0 |
group_g | 9 | 24 | 78 | 5 | 24 + (5 - 1) * 9 | 60 | 0 | 9.0 | 24.0 | 1.0 |
group_c | 8 | 15 | 78 | 6 | 15 + (6 - 1) * 9 | 60 | 0 | 8.0 | 15.0 | 1.0 |
group_b | 7 | 7 | 78 | 7 | 7 + (7 - 1) * 8 | 55 | 0 | 7.0 | 7.0 | 1.0 |
7. Structures
7.1. To convert a map to a struct
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() df = spark.sql("SELECT map('John', 1, 'Michael', 2) as Students_map") df = df.withColumn("Students_struct", F.map_entries("Students_map").alias("a", "b")) df.printSchema() df.show(truncate=False)
Schema of df
is:
root |-- Students_map: map (nullable = false) | |-- key: string | |-- value: integer (valueContainsNull = false) |-- Students_struct: array (nullable = false) | |-- element: struct (containsNull = false) | | |-- key: string (nullable = false) | | |-- value: integer (nullable = false)
Students_map | Students_struct |
---|---|
{John -> 1, Michael -> 2} | [{John, 1}, {Michael, 2}] |
7.2. To extract a field from a struct as a column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType([ T.StructField('Student', T.StructType([ T.StructField('First name', T.StringType(), True), T.StructField('Middle name', T.StringType(), True), T.StructField('Last name', T.StringType(), True) ])), T.StructField('ID', T.StringType(), True), T.StructField('Gender', T.StringType(), True), T.StructField('Score', T.IntegerType(), True) ]) data = [ (("John", "", "Doe"),"1007", "M", 75), (("Adam", "Scott", "Smith"),"1008", "M", 55), (("Marie", "", "Carpenter"), "1004", "F", 67), (("Samantha", "Louise", "Herbert"),"1002", "F", 90), (("Craig", "", "Brown"), "1011", "M" , 88) ] df = spark.createDataFrame(data=data, schema=schema) df.printSchema() df = df.withColumn("First name", F.col("Student.First Name")) df = df.withColumn("Last name", F.upper(F.col("Student").getField("Last name"))) df.show(truncate=False)
Schema of df
is:
root |-- Student: struct (nullable = true) | |-- First name: string (nullable = true) | |-- Middle name: string (nullable = true) | |-- Last name: string (nullable = true) |-- ID: string (nullable = true) |-- Gender: string (nullable = true) |-- Score: integer (nullable = true)
Student | ID | Gender | Score | First name | Last name |
---|---|---|---|---|---|
{John, , Doe} | 1007 | M | 75 | John | DOE |
{Adam, Scott, Smith} | 1008 | M | 55 | Adam | SMITH |
{Marie, , Carpenter} | 1004 | F | 67 | Marie | CARPENTER |
{Samantha, Louise, Herbert} | 1002 | F | 90 | Samantha | HERBERT |
{Craig, , Brown} | 1011 | M | 88 | Craig | BROWN |
7.3. To process an array of structs using F.transform()
and .getField()
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() # Define schema with a struct that includes an array of structs for subjects schema = T.StructType([ T.StructField('Student', T.StructType([ T.StructField('First name', T.StringType(), True), T.StructField('Middle name', T.StringType(), True), T.StructField('Last name', T.StringType(), True) ])), T.StructField('ID', T.StringType(), True), T.StructField('Gender', T.StringType(), True), T.StructField('Score', T.IntegerType(), True), T.StructField('Subjects and scores', T.ArrayType( T.StructType([ T.StructField('Subject name', T.StringType(), True), T.StructField('Subject score', T.IntegerType(), True) ]) ), True) ]) # Define data with an array of subjects for each student data = [ (("John", "", "Doe"), "1007", "M", 75, [("Math", 78), ("Science", 82)]), (("Adam", "Scott", "Smith"), "1008", "M", 55, [("Math", 55), ("English", 65)]), (("Marie", "", "Carpenter"), "1004", "F", 67, [("Math", 72), ("Science", 68), ("History", 75)]), (("Samantha", "Louise", "Herbert"), "1002", "F", 90, [("Math", 92), ("Science", 88), ("English", 91)]), (("Craig", "", "Brown"), "1011", "M", 88, [("Math", 85), ("Science", 90)]) ] df = spark.createDataFrame(data=data, schema=schema) df.printSchema() df = df.withColumn("Subjects taken", F.transform("Subjects and scores", lambda x: x.getField("Subject name"))) df.show(truncate=False)
Schema of df
is:
root |-- Student: struct (nullable = true) | |-- First name: string (nullable = true) | |-- Middle name: string (nullable = true) | |-- Last name: string (nullable = true) |-- ID: string (nullable = true) |-- Gender: string (nullable = true) |-- Score: integer (nullable = true) |-- Subjects and scores: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- Subject name: string (nullable = true) | | |-- Subject score: integer (nullable = true)
Student | ID | Gender | Score | Subjects and scores | Subjects taken |
---|---|---|---|---|---|
{John, , Doe} | 1007 | M | 75 | [{Math, 78}, {Science, 82}] | [Math, Science] |
{Adam, Scott, Smith} | 1008 | M | 55 | [{Math, 55}, {English, 65}] | [Math, English] |
{Marie, , Carpenter} | 1004 | F | 67 | [{Math, 72}, {Science, 68}, {History, 75}] | [Math, Science, History] |
{Samantha, Louise, Herbert} | 1002 | F | 90 | [{Math, 92}, {Science, 88}, {English, 91}] | [Math, Science, English] |
{Craig, , Brown} | 1011 | M | 88 | [{Math, 85}, {Science, 90}] | [Math, Science] |
7.4. To process an array of nested structs using F.transform()
and .getField()
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() # Define the schema with nested structs schema = T.StructType([ T.StructField('Student', T.StructType([ T.StructField('First name', T.StringType(), True), T.StructField('Middle name', T.StringType(), True), T.StructField('Last name', T.StringType(), True) ])), T.StructField('ID', T.StringType(), True), T.StructField('Gender', T.StringType(), True), T.StructField('Subjects', T.ArrayType(T.StructType([ T.StructField('Subject Name', T.StringType(), True), T.StructField('Assessments', T.ArrayType(T.StructType([ T.StructField('Assessment Name', T.StringType(), True), T.StructField('Score', T.IntegerType(), True) ])), True) ])), True) ]) # Sample data matching the new schema data = [ (("John", "", "Doe"), "1007", "M", [ ("Math", [("Midterm", 70), ("Final", 80)]), ("Science", [("Midterm", 65), ("Final", 75)]) ]), (("Adam", "Scott", "Smith"), "1008", "M", [ ("Math", [("Midterm", 50), ("Final", 60)]), ("Science", [("Midterm", 55), ("Final", 65)]) ]), (("Marie", "", "Carpenter"), "1004", "F", [ ("Math", [("Midterm", 60), ("Final", 75)]), ("Science", [("Midterm", 68), ("Final", 70)]) ]), (("Samantha", "Louise", "Herbert"), "1002", "F", [ ("Math", [("Midterm", 88), ("Final", 92)]), ("Science", [("Midterm", 85), ("Final", 89)]) ]), (("Craig", "", "Brown"), "1011", "M", [ ("Math", [("Midterm", 78), ("Final", 85)]), ("Science", [("Midterm", 80), ("Final", 84)]) ]) ] df = spark.createDataFrame(data=data, schema=schema) df.printSchema() df = df.withColumn("Scores", F.transform("Subjects", lambda x: x["Assessments"].getField("Score"))) df.show(truncate=False)
Schema of df
is:
root |-- Student: struct (nullable = true) | |-- First name: string (nullable = true) | |-- Middle name: string (nullable = true) | |-- Last name: string (nullable = true) |-- ID: string (nullable = true) |-- Gender: string (nullable = true) |-- Subjects: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- Subject Name: string (nullable = true) | | |-- Assessments: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- Assessment Name: string (nullable = true) | | | | |-- Score: integer (nullable = true)
Student | ID | Gender | Subjects | Scores |
---|---|---|---|---|
{John, , Doe} | 1007 | M | [{Math, [{Midterm, 70}, {Final, 80}]}, {Science, [{Midterm, 65}, {Final, 75}]}] | [[70, 80], [65, 75]] |
{Adam, Scott, Smith} | 1008 | M | [{Math, [{Midterm, 50}, {Final, 60}]}, {Science, [{Midterm, 55}, {Final, 65}]}] | [[50, 60], [55, 65]] |
{Marie, , Carpenter} | 1004 | F | [{Math, [{Midterm, 60}, {Final, 75}]}, {Science, [{Midterm, 68}, {Final, 70}]}] | [[60, 75], [68, 70]] |
{Samantha, Louise, Herbert} | 1002 | F | [{Math, [{Midterm, 88}, {Final, 92}]}, {Science, [{Midterm, 85}, {Final, 89}]}] | [[88, 92], [85, 89]] |
{Craig, , Brown} | 1011 | M | [{Math, [{Midterm, 78}, {Final, 85}]}, {Science, [{Midterm, 80}, {Final, 84}]}] | [[78, 85], [80, 84]] |
8. Dataframe join operations
8.1. To perform a full, outer, left, right join operations
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Name", T.StringType(), True), T.StructField("Score", T.IntegerType(), True), ] ) data = [("Alice", 10), ("Bob", 11) ] df_a = spark.createDataFrame(schema=schema, data=data) print("Table A:") df_a.show() schema = T.StructType( [ T.StructField("Name", T.StringType(), True), T.StructField("Surname", T.StringType(), True), T.StructField("Age", T.StringType(), True), ] ) data = [("Alice", "Doe", 12), ("Alice", "Smith", 30), ("Jane", "Carter", 7), ] df_b = spark.createDataFrame(schema=schema, data=data) print("Table B:") df_b.show() df = df_a.join(df_b, on="Name", how="full") print("Full join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="outer") print("Outer join on 'Name':") df.show() df = df_a.join(df_b, df_a["Name"] == df_b["Name"]) print("Join on 'Name' on equal condition:") df.show() df = df_a.join(df_b, on="Name", how="inner") print("Inner join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="left") print("Left join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="left_outer") print("Left-outer join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="left_anti") print("Left-anti join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="left_semi") print("Left-semi join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="right") print("Right join on 'Name':") df.show() df = df_a.join(df_b, on="Name", how="right_outer") print("Right-outer join on 'Name':") df.show()
Table A:
Name | Score |
---|---|
Alice | 10 |
Bob | 11 |
Table B:
Name | Surname | Age |
---|---|---|
Alice | Doe | 12 |
Alice | Smith | 30 |
Jane | Carter | 7 |
Full join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Bob | 11 | null | null |
Jane | null | Carter | 7 |
Outer join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Bob | 11 | null | null |
Jane | null | Carter | 7 |
Join on 'Name' on equal condition:
Name | Score | Name | Surname | Age |
---|---|---|---|---|
Alice | 10 | Alice | Doe | 12 |
Alice | 10 | Alice | Smith | 30 |
Inner join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Left join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Bob | 11 | null | null |
Alice | 10 | Smith | 30 |
Alice | 10 | Doe | 12 |
Left-outer join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Bob | 11 | null | null |
Alice | 10 | Smith | 30 |
Alice | 10 | Doe | 12 |
Left-anti join on 'Name':
Name | Score |
---|---|
Bob | 11 |
Left-semi join on 'Name':
Name | Score |
---|---|
Alice | 10 |
Right join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Jane | null | Carter | 7 |
Right-outer join on 'Name':
Name | Score | Surname | Age |
---|---|---|---|
Alice | 10 | Doe | 12 |
Alice | 10 | Smith | 30 |
Jane | null | Carter | 7 |
8.2. To drop one of the duplicate columns after join
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() df_a = spark.createDataFrame([ Row(id=1, value="A1"), Row(id=1, value="B1"), Row(id=1, value="C1"), Row(id=2, value="A1"), Row(id=2, value="X1"), Row(id=2, value="Y1")] ) print("Dataframe df_a:") df_1.show() df_b = spark.createDataFrame([ Row(id=1, updated="A2"), Row(id=1, updated="B1"), Row(id=1, updated="C1"), Row(id=2, updated="A1"), Row(id=2, updated="X1"), Row(id=2, updated="Y1")] ) print("Dataframe df_b:") df_2.show() df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full") print("Full join on df_a['value'] == df_b['updated']:") df.show() df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full").drop(df_b["id"]) print("Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:") df.show()
Dataframe df_a
:
id | value |
---|---|
1 | A1 |
1 | B1 |
1 | C1 |
2 | A1 |
2 | X1 |
2 | Y1 |
Dataframe df_b
:
id | updated |
---|---|
1 | A2 |
1 | B1 |
1 | C1 |
2 | A1 |
2 | X1 |
2 | Y1 |
Full join on df_a['value'] == df_b['updated']
:
id | value | id | updated |
---|---|---|---|
1.0 | A1 | null | null |
null | null | 1.0 | A2 |
1.0 | B1 | 1.0 | B1 |
1.0 | C1 | 1.0 | C1 |
2.0 | A1 | 2.0 | A1 |
2.0 | X1 | 2.0 | X1 |
2.0 | Y1 | 2.0 | Y1 |
Full join on df_a['value'] == df_b['updated']
with dropped df_b['id']
column:
id | value | updated |
---|---|---|
1.0 | A1 | null |
null | null | A2 |
1.0 | B1 | B1 |
1.0 | C1 | C1 |
2.0 | A1 | A1 |
2.0 | X1 | X1 |
2.0 | Y1 | Y1 |
9. Aggregation and maps
9.1. To group by and aggregate into a map using F.map_from_entries()
import pyspark.sql.functions as F from pyspark.sql import Row from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() df = spark.createDataFrame([ Row(id=1, key='a', value="A1"), Row(id=1, key='b', value="B1"), Row(id=1, key='c', value="C1"), Row(id=2, key='a', value="A1"), Row(id=2, key='x', value="X1"), Row(id=2, key='y', value="Y1")] ) print("Dataframe with keys and values:") df.show(truncate=False) dft = df.groupBy("id").agg(F.map_from_entries(F.collect_list( F.struct("key", "value"))).alias("key_value") ) print("Dataframe with key -> value mapping") dft.show(truncate=False) dft.printSchema()
Dataframe with keys and values:
id | key | value |
---|---|---|
1 | a | A1 |
1 | b | B1 |
1 | c | C1 |
2 | a | A1 |
2 | x | X1 |
2 | y | Y1 |
Dataframe with key -> value mapping
id | key_value |
---|---|
1 | {a -> A1, b -> B1, c -> C1} |
2 | {a -> A1, x -> X1, y -> Y1} |
Schema of dft
is:
root |-- id: long (nullable = true) |-- key_value: map (nullable = false) | |-- key: string | |-- value: string (valueContainsNull = true)
9.2. To group by and aggregate into a map using UDF
import pyspark.sql.functions as F from pyspark.sql import Row from pyspark.sql import SparkSession import pyspark.sql.types as T spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() df = spark.createDataFrame([ Row(id=1, key='a', value="A1"), Row(id=1, key='b', value="B1"), Row(id=1, key='c', value="C1"), Row(id=2, key='a', value="A1"), Row(id=2, key='x', value="X1"), Row(id=2, key='y', value="Y1")] ) print("Dataframe with keys and values:") df.show() @F.udf(returnType=T.MapType(T.StringType(), T.StringType())) def map_array(column): return dict(column) dft = (df.groupBy("id") .agg(F.collect_list(F.struct("key", "value")).alias("key_value")) .withColumn('key_value', map_array('key_value'))) print("Dataframe with keys and values:") dft.show(truncate=False) dft.printSchema()
Dataframe with keys and values:
id | key | value |
---|---|---|
1 | a | A1 |
1 | b | B1 |
1 | c | C1 |
2 | a | A1 |
2 | x | X1 |
2 | y | Y1 |
Dataframe with keys and values:
id | key_value |
---|---|
1 | {a -> A1, b -> B1, c -> C1} |
2 | {x -> X1, a -> A1, y -> Y1} |
Schema of dft
is:
root |-- id: long (nullable = true) |-- key_value: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true)
9.3. To agregate over multiple columns and sum values of dictionaries
import pyspark.sql.types as T import pyspark.sql.functions as F from pyspark.sql import SparkSession df_schema = T.StructType([T.StructField('clid', T.StringType(), True), T.StructField('coef_1', T.MapType(T.StringType(), T.DoubleType(), True), False), T.StructField('coef_2', T.MapType(T.StringType(), T.DoubleType(), True), False), T.StructField('coef_3', T.MapType(T.StringType(), T.DoubleType(), True), False)]) df_data = [["X", {'B': 0.4, 'C': 0.4}, {'B': 0.33, 'C': 0.5}, {'A': 0.5, 'C': 0.33}], ["Y", {'B': 0.67, 'C': 0.33}, {'B': 0.85}, {'A': 0.4, 'C': 0.57}], ] spark = SparkSession.builder\ .appName("Parse DataFrame Schema")\ .getOrCreate() df = spark.createDataFrame(data=df_data, schema=df_schema) df = df.withColumn("coef_total", F.col("coef_1")) for i in range(2,4): df = df.withColumn("coef_total", F.map_zip_with("coef_total", f"coef_{i}", lambda k, v1, v2: F.when(v1.isNull(), 0).otherwise(v1) + F.when(v2.isNull(), 0).otherwise(v2))) df.show(truncate=False)
clid | coef_1 | coef_2 | coef_3 | coef_total |
---|---|---|---|---|
X | {B -> 0.4, C -> 0.4} | {B -> 0.33, C -> 0.5} | {A -> 0.5, C -> 0.33} | {B -> 0.73, C -> 1.23, A -> 0.5} |
Y | {B -> 0.67, C -> 0.33} | {B -> 0.85} | {A -> 0.4, C -> 0.57} | {B -> 1.52, C -> 0.8999999999999999, A -> 0.4} |
10. Groups, aggregations, pivoting and window operations
10.1. To get 2nd smallest element in a group
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Location", T.StringType(), True), T.StructField("Product", T.StringType(), True), T.StructField("Quantity", T.IntegerType(), True), ] ) data = [("Home", "Laptop", 12), ("Home", "Monitor", 7), ("Home", "Mouse", 8), ("Home", "Keyboard", 9), ("Office", "Laptop", 23), ("Office", "Monitor", 10), ("Office", "Mouse", 9)] df = spark.createDataFrame(schema=schema, data=data) w = Window.partitionBy("Location").orderBy(F.asc("Quantity")) df = df.withColumn("rank", F.rank().over(w)) df.show() print("Products with the 2nd smallest quantity in a location:") df = df.filter(F.col("rank") == 2) df.show()
Location | Product | Quantity | rank |
---|---|---|---|
Home | Monitor | 7 | 1 |
Home | Mouse | 8 | 2 |
Home | Keyboard | 9 | 3 |
Home | Laptop | 12 | 4 |
Office | Mouse | 9 | 1 |
Office | Monitor | 10 | 2 |
Office | Laptop | 23 | 3 |
Products with the 2nd smallest quantity in a location:
Location | Product | Quantity | rank |
---|---|---|---|
Home | Mouse | 8 | 2 |
Office | Monitor | 10 | 2 |
10.2. To calculate set intersection between arrays in two consecutive rows in a window
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.window import Window from pyspark.sql import SparkSession from typing import List import pandas as pd spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("group", T.StringType(), True), T.StructField("letters", T.ArrayType(T.StringType()), True), ] ) data = [("group_A", ["a", "b", "c"]), ("group_A", ["a", "x", "z", "y"]), ("group_A", ["a", "x", "z", "w"]), ("group_A", ["d", "x", "z"]), ("group_B", ["a", "c", "d"]), ("group_B", ["a", "c", "e"]), ("group_B", ["a", "g", "f", "h"])] df = spark.createDataFrame(schema=schema, data=data) @F.pandas_udf(returnType=T.IntegerType()) def number_of_input_rows_in_win(v: pd.Series) -> int: """ Calculate number of rows in a window """ return v.shape[0] @F.pandas_udf(returnType=T.IntegerType()) def arr_intxn_size(v: pd.Series) -> int: """ Calculate size of set intersection between last and penultimate elements (arrays) """ if v.shape[0] <= 1: return 0 else: x = list(set(v.iloc[-1]).intersection(set(v.iloc[-2]))) return len(x) @F.pandas_udf(returnType=T.ArrayType(T.StringType())) def arr_intxn_pudf(v: pd.Series) -> List[str]: """ Calculate set intersection between last and penultimate elements (arrays) """ if v.shape[0] <= 1: return [] else: return list(set(v.iloc[-1]).intersection(set(v.iloc[-2]))) def arr_intxn_lag(col_name, win): col_lagged = F.lag(col_name, 1).over(win) itxn = F.array_intersect(col_name, col_lagged) return F.when(itxn.isNull(), F.array([])).otherwise(itxn) # A window with previous and current row (indices: -1, 0) win_last_two = Window.partitionBy("group").orderBy("letters").rowsBetween(-1, 0) df = df.withColumn("nbr_rows_in_input_win", number_of_input_rows_in_win("letters").over(win_last_two)) df = df.withColumn("intxn_w_prv_row_pudf", arr_intxn_pudf("letters").over(win_last_two)) win_unbound = Window.partitionBy("group").orderBy("letters") df = df.withColumn("intxn_w_prv_row_lag", arr_intxn_lag("letters", win_unbound)) df = df.withColumn("size(intxn_w_prv_row_pudf)", arr_intxn_size("letters").over(win_last_two)) df = df.withColumn("size(intxn_w_prv_row)", F.size("intxn_w_prv_row_lag")) txt = "Number of rows in input window can be \nequal to 1 (first row) or\nequal to" + \ " 2 (consecutive two rows specified by indices (-1, 0) in window definition)" print(txt) df.show()
Number of rows in input window can be equal to 1 (first row) or equal to 2 (consecutive two rows specified by indices (-1, 0) in window definition)
group | letters | nbr_rows_in_input_win | intxn_w_prv_row_pudf | intxn_w_prv_row_lag | size(intxn_w_prv_row_pudf) | size(intxn_w_prv_row) |
---|---|---|---|---|---|---|
group_A | [a, b, c] | 1 | [] | [] | 0 | 0 |
group_A | [a, x, z, w] | 2 | [a] | [a] | 1 | 1 |
group_A | [a, x, z, y] | 2 | [a, x, z] | [a, x, z] | 3 | 3 |
group_A | [d, x, z] | 2 | [x, z] | [x, z] | 2 | 2 |
group_B | [a, c, d] | 1 | [] | [] | 0 | 0 |
group_B | [a, c, e] | 2 | [c, a] | [a, c] | 2 | 2 |
group_B | [a, g, f, h] | 2 | [a] | [a] | 1 | 1 |
10.3. To pivot a dataframe by some column
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() # Define schema for "name", "subject", and "grade" schema = T.StructType([ T.StructField("name", T.StringType(), True), T.StructField("subject", T.StringType(), True), T.StructField("grade", T.IntegerType(), True), ]) # Sample data for students, their subjects, and grades data = [ ("John", "Math", 85), ("John", "Science", 90), ("John", "History", 78), ("Marie", "Math", 88), ("Marie", "Science", 92), ("Marie", "History", 80), ("Adam", "Math", 75), ("Adam", "Science", 82), ("Adam", "History", 70), ] # Create DataFrame df = spark.createDataFrame(data=data, schema=schema) txt = "Original dataframe:" print(txt) df.show() # Pivot the table by "subject" and aggregate by taking the first value of "grade" for each subject df_pvt = df.groupBy("name").pivot("subject").agg(F.first("grade")) txt = "Original dataframe pivoted by column \"subject\":" print(txt) df_pvt.show(truncate=False)
Original dataframe:
name | subject | grade |
---|---|---|
John | Math | 85 |
John | Science | 90 |
John | History | 78 |
Marie | Math | 88 |
Marie | Science | 92 |
Marie | History | 80 |
Adam | Math | 75 |
Adam | Science | 82 |
Adam | History | 70 |
Original dataframe pivoted by column "subject":
name | History | Math | Science |
---|---|---|---|
John | 78 | 85 | 90 |
Adam | 70 | 75 | 82 |
Marie | 80 | 88 | 92 |
11. Sampling rows
11.1. To sample rows
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("index", T.IntegerType(), True), T.StructField("value", T.StringType(), True), ] ) data = [(1, "Home"), (2, "School"), (3, "Home"), (4, "Home"), (5, "Office"), (6, "Office"), (7, "Office"), (8, "Mall"), (9, "Mall"), (10, "School")] df = spark.createDataFrame(schema=schema, data=data).repartition(3) df = df.withColumn("partition", F.spark_partition_id()).orderBy("index") print("Original dataframe:") df.show() print("Sampled dataframe:") dft = df.sample(fraction=0.5, seed=1).orderBy("index") dft.show()
Original dataframe:
index | value | partition |
---|---|---|
1 | Home | 1 |
2 | School | 0 |
3 | Home | 0 |
4 | Home | 2 |
5 | Office | 2 |
6 | Office | 2 |
7 | Office | 1 |
8 | Mall | 0 |
9 | Mall | 1 |
10 | School | 0 |
Sampled dataframe:
index | value | partition |
---|---|---|
3 | Home | 0 |
4 | Home | 2 |
7 | Office | 1 |
8 | Mall | 0 |
9 | Mall | 1 |
12. UUID generation
12.1. To generate a UUID for every row
import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql import SparkSession import random import uuid spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() schema = T.StructType( [ T.StructField("Name", T.StringType(), True), ] ) data = [["Alice"], ["Bon"], ["John"], ["Cecile"] ] df = spark.createDataFrame(schema=schema, data=data).repartition(2) def _generate_uuid(uuid_gen, v=10): def _replace_byte(value: int, byte: int): byte = byte & 0xF bit_shift = 76 mask = ~(0xF << bit_shift) return value & mask | (byte << bit_shift) uuid_ = uuid_gen.generate() return uuid.UUID(int=(_replace_byte(uuid_.int, v))) class RandomDistributedUUIDGenerator: def generate(self): return uuid.uuid4() class SeedBasedUUIDGenerator: def __init__(self, seed): self.rnd = random.Random(seed) def generate(self): return uuid.UUID(int=self.rnd.getrandbits(128), version=4) gen = RandomDistributedUUIDGenerator() udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType()) df = df.withColumn("UUID_random_distributed", udf_generate_uuid()) seed_for_rng = 1 gen = SeedBasedUUIDGenerator(seed_for_rng) udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType()) df = df.withColumn("UUID_seed_based", udf_generate_uuid()) print("The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.") df.show(truncate=False)
The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.
Name | UUID_random_distributed | UUID_seed_based |
---|---|---|
John | 4e9a3bb1-a189-a25e-8389-7f8382635b09 | cd613e30-d8f1-aadf-91b7-584a2265b1f5 |
Bon | 16cd1549-0c74-a483-9bbe-707e59e0796f | 1e2feb89-414c-a43c-9027-c4d1c386bbc4 |
Cecile | b8b05619-6004-aa75-b98b-7e1c83c9f301 | cd613e30-d8f1-aadf-91b7-584a2265b1f5 |
Alice | b1f1a9fb-feb9-a946-9171-3e7cb577fdaa | 1e2feb89-414c-a43c-9027-c4d1c386bbc4 |