5 ways to calculate max of row in Spark

5 ways to calculate max of row in Spark/PySpark

In this post I tested the performance of 5 different methods to calculate a maximal value of a single row. You might encounter this situation before: you have multiple columns with some value, integer or float for example, and you want to calculate the maximum value per row for all the data frame and append it as additional column. We will test 5 different ways to calculate maximum value:
  1. udf - user define function
  2. 'array' and 'array_max' functions
  3. 'greatest' function
  4. rdd 'map' transformation
  5. data frame groupBy and max-aggregation
For each of these methods will count the time it take to read a CSV file, calculate the maximal value and add it as 'max' column.

For the data, we are going to generate 10 columns of integer values over 1 million rows via Python code. The values will be in the range of 1 to 1000.

import random
def gen_csv():
    f = open("../data_10x1m.csv", "w")
    f.write("c1,c2,c3,c4,c5,c6,c7,c8,c9,c10\n")
    f.flush()
    i = 1
    while i <= 1000000:
        if i % 1000 == 0:
            print(i)
            f.flush()
        f.write(
            f"{i},{random.randint(1, 1000)},{random.randint(1, 1000)},{random.randint(1, 1000)},"
            f"{random.randint(1, 1000)},{random.randint(1, 1000)},{random.randint(1, 1000)},{random.randint(1, 1000)},"
            f"{random.randint(1, 1000)},{random.randint(1, 1000)}\n")
        i += 1
    f.flush()
    f.close()

The generated file is a simple CSV that weight 409MB that looks like this:

c1,c2,c3,c4,c5,c6,c7,c8,c9,c10
574,124,734,748,249,461,392,492,19,375
336,5,392,447,208,564,592,823,698,335
839,560,907,920,699,389,982,327,43,530
631,846,597,7,176,482,390,149,489,442
362,375,686,367,852,959,297,433,892,400

And its schema is all integers when Spark infers it.

root
|-- c1: integer (nullable = true)
|-- c2: integer (nullable = true)
|-- c3: integer (nullable = true)
|-- c4: integer (nullable = true)
|-- c5: integer (nullable = true)
|-- c6: integer (nullable = true)
|-- c7: integer (nullable = true)
|-- c8: integer (nullable = true)
|-- c9: integer (nullable = true)
|-- c10: integer (nullable = true)

import time
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, Row, StructType, StructField
from pyspark.sql.functions import col, greatest

1. UDF - User Define Function

Using UDF or user defined function is a convenient way to do these kind of things, however a UDF is not recommended, in the general case, since it "lowers" Spark to use Python native function that might be not optimal. In the code below, we use a UDF and Python max function. The UDF is a Python lambda that gets a structure column of all the column names in the schema, calculates the maximum value using Python max function.

def max_udf(spark) -> float:
    t1 = time.time()
    df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True)
    cols = list(col(cname) for cname in df.schema.names)
    udf_max = f.udf(lambda row: max(row), IntegerType())
    df = df.select("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10",
                   udf_max(f.struct([x for x in cols])).alias("max"))
    df.write.csv("data_10x1m_processed.csv", header=True)
    df.explain(mode="formatted")
    t2 = time.time()
    return t2 - t1

To get the most accurate result I run this code 5 times, each time resetting to the same conditions as possibly.

max_udf    1    13.085139751434326 sec <<< initializing SparkSession for the first time!
max_udf    2    9.243947505950928 sec
max_udf    3    9.129037141799927 sec
max_udf    4    9.652921438217163 sec
max_udf    5    8.511781930923462 sec

As you can see the first iteration takes a few seconds longer then all the rest. This is due to the initialization of the Spark session. In order to get consistency I will ignore the first iteration. Averaging on the rest 4 measurements gives: 9.13442200422287 seconds. so we can say:
max_udf runtime is 9.134 seconds

2. Array function

