Predictive Data Analytics With Apache Spark (Part 7 Multi Classification)

Predictive Data Analytics With Apache Spark (Part 7 Multi Classification)

January 27, 2019

1. Select Features

In this article, we aim to build classifiers that can distinguish between three time zones before a given engines fails to run. These time zones are defined as Alarm Zone (1-15 runs before the failure), Warning Zone (1-15 runs before Alarm Zone), and Normal Zone (runs before Warning Zone).

We use the same list of independent variables in the previous article (Part 6 Binary Classifiers) to build our multi classifiers. The features are [s3_rollingmean_4_scaled, s4_rollingmean_4_scaled, s7_rollingmean_4_scaled, s12_rollingmean_4_scaled, and s17_rollingmean_4_scaled, PCA_1_scaled, PCA_2_scaled, PCA_3_scaled, PCA_4_scaled, PCA_5_scaled].

2. Build Features Vector of Train Dataset

The train vector is initiated the same as we learned in the previous 2 Posts. The single difference is changing the target variable from “label1” to “label2” to refer to the multi-labeling column.
# select features to be used into classification algorithms
train = train_df.select(train_df["s3_rollingmean_4_scaled"],train_df["s4_rollingmean_4_scaled"],
                        train_df["s7_rollingmean_4_scaled"],train_df["s12_rollingmean_4_scaled"],
                        train_df["s17_rollingmean_4_scaled"],train_df["PCA_1_scaled"],
                        train_df["PCA_2_scaled"],train_df["PCA_3_scaled"],train_df["PCA_4_scaled"],
                        train_df["PCA_5_scaled"],train_df["label2"])

# build train features vector
train_vector = train.rdd.map(lambda x: [x[10], Vectors.dense(x[0:10])]).toDF(['label','features'])

3. Logistic Regression

In order to build Logistic Regression multi classifier, we follow the same steps we learned in the last Post. Apache Spark can distinguish between binary classifier and multi classifier by detecting number of classes in the “label” column.

from pyspark.ml.classification import LogisticRegression
# init log regression object

lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=100)

If we want to apply cross validation within the learning process, we need to define a multi classification validator using the following line:

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# create evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="label")

Afterwards we create the Grid Parameter and Cross Validation objects as follows:

from pyspark.ml.tuning import ParamGridBuilder
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.1, 0.3]) # regularization parameter .addGrid(lr.elasticNetParam, [0.0]) # Elastic Net Parameter (Ridge = 0) .build()) from pyspark.ml.tuning import CrossValidator
# create cross validation object
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

We then fit our model using the CrossValidator() instance:

# run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train_vector)

We make model prediction on test data using the following code:

# make predictions on test data
prediction_mul_lr = cvModel.transform(test_vector)

Finally we get the confusion matrix using confusion_matrix() function

from sklearn.metrics import confusion_matrix
# get confusion matrix
logreg_matrix = confusion_matrix(labels, preds)

4. Decision Tree

The second multi classifier in this Post is the Decision Tree classifier. To build the classifier we follow the same steps as of Log. Regression classifier in the previous section.

from pyspark.ml.classification import DecisionTreeClassifier
# init decision tree object dt = DecisionTreeClassifier(featuresCol='features', labelCol='label') # add empty parameter grid paramGrid = (ParamGridBuilder() .addGrid(dt.maxDepth, [4, 8]) # max depth parameter .addGrid(dt.maxBins, [2,4,6]) # max bins Parameter .build()) # create evaluator evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label") # create cross validation object crossval = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10) # run cross-validation, and choose the best set of parameters. cvModel = crossval.fit(train_vector) # make predictions on test data prediction_mcdt = cvModel.transform(test_vector) # get labels, predictions, and probabilities as python lists and nparrays labels = map(int, get_col_as_list(prediction_mcpc, "label")) preds = map(int, get_col_as_list(prediction_mcpc, "prediction")) probs = get_col_as_nparray(prediction_mcpc, "probability") # get confusion matrix dt_matrix = confusion_matrix(labels, preds)

The functions get_col_as_list() and get_col_as_nparray() are defined as follows:

def get_col_as_list(df, col_name):
    '''
    in this function, we convert a column in Spark DF to a Python list
    
    INPUTS:
    @df: Spark DF
    @col_name: name of the column 
    '''
    
    return df.select(col_name).rdd.flatMap(lambda x: x).collect()

def get_col_as_nparray(df, col_name):
    '''
    in this function, we convert a column in Spark DF to a Numpy array
    
    INPUTS:
    @df: Spark DF
    @col_name: name of the column 
    '''
    
    return np.array(df.select(col_name).collect())

5. Multilayer Perceptron

The third classifier in this Post is the Multilayer Perceptron. To build this classifier, we follow the same steps as of the above classifiers.

from pyspark.ml.classification import MultilayerPerceptronClassifier
# init multilayer perceptron object
mcpc = MultilayerPerceptronClassifier(featuresCol='features', labelCol='label', maxIter= 100, layers=[10, 11, 10, 3]) # add parameter grid paramGrid = ParamGridBuilder().build() # create evaluator evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label") # create cross validation object crossval = CrossValidator(estimator=mcpc, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10) # run cross-validation, and choose the best set of parameters. cvModel = crossval.fit(train_vector) # make predictions on test data prediction_rf = cvModel_rf.transform(test_vector) # get labels, predictions, and probabilities as python lists and nparrays labels = map(int, get_col_as_list(prediction_mcpc, "label")) preds = map(int, get_col_as_list(prediction_mcpc, "prediction")) probs = get_col_as_nparray(prediction_mcpc, "probability") # get confusion matrix mcpc_matrix = confusion_matrix(labels, preds)

6. Compare Classifiers

After building and evaluating the above multi classifiers, we compare them using Accuracy, Macro Precision, Macro Recall, and Macro F1 evaluation metrics. The following figure shows the performance of each classifier over the first 10 engines.
As the figure displays, it is clear that except for Engine #1, both Log. Regression and ML Perceptron classifiers outperform Decision Tree. However, it’s hard to prefer one classifier over the other, as they share the best results over engines’ domain. If we consider F1 as the most favorite measurement (i.e. optimizes both Precision and Recall), we can prefer ML Perceptron over the majority of test engines.

7. Full Code

The code of this tutorial can be found in my Github under this link.

Leave a Reply