Predictive Data Analytics With Apache Spark (Part 2 Data Preparation)
January 4, 2019
1. Download Data and Create Project Directory
We Start off by downloading the Turbofan Engine Degradation Simulation Data Set from this link. Extract the .zip file and add the expanded folder (/CMAPSSData) to your project home directory.
The data folder consists of 12 .txt files, representing four separate portions of train datasets, along with their corresponded test datasets and ground truth files. This tutorial analyses the first portion of data. But you are free to add aditional data portions and test them with the same code.
2. Clean Home Directory
Before we launch Spark coding, we need to clean the Spark home directory from auto-generated components that might be added during previous Spark executions. Starting from Spark 2.1.x, and due to default settings in Spark-defaults.conf.template file, Spark generates metastore_db directory and derby.log file automatically in the home directory of your code. Data resides in these auto-generated components can interrupt the execution of your current Spark session. Therefore, we need to delete both metastore_db directory and derby.log file to enable Spark session to launch properly. The following function does this job.
import shutil import os def clean_project_directory(): ''' This function deletes both '/metastore_db' folder and 'derby.log' file before initializing Apache Spark session. The goal is to avoid any inconsistant startout of Spark session '''
# delete metastore_db folder if found if os.path.isdir(os.getcwd() + '/metastore_db'): shutil.rmtree(os.getcwd() + '/metastore_db', ignore_errors=True) # delete derby.log file if found if os.path.exists(os.getcwd() + '/derby.log'): os.remove(os.getcwd() + '/derby.log')
3. Launch Spark Session
After cleaning Spark home directory, we create a new Spark session. The module pyspark.sql includes SparkSession() function that enable us to create new Spark session. Using this function we can set some nice properties, such as the session’s master URL, the Spark application name, and the maximum amount of memory reserved for each executor process. In the below function, we create a Spark session that runs on local machine, with max reserved memory of 1 Gigabyte.
from pyspark.sql import SparkSession def create_spark_session(app_name, exe_memory): ''' This function creates Spark session with application name and available memory to the session INPUTS: @app_name: name of Spark application @exe_memory: value of reserved memory for Spark session OUTPUTS: @SparkSession instance ''' # create Spark Session return SparkSession.builder \ .master("local") \ .appName(app_name) \ .config("spark.executor.memory", exe_memory) \ .getOrCreate() # create Spark session spark = create_spark_session('Predictive Maintenance', '1gb')
4. Import .CSV Data Files to Spark Session
In this step, we import train and test data files in .CSV format into the Spark session as RDD objects. RDDs (or Resilient Distributed Datasets) are the core data format of Apache Spark and can be invoked through all Spark supported languages (Scala, Python, Java, and SQL). RDDs are responsible of the significant speed of Apache Spark, when compared with Hadoop Distributed File System (HDFS).
The easiest way to import CSV files as RDD object is by using Spark Context module. We append all CSV files using union() function to one RDD object. Then we separate data fields into RDD object using map() function.
def read_csv_as_rdd(sc, path, files_list, sep): ''' This function reads .CSV data files into RDD object and returns the RDD object INPUTS: @sc: Spark Context object @path: path of .CSV files @files_list: list of .CVS data files @sep: fields separator OUTPUTS: @rdd: RDD object contains data observations ''' # Read files and append them to RDD rdd = sc.union([sc.textFile(path + f) for f in files_list]) # Split lines on spaces rdd = rdd.map(lambda line: line.split(sep)) # return RDD object return rdd # get Spark Context object sc = spark.sparkContext # get files of training data train_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('train') and filename.endswith('.txt'))]) # get files of test data test_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('test') and filename.endswith('.txt')) # read training data in RDD train_rdd = read_csv_as_rdd(sc, path, [train_files], " ") # read test data in RDD test_rdd = read_csv_as_rdd(sc, path, [test_files], " ")
5. Convert RDD to Spark Dataframe
The basic advantage of PySpark is the ability to convert RDD objects into Dataframes. For those readers whom are familiar with R or Python Dataframes, working with Spark Dataframes makes Spark coding much easier. Similar to R and Python Dataframes, Spark Dataframes are also groups of data objects organized into named fields (i.e. columns). In order to convert an RDD object into Spark Dataframe, all what you need is to define the list of columns’ names you want to assign to your data. The function toDF() will do the rest for you.
def convert_rdd_to_df(rdd, header): ''' This function converts data from RDD format to Spark Dataframe, and adds header to the dataframe INPUTS: @rdd: RDD object contains data features @header: list of column names OUTPUTS: PySpark DF version of @rdd ''' # convert RDD to DF with header return rdd.toDF(header) # set data header, contains list of names of data columns header = ["id", "cycle", "setting1", "setting2", "setting3", "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10", "s11", "s12", "s13", "s14", "s15", "s16","s17", "s18", "s19", "s20", "s21"] # get df of both train and test data out of corresponded RDD objects train_df = convert_rdd_to_df(train_rdd, header) test_df = convert_rdd_to_df(test_rdd, header)
6. Remove NULL Values
After getting our datasets as Spark Dataframes, we want to remove NULL observations (i.e. data observations with no values). To do so, all what we need is to apply na.drop() to our Dataframes.
def remove_na_rows(df): ''' This function removes rows with empty values from Spark Dataframe INPUTS: @df: Spark Dataframe with possible NULL values OUTPUTS: @df: Spark Dataframe without NULL values ''' return df.na.drop() # remove empty rows train_df = remove_na_rows(train_df) test_df = remove_na_rows(test_df)
7. Set Data Types
The next step is to assign a data type to each column in our Dataframes. For this purpose, we will cast each column to it’s proper data type (e.g. Integer, Double, String, …). The following lines of code cast a list of columns into an Integer data type.
from pyspark.sql.types import IntegerType if len(int_list) > 0: for f in int_list: df = df.withColumn(f, df[f].cast(IntegerType()))
8. Visualize Data
Apache Spark has no native module for data visualization. Accordingly, and in order to visualize our data, we need to convert Spark Dataframes to another format. The direct approach we can apply in PySpark is to convert PySpark Dataframes into Pandas Dataframes. To do such conversion in efficient way, we select the set of data features we want to visualize, and retrieve it as Pandas DF. The following function gets Pandas DF using SQL query.
def get_pandasdf_from_sparkdf(spark_df, view_name, query): ''' This function queries Spark DF, and returns the result table as Pandas DF INPUTS: @spark_df: Spark Dataframe to be queried @view_name: name of SQL view to be created from Spark Dataframe @query: SQL query to be run on @spark_df OUTPUTS: SQL view of @query in Pandas format ''' spark_df.createOrReplaceTempView(view_name) return spark.sql(query).toPandas()
After getting the Pandas version of data, we can easily visualize it using matplotlib library.
# set SQL query for dataframe df sqlQuery = """ SELECT cycle, setting1, setting2, setting3, s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13, s14, s15, s16,s17, s18, s19, s20, s21 FROM df1 WHERE df1.id=15 """ # get SQL query result as Pandas DF plotdata1 = get_pandasdf_from_sparkdf(train_df, "df1", sqlQuery)
The following plot show features’ changes over time for engine # 15 in the training Dataset.
As we note from the above plot, the train data has features with no or very little variance over time. This type of features is almost useless in building predictive data models. Therefore, we will start the next Post of this tutorial by removing low-variance features.