In this post,
you will learn how to create an Apache Spark cluster in HDInsight and then use
Jupyter notebook to run Spark SQL interactive queries on the Spark cluster.
The
process starts with 3 simple steps: -
ii. Provision Apache Spark cluster in HDInsight
iii. Run Spark statements using Jupyter notebook
In this post,
you use Jupyter notebook to run Spark SQL queries against the Spark cluster.
HDInsight Spark clusters provide two kernels that you can use with the Jupyter
notebook. These are:
1. PySpark (for applications written
in Python)
2. Spark (for applications written
in Scala)
Create Jupyter notebook with PySpark kernel
- From the Azure Portal, from the startboard, click the tile for your Spark cluster (if you pinned it to the startboard). You can also navigate to your cluster under Browse All > HDInsight Clusters.
- From the Spark cluster blade, click Cluster Dashboard, and then click Jupyter Notebook. If prompted, enter the admin credentials for the cluster.
- Create a new notebook. Click New, and then click PySpark.
- A new notebook is created and opened with the name Untitled.pynb. Click the notebook name at the top, and enter a friendly name.
- Because you created a notebook using the PySpark kernel, you do not need to create any contexts explicitly. The Spark and Hive contexts will be automatically created for you when you run the first code cell.
This Post demonstrates how to use MLLib, Spark's
built-in machine learning libraries, to perform a simple predictive analysis on
an open dataset. MLLib is a core Spark library that provides several utilities
that are useful for machine learning tasks,
This article presents a simple approach to classification
through logistic regression.
Notebook setup
When using PySpark kernel notebooks on HDInsight, there is no need to create a SparkContext or a HiveContext; those are all created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:- SparkContext (sc)
- HiveContext (sqlContext)
Construct an input dataframe
We can use sqlContext to perform transformations on structured data. The first task is to load the sample data (Food_Inspections1.csv) into a Spark SQL dataframe. The sample data is by default available on the cluster. Because the raw data is in a CSV format, we need to use the Spark context to pull every line of the file into memory as unstructured text; then, you use Python's CSV library to parse each line individually.The schema of the input file can be checked with the following code – “inspections.take(1)”
The following output will be observed:
The output of the above code gives us an idea of the schema of the input file; the file includes the name of every establishment, the type of establishment, the address, the data of the inspections, and the location, among other things. Let's select a few columns that will be useful for our predictive analysis and group the results as a dataframe.
We now have a dataframe, df on which we can perform our analysis. We've included 4 columns of interest in the dataframe: id, name, results, and violations using the following codes:
Understand the data
Let's start to get a sense of what our dataset contains
You can
see that there are 5 distinct results that an inspection can have
- Business not located
- Fail
- Pass
- Pass w/ conditions, and
- Out of Business
Let us
develop a model that can guess the outcome of a food inspection, given the
violations. Since logistic regression is a binary classification method, it
makes sense to group our data into two categories: Fail and Pass.
A "Pass w/ Conditions" is still a Pass, so when we train the model,
we will consider the two results equivalent. Data with the other results
("Business Not Located", "Out of Business") are not useful
so we will remove them from our training set. This should be okay since these
two categories make up a very small percentage of the results anyway.
Let us go
ahead and convert our existing dataframe(df) into a
new dataframe where each inspection is represented as a label-violations pair.
In our case, a label of 0.0 represents a failure, a label of
1.0 represents a success, and a label of -1.0 represents some results besides those two. We will
filter those other results out when computing the new data frame.
The codes
are given below:
def labelForResults(s):
if s ==
'Fail':
return 0.0
elif s ==
'Pass w/ Conditions' or s == 'Pass':
return 1.0
else:
return -1.0
label = UserDefinedFunction(labelForResults,
DoubleType())
labeledData =
df.select(label(df.results).alias('label'), df.violations).where('label >=
0')
Create a logistic regression model from the input dataframe
Our final task is to convert the labeled data into a format that can be analyzed by logistic regression. The input to a logistic regression algorithm should be a set of label-feature vector pairs, where the "feature vector" is a vector of numbers that represents the input point in some way. So, we need a way to convert the "violations" column, which is semi-structured and contains a lot of comments in free-text, to an array of real numbers that a machine could easily understand.One standard machine learning approach for processing natural language is to assign each distinct word an "index", and then pass a vector to the machine learning algorithm such that each index's value contains the relative frequency of that word in the text string.
Codes:
tokenizer =
Tokenizer(inputCol="violations", outputCol="words")
hashingTF =
HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF,
lr])
model = pipeline.fit(labeledData)
model
Evaluate the model on a separate test dataset
We can use the model we created earlier to predict what the results of new inspections will be, based on the violations that were observed. We trained this model on the dataset Food_Inspections1.csv. Let us use a second dataset, Food_Inspections2.csv, to evaluate the strength of this model on new data. This second data set (Food_Inspections2.csv) should already be in the default storage container associated with the cluster.The snippet below creates a new dataframe, predictionsDf that contains the prediction generated by the model.
testData=sc.textFile('wasb:///HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections2.csv')\
.map(csvParse) \
.map(lambda l: (int(l[0]), l[1], l[12], l[13]))
testDf = sqlContext.createDataFrame(testData, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")
predictionsDf = model.transform(testDf)
predictionsDf.registerTempTable('Predictions')
predictionsDf.columns
The model.transform() method will apply the same transformation to any new data with the same schema, and arrive at a prediction of how to classify the data. We can do some simple statistics to get a sense of how accurate our predictions were:
numSuccesses = predictionsDf.where("""(prediction = 0 AND results = 'Fail') OR
(prediction = 1 AND (results = 'Pass' OR
results = 'Pass w/ Conditions'))""").count()
numInspections = predictionsDf.count()
print "There were", numInspections, "inspections and there were", numSuccesses, "successful predictions"
print "This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate"
As it can be noticed the model thus created predicted with a success rate of ~86.81 %
Further improvements can be made to the model using more advanced machine learning concepts, but for now this is how a basic machine learning problem can be solved on a spark cluster in Microsoft Azure.
No comments:
Post a Comment