Spark - How to create a DataFrame (different ways)

How to create a Spark DataFrame in different ways



DataFrames are a table-like interface in Spark. The DataFrame structure is a distributed collection of data, build on Dataset. Since DataFrame have a defined schema and types it allows Spark to utilize different optimization on top of the schema and to execute calculations faster. Below we have initial code to instantiate SparkContext, SQLContext and SparkSession code.

from pyspark.sql import Row
from pyspark import SparkContext
from pyspark import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local') \
    .appName('Create DataFrame') \
    .getOrCreate()

toDF method

Perhaps the most simple way to create a DataFrame is to use the toDF method. All you need is to supply a list of column names (strings) and a list of tuples as data. If you don't specify the column names Spark generate the column names, like _1, _2, etc.

list_of_persons = [('Arike', 28, 78.6), ('Bob', 32, 45.32), ('Corry', 65, 98.47)]
df = sc.parallelize(list_of_persons).toDF(['name', 'age', 'score'])
df.printSchema()
df.show()

The parallelize method returns an RDD which then converted to DataFrame. Note that the column types are inferred from the data. Also, the columns are nullable = true so null's or None's are permitted even if you wish otherwise.

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- score: double (nullable = true)
+-----+---+-----+
| name|age|score|
+-----+---+-----+
|Arike| 28| 78.6|
|  Bob| 32|45.32|
|Corry| 65|98.47|
+-----+---+-----+

createDataFrame method

A different approach to create DataFrame is to utilize createDataFrame API. As before we use parallelize to create a RDD but performing a map transformation and specifying the column types.

list_of_persons = [('Arike', 28, 78.6), ('Bob', 32, 45.32), ('Corry', 65, 98.47)]
rdd = sc.parallelize(list_of_persons)
person = rdd.map(lambda x: Row(name=x[0], age=int(x[1]), score=float(x[2])))
schemaPeople = sqlContext.createDataFrame(person)
schemaPeople.printSchema()
schemaPeople.show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- score: double (nullable = true)
+---+-----+-----+
|age| name|score|
+---+-----+-----+
| 28|Arike| 78.6|
| 32|  Bob|45.32|
| 65|Corry|98.47|
+---+-----+-----+

read method

Reading a file from the disk will return a DataFrame. Spark supports multiple formats like CSV, JSON or parquet to name a few. For more information on how to read a file please see these posts:
How to read a JSON file,
How to read a CSV file

df = spark.read.csv("data.csv")
df.printSchema()
df.show()
df = spark.read.json("data.json")
df.printSchema()
df.show()
df = spark.read.parquet("data_parquet")
df.printSchema()
df.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
+-----+---+-----+
|  _c0|_c1|  _c2|
+-----+---+-----+
| Name|Age|Score|
|Arike| 28| 78.6|
|  Bob| 32|45.32|
|Corry| 65|98.47|
+-----+---+-----+
root
 |-- Age: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Score: double (nullable = true)
+---+-----+-----+
|Age| Name|Score|
+---+-----+-----+
| 28|Arike| 78.6|
| 32|  Bob|45.32|
| 65|Corry|98.47|
+---+-----+-----+
root
 |-- Age: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Score: double (nullable = true)
+---+-----+-----+
|Age| Name|Score|
+---+-----+-----+
| 28|Arike| 78.6|
| 32|  Bob|45.32|
| 65|Corry|98.47|
+---+-----+-----+

sql method

You can also use the SQL method to write multiple select SQL queries and concatenate them using union.

spark.sql("(select 'Arike' as name, 28 as age, 78.6 as score) union " +
          "(select 'Bob' as name, 32 as age, 45.32 as score) union " +
          "(select 'Corry' as name, 65 as age, 98.47 as score)")
df.printSchema()
df.show()

root
 |-- Age: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Score: double (nullable = true)
+---+-----+-----+
|Age| Name|Score|
+---+-----+-----+
| 28|Arike| 78.6|
| 32|  Bob|45.32|
| 65|Corry|98.47|
+---+-----+-----+

Conclusion

I believe the practical approach is to use the read method to create a DataFrame. The other ways can be useful for small reference table you might join with.

Comments

Popular posts from this blog

5 ways to calculate max of row in Spark

Create Custom Datasource With Spark 3 - Part 1

Spark - How to read a JSON file