Build Your Operator
Overview
Simplan framework is built around the concept of operators. An operator is intended to perform an operation which is a unit of work the framework can perform. Learn more about operators in the Operators section.
In this section, we will learn how to build an operator for SparkAppContext. We will build an operator that will execute a Spark SQL statement on a DataFrame. It is a wrapper around the Spark SQL API. Any Spark SQL statement can be executed using this transformation. Any existing table in Hive metastore can be queried using this transformation. The transformation can also be used to create a new table in Hive metastore.
If you just like to see the code, you can find it here.
More examples of operators can be found here.
Defining a configuration class
The config class is used to define the configuration parameters for the operator. A case class can be defined in a specific structure to emulate the configuration parameters.
| Parameter | Description | Required | Default | |-----------|-------------|----------|---------| | source | Source on which the filteration is to be performed | Yes | NA | | condition | Condition to Filter | Yes | NA |Defining the operator
To create an operator targetted for SparkAppContext in Simplan you need to extend SparkOperator class provided by Simplan framework and implements process(SparkOperatorRequest) method. The process method takes a SparkOperatorRequest
as input and returns a SparkOperatorResponse
. The Operator accepts a SparkAppContext
as a constructor arguement which contains all the context information required to execute the operator.
package com.intuit.data.simplan.spark.core.operators.transformations
imports ...
class FilteringOperator(appContext: SparkAppContext) extends SparkOperator(appContext) {
override def process(request: SparkOperatorRequest): SparkOperatorResponse = {
val config: WhereConditionOperatorConfig = request.parseConfigAs[WhereConditionOperatorConfig]
val sourceDataframe: DataFrame = request.dataframes(config.source)
val filteredDataframe = sourceDataframe.where(config.condition)
SparkOperatorResponse(request.taskName, filteredDataframe)
}
}
SparkOperatorRequest
contains information of all the dataframes that are available in the context. The operator can use the dataframes to perform the operation. The operator can also use the config to perform the operation. The operator can return a SparkOperatorResponse which contains the name of the task and the dataframe that is generated as a result of the operation.
Operator Configuration
The above operator can be configured in the task as shown below: