Spark - How to create a DataFrame (different ways)
How to create a Spark DataFrame in different ways
DataFrames are a table-like interface in Spark. The DataFrame structure is a distributed collection of data, build on Dataset.
Since DataFrame have a defined schema and types it allows Spark to utilize different optimization on top of the schema
and to execute calculations faster.
Below we have initial code to instantiate SparkContext, SQLContext and SparkSession code.
from pyspark.sql import Row from pyspark import SparkContext from pyspark import SQLContext sc = SparkContext() sqlContext = SQLContext(sc) from pyspark.sql import SparkSession spark = SparkSession.builder \ .master('local') \ .appName('Create DataFrame') \ .getOrCreate()
toDF method
Perhaps the most simple way to create a DataFrame is to use the toDF method. All you need is to supply a list of column names (strings) and a list of tuples as data. If you don't specify the column names Spark generate the column names, like _1, _2, etc.list_of_persons = [('Arike', 28, 78.6), ('Bob', 32, 45.32), ('Corry', 65, 98.47)] df = sc.parallelize(list_of_persons).toDF(['name', 'age', 'score']) df.printSchema() df.show()
The parallelize method returns an RDD which then converted to DataFrame.
Note that the column types are inferred from the data. Also, the columns are nullable = true so null's or None's are
permitted even if you wish otherwise.
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- score: double (nullable = true) +-----+---+-----+ | name|age|score| +-----+---+-----+ |Arike| 28| 78.6| | Bob| 32|45.32| |Corry| 65|98.47| +-----+---+-----+
createDataFrame method
A different approach to create DataFrame is to utilize createDataFrame API. As before we use parallelize to create a RDD but performing a map transformation and specifying the column types.list_of_persons = [('Arike', 28, 78.6), ('Bob', 32, 45.32), ('Corry', 65, 98.47)] rdd = sc.parallelize(list_of_persons) person = rdd.map(lambda x: Row(name=x[0], age=int(x[1]), score=float(x[2]))) schemaPeople = sqlContext.createDataFrame(person) schemaPeople.printSchema() schemaPeople.show()
root |-- age: long (nullable = true) |-- name: string (nullable = true) |-- score: double (nullable = true) +---+-----+-----+ |age| name|score| +---+-----+-----+ | 28|Arike| 78.6| | 32| Bob|45.32| | 65|Corry|98.47| +---+-----+-----+
read method
Reading a file from the disk will return a DataFrame. Spark supports multiple formats like CSV, JSON or parquet to name a few. For more information on how to read a file please see these posts:How to read a JSON file,
How to read a CSV file
df = spark.read.csv("data.csv") df.printSchema() df.show() df = spark.read.json("data.json") df.printSchema() df.show() df = spark.read.parquet("data_parquet") df.printSchema() df.show()
root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) +-----+---+-----+ | _c0|_c1| _c2| +-----+---+-----+ | Name|Age|Score| |Arike| 28| 78.6| | Bob| 32|45.32| |Corry| 65|98.47| +-----+---+-----+ root |-- Age: long (nullable = true) |-- Name: string (nullable = true) |-- Score: double (nullable = true) +---+-----+-----+ |Age| Name|Score| +---+-----+-----+ | 28|Arike| 78.6| | 32| Bob|45.32| | 65|Corry|98.47| +---+-----+-----+ root |-- Age: long (nullable = true) |-- Name: string (nullable = true) |-- Score: double (nullable = true) +---+-----+-----+ |Age| Name|Score| +---+-----+-----+ | 28|Arike| 78.6| | 32| Bob|45.32| | 65|Corry|98.47| +---+-----+-----+
sql method
You can also use the SQL method to write multiple select SQL queries and concatenate them using union.spark.sql("(select 'Arike' as name, 28 as age, 78.6 as score) union " + "(select 'Bob' as name, 32 as age, 45.32 as score) union " + "(select 'Corry' as name, 65 as age, 98.47 as score)") df.printSchema() df.show()
root |-- Age: long (nullable = true) |-- Name: string (nullable = true) |-- Score: double (nullable = true) +---+-----+-----+ |Age| Name|Score| +---+-----+-----+ | 28|Arike| 78.6| | 32| Bob|45.32| | 65|Corry|98.47| +---+-----+-----+
Conclusion
I believe the practical approach is to use the read method to create a DataFrame. The other ways can be useful for small reference table you might join with.Additional posts: 5 ways to calculate max of row in Spark or How to read a JSON file
Comments
Post a Comment