Transform data using Apache Spark

Problem Statment:

In thermal power plant 4 turbines used to generate electricity. To check performance multiple sensors were placed all around the turbine. On an average we have 70 Sensors for each turbine unit. And each Sensor Tag generates value every second. So, every second 4 Units generate huge amount of data to check performance of turbine.This data is recorded in logs and give as CSV file every 15 minutes to batch process the data.

Transform all tag values of each unit at particular second in a single row and we need to consider the value of the tag if sensor value. For negative sensor value replace the tag value with null using Spark framework by leveraging Spark SQL/PySpark

E.g.:Unit_1,31-Dec-2012 00:00:00, -799.999, -849.579, ......till 1st tagUnit_1,31-Dec-2012 00:01:00, -789.999, -589.349, ......till 70th tag

Spark Session is an entry point to Spark functionality in order to work with Data Frames. Spark Session internally creates Spark Config and Spark Context with the configuration provided with Spark Session. Spark Session will be created using

spark = SparkSession.builder.appName("TurbineData").getOrCreate()

getOrCreate() — This returns a SparkSession object if already exists, creates new one if not exists.

Spark by default provides an API to read a delimiter file with comma and it also provides several options on handling with and without header, schema, data types etc.

df = spark.read.format("CSV") \
.option("header", "true") \
.option("inferSchema", “false") \
.load("path")

read() — Returns an instance of DataFrameReader class, this is used to read records from CSV, parquet, avro and more file formats into DataFrame.

schema = StructType([
StructField("Unit", StringType(), True),
StructField("Date", StringType(), True),
StructField("tag", StringType(), True),
StructField("tag_value", DoubleType(), True),
StructField("sensor", IntegerType(), True)])

We can define schema and add it to read file

In Python we can access DataFrame’s columns by attribute or indexing. So we need to Cast sensor values to integer and convert them to null based on positive & negative values.

Spark withColumn() is a DataFrame function that is used to add a new column to DataFrame, change the value of an existing column, convert the datatype of a column, derive a new column.

We first register the cases Dataframe to a temporary table cases_table on which we can run SQL operations. As you can see, the result of the SQL select statement is again a Spark Dataframe.

df1 = df.withColumn("tag_value", expr("CASE WHEN sensor > 0 THEN tag_value " + "ELSE 'null' END"))

We can use groupBy function with a spark DataFrame too. Pretty much same as the pandas groupBy with the exception that you will need to import pyspark.sql.functions. Using time column, we group by date and collect list of Tag Values by listing them based on the sensor value condition.

.groupBy("unit","Date")\
.agg(concat_ws(",", func.collect_list("tag_value")).alias("Tag_Value"))

Spark provides rich APIs to save data frames to many different formats of files such as CSV, Parquet, Orc, Avro, etc. CSV is commonly used in data application though nowadays binary formats are getting momentum

df1.repartition(1).write.format("csv")\
.option("header", False) \
.option("sep",",")\
.mode("overwrite")\
.save("Path")

header: to specify whether include header in the file.

Sep: to specify the delimiter

mode is used to specify the behavior of the save operation when data already exists.

It is creating a folder with multiple files, because each partition is saved individually. If you need a single output file (still in a folder) you can repartition (preferred if upstream data is large, but requires a shuffle)

df.write\
.format("org.apache.spark.sql.cassandra")\
.mode('overwrite')\
.options(table=" ", keyspace=" ")\
.save()

We can persist spark DataFrame into Cassandra table