Predictive Data Analytics With Apache Spark (Part 2 Data Preparation)

Predictive Data Analytics With Apache Spark (Part 2 Data Preparation)

January 4, 2019

1. Download Data and Create Project Directory

We Start off by downloading the Turbofan Engine Degradation Simulation Data Set from this link. Extract the .zip file and add the expanded folder (/CMAPSSData)  to your project home directory.

The data folder consists of 12 .txt files, representing four separate portions of train datasets, along with their corresponded test datasets and ground truth files. This tutorial analyses the first portion of data. But you are free to add aditional data portions and test them with the same code.

The code in this tutorial is executable under Python 2.7 and Spark 2.3.2 versions.

2. Clean Home Directory

Before we launch Spark coding, we need to clean the Spark home directory from auto-generated components that might be added during previous Spark executions. Starting from Spark 2.1.x, and due to default settings in Spark-defaults.conf.template file, Spark generates metastore_db directory and derby.log file automatically in the home directory of your code. Data resides in these auto-generated components can interrupt the execution of your current Spark session. Therefore, we need to delete both metastore_db directory and derby.log file to enable Spark session to launch properly. The following function does this job.

import shutil
import os
def clean_project_directory():
    '''
    This function deletes both '/metastore_db' folder and 'derby.log' file before 
    initializing Apache Spark session.
    The goal is to avoid any inconsistant startout of Spark session
    '''
# delete metastore_db folder if found if os.path.isdir(os.getcwd() + '/metastore_db'): shutil.rmtree(os.getcwd() + '/metastore_db', ignore_errors=True) # delete derby.log file if found if os.path.exists(os.getcwd() + '/derby.log'): os.remove(os.getcwd() + '/derby.log')

3. Launch Spark Session

After cleaning Spark home directory, we create a new Spark session. The module pyspark.sql includes SparkSession() function that enable us to create new Spark session. Using this function we can set some nice properties, such as the session’s master URL, the Spark application name, and the maximum amount of memory reserved for each executor process. In the below function, we create a Spark session that runs on local machine, with max reserved memory of 1 Gigabyte.

from pyspark.sql import SparkSession
def create_spark_session(app_name, exe_memory):
    '''
    This function creates Spark session with application name and available memory 
    to the session
    
    INPUTS:
    @app_name: name of Spark application
    @exe_memory: value of reserved memory for Spark session
    
    OUTPUTS:
    @SparkSession instance
    '''
    
    # create Spark Session
    return SparkSession.builder \
       .master("local") \
       .appName(app_name) \
       .config("spark.executor.memory", exe_memory) \
       .getOrCreate()

# create Spark session    
spark = create_spark_session('Predictive Maintenance', '1gb')

4. Import .CSV Data Files to Spark Session

In this step, we import train and test data files in .CSV format into the Spark session as RDD objects. RDDs (or Resilient Distributed Datasets) are the core data format of Apache Spark and can be invoked through all Spark supported languages (Scala, Python, Java, and SQL). RDDs are responsible of the significant speed of Apache Spark, when compared with Hadoop Distributed File System (HDFS).

The easiest way to import CSV files as RDD object is by using Spark Context module. We append all CSV files using union() function to one RDD object. Then we separate data fields into RDD object using map() function.

def read_csv_as_rdd(sc, path, files_list, sep):
    '''
    This function reads .CSV data files into RDD object and returns the RDD object
    
    INPUTS:
    @sc: Spark Context object
    @path: path of .CSV files
    @files_list: list of .CVS data files
    @sep: fields separator
    
    OUTPUTS:
    @rdd: RDD object contains data observations
    '''
    
    # Read files and append them to RDD
    rdd = sc.union([sc.textFile(path + f) for f in files_list])

    # Split lines on spaces
    rdd = rdd.map(lambda line: line.split(sep))
    
    # return RDD object
    return rdd   

# get Spark Context object
sc = spark.sparkContext

# get files of training data
train_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('train') 
		and filename.endswith('.txt'))])

# get files of test data
test_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('test') 
                            and filename.endswith('.txt'))

# read training data in RDD
train_rdd = read_csv_as_rdd(sc, path, [train_files[0]], " ")

# read test data in RDD
test_rdd = read_csv_as_rdd(sc, path, [test_files[0]], " ")

5. Convert RDD to Spark Dataframe

The basic advantage of PySpark is the ability to convert RDD objects into Dataframes. For those readers whom are familiar with R or Python Dataframes, working with Spark Dataframes makes Spark coding much easier. Similar to R and Python Dataframes, Spark Dataframes are also groups of data objects organized into named fields (i.e. columns). In order to convert an RDD object into Spark Dataframe, all what you need is to define the list of columns’ names you want to assign to your data. The function toDF() will do the rest for you.

def convert_rdd_to_df(rdd, header):
    '''
    This function converts data from RDD format to Spark Dataframe, and adds header to the dataframe
    
    INPUTS:
    @rdd: RDD object contains data features
    @header: list of column names  
    
    OUTPUTS:
    PySpark DF version of @rdd
    '''
        
    # convert RDD to DF with header
    return rdd.toDF(header)

# set data header, contains list of names of data columns
header = ["id", "cycle", "setting1", "setting2", "setting3",
                    "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8",
                    "s9", "s10", "s11", "s12", "s13", "s14", "s15", 
                    "s16","s17", "s18", "s19", "s20", "s21"]

# get df of both train and test data out of corresponded RDD objects
train_df = convert_rdd_to_df(train_rdd, header)
test_df = convert_rdd_to_df(test_rdd, header)

6. Remove NULL Values

After getting our datasets as Spark Dataframes, we want to remove NULL observations (i.e. data observations with no values). To do so, all what we need is to apply na.drop() to our Dataframes.

def remove_na_rows(df):
    '''
    This function removes rows with empty values from Spark Dataframe
    
    INPUTS:
    @df: Spark Dataframe with possible NULL values
    
    OUTPUTS:
    @df: Spark Dataframe without NULL values
    '''
    
    return df.na.drop()

# remove empty rows
train_df = remove_na_rows(train_df)
test_df = remove_na_rows(test_df)

7. Set Data Types

The next step is to assign a data type to each column in our Dataframes. For this purpose, we will cast each column to it’s proper data type (e.g. Integer, Double, String, …). The following lines of code cast a list of columns into an Integer data type.

from pyspark.sql.types import IntegerType
if len(int_list) > 0:
   for f in int_list:
      df = df.withColumn(f, df[f].cast(IntegerType()))

8. Visualize Data

Apache Spark has no native module for data visualization. Accordingly, and in order to visualize our data, we need to convert Spark Dataframes to another format. The direct approach we can apply in PySpark is to convert PySpark Dataframes into Pandas Dataframes. To do such conversion in efficient way, we select the set of data features we want to visualize, and retrieve it as Pandas DF. The following function gets Pandas DF using SQL query.

def get_pandasdf_from_sparkdf(spark_df, view_name, query):
    '''
    This function queries Spark DF, and returns the result table as Pandas DF
    
    INPUTS:
    @spark_df: Spark Dataframe to be queried
    @view_name: name of SQL view to be created from Spark Dataframe
    @query: SQL query to be run on @spark_df 
    
    OUTPUTS:
    SQL view of @query in Pandas format
    '''
    
    spark_df.createOrReplaceTempView(view_name)
    return spark.sql(query).toPandas()

After getting the Pandas version of data, we can easily visualize it using matplotlib library.

# set SQL query for dataframe df
sqlQuery = """
    SELECT cycle, setting1, setting2, setting3,
                    s1, s2, s3, s4, s5, s6, s7, s8,
                    s9, s10, s11, s12, s13, s14, s15, 
                    s16,s17, s18, s19, s20, s21
    FROM df1 
    WHERE df1.id=15
    """

# get SQL query result as Pandas DF
plotdata1 = get_pandasdf_from_sparkdf(train_df, "df1", sqlQuery)

The following plot show features’ changes over time for engine # 15 in the training Dataset.

As we note from the above plot, the train data has features with no or very little variance over time. This type of features is almost useless in building predictive data models. Therefore, we will start the next Post of this tutorial by removing low-variance features.

9. Full Code

The code of this tutorial can be found in my Github under this link.

This Post Has 4 Comments

  1. I discovered your internet site from Google and I need to
    claim it was a terrific locate. Many thanks!

    1. Thank you!

  2. Do you have any type of pointers for writing articles? That’s where I constantly struggle
    and I just wind up looking vacant screen for long time.

    1. It depends on the material you try to publish. For me, I used to write technical articles based on my working experience.
      I believe you can easily find a way to write a good article once you decide about the contents of it.

Leave a Reply

Close Menu