# Predictive Data Analytics With Apache Spark (Part 4 Principal Component Analysis)

##### January 10, 2019

# 1. Principal Component Analysis (PCA)

In the previous Post, we generated tens of new features out of existing ones. This means our data dimensionality has significantly expanded. In order to go further in predictive data modeling, we need to reduce data dimensionality. Reducing data dimentions enables us to visualize our data more efficiently, test different parametric settings of machine learning algorithms to optimize our predictive solutions, and make much use of memory and storage utilities. One of the most common procedures to reduce dimensionality is **Principal Component Analysis** (**PCA**) algorithm. The following function applies PCA algorithm to a Spark Dataframe. It gets Spark DF as input, along with the subset of numerical features to be reduced and the targeted number of Principle Component (PC) features to be generated. The function returns the same input Dataframe after appending PCs to it, along with a list of variances of the generated PCs.

**from** pyspark.ml.feature **import** RFormula, PCA

def add_PCA_features(df, features, PC_num):
* '''
This function add PCs to Spark dataframe.
***INPUTS:**
@df: Spark dataframe
@features: list of numeric data features in @df
@PC_num: number of required PCs
**OUTPUTS:**
@df_new: Updated Spark dataframe
@pca_variance: list of variances of PC features
'''
*# create unique ID for @df*
df = df.withColumn("key", F.monotonically_increasing_id())
*''' 1. use RFormula to create the feature vector
In this step, we generate an ID column for scaled feature DF, and using this ID column to join data
features into one Vector.*

'''
* # Use RFormula to create feature vector*
formula = RFormula(formula = "~" + "+".join(features))
* # create ML pipeline for feature processing*
pipeline = formula.fit(df).transform(df)
* # select both "key" and "features" out of pipeline*
output = pipeline.select("key", "features")
* ''' 2. build PCA model, and fit data to it
In this step, we build a PCA model with 6 desired PCs, and train (fit) our features to that model.
'''*
* # init PCA object with "features" as input and "pcaFeatures" as output*
pca = PCA(k=PC_num, inputCol="features", outputCol="pcaFeatures")
* # build PCA model using output pipeline*
model = pca.fit(output)
* # get PCA result by fitting features into PCA*
result = model.transform(output).select("key", "pcaFeatures")
* # get vector of variances covered by each PC*
pca_variance = model.explainedVariance
*''' 3. convert PCs to Dataframe columns
In this step, we convert the generated PCs from last step, and append it to the scale data dataframe.
'''*
* # get PCA output as new Spark Dataframe*
pca_outcome = result.rdd.map(extract).toDF(["key"])
* # get columns names of pca_outcome*
oldColumns = pca_outcome.schema.names
* # set new names for PCA features*
newColumns = ["key"]
**for** i **in** range (1, PC_num + 1):
newColumns.append('PCA_' + str(i))
* # add new columns names to PCA Dataframe*
pca_result = reduce(lambda pca_outcome, idx: pca_outcome.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), pca_outcome)
* # join PCA df to data df*
df_new = df.join(pca_result, 'key', 'left')
**return** df_new, pca_variance

# add PCs to data features

train_df, train_pca_variance = add_PCA_features(train_df, features_to_PCA, 10)

After executing the above function, we obtain the required number of PC features. The following figure describes the cumulated data variance obtained by the first 10 PCs in both train and test datasets.

As we see in the above figures, the PCA algorithm managed to compress almost all data variance in the first 4 PC features. This means we managed to efficiently reduce our data dimensionality from 24 features to 4 features without losing variance. Now, let’s visualize our PC features.

As the figure shows, the generated PCs have different scales. Therefore, we need to scale those features using normalization and standardization procedures.

# 2. Normalize PC Features

In order to normalize PC features, we need to apply the normalization procedure over all engines (i.e. we do not want to partition data per each engine before normalization). The reason is that we want to generate global normalized features over all engines, to be employed later in the learning process. The following function adds global normalized features of the PCs.

**def** add_normalized_features_unpartitioned(df, features):
* '''
this function squashes columns in Spark dataframe in [0,1] domain.
***INPUTS:**
@df: Spark dataframe
@features: list of data features in @df to be normalized
**OUTPUTS:**
@df: updated Spark Dataframe
'''
* # loop over data features*
**for** f **in** features:
* # compute min, max values for data feature 'f'*
cur_min = df.agg({f: "min"}).collect()[0][0]
cur_max = df.agg({f: "max"}).collect()[0][0]
**print** f, ' cur_min: ', cur_min, ' cur_max: ', cur_max
* # create UDF instance*
normalize_Udf = F.udf(lambda value: (value - cur_min) / (cur_max - cur_min), DoubleType())
* # build normalized DF of data features*
df = df.withColumn(f + '_norm', normalize_Udf(df[f]))
**return** df

*# add normalized features to the train DF*

train_df = add_normalized_features_unpartitioned(train_df, pca_features)

# 3. Standardize PC Features

As with PC normalization, we also generate standardized PC features without partitioning our data per engines. The following function does the job for us.

**def** add_standardized_features_unpartitioned(df, features):
* '''
this function add standard features with 0 mean and unit variance for each data feature in Spark DF
***INPUTS:**
@df: Spark Dataframe
@features: list of data features in @df to be standerdized
**OUTPUTS:**
@df: updated Spark Dataframe
'''
**for** f **in** features:
* # compute min, max values for each data feature*
cur_mean = float(df.describe(f).filter("summary = 'mean'").select(f).collect()[0].asDict()[f])
cur_std = float(df.describe(f).filter("summary = 'stddev'").select(f).collect()[0].asDict()[f])
**print** f, ' cur_mean: ', cur_mean, ' cur_std: ', cur_std
* # create UDF instance step 1 (subtract mean from feature)*
standardize_Udf = F.udf(lambda value: (value - cur_mean) / cur_std, DoubleType())
* # add standardized data features*
df = df.withColumn(f + '_scaled', standardize_Udf(df[f]))
**return** df

*# add standardized features to train DF*

train_df = add_standardized_features_unpartitioned(train_df, pca_features)

The following figure shows standardized PC features of all engines.

# 4. Save data in Parquet Format

After applying all data preparation and feature engineering procedures listed above, now it’s time to save our Spark Dataframes to the storage unit. In Apache Spark, the most recommended format to save Dataframes is the **Parquet**, which is a columnar storage format that can store large datasets with many columns in reduced storage space. The following lines save both train and test Dataframes in *.parquet* format.

```
train_df.write.mode('overwrite').parquet(os.getcwd() + '/' + path + 'train_FD001_preprocessed.parquet')
test_df.write.mode('overwrite').parquet(os.getcwd() + '/' + path + 'test_FD001_preprocessed.parquet')
```

# 5. Full Code

The code of this tutorial can be found in my Github under this link.

## Gemma

6 Mar 2019Could you tell me what theme are you utilizing on your web site?

It looks good.

## Boutros El-Gamil

7 Mar 2019I am using OceanWP theme

## Irenglasy

26 Mar 2019Hello, I want to work in your company on a voluntary basis, can you offer me anything?

a little about me: https://about.me/iren

## Boutros El-Gamil

27 Mar 2019Thanks for your message!

Nevertheless, I have no private company now