Predictive Data Analytics With Apache Spark (Part 2 Data Preparation)
January 4, 2019
1. Download Data and Create Project Directory
We Start off by downloading the Turbofan Engine Degradation Simulation Data Set from this link. Extract the .zip file and add the expanded folder (/CMAPSSData) to your project home directory.
The data folder consists of 12 .txt files, representing four separate portions of train datasets, along with their corresponded test datasets and ground truth files. This tutorial analyses the first portion of data. But you are free to add aditional data portions and test them with the same code.
The code in this tutorial is executable under Python 2.7 and Spark 2.3.2 versions.
2. Clean Home Directory
Before we launch Spark coding, we need to clean the Spark home directory from auto-generated components that might be added during previous Spark executions. Starting from Spark 2.1.x, and due to default settings in Spark-defaults.conf.template file, Spark generates metastore_db directory and derby.log file automatically in the home directory of your code. Data resides in these auto-generated components can interrupt the execution of your current Spark session. Therefore, we need to delete both metastore_db directory and derby.log file to enable Spark session to launch properly. The following function does this job.
import shutil
import os
def clean_project_directory():
'''
This function deletes both '/metastore_db' folder and 'derby.log' file before
initializing Apache Spark session.
The goal is to avoid any inconsistant startout of Spark session
'''
# delete metastore_db folder if found
if os.path.isdir(os.getcwd() + '/metastore_db'):
shutil.rmtree(os.getcwd() + '/metastore_db', ignore_errors=True)
# delete derby.log file if found
if os.path.exists(os.getcwd() + '/derby.log'):
os.remove(os.getcwd() + '/derby.log')
3. Launch Spark Session
After cleaning Spark home directory, we create a new Spark session. The module pyspark.sql includes SparkSession() function that enable us to create new Spark session. Using this function we can set some nice properties, such as the session’s master URL, the Spark application name, and the maximum amount of memory reserved for each executor process. In the below function, we create a Spark session that runs on local machine, with max reserved memory of 1 Gigabyte.
from pyspark.sql import SparkSession def create_spark_session(app_name, exe_memory): ''' This function creates Spark session with application name and available memory to the session INPUTS: @app_name: name of Spark application @exe_memory: value of reserved memory for Spark session OUTPUTS: @SparkSession instance ''' # create Spark Session return SparkSession.builder \ .master("local") \ .appName(app_name) \ .config("spark.executor.memory", exe_memory) \ .getOrCreate() # create Spark session spark = create_spark_session('Predictive Maintenance', '1gb')
4. Import .CSV Data Files to Spark Session
In this step, we import train and test data files in .CSV format into the Spark session as RDD objects. RDDs (or Resilient Distributed Datasets) are the core data format of Apache Spark and can be invoked through all Spark supported languages (Scala, Python, Java, and SQL). RDDs are responsible of the significant speed of Apache Spark, when compared with Hadoop Distributed File System (HDFS).
The easiest way to import CSV files as RDD object is by using Spark Context module. We append all CSV files using union() function to one RDD object. Then we separate data fields into RDD object using map() function.
def read_csv_as_rdd(sc, path, files_list, sep):
'''
This function reads .CSV data files into RDD object and returns the RDD object
INPUTS:
@sc: Spark Context object
@path: path of .CSV files
@files_list: list of .CVS data files
@sep: fields separator
OUTPUTS:
@rdd: RDD object contains data observations
'''
# Read files and append them to RDD
rdd = sc.union([sc.textFile(path + f) for f in files_list])
# Split lines on spaces
rdd = rdd.map(lambda line: line.split(sep))
# return RDD object
return rdd
# get Spark Context object
sc = spark.sparkContext
# get files of training data
train_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('train')
and filename.endswith('.txt'))])
# get files of test data
test_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('test')
and filename.endswith('.txt'))
# read training data in RDD
train_rdd = read_csv_as_rdd(sc, path, [train_files[0]], " ")
# read test data in RDD
test_rdd = read_csv_as_rdd(sc, path, [test_files[0]], " ")
5. Convert RDD to Spark Dataframe
The basic advantage of PySpark is the ability to convert RDD objects into Dataframes. For those readers whom are familiar with R or Python Dataframes, working with Spark Dataframes makes Spark coding much easier. Similar to R and Python Dataframes, Spark Dataframes are also groups of data objects organized into named fields (i.e. columns). In order to convert an RDD object into Spark Dataframe, all what you need is to define the list of columns’ names you want to assign to your data. The function toDF() will do the rest for you.
def convert_rdd_to_df(rdd, header):
'''
This function converts data from RDD format to Spark Dataframe, and adds header to the dataframe
INPUTS:
@rdd: RDD object contains data features
@header: list of column names
OUTPUTS:
PySpark DF version of @rdd
'''
# convert RDD to DF with header
return rdd.toDF(header)
# set data header, contains list of names of data columns
header = ["id", "cycle", "setting1", "setting2", "setting3",
"s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8",
"s9", "s10", "s11", "s12", "s13", "s14", "s15",
"s16","s17", "s18", "s19", "s20", "s21"]
# get df of both train and test data out of corresponded RDD objects
train_df = convert_rdd_to_df(train_rdd, header)
test_df = convert_rdd_to_df(test_rdd, header)
6. Remove NULL Values
After getting our datasets as Spark Dataframes, we want to remove NULL observations (i.e. data observations with no values). To do so, all what we need is to apply na.drop() to our Dataframes.
def remove_na_rows(df):
'''
This function removes rows with empty values from Spark Dataframe
INPUTS:
@df: Spark Dataframe with possible NULL values
OUTPUTS:
@df: Spark Dataframe without NULL values
'''
return df.na.drop()
# remove empty rows
train_df = remove_na_rows(train_df)
test_df = remove_na_rows(test_df)
7. Set Data Types
The next step is to assign a data type to each column in our Dataframes. For this purpose, we will cast each column to it’s proper data type (e.g. Integer, Double, String, …). The following lines of code cast a list of columns into an Integer data type.
from pyspark.sql.types import IntegerType
if len(int_list) > 0:
for f in int_list:
df = df.withColumn(f, df[f].cast(IntegerType()))
8. Visualize Data
Apache Spark has no native module for data visualization. Accordingly, and in order to visualize our data, we need to convert Spark Dataframes to another format. The direct approach we can apply in PySpark is to convert PySpark Dataframes into Pandas Dataframes. To do such conversion in efficient way, we select the set of data features we want to visualize, and retrieve it as Pandas DF. The following function gets Pandas DF using SQL query.
def get_pandasdf_from_sparkdf(spark_df, view_name, query):
'''
This function queries Spark DF, and returns the result table as Pandas DF
INPUTS:
@spark_df: Spark Dataframe to be queried
@view_name: name of SQL view to be created from Spark Dataframe
@query: SQL query to be run on @spark_df
OUTPUTS:
SQL view of @query in Pandas format
'''
spark_df.createOrReplaceTempView(view_name)
return spark.sql(query).toPandas()
After getting the Pandas version of data, we can easily visualize it using matplotlib library.
# set SQL query for dataframe df sqlQuery = """ SELECT cycle, setting1, setting2, setting3, s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13, s14, s15, s16,s17, s18, s19, s20, s21 FROM df1 WHERE df1.id=15 """ # get SQL query result as Pandas DF plotdata1 = get_pandasdf_from_sparkdf(train_df, "df1", sqlQuery)
The following plot show features’ changes over time for engine # 15 in the training Dataset.
As we note from the above plot, the train data has features with no or very little variance over time. This type of features is almost useless in building predictive data models. Therefore, we will start the next Post of this tutorial by removing low-variance features.
9. Full Code
The code of this tutorial can be found in my Github under this link.
I discovered your internet site from Google and I need to
claim it was a terrific locate. Many thanks!
Thank you!
Do you have any type of pointers for writing articles? That’s where I constantly struggle
and I just wind up looking vacant screen for long time.
We’re a gaggle of volunteers and opening a new scheme
in our community. Your web site offered us with useful information to work on. You
have performed a formidable task and our whole group will probably be
grateful to you.
An impressive share! I’ve just forwarded this onto a coworker
who had been doing a little homework on this. And he in fact bought me breakfast because I discovered it for him…
lol. So allow me to reword this…. Thanks for the meal!!
But yeah, thanx for spending time to discuss this topic here on your site.
I really love your site.. Very nice colors & theme.
Did you build this site yourself? Please
reply back as I’m planning to create my own personal site and would like to know where you got this from or just what the theme is called.
Cheers!
I am using OceanWP Theme