Predictive Data Analytics With Apache Spark (Part 2 Data Preparation)

Predictive Data Analytics With Apache Spark (Part 2 Data Preparation)

January 4, 2019

1. Download Data and Create Project Directory

We Start off by downloading the Turbofan Engine Degradation Simulation Data Set from this link. Extract the .zip file and add the expanded folder (/CMAPSSData)  to your project home directory.

The data folder consists of 12 .txt files, representing four separate portions of train datasets, along with their corresponded test datasets and ground truth files. This tutorial analyses the first portion of data. But you are free to add aditional data portions and test them with the same code.

The code in this tutorial is executable under Python 2.7 and Spark 2.3.2 versions.

2. Clean Home Directory

Before we launch Spark coding, we need to clean the Spark home directory from auto-generated components that might be added during previous Spark executions. Starting from Spark 2.1.x, and due to default settings in Spark-defaults.conf.template file, Spark generates metastore_db directory and derby.log file automatically in the home directory of your code. Data resides in these auto-generated components can interrupt the execution of your current Spark session. Therefore, we need to delete both metastore_db directory and derby.log file to enable Spark session to launch properly. The following function does this job.

import shutil
import os
def clean_project_directory():
    This function deletes both '/metastore_db' folder and 'derby.log' file before 
    initializing Apache Spark session.
    The goal is to avoid any inconsistant startout of Spark session
# delete metastore_db folder if found if os.path.isdir(os.getcwd() + '/metastore_db'): shutil.rmtree(os.getcwd() + '/metastore_db', ignore_errors=True) # delete derby.log file if found if os.path.exists(os.getcwd() + '/derby.log'): os.remove(os.getcwd() + '/derby.log')

3. Launch Spark Session

After cleaning Spark home directory, we create a new Spark session. The module pyspark.sql includes SparkSession() function that enable us to create new Spark session. Using this function we can set some nice properties, such as the session’s master URL, the Spark application name, and the maximum amount of memory reserved for each executor process. In the below function, we create a Spark session that runs on local machine, with max reserved memory of 1 Gigabyte.

from pyspark.sql import SparkSession
def create_spark_session(app_name, exe_memory):
    This function creates Spark session with application name and available memory 
    to the session
    @app_name: name of Spark application
    @exe_memory: value of reserved memory for Spark session
    @SparkSession instance
    # create Spark Session
    return SparkSession.builder \
       .master("local") \
       .appName(app_name) \
       .config("spark.executor.memory", exe_memory) \

# create Spark session    
spark = create_spark_session('Predictive Maintenance', '1gb')

4. Import .CSV Data Files to Spark Session

In this step, we import train and test data files in .CSV format into the Spark session as RDD objects. RDDs (or Resilient Distributed Datasets) are the core data format of Apache Spark and can be invoked through all Spark supported languages (Scala, Python, Java, and SQL). RDDs are responsible of the significant speed of Apache Spark, when compared with Hadoop Distributed File System (HDFS).

The easiest way to import CSV files as RDD object is by using Spark Context module. We append all CSV files using union() function to one RDD object. Then we separate data fields into RDD object using map() function.

def read_csv_as_rdd(sc, path, files_list, sep):
    This function reads .CSV data files into RDD object and returns the RDD object
    @sc: Spark Context object
    @path: path of .CSV files
    @files_list: list of .CVS data files
    @sep: fields separator
    @rdd: RDD object contains data observations
    # Read files and append them to RDD
    rdd = sc.union([sc.textFile(path + f) for f in files_list])

    # Split lines on spaces
    rdd = line: line.split(sep))
    # return RDD object
    return rdd   

# get Spark Context object
sc = spark.sparkContext

# get files of training data
train_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('train') 
		and filename.endswith('.txt'))])

# get files of test data
test_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('test') 
                            and filename.endswith('.txt'))

# read training data in RDD
train_rdd = read_csv_as_rdd(sc, path, [train_files[0]], " ")

# read test data in RDD
test_rdd = read_csv_as_rdd(sc, path, [test_files[0]], " ")

5. Convert RDD to Spark Dataframe

The basic advantage of PySpark is the ability to convert RDD objects into Dataframes. For those readers whom are familiar with R or Python Dataframes, working with Spark Dataframes makes Spark coding much easier. Similar to R and Python Dataframes, Spark Dataframes are also groups of data objects organized into named fields (i.e. columns). In order to convert an RDD object into Spark Dataframe, all what you need is to define the list of columns’ names you want to assign to your data. The function toDF() will do the rest for you.

