Thursday, December 8, 2016

Movie Recomendation System with Python



The objective in this post is to generate some movie recommendations for a user, given movies they have already watched, and the ratings they gave for those movies

 We will do this a few different ways. We'll also use Pandas, a data analysis library, for most of the
 data preparation and analysis.

 First let's start by downloading the dataset we'll be using. This is the  MovieLens dataset
 which is maintained by the Department of Computer Science at the University of Minnesota

 There are several datasets available of varying sizes. Let's download the 100K dataset.
This has 100K data points, each row is a rating given by 1 user for 1 movie at a particular date and time
Check out the readme that comes with the data to see all the files that are provided

 There are 2 files that we are interested in u.data - this has the userId, the movieId, the rating and the date that rating was given
u.item has a bunch of movie related details, like the title, genre, imdb url etc. We'll just use this file for the movie titles

 Pandas is a python library for data analysis in a way that's similar to dataframe manipulation in R. We can read the data from a csv, write to a csv, manipulate it into different shapes , subset the data based on conditions etc

dataFile='/Users/swethakolalapudi/Downloads/ml-100k/u.data'
data=pd.read_csv(dataFile,sep="\t",header=None,
                 names=['userId','itemId','rating','timestamp'])


 This line will read the data file, it will treat it as a tab delimited file, i.e; the columns (or values) are separated by \t
There is no header in the file, (this is specified to Pandas by header=None)
the names list will be used as the column names for the data the first column will be checked to see if it's a serial number, if yes it will be automatically used as a row index. Else a row index which starts from 0 will be assigned



 In[2]: data.head()

 data is a pandas DataFrame object. There are many complex ways of indexing this DataFrame and manipulating it, subsetting it etc..
 head() will print the first few rows in the DataFrame


 In[3]: movieInfoFile='/Users/swethakolalapudi/Downloads/ml-100k/u.item'
movieInfo=pd.read_csv(movieInfoFile,sep="|", header=None, index_col=False,
                     names=['itemId','title'], usecols=[0,1])

Here we are reading the movie data. We just care about the itemId (movieId) and  the title, so we are only reading the first two columns - this is specified in usecols. We are explicitly passing the column names in names. Note that index_col is set to false. This will explicitly make sure that none of the columns in the file are used to create a row index

 In[4]: movieInfo.head()
data=pd.merge(data,movieInfo,left_on='itemId',right_on="itemId")

the result will be that a column 'title' will be added to our data object. This line is very much like and SQL join. We are specifying the columns from each table(dataframe) to join on 

 In[5]: data.head()
 Let's now see how we can index the data in the dataframe.
 All the values in a column can simply be indexed by the column name
userIds=data.userId  - a Pandas series object
userIds2=data[['userId']] - a Pandas DataFrame object

 
 In[6]:userIds.head()
 In[7]:userIds2.head()
 In[8]:type(userIds)
 In[9]:type(userIds2)
 In[10]: data.loc[0:10,['userId']]

loc is a function we'll use very heavily for indexing. You can give it column
 and row indices , or use boolean indexing.
Give loc a list of row indices and a list of column names

 In[11]:toyStoryUsers=data[data.title=="Toy Story (1995)"]

 This will give us a subset dataframe with only the users who have rated Toy Story
In[17]: toyStoryUsers.head()

 You can sort values in the dataframe using the sort_values function ,This function will take in the dataframe, the columns to sort on and whether to sort ascending or not
data=pd.DataFrame.sort_values(data,['userId','itemId'],ascending=[0,1])

Let's see how many users and how  many movies there are

numUsers=max(data.userId)
numMovies=max(data.itemId)

 WE can also see how many movies were rated by each user, and the number of users that rated each movie 

In[18]: moviesPerUser=data.userId.value_counts()
usersPerMovie=data.title.value_counts()
usersPerMovie
 numUsers


 Let's write a function to find the top N favorite movies of a user def favoriteMovies(activeUser,N):
    1. subset the dataframe to have the rows corresponding to the active user
     2. sort by the rating in descending order
     3. pick the top N rows
     
 In[19]: topMovies=pd.DataFrame.sort_values(
        data[data.userId==activeUser],['rating'],ascending=[0])[:N]
        return list(topMovies.title)
        print favoriteMovies(5,3)

 Let's get down to finding some recommendations now!

 We'll start by using a neigbour based collaborative filtering model .The idea is to find the K Nearest neighbours of a user and  use their ratings to predict ratings of the active user for movies  they haven't rated.

 First we'll represent each user as a vector - each element of the vector  will be their rating for 1 movie. Since there are 1600 odd movies in all  Each user will be represented by a vector that has 1600 odd values When the user doesn't have any rating for a movie - the corresponding
 element will be blank. NaN is a value in numpy that represents numbers that don't exist. This is a little tricky - any operation of any other number with NaN will give us NaN. So, we'll keep this mind as we manipulate the vectors

 In[20]: userItemRatingMatrix=pd.pivot_table(data, values='rating',
                                    index=['userId'], columns=['itemId'])

