Create Custom Datasource With Spark 3 - Part 1

This article is the first of a series on how to create a custom data source that will plug in to Spark mechanism. The idea is to write something like:

Dataset<Row> df = sparkSession.read()
                .option("filename", "path-to-data-file")
                .option("other-options", "some value")
.format("com.roizaig.mydatasource") .load();

To read a file, parse it according to some custom logic and return a dataset that represents it.

Why Create Custom Data Source?

Spark support out of the box in Parquet, CSV, JSON and ORC and more. In most cases these formats will be more than okay and if not, you can always read it as a text file and do some string manipulation in Spark. However, there are some cases where we would like to "teach" Spark to read our own custom format.
Flexibility is a key point, with a custom datasource you can read every format you can think of and parse it in any way. You are not bound to the build-in formats. For example you can connect to any storage engine like Tibero, FrontBase or other exotic storages. 
Support outdated formats can be supported too in case of an old version that was dropped out of product.
Better Performance can be achieved if the datasource you are writing will use features like Partitioner and data locality. These are more advanced mechanisms that I will not show in this post.

Work Flow

We will begin with the most naive approach to keep the logic simple as possible and concentrate on the "plumbing".  By the end of this phase we should be able to create a Dataset from a file. For now the schema will be constant and string based. Minimal error handling - we'll just fail the process.

Example Use case - Java Log file

I was looking for a text based format to parse and it occurred to me that parsing a log file can be a nice challenge. For now let's take a fixed example of a log file. I know that log files come in different shapes and sizes however it will keep things simple and we'll expand the code to make prase more dynamic (so stay tuned for next posts). 

2021-03-21 16:48:03.811 [        main] DEBUG [EncodingInputStream:249] - Resetting in background
2021-03-21 16:48:03.812 [        main] ERROR [DataStreamConnector:107] - Cannot upload the file 'data_2021_2021_03_17_16_47_53.csv' to S3
2021-03-21 16:48:03.812 [        main] INFO  [DataStreamConnector:113] - Moving data to archive using bulk opearion

Its anatomy from left to right is: time of event, executing thread  name in square brackets, in upper case the log level, class name and line number and the a dash and the log message. We can express it as a regular expression.

(.+) \[(.+)] (\w+)\s*\[(.+):(\d+)] - (.*)

If you are not familiar with regular expressions, don't worry. It is just a mean to an end - we want to break down the log line to its components. 

The first set of round brackets will extract the date and time, The second will extract the thread name without the square brackets, next will be the log level. The brackets before the colon will extract the class name and the one after it the line number. Finally the last rounded brackets will extract the message.

So our end result should look like this

Time Thread Name Log Level Class Name Line Number Message
16:48:03.811 main DEBUG EncodingInputStream 249 Resetting in background
16:48:03.812 main ERROR DataStreamConnector 107 Cannot upload the file
'data_2021_2021_03_17_16_47_53.csv' to S3
16:48:03.812 main INFO DataStreamConnector 113 Moving data to archive using bulk operation


Spark Classes and Interfaces 

In order not to burden this post with too many explanations I will not cover each interface or class in this post. I think this article by Amar Gajbhiye is doing a great job and I build my datasource from the code in it. I will equip you with the diagram below, showing the relationship between the different entities.

Note, that in Spark 3.x, the interface was changed a lot from Spark 2.x.


Some tips:

  • Naming the class DefaultSource is important so Spark will recognize it as the default implementation in this package.
  • In order to flag this data source as readable we need to implement the SupportRead interface. SupportRead is an extension of the Table interface that is the representation of the structure data.
  • In order to use the options from the read statement, like filename etc. you need to pass the options object (that is an instance of CaseInsensitiveStringMap) to the lower objects this is somewhat annoying and clumsy.
  • The most low level read of the file is done by JavaLogReader that extends Iterator. In this simple implementation I read the file one character at a time and return a full line - definitely room for improvement here.

Conclusions And Next Step

  • Although, there are multiple classes involved here, in this simple example most of them just delegate the work to the lower levels. Thus, creating some plumbing that looks unnecessary. However, we should consider the more advanced features Spark uses to accelerate the processing.
  • Next on the series we will add schema with different data types.

Comments

Popular posts from this blog

5 ways to calculate max of row in Spark

Spark - How to read a JSON file