Posts

Create Custom Datasource With Spark 3 - Part 1

Image
This article is the first of a series on how to create a custom data source that will plug in to Spark mechanism. The idea is to write something like: Dataset < Row > df = sparkSession . read () . option ( "filename" , "path-to-data-file" ) . option ( "other-options" , "some value" ) . format ( "com.roizaig.mydatasource" ) . load (); To read a file, parse it according to some custom logic and return a dataset that represents it. Why Create Custom Data Source? Spark support out of the box in Parquet , CSV , JSON and ORC  and more. In most cases these formats will be more than okay and if not, you can always read it as a text file and do some string manipulation in Spark. However, there are some cases where we would like to "teach" Spark to read our own custom format. Flexibility is a key point, with a custom datasource you can read every format y...

5 ways to calculate max of row in Spark

Image
5 ways to calculate max of row in Spark/PySpark In this post I tested the performance of 5 different methods to calculate a maximal value of a single row. You might encounter this situation before: you have multiple columns with some value, integer or float for example, and you want to calculate the maximum value per row for all the data frame and append it as additional column. We will test 5 different ways to calculate maximum value: udf - user define function 'array' and 'array_max' functions 'greatest' function rdd 'map' transformation data frame groupBy and max-aggregation For each of these methods will count the time it take to read a CSV file, calculate the maximal value and add it as 'max' column. For the data, we are going to generate 10 columns of integer values over 1 million rows via Python code. The values will be in the range of 1 to 1000. import random def gen_csv (): f = open ( "../data_10x1m.csv...

Apache Zeppelin

Image
In this post I would like to show you a tool called Apache Zeppelin . Zeppelin is an Apache project meant to be a notebook live environment that can run multiple languages like Python and Scala or tools like Apache Spark, Ignite etc. each one of these ability is called Interpreter . In there own words - "Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala and more". Running Apache Zeppelin is very simple, just download the zipped binaries from the download section of the Zeppelin project page - here . On Linux, in terminal from the Zeppelin home folder, run: $ zeppelin-0.8.2-bin-all ./bin/zeppelin.sh This will start the web server that is the GUI for Zeppelin. The main screen shows you the available notebooks. Lets create a new notebook by clicking  Create new note . After entering the notebook name and pressing Create. Alternatively you can click on one of the existing notebooks to open a...

Spark - How to create a DataFrame (different ways)

Image
How to create a Spark DataFrame in different ways DataFrames are a table-like interface in Spark. The DataFrame structure is a distributed collection of data, build on Dataset. Since DataFrame have a defined schema and types it allows Spark to utilize different optimization on top of the schema and to execute calculations faster. Below we have initial code to instantiate SparkContext , SQLContext and SparkSession code. from pyspark.sql import Row from pyspark import SparkContext from pyspark import SQLContext sc = SparkContext() sqlContext = SQLContext(sc) from pyspark.sql import SparkSession spark = SparkSession . builder \ . master( 'local' ) \ . appName( 'Create DataFrame' ) \ . getOrCreate() toDF method Perhaps the most simple way to create a DataFrame is to use the toDF method. All you need is to supply a list of column names (strings) and a list of tuples as data. If you don't specify the column names Spark generate the...

Recommended order of reading

Recommended order of reading: Spark overview Get Spark Session  Read CSV File How to read JSON file How to create a DataFrame (different ways) Other posts: Calculate Standard deviation incrementally one batch at a time

Spark - How to read a JSON file

Image
First thing to notice it the file structure. Its actually a collection of json objects *each one in a single line*. { "field_name": "value", "field_name": "value", ... } { "field_name": "value", "field_name": "value", ... } { "field_name": "value", "field_name": "value", ... } Unlike csv() method the json() method does not have inferSchema parameter. The schema is inferred from the json types. from pyspark.sql import SparkSession import pyspark.sql.types as t import pyspark.sql.functions as f spark = SparkSession . builder \ . master( 'local' ) \ . appName( 'Read JSON file' ) \ . getOrCreate() df = spark . read \ . json( 'data.json' , multiLine = True ) df . printSchema() df . show(truncate = False ) root |-- Address: string (nullable = true) |-- Age: long (nullable = true) |-- Birth Date: string (nu...