userItemRatingMatrix.head()

 Now each user has been represented using their ratings. Let's write a function to find the similarity between 2 users. We'll user a correlation to do so from scipy.spatial.distance import correlation

 In[24]: def similarity(user1,user2):
    user1=np.array(user1)-np.nanmean(user1)  
    user2=np.array(user2)-np.nanmean(user2)
    commonItemIds=[i for i in range(len(user1)) if user1[i]>0 and user2[i]>0]
    if len(commonItemIds)==0:
            return 0
    else:
        user1=np.array([user1[i] for i in commonItemIds])
        user2=np.array([user2[i] for i in commonItemIds])
        return correlation(user1,user2)
   
 Using this similarity function, let's find the nearest neighbours of the active user
def nearestNeighbourRatings(activeUser,K):
    similarityMatrix=pd.DataFrame(index=userItemRatingMatrix.index,
                                  columns=['Similarity'])
    for i in userItemRatingMatrix.index:
        similarityMatrix.loc[i]=similarity(userItemRatingMatrix.loc[activeUser],
                                          userItemRatingMatrix.loc[i])
    similarityMatrix=pd.DataFrame.sort_values(similarityMatrix,
                                              ['Similarity'],ascending=[0])
    nearestNeighbours=similarityMatrix[:K]

The above line will give us the K Nearest neighbours .We'll now take the nearest neighbours and use their ratings to predict the active user's rating for every movie
neighbourItemRatings=userItemRatingMatrix.loc[nearestNeighbours.index]
    predictItemRating=pd.DataFrame(index=userItemRatingMatrix.columns, columns=['Rating'])
     A placeholder for the predicted item ratings. It's row index is the
     list of itemIds which is the same as the column index of userItemRatingMatrix
    Let's fill this up now

    for i in userItemRatingMatrix.columns:
     predictedRating=np.nanmean(userItemRatingMatrix.loc[activeUser])
        for j in neighbourItemRatings.index:
            if userItemRatingMatrix.loc[j,i]>0:
                predictedRating += (userItemRatingMatrix.loc[j,i]
                                    -np.nanmean(userItemRatingMatrix.loc[j]))*nearestNeighbours.loc[j,'Similarity']
        predictItemRating.loc[i,'Rating']=predictedRating
    return predictItemRating

 Let's now use these predicted Ratings to find the top N Recommendations for the
 active user

def topNRecommendations(activeUser,N):
    predictItemRating=nearestNeighbourRatings(activeUser,10)
    moviesAlreadyWatched=list(userItemRatingMatrix.loc[activeUser]
                              .loc[userItemRatingMatrix.loc[activeUser]>0].index)
    predictItemRating=predictItemRating.drop(moviesAlreadyWatched)
    topRecommendations=pd.DataFrame.sort_values(predictItemRating,
                                                ['Rating'],ascending=[0])[:N]

     This will give us the list of itemIds which are the top recommendations
     Let's find the corresponding movie titles
    topRecommendationTitles=(movieInfo.loc[movieInfo.itemId.isin(topRecommendations.index)])
    return list(topRecommendationTitles.title)

 Let's take this for a spin
activeUser=5
print favoriteMovies(activeUser,5),"\n",topNRecommendations(activeUser,3)

The above code will print the predicted values of  favorite movies of the users

So thus using python we have created a movie recomendation system



Machine learning on spark with Microsoft Azure - Apache Spark cluster on HDInsight


In this post, you will learn how to create an Apache Spark cluster in HDInsight and then use Jupyter notebook to run Spark SQL interactive queries on the Spark cluster.
The process starts with 3 simple steps: - 
       i.          Create an Azure storage account

     ii.          Provision Apache Spark cluster in HDInsight

   iii.          Run Spark statements using Jupyter notebook

Once setup, The HDInsight Spark cluster dashboard looks something like this:


In this post, you use Jupyter notebook to run Spark SQL queries against the Spark cluster. HDInsight Spark clusters provide two kernels that you can use with the Jupyter notebook. These are:
1.     PySpark (for applications written in Python)
2.     Spark (for applications written in Scala)

Create Jupyter notebook with PySpark kernel

  1. From the Azure Portal, from the startboard, click the tile for your Spark cluster (if you pinned it to the startboard). You can also navigate to your cluster under Browse All > HDInsight Clusters.
  1. From the Spark cluster blade, click Cluster Dashboard, and then click Jupyter Notebook. If prompted, enter the admin credentials for the cluster.
  1. Create a new notebook. Click New, and then click PySpark.
  1. A new notebook is created and opened with the name Untitled.pynb. Click the notebook name at the top, and enter a friendly name.
  1. Because you created a notebook using the PySpark kernel, you do not need to create any contexts explicitly. The Spark and Hive contexts will be automatically created for you when you run the first code cell.