In this function the code will "package" the values in the row to an array using Spark array function. The array function takes a list columns and returns a new column with values of the input columns as an array. On that array I will use Spark array_max function, that returns the maximum value of the array.

def max_array(spark) -> float:
    t1 = time.time()
    df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True)
    cols = list(col(cname) for cname in df.schema.names)
    df = df.select("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10",
                   f.array_max(f.array(*cols)).alias("max"))
    df.write.csv("data_10x1m_processed.csv", header=True)
    t2 = time.time()
    return t2 - t1

Just like before the first measurement will contains the time it took to initialize the Spark session and I will ignore it.

max_array    1    6.332355260848999 sec <<< initializing SparkSession for the first time!
max_array    2    3.4596095085144043 sec
max_array    3    3.4937448501586914 sec
max_array    4    3.456946611404419 sec
max_array    5    3.478461503982544 sec

As you can see from looking at the raw results max_array outperforms max_udf. Ignoring the first iteration, measurements 2 to 5 gives an average runtime: 3.4721906185150146.
max_array runtime is 3.472 seconds
This is a really good example of how UDF's are slower then native Spark code and even though is might be simpler to skip the spark-code-headache and going with Python code but the performance is very costly.

3. Greatest function

The next method I found in stackoverflow by zero323. It uses the builtin Spark function greatest. Greatest function returns the largest value from a list of column names while skipping null values. While the original answer uses coalesce - that returns the first column that is not null, I did not see the justification to use it here. However, feel free to use it if your use case is different from what I'm showing here.

def max_greatest(spark) -> float:
    t1 = time.time()
    df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True)
    row_max = greatest(*[col(x) for x in df.schema.names])
    df = df.withColumn("max", row_max)
    df.write.csv("data_10x1m_processed.csv", header=True)
    df.explain(mode="formatted")
    t2 = time.time()
    return t2 - t1

max_greatest	1	6.922237873077393 sec <<< initializing SparkSession for the first time!
max_greatest	2	3.678072214126587 sec
max_greatest	3	3.6312289237976074 sec
max_greatest	4	3.608497381210327 sec
max_greatest	5	3.678683042526245 sec

Similar to arrays function, greatest work very fast. As before I will ignore the first iteration, measurements 2 to 5 gives an average runtime: 3.6491203904151917
max_coalesce_greatest runtime is 3.649 seconds
The use of greatest function work slightly slower then the usage of arrays.

4. Map transformation and Python's max function

Lets use map transformation on the underlining RDD. The code below, map each row to each original value (the entire row) with the additional value of maximum value of the row's values. To find the maximum value of a row, I use Python max function. To match to other functions, I created a data frame from the RDD using createDataFrame and supplying its schema.

def max_map(spark) -> float:
    t1 = time.time()
    df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True)
    rdd = df.rdd.map(lambda row: Row(*row, max(*row)))
    schema = StructType([
        StructField("c1", IntegerType(), False),
        StructField("c2", IntegerType(), False),
        StructField("c3", IntegerType(), False),
        StructField("c4", IntegerType(), False),
        StructField("c5", IntegerType(), False),
        StructField("c6", IntegerType(), False),
        StructField("c7", IntegerType(), False),
        StructField("c8", IntegerType(), False),
        StructField("c9", IntegerType(), False),
        StructField("c10", IntegerType(), False),
        StructField("max", IntegerType(), False)
    ])
    df = spark.createDataFrame(rdd, schema=schema)
    df.write.csv("data_10x1m_processed.csv", header=True)
    t2 = time.time()
    return t2 - t1

max_map	1	18.7514066696167 sec <<< initializing SparkSession for the first time!
max_map	2	16.380369186401367 sec
max_map	3	15.212764739990234 sec
max_map	4	15.614974737167358 sec
max_map	5	14.794058799743652 sec

Skipping first iteration that holds the initialization time for Spark session. The average runtime for iteration 2 to 5: 15.500541865825653
max_map runtime is 15.5 seconds
Clearly, these results are worst then all 3 methods above. The physical plan, shown below, tell us that Spark has only one stage for this code and its RDD scan that is relatively slow compere to equal operation on a DataFrame.

