Apache Spark 2:Data Processing and Real-Time Analytics
上QQ阅读APP看书,第一时间看更新

Feature engineering

Now it is time to run the first transformer (which is actually an estimator). It is StringIndexer and needs to keep track of an internal mapping table between strings and indexes. Therefore, it is not a transformer but an estimator:

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

var indexer = new StringIndexer()
.setHandleInvalid("skip")
.setInputCol("L0_S22_F545")
.setOutputCol("L0_S22_F545Index")

var indexed = indexer.fit(df_notnull).transform(df_notnull)
indexed.printSchema

As we can see clearly in the following image, an additional column called L0_S22_F545Index has been created:

Finally, let's examine some content of the newly created column and compare it with the source column.

We can clearly see how the category string gets transformed into a float index:

Now we want to apply OneHotEncoder, which is a transformer, in order to generate better features for our machine learning model:

var encoder = new OneHotEncoder()
.setInputCol("L0_S22_F545Index")
.setOutputCol("L0_S22_F545Vec")

var encoded = encoder.transform(indexed)

As you can see in the following figure, the newly created column L0_S22_F545Vec contains org.apache.spark.ml.linalg.SparseVector objects, which is a compressed representation of a sparse vector:

Sparse vector representations: The OneHotEncoder, as many other algorithms, returns a sparse vector of the org.apache.spark.ml.linalg.SparseVector type as, according to the definition, only one element of the vector can be one, the rest has to remain zero. This gives a lot of opportunity for compression as only the position of the elements that are non-zero has to be known. Apache Spark uses a sparse vector representation in the following format: (l,[p],[v]), where l stands for length of the vector, p for position (this can also be an array of positions), and v for the actual values (this can be an array of values). So if we get (13,[10],[1.0]), as in our earlier example, the actual sparse vector looks like this: (0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0).

So now that we are done with our feature engineering, we want to create one overall sparse vector containing all the necessary columns for our machine learner. This is done using VectorAssembler:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

var vectorAssembler = new VectorAssembler()
.setInputCols(Array("L0_S22_F545Vec", "L0_S0_F0", "L0_S0_F2","L0_S0_F4"))
.setOutputCol("features")

var assembled = vectorAssembler.transform(encoded)

We basically just define a list of column names and a target column, and the rest is done for us:

As the view of the features column got a bit squashed, let's inspect one instance of the feature field in more detail:

We can clearly see that we are dealing with a sparse vector of length 16 where positions 0, 13, 14, and 15 are non-zero and contain the following values: 1.0, 0.03, -0.034, and -0.197. Done! Let's create a Pipeline out of these components.