This Post demonstrates how to use MLLib, Spark's built-in machine learning libraries, to perform a simple predictive analysis on an open dataset. MLLib is a core Spark library that provides several utilities that are useful for machine learning tasks,
This article presents a simple approach to classification through logistic regression.

Notebook setup

When using PySpark kernel notebooks on HDInsight, there is no need to create a SparkContext or a HiveContext; those are all created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:
  • SparkContext (sc) 
  • HiveContext (sqlContext)

Construct an input dataframe

We can use sqlContext to perform transformations on structured data. The first task is to load the sample data (Food_Inspections1.csv) into a Spark SQL dataframe. The sample data is by default available on the cluster. Because the raw data is in a CSV format, we need to use the Spark context to pull every line of the file into memory as unstructured text; then, you use Python's CSV library to parse each line individually.
The schema of the input file can be checked with the following code – “inspections.take(1)
The following output will be observed:



The output of the above code gives us an idea of the schema of the input file; the file includes the name of every establishment, the type of establishment, the address, the data of the inspections, and the location, among other things. Let's select a few columns that will be useful for our predictive analysis and group the results as a dataframe.
We now have a dataframe, df on which we can perform our analysis. We've included 4 columns of interest in the dataframe: id, name, results, and violations using the following codes:

Understand the data

Let's start to get a sense of what our dataset contains


You can see that there are 5 distinct results that an inspection can have
  • Business not located
  • Fail
  • Pass
  • Pass w/ conditions, and
  • Out of Business
Let us develop a model that can guess the outcome of a food inspection, given the violations. Since logistic regression is a binary classification method, it makes sense to group our data into two categories: Fail and Pass. A "Pass w/ Conditions" is still a Pass, so when we train the model, we will consider the two results equivalent. Data with the other results ("Business Not Located", "Out of Business") are not useful so we will remove them from our training set. This should be okay since these two categories make up a very small percentage of the results anyway.
Let us go ahead and convert our existing dataframe(df) into a new dataframe where each inspection is represented as a label-violations pair. In our case, a label of 0.0 represents a failure, a label of 1.0 represents a success, and a label of -1.0 represents some results besides those two. We will filter those other results out when computing the new data frame.
The codes are given below:

def labelForResults(s):
    if s == 'Fail':
        return 0.0
    elif s == 'Pass w/ Conditions' or s == 'Pass':
        return 1.0
    else:
        return -1.0
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')

Create a logistic regression model from the input dataframe

Our final task is to convert the labeled data into a format that can be analyzed by logistic regression. The input to a logistic regression algorithm should be a set of label-feature vector pairs, where the "feature vector" is a vector of numbers that represents the input point in some way. So, we need a way to convert the "violations" column, which is semi-structured and contains a lot of comments in free-text, to an array of real numbers that a machine could easily understand.
One standard machine learning approach for processing natural language is to assign each distinct word an "index", and then pass a vector to the machine learning algorithm such that each index's value contains the relative frequency of that word in the text string.

Codes:

tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(labeledData)
model

Evaluate the model on a separate test dataset

We can use the model we created earlier to predict what the results of new inspections will be, based on the violations that were observed. We trained this model on the dataset Food_Inspections1.csv. Let us use a second dataset, Food_Inspections2.csv, to evaluate the strength of this model on new data. This second data set (Food_Inspections2.csv) should already be in the default storage container associated with the cluster.
The snippet below creates a new dataframe, predictionsDf that contains the prediction generated by the model.

testData=sc.textFile('wasb:///HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections2.csv')\
.map(csvParse) \
.map(lambda l: (int(l[0]), l[1], l[12], l[13]))
testDf = sqlContext.createDataFrame(testData, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")
predictionsDf = model.transform(testDf)
predictionsDf.registerTempTable('Predictions')
predictionsDf.columns


The model.transform() method will apply the same transformation to any new data with the same schema, and arrive at a prediction of how to classify the data. We can do some simple statistics to get a sense of how accurate our predictions were:
numSuccesses = predictionsDf.where("""(prediction = 0 AND results = 'Fail') OR
                                      (prediction = 1 AND (results = 'Pass' OR
                                                           results = 'Pass w/ Conditions'))""").count()
numInspections = predictionsDf.count()
print "There were", numInspections, "inspections and there were", numSuccesses, "successful predictions"
print "This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate"


As it can be noticed the model thus created predicted with a success rate of ~86.81 %
Further improvements can be made to the model using more advanced machine learning concepts, but for now this is how a basic machine learning problem can be solved on a spark cluster in Microsoft Azure.