== Physical Plan ==
* Scan ExistingRDD (1)
(1) Scan ExistingRDD [codegen id : 1]
Output [11]: [c1#256, c2#257, c3#258, c4#259, c5#260, c6#261, c7#262, c8#263, c9#264, c10#265, max#266]
Arguments: [c1#256, c2#257, c3#258, c4#259, c5#260, c6#261, c7#262, c8#263, c9#264, c10#265, max#266], MapPartitionsRDD[65] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

5. GroupBy and Max aggregation

This will be the most complex way to do this calculation. In the code below I added a row id using monotonically_increasing_id and an array of all original values. Next, I created a new row for each element of the array using explode. The result is that for each line in the original data frame we'll have multiple lines per each value of the array and the rest of the of the rows values. Meaning a single row_id per all multiple values that originated from the same line.

+---+---+---+---+---+---+---+---+---+---+------+--------------------+--------------+
| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|row_id|                 arr|exploded_value|
+---+---+---+---+---+---+---+---+---+---+------+--------------------+--------------+
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|             1| -+
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|           187|  |
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|            55|  |
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|            64|  |
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|           354|  |
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|           754|  +-> single row in the
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|           909|  |   original data,
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|           966|  |   marked as row_id = 1
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|           882|  |
|  1|187| 55| 64|354|754|909|966|882|976|     0|[1, 187, 55, 64, ...|           976| -+
|  2|576|781|708|788|965|689|239| 50|394|     1|[2, 576, 781, 708...|             2|
|  2|576|781|708|788|965|689|239| 50|394|     1|[2, 576, 781, 708...|           576|
...

Now, I can group by the row_id and aggregate using max function. To get the rest of the values just like in the previous cases, I used first function.

def max_groupby(spark) -> float:
    t1 = time.time()
    df = spark.read.csv("../data_10x1m.csv", header=True, inferSchema=True)
    cols = list(f.col(cname) for cname in df.schema.names)
    name_col_list = list((cname, f.col(cname)) for cname in df.schema.names)
    df = df.withColumn("row_id", f.monotonically_increasing_id()) \
        .withColumn("arr", f.array(*cols))
    df = df.withColumn("exploded_value", f.explode(df.arr))
    firsts = list(f.first(name_col[1]).alias(name_col[0]) for name_col in name_col_list)
    df = df.groupBy("row_id") \
        .agg(f.max("exploded_value").alias("max"), *firsts) \
        .drop("row_id") \
        .coalesce(1)
    df.write.csv("data_10x1m_processed.csv", header=True)
    t2 = time.time()
    return t2 - t1

max_groupby	1	13.38654112815857 sec <<< initializing SparkSession for the first time!
max_groupby	2	9.496947050094604 sec
max_groupby	3	8.984785318374634 sec
max_groupby	4	8.526089429855347 sec
max_groupby	5	8.554388284683228 sec
Avg runtime: 8.890552520751953

While this method is not the best its surprisingly not the worst. Despite its complexity this method out performance max_map by almost twice. The average runtime is 8.890552520751953.
max_groupby runtime is 8.89 seconds

Summary And Conclusions

To sum up, we see the winner was max_array that uses Spark builtin functions 'array' to create the array and 'array_max' to find the maximal value in that array. Right next to it, is the max_greatest that also uses Spark native greatest function. We can safely assume that these builtin functions together with Spark optimizer will give great performance. 


As for our tail trio: max_udf, max_groupby and max_map. It was no surprise that UDF work significantly slower. However, I was surprised by how badly max_map performed. Max_map use RDD to map each row to itself and its maximal value.

This is it, hopefully this post will be useful. If you have comment or questions feel free to write something below.

Comments

Popular posts from this blog

Spark - How to read a JSON file

Create Custom Datasource With Spark 3 - Part 1