def convert_rdd_to_df(rdd, header):
    This function converts data from RDD format to Spark Dataframe, and adds header to the dataframe
    @rdd: RDD object contains data features
    @header: list of column names  
    PySpark DF version of @rdd
    # convert RDD to DF with header
    return rdd.toDF(header)

# set data header, contains list of names of data columns
header = ["id", "cycle", "setting1", "setting2", "setting3",
                    "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8",
                    "s9", "s10", "s11", "s12", "s13", "s14", "s15", 
                    "s16","s17", "s18", "s19", "s20", "s21"]

# get df of both train and test data out of corresponded RDD objects
train_df = convert_rdd_to_df(train_rdd, header)
test_df = convert_rdd_to_df(test_rdd, header)

6. Remove NULL Values

After getting our datasets as Spark Dataframes, we want to remove NULL observations (i.e. data observations with no values). To do so, all what we need is to apply na.drop() to our Dataframes.

def remove_na_rows(df):
    This function removes rows with empty values from Spark Dataframe
    @df: Spark Dataframe with possible NULL values
    @df: Spark Dataframe without NULL values

# remove empty rows
train_df = remove_na_rows(train_df)
test_df = remove_na_rows(test_df)

7. Set Data Types

The next step is to assign a data type to each column in our Dataframes. For this purpose, we will cast each column to it’s proper data type (e.g. Integer, Double, String, …). The following lines of code cast a list of columns into an Integer data type.

from pyspark.sql.types import IntegerType
if len(int_list) > 0:
   for f in int_list:
      df = df.withColumn(f, df[f].cast(IntegerType()))

8. Visualize Data

Apache Spark has no native module for data visualization. Accordingly, and in order to visualize our data, we need to convert Spark Dataframes to another format. The direct approach we can apply in PySpark is to convert PySpark Dataframes into Pandas Dataframes. To do such conversion in efficient way, we select the set of data features we want to visualize, and retrieve it as Pandas DF. The following function gets Pandas DF using SQL query.

def get_pandasdf_from_sparkdf(spark_df, view_name, query):
    This function queries Spark DF, and returns the result table as Pandas DF
    @spark_df: Spark Dataframe to be queried
    @view_name: name of SQL view to be created from Spark Dataframe
    @query: SQL query to be run on @spark_df 
    SQL view of @query in Pandas format
    return spark.sql(query).toPandas()

After getting the Pandas version of data, we can easily visualize it using matplotlib library.

# set SQL query for dataframe df
sqlQuery = """
    SELECT cycle, setting1, setting2, setting3,
                    s1, s2, s3, s4, s5, s6, s7, s8,
                    s9, s10, s11, s12, s13, s14, s15, 
                    s16,s17, s18, s19, s20, s21
    FROM df1 

# get SQL query result as Pandas DF
plotdata1 = get_pandasdf_from_sparkdf(train_df, "df1", sqlQuery)

The following plot show features’ changes over time for engine # 15 in the training Dataset.

As we note from the above plot, the train data has features with no or very little variance over time. This type of features is almost useless in building predictive data models. Therefore, we will start the next Post of this tutorial by removing low-variance features.

9. Full Code

The code of this tutorial can be found in my Github under this link.

This Post Has 7 Comments

  1. Lakesha

    I discovered your internet site from Google and I need to
    claim it was a terrific locate. Many thanks!

    1. Boutros El-Gamil

      Thank you!

  2. Eleanore

    Do you have any type of pointers for writing articles? That’s where I constantly struggle
    and I just wind up looking vacant screen for long time.

  3. Regena

    We’re a gaggle of volunteers and opening a new scheme
    in our community. Your web site offered us with useful information to work on. You
    have performed a formidable task and our whole group will probably be
    grateful to you.

  4. Olivia

    An impressive share! I’ve just forwarded this onto a coworker
    who had been doing a little homework on this. And he in fact bought me breakfast because I discovered it for him…

    lol. So allow me to reword this…. Thanks for the meal!!
    But yeah, thanx for spending time to discuss this topic here on your site.

  5. Predictive Analytics

    I really love your site.. Very nice colors & theme.
    Did you build this site yourself? Please
    reply back as I’m planning to create my own personal site and would like to know where you got this from or just what the theme is called.

    1. Boutros El-Gamil

      I am using OceanWP Theme

Leave a Reply