Create Custom Datasource With Spark 3 - Part 1
This article is the first of a series on how to create a custom data source that will plug in to Spark mechanism. The idea is to write something like:
Dataset<Row> df = sparkSession.read() .option("filename", "path-to-data-file")
.option("other-options", "some value").format("com.roizaig.mydatasource") .load();
To read a file, parse it according to some custom logic and return a dataset that represents it.
Why Create Custom Data Source?
Work Flow
We will begin with the most naive approach to keep the logic simple as possible and concentrate on the "plumbing". By the end of this phase we should be able to create a Dataset from a file. For now the schema will be constant and string based. Minimal error handling - we'll just fail the process.
Example Use case - Java Log file
I was looking for a text based format to parse and it occurred to me that parsing a log file can be a nice challenge. For now let's take a fixed example of a log file. I know that log files come in different shapes and sizes however it will keep things simple and we'll expand the code to make prase more dynamic (so stay tuned for next posts).
2021-03-21 16:48:03.811 [ main] DEBUG [EncodingInputStream:249] - Resetting in background 2021-03-21 16:48:03.812 [ main] ERROR [DataStreamConnector:107] - Cannot upload the file 'data_2021_2021_03_17_16_47_53.csv' to S3 2021-03-21 16:48:03.812 [ main] INFO [DataStreamConnector:113] - Moving data to archive using bulk opearion
Its anatomy from left to right is: time of event, executing thread name in square brackets, in upper case the log level, class name and line number and the a dash and the log message. We can express it as a regular expression.
(.+) \[(.+)] (\w+)\s*\[(.+):(\d+)] - (.*)
If you are not familiar with regular expressions, don't worry. It is just a mean to an end - we want to break down the log line to its components.
The first set of round brackets will extract the date and time, The second will extract the thread name without the square brackets, next will be the log level. The brackets before the colon will extract the class name and the one after it the line number. Finally the last rounded brackets will extract the message.
So our end result should look like this
Time | Thread Name | Log Level | Class Name | Line Number | Message |
---|---|---|---|---|---|
16:48:03.811 | main | DEBUG | EncodingInputStream | 249 | Resetting in background |
16:48:03.812 | main | ERROR | DataStreamConnector | 107 | Cannot upload the file 'data_2021_2021_03_17_16_47_53.csv' to S3 |
16:48:03.812 | main | INFO | DataStreamConnector | 113 | Moving data to archive using bulk operation |
Spark Classes and Interfaces
Note, that in Spark 3.x, the interface was changed a lot from Spark 2.x.
Some tips:
- Naming the class DefaultSource is important so Spark will recognize it as the default implementation in this package.
- In order to flag this data source as readable we need to implement the SupportRead interface. SupportRead is an extension of the Table interface that is the representation of the structure data.
- In order to use the options from the read statement, like filename etc. you need to pass the options object (that is an instance of CaseInsensitiveStringMap) to the lower objects this is somewhat annoying and clumsy.
- The most low level read of the file is done by JavaLogReader that extends Iterator. In this simple implementation I read the file one character at a time and return a full line - definitely room for improvement here.
Conclusions And Next Step
- Although, there are multiple classes involved here, in this simple example most of them just delegate the work to the lower levels. Thus, creating some plumbing that looks unnecessary. However, we should consider the more advanced features Spark uses to accelerate the processing.
- Next on the series we will add schema with different data types.
Comments
Post a Comment