Predictive Data Analytics With Apache Spark (Part 4 Principal Component Analysis)

Predictive Data Analytics With Apache Spark (Part 4 Principal Component Analysis)

January 10, 2019

1. Principal Component Analysis (PCA)

In the previous Post, we generated tens of new features out of existing ones. This means our data dimensionality has significantly expanded. In order to go further in predictive data modeling, we need to reduce data dimensionality. Reducing data dimentions enables us to visualize our data more efficiently, test different parametric settings of machine learning algorithms to optimize our predictive solutions, and make much use of memory and storage utilities. One of the most common procedures to reduce dimensionality is Principal Component Analysis (PCA) algorithm. The following function applies PCA algorithm to a Spark Dataframe. It gets Spark DF as input, along with the subset of numerical features to be reduced and the targeted number of Principle Component (PC) features to be generated. The function returns the same input Dataframe after appending PCs to it, along with a list of variances of the generated PCs.

from pyspark.ml.feature import RFormula, PCA
def
add_PCA_features(df, features, PC_num): ''' This function add PCs to Spark dataframe. INPUTS: @df: Spark dataframe @features: list of numeric data features in @df @PC_num: number of required PCs OUTPUTS: @df_new: Updated Spark dataframe @pca_variance: list of variances of PC features ''' # create unique ID for @df df = df.withColumn("key", F.monotonically_increasing_id()) ''' 1. use RFormula to create the feature vector In this step, we generate an ID column for scaled feature DF, and using this ID column to join data features into one Vector.
'''
# Use RFormula to create feature vector formula = RFormula(formula = "~" + "+".join(features)) # create ML pipeline for feature processing pipeline = formula.fit(df).transform(df) # select both "key" and "features" out of pipeline output = pipeline.select("key", "features") ''' 2. build PCA model, and fit data to it In this step, we build a PCA model with 6 desired PCs, and train (fit) our features to that model. ''' # init PCA object with "features" as input and "pcaFeatures" as output pca = PCA(k=PC_num, inputCol="features", outputCol="pcaFeatures") # build PCA model using output pipeline model = pca.fit(output) # get PCA result by fitting features into PCA result = model.transform(output).select("key", "pcaFeatures") # get vector of variances covered by each PC pca_variance = model.explainedVariance ''' 3. convert PCs to Dataframe columns In this step, we convert the generated PCs from last step, and append it to the scale data dataframe. ''' # get PCA output as new Spark Dataframe pca_outcome = result.rdd.map(extract).toDF(["key"]) # get columns names of pca_outcome oldColumns = pca_outcome.schema.names # set new names for PCA features newColumns = ["key"] for i in range (1, PC_num + 1): newColumns.append('PCA_' + str(i)) # add new columns names to PCA Dataframe pca_result = reduce(lambda pca_outcome, idx: pca_outcome.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), pca_outcome) # join PCA df to data df df_new = df.join(pca_result, 'key', 'left') return df_new, pca_variance

 

# add PCs to data features
train_df, train_pca_variance = add_PCA_features(train_df, features_to_PCA, 10)

After executing the above function, we obtain the required number of PC features. The following figure describes the cumulated data variance obtained by the first 10 PCs in both train and test datasets.

As we see in the above figures, the PCA algorithm managed to compress almost all data variance in the first 4 PC features. This means we managed to efficiently reduce our data dimensionality from 24 features to 4 features without losing variance. Now, let’s visualize our PC features.
As the figure shows, the generated PCs have different scales. Therefore, we need to scale those features using normalization and standardization procedures.

2. Normalize PC Features

In order to normalize PC features, we need to apply the normalization procedure over all engines (i.e. we do not want to partition data per each engine before normalization). The reason is that we want to generate global normalized features over all engines, to be employed later in the learning process. The following function adds global normalized features of the PCs.

def add_normalized_features_unpartitioned(df, features):
    '''
    this function squashes columns in Spark dataframe in [0,1] domain.
    
    INPUTS:
    @df: Spark dataframe
    @features: list of data features in @df to be normalized
    
    OUTPUTS:
    @df: updated Spark Dataframe
    '''    
    
    # loop over data features
    for f in features:                                    
        # compute min, max values for data feature 'f'
        cur_min = df.agg({f: "min"}).collect()[0][0]
        cur_max = df.agg({f: "max"}).collect()[0][0]

        print f, ' cur_min: ', cur_min, ' cur_max: ', cur_max 

        # create UDF instance
        normalize_Udf = F.udf(lambda value: (value - cur_min) / (cur_max - cur_min), DoubleType())

        # build normalized DF of data features
        df = df.withColumn(f + '_norm', normalize_Udf(df[f])) 

    return df

# add normalized features to the train DF
train_df = add_normalized_features_unpartitioned(train_df, pca_features)

3. Standardize PC Features

As with PC normalization, we also generate standardized PC features without partitioning our data per engines. The following function does the job for us.

def add_standardized_features_unpartitioned(df, features):
    '''
    this function add standard features with 0 mean and unit variance for each data feature in Spark DF
    
    INPUTS:
    @df: Spark Dataframe
    @features: list of data features in @df to be standerdized
    
    OUTPUTS:
    @df: updated Spark Dataframe 
    '''
    
    for f in features:
        # compute min, max values for each data feature
        cur_mean = float(df.describe(f).filter("summary = 'mean'").select(f).collect()[0].asDict()[f])
        cur_std = float(df.describe(f).filter("summary = 'stddev'").select(f).collect()[0].asDict()[f])

        print f, ' cur_mean: ', cur_mean, ' cur_std: ', cur_std 

        # create UDF instance step 1 (subtract mean from feature)
        standardize_Udf = F.udf(lambda value: (value - cur_mean) / cur_std, DoubleType())

        # add standardized data features
        df = df.withColumn(f + '_scaled', standardize_Udf(df[f]))  
        
    return df

# add standardized features to train DF
train_df = add_standardized_features_unpartitioned(train_df, pca_features)

The following figure shows standardized PC features of all engines.

4. Save data in Parquet Format

After applying all data preparation and feature engineering procedures listed above, now it’s time to save our Spark Dataframes to the storage unit. In Apache Spark, the most recommended format to save Dataframes is the Parquet, which is a columnar storage format that can store large datasets with many columns in reduced storage space. The following lines save both train and test Dataframes in .parquet format.

train_df.write.mode('overwrite').parquet(os.getcwd() + '/' + path +  'train_FD001_preprocessed.parquet')
test_df.write.mode('overwrite').parquet(os.getcwd() + '/' + path +  'test_FD001_preprocessed.parquet')

5. Full Code

The code of this tutorial can be found in my Github under this link.

This Post Has 4 Comments

  1. Could you tell me what theme are you utilizing on your web site?
    It looks good.

    1. I am using OceanWP theme

  2. Hello, I want to work in your company on a voluntary basis, can you offer me anything?
    a little about me: https://about.me/iren

    1. Thanks for your message!
      Nevertheless, I have no private company now

Leave a Reply

Close Menu