5 ways to calculate max of row in Spark
5 ways to calculate max of row in Spark/PySpark
In this post I tested the performance of 5 different methods to calculate a maximal value of a single row.
You might encounter this situation before: you have multiple columns with some value, integer or float for example,
and you want to calculate the maximum value per row for all the data frame and append it as additional column.
We will test 5 different ways to calculate maximum value:
- udf - user define function
- 'array' and 'array_max' functions
- 'greatest' function
- rdd 'map' transformation
- data frame groupBy and max-aggregation
For the data, we are going to generate 10 columns of integer values over 1 million rows via Python code.
The values will be in the range of 1 to 1000.
import random def gen_csv(): f = open("../data_10x1m.csv", "w") f.write("c1,c2,c3,c4,c5,c6,c7,c8,c9,c10\n") f.flush() i = 1 while i <= 1000000: if i % 1000 == 0: print(i) f.flush() f.write( f"{i},{random.randint(1, 1000)},{random.randint(1, 1000)},{random.randint(1, 1000)}," f"{random.randint(1, 1000)},{random.randint(1, 1000)},{random.randint(1, 1000)},{random.randint(1, 1000)}," f"{random.randint(1, 1000)},{random.randint(1, 1000)}\n") i += 1 f.flush() f.close()
The generated file is a simple CSV that weight 409MB that looks like this:
c1,c2,c3,c4,c5,c6,c7,c8,c9,c10 574,124,734,748,249,461,392,492,19,375 336,5,392,447,208,564,592,823,698,335 839,560,907,920,699,389,982,327,43,530 631,846,597,7,176,482,390,149,489,442 362,375,686,367,852,959,297,433,892,400
And its schema is all integers when Spark infers it.
root |-- c1: integer (nullable = true) |-- c2: integer (nullable = true) |-- c3: integer (nullable = true) |-- c4: integer (nullable = true) |-- c5: integer (nullable = true) |-- c6: integer (nullable = true) |-- c7: integer (nullable = true) |-- c8: integer (nullable = true) |-- c9: integer (nullable = true) |-- c10: integer (nullable = true)
import time import pyspark.sql.functions as f from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, Row, StructType, StructField from pyspark.sql.functions import col, greatest
1. UDF - User Define Function
Using UDF or user defined function is a convenient way to do these kind of things, however a UDF is not recommended, in the general case, since it "lowers" Spark to use Python native function that might be not optimal. In the code below, we use a UDF and Python max function. The UDF is a Python lambda that gets a structure column of all the column names in the schema, calculates the maximum value using Python max function.def max_udf(spark) -> float: t1 = time.time() df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True) cols = list(col(cname) for cname in df.schema.names) udf_max = f.udf(lambda row: max(row), IntegerType()) df = df.select("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", udf_max(f.struct([x for x in cols])).alias("max")) df.write.csv("data_10x1m_processed.csv", header=True) df.explain(mode="formatted") t2 = time.time() return t2 - t1
To get the most accurate result I run this code 5 times, each time resetting to the same conditions as possibly.
max_udf 1 13.085139751434326 sec <<< initializing SparkSession for the first time! max_udf 2 9.243947505950928 sec max_udf 3 9.129037141799927 sec max_udf 4 9.652921438217163 sec max_udf 5 8.511781930923462 sec
As you can see the first iteration takes a few seconds longer then all the rest. This is due to the initialization of
the Spark session. In order to get consistency I will ignore the first iteration. Averaging on the rest 4 measurements
gives: 9.13442200422287 seconds. so we can say:
max_udf runtime is 9.134 seconds
2. Array function
In this function the code will "package" the values in the row to an array using Spark array function. The array function takes a list columns and returns a new column with values of the input columns as an array. On that array I will use Spark array_max function, that returns the maximum value of the array.def max_array(spark) -> float: t1 = time.time() df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True) cols = list(col(cname) for cname in df.schema.names) df = df.select("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", f.array_max(f.array(*cols)).alias("max")) df.write.csv("data_10x1m_processed.csv", header=True) t2 = time.time() return t2 - t1
Just like before the first measurement will contains the time it took to initialize the Spark session and I will ignore it.
max_array 1 6.332355260848999 sec <<< initializing SparkSession for the first time! max_array 2 3.4596095085144043 sec max_array 3 3.4937448501586914 sec max_array 4 3.456946611404419 sec max_array 5 3.478461503982544 sec
As you can see from looking at the raw results max_array outperforms max_udf. Ignoring the first iteration, measurements 2 to 5 gives an average runtime: 3.4721906185150146.
This is a really good example of how UDF's are slower then native Spark code and even though is might be simpler to
skip the spark-code-headache and going with Python code but the performance is very costly.
max_array runtime is 3.472 seconds
3. Greatest function
The next method I found in stackoverflow by zero323. It uses the builtin Spark function greatest. Greatest function returns the largest value from a list of column names while skipping null values. While the original answer uses coalesce - that returns the first column that is not null, I did not see the justification to use it here. However, feel free to use it if your use case is different from what I'm showing here.def max_greatest(spark) -> float: t1 = time.time() df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True) row_max = greatest(*[col(x) for x in df.schema.names]) df = df.withColumn("max", row_max) df.write.csv("data_10x1m_processed.csv", header=True) df.explain(mode="formatted") t2 = time.time() return t2 - t1
max_greatest 1 6.922237873077393 sec <<< initializing SparkSession for the first time! max_greatest 2 3.678072214126587 sec max_greatest 3 3.6312289237976074 sec max_greatest 4 3.608497381210327 sec max_greatest 5 3.678683042526245 sec
Similar to arrays function, greatest work very fast.
As before I will ignore the first iteration, measurements 2 to 5 gives an average runtime: 3.6491203904151917
The use of greatest function work slightly slower then the usage of arrays.
max_coalesce_greatest runtime is 3.649 seconds
4. Map transformation and Python's max function
Lets use map transformation on the underlining RDD. The code below, map each row to each original value (the entire row) with the additional value of maximum value of the row's values. To find the maximum value of a row, I use Python max function. To match to other functions, I created a data frame from the RDD using createDataFrame and supplying its schema.def max_map(spark) -> float: t1 = time.time() df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True) rdd = df.rdd.map(lambda row: Row(*row, max(*row))) schema = StructType([ StructField("c1", IntegerType(), False), StructField("c2", IntegerType(), False), StructField("c3", IntegerType(), False), StructField("c4", IntegerType(), False), StructField("c5", IntegerType(), False), StructField("c6", IntegerType(), False), StructField("c7", IntegerType(), False), StructField("c8", IntegerType(), False), StructField("c9", IntegerType(), False), StructField("c10", IntegerType(), False), StructField("max", IntegerType(), False) ]) df = spark.createDataFrame(rdd, schema=schema) df.write.csv("data_10x1m_processed.csv", header=True) t2 = time.time() return t2 - t1
max_map 1 18.7514066696167 sec <<< initializing SparkSession for the first time! max_map 2 16.380369186401367 sec max_map 3 15.212764739990234 sec max_map 4 15.614974737167358 sec max_map 5 14.794058799743652 sec
Skipping first iteration that holds the initialization time for Spark session.
The average runtime for iteration 2 to 5: 15.500541865825653
Clearly, these results are worst then all 3 methods above.
The physical plan, shown below, tell us that Spark has only one stage for this code and its RDD scan that is relatively
slow compere to equal operation on a DataFrame.
max_map runtime is 15.5 seconds
== Physical Plan == * Scan ExistingRDD (1) (1) Scan ExistingRDD [codegen id : 1] Output [11]: [c1#256, c2#257, c3#258, c4#259, c5#260, c6#261, c7#262, c8#263, c9#264, c10#265, max#266] Arguments: [c1#256, c2#257, c3#258, c4#259, c5#260, c6#261, c7#262, c8#263, c9#264, c10#265, max#266], MapPartitionsRDD[65] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
5. GroupBy and Max aggregation
This will be the most complex way to do this calculation. In the code below I added a row id using monotonically_increasing_id and an array of all original values. Next, I created a new row for each element of the array using explode. The result is that for each line in the original data frame we'll have multiple lines per each value of the array and the rest of the of the rows values. Meaning a single row_id per all multiple values that originated from the same line.+---+---+---+---+---+---+---+---+---+---+------+--------------------+--------------+ | c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|row_id| arr|exploded_value| +---+---+---+---+---+---+---+---+---+---+------+--------------------+--------------+ | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 1| -+ | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 187| | | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 55| | | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 64| | | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 354| | | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 754| +-> single row in the | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 909| | original data, | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 966| | marked as row_id = 1 | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 882| | | 1|187| 55| 64|354|754|909|966|882|976| 0|[1, 187, 55, 64, ...| 976| -+ | 2|576|781|708|788|965|689|239| 50|394| 1|[2, 576, 781, 708...| 2| | 2|576|781|708|788|965|689|239| 50|394| 1|[2, 576, 781, 708...| 576| ...
Now, I can group by the row_id and aggregate using max function.
To get the rest of the values just like in the previous cases, I used first function.
def max_groupby(spark) -> float: t1 = time.time() df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True) cols = list(f.col(cname) for cname in df.schema.names) name_col_list = list((cname, f.col(cname)) for cname in df.schema.names) df = df.withColumn("row_id", f.monotonically_increasing_id()) \ .withColumn("arr", f.array(*cols)) df = df.withColumn("exploded_value", f.explode(df.arr)) firsts = list(f.first(name_col[1]).alias(name_col[0]) for name_col in name_col_list) df = df.groupBy("row_id") \ .agg(f.max("exploded_value").alias("max"), *firsts) \ .drop("row_id") \ .coalesce(1) df.write.csv("data_10x1m_processed.csv", header=True) t2 = time.time() return t2 - t1
max_groupby 1 13.38654112815857 sec <<< initializing SparkSession for the first time! max_groupby 2 9.496947050094604 sec max_groupby 3 8.984785318374634 sec max_groupby 4 8.526089429855347 sec max_groupby 5 8.554388284683228 sec Avg runtime: 8.890552520751953
While this method is not the best its surprisingly not the worst.
Despite its complexity this method out performance max_map by almost twice. The average runtime is 8.890552520751953.
max_groupby runtime is 8.89 seconds
Summary And Conclusions
To sum up, we see the winner was max_array that uses Spark builtin functions 'array' to create the array and 'array_max' to find the maximal value in that array. Right next to it, is the max_greatest that also uses Spark native greatest function. We can safely assume that these builtin functions together with Spark optimizer will give great performance.As for our tail trio: max_udf, max_groupby and max_map. It was no surprise that UDF work significantly slower. However, I was surprised by how badly max_map performed. Max_map use RDD to map each row to itself and its maximal value.
This is it, hopefully this post will be useful. If you have comment or questions feel free to write something below.
Comments
Post a Comment