Tuesday, August 12, 2014

on the statistics of rolling a dice part -2

In the part 1 I wrote about the Maximum Likelihood estimate and MAP estimate, along with the probability distributions affecting the process of rolling a dice. In this part I will explore   the idea of prior distributions and conjugate prior distributions.

Prior  and posterior  probability distributions

 Probability distribution tells us the probability of occurrence of a certain value in the domain of the function. A probability distribution of a coin toss will tell us the probability of occurrence of number of occurrence of head or tails in a series of coin tosses. In the case of a coin toss, we associate a probability  
p as the probability of success or probability of a head.  The probability distribution is based on this p value, and determined by the binomial distribution with p as the parameter.  In the case of rolling a dice we have a probability p(i) associated with each side (i) of the dice, and the probability distribution is determined by a multinomial distribution.   We are interested in learning the parameter associated with the distribution, once we observe the data.   Why should we care about learning the parameters? because once we learn the parameters then we have a hypothesis about the data (or the generative process governing  the generation of data; which is a coin toss or a dice rolling with a probability associated with each side). This hypothesis can be used to predict the outcome of future events.

When you  role the dice, generally, you assume that each side has the same probability of observing.
And when you rolled the dice 12 times you observed each side exactly 2 times. Based on the current observation you conclude that each side has the same probability of observation.
But, what if you  were told that in the past 100 throws 50 times side 6 came up. How does that fact should affect your estimation  about the probability distributions? Based on a pure maximum likelihood estimate, you concluded that each sides probability of observation is 1/6. Now once you factor in the prior observation, your estimation will change. The term prior probability distribution is the distribution of the parameters before observing the current data, and the term posterior probability distribution is the distribution of parameters after observing the data.  You can consider prior as the prior belief, observation of the data as the new evidence which can affect the prior belief and posterior is the new conclusion which is generated from the prior belief and the new evidence.

Posterior = k * likelihood * prior where k is a normalization constant such that probabilities add to 1.

Frequentist Vs Bayesian approach

A Frequentist approach to determine the parameters of the distribution, does not take into account of a prior probability. Frequentist will rely on a maximum likelihood estimate disregarding the prior probability. Well that is not exactly like that; often there is no exact information available on a prior distribution, and in such cases a frequentist does not bother about it. Where as a strictly Bayesian approach, a convenient prior probability distribution is assumed if an exact information is not available.
Note the term, convenient prior probability distribution. We will need it later.

What is that convenient prior probability distribution?How should it look like so that it is convenient? What is the affect of  our forcing a convenient prior as opposed to what would have been an actual prior (even though we have no idea about it because we do not have information about the past). 

Idea of conjugate prior

     By definition, a conjugate prior of a likelihood function is such that the posterior distribution is also in the same family of distribution as the prior. The likelihood function when multiplied by the prior gives a function which looks like the prior, but with different parameter values.  This is   benefical to simplify the mathematical computations involved. Two well known likelihood-conjugate prior pairs, frequently used in text analysis, are Bernoulli likelihood - Beta prior and Multinomial likelihood-Dirichlet prior.

To show why it is convenient to use a conjugate prior as a prior distribution, let us take the case of a Bernoulli-Beta distribution pair.  Let us take the case of a coin toss. Let n1 be the number of times we observe head and n0 the number of times we observe tail.

 If p is the probability of head, the probability of getting n1 heads and n0 tails is given by . This is our likelihood estimation.

Let us assume that there is a prior distribution associated with p, which is a beta distribution given by
. What this means is, we randomly draw a probability value associated with getting a head, from a beta distribution, instead fixing the value as p.

The posterior probability after observing the set X of coin tosses is



The above equation basically tells that the probability of a particular p is proportional to  the probability of drawing that p from the beta distribution and proportional to the observed data X being generated from that p.

replacing the probabilities with its actual value

Simplifying the equations,


This is a beta distribution with parameters . Notice that we have the posterior probability also in the same family of distribution as the prior(beta distribution).  The parameters   of the prior  is augmented by the number times we observe heads and tails (n1,n0).   is also known as pseudo counts.

In the case of rolling a dice we have the multinomial distribution as the likelihood distribution and Dirichlet distribution as the conjugate prior.

What if the prior is not actually the conjugate prior as we assumed?

Well, statistics is all about assumptions. We assumed the prior as the conjugate prior to simplify our math computations. If in realty the prior distribution, is not the conjugate prior as we assumed it to be, then our posterior estimation will be off in proportion to  how much the prior in actual varied from our assumption. 

Friday, July 11, 2014

Scripting a recommendation engine pipeline using mahout

Mahout comes up with large number of  machine learning algorithms implemented to be run on top of Hadoop. For generating item similarity based recommendations, Mahout has built in algorithms to generate item similarity using multiple similarity measures, like COSINE, PEARSON correlation, etc.

There are two pieces to the item similarity based recommendation.

  • Compute item similarity metrics
  • Generate recommendations based on the latest user-item interaction data. 
Generating item similarity metrics is a complex and lengthy computation.   Imagine computing the pairwise similarity for 100k or so products.  But, once the similarities are computed then generating recommendations based on the user-item interaction data is relatively less demanding. Luckily, item to item similarity does not change fast. You need to recompute only once or twice a month (depending on how often you introduce new products).

Mahout has a command to generate recommendations, which is 'recommenditembased'. The mahout built in code first generate item similarity metrics and then proceeds to generate recommendations.

The mahout recommenditembased command will look like below.


mahout recommenditembased --startPhase 0 --endPhase 10 -i /input -o /output  -s SIMILARITY_COSINE -mp 15 -m 300 --numRecommendations 1000  --tempDir  /temDir

You can provide the startPhase and endPhase to control to what extend mahout should proceed in its processing.

See the link below for more on mahout recommendation phases.

http://www.slideshare.net/vangjee/a-quick-tutorial-on-mahouts-recommendation-engine-v-04


Generating similarities - (frequency - run every week or so)

The first step is to compute the preference metrics to gather all the user-item interactions. This is the 0th phase.
step1 - delete tempDir if already existing

hadoop fs -rm -r  <tempDir>
step 2 -  generate preference metrics
mahout recommenditembased --startPhase 0 --endPhase 0 -i <inputDir> -o <outputDir>  -s <similarityClass>  --tempDir  <tempDir>.

For similarity metrics computation the endPhase is 1. Next step is to compute the similarities

step 3 - compute similarities
mahout recommenditembased --startPhase 1 --endPhase 1 -i <inputDir> -o <outputDir>  -s <similarityClass>  --tempDir  <tempDir>

For the job which computes similarities you will need step1 and step2.

Generating recommendations( frequency - run every 5-6 hours or daily)


Start with deleting the preference metrics folder in hadoop if it is already existing. Note that we need to compute the near realtime user-item interactions

step 1 - delete <tempDir>/preparePreferenceMatrix
hadoop fs -rm -r  <tempDir>/preparePreferenceMatrix

Note that you should not delete the entire tempDir as it contains the item similarity metrics and other things which are required for recommendation generation.

step 2 -  generate preference metrics
mahout recommenditembased --startPhase 0 --endPhase 0 -i <inputDir> -o <outputDir>  -s <similarityClass>  --tempDir  <tempDir>

step 3 - delete partial multiply directory if already existing.
hadoop fs -rm -r  <tempDir>/partialMultiply

step 4 - generate recommendations

mahout recommenditembased --startPhase 2 --endPhase 30 -i <inputDir> -o <outputDir> " -s <similarityClass> -mp 15 -m 300 --numRecommendations 1000 "+similarityOut+"/current --tempDir <tempDir>








Friday, May 9, 2014

Reading mahout sequence files from Hive

Working on recommendation systems, I wanted to play with the algorithms myself independent of Mahout. But I did not want to throw away a lot of useful processing done by Mahout.  Mahout can compute similarities between items using various similarities measures like cosine, tanimoto correlation, pearson coeffieicne, etc.
The matrix I am interested in it, is each item and a list of similar items ordered in descending order of similarity. Mahout has a sequence dumper program which can output the sequence files to console. I wanted to go beyond this  and query this file and potentially upload it into HBASE.  Once I can access similarity list of an item from HBASE I can easily generated the weighted sum of recommendation score for a given item plus play with various tweaks of the algorithm independently of Mahout. That was my intention.

So how do you read the Mahout Sequence file for  PairwiseItemSimilarity matrix?
 Mahout stores the PairwiseItemSimilarity metrics as a key value pair of item and a Vector of of other items. The key class is org.apache.hadoop.io.IntWritable and Value class is org.apache.mahout.math.VectorWritable. You will need to have a custom Hive Serde to read the sequence file and convert the IntWritable and VectorWritable to a Hive row. But, there is a catch here, While reading a sequence file, Hive only consider the value and ignores the key.  But this is not acceptable to our case, as we have the item id as the key. To make Hive read the key also from the sequence file, we need to write a custom input format class.




How Does Hive Reads from a file.

Hive has org.apache.hadoop.hive.ql.FetchOperator class which has a org.apache.hadoop.mapred,RecordReader as an attribute. RecordReader reads the record. The code for the FetchOperator.getNexRow() looks like below.

try {
318
      while (true) {
319
        if (currRecReader == null) {
320
          currRecReader = getRecordReader();
321
          if (currRecReader == null) {
322
            return null;
323
          }
324
        }
325

326
        boolean ret = currRecReader.next();
327
        if (ret) {
328
          if (this. == null) {
329
            Object obj = .deserialize();
330
            return new InspectableObject(obj.getObjectInspector());
331
          } else {
332
            [0] = .deserialize();
333
            return new InspectableObject();
334
          }
335
        } else {
336
          currRecReader.close();
337
          currRecReader = null;
338
        }
339
      }
340
    } catch (Exception e) {
341
      throw new IOException(e);
342
    }


Note line 329, in which the deserialization happens only for the value. Our hack is to somehow 
embed the key in the value and in the serde while deserializing read the key along with the value and 
then assign it to hive row. We can achieve by writing a custom InputFormat which has a custom record reader. In the call to next(key,value) we encode key into the value.

Here is how it looks like.



import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VectorWritable;


public abstract class MahoutVectorSequenceFileRecordReader<KEYIN, VALUEIN> implements org.apache.hadoop.mapred.RecordReader<KEYIN, VectorWritable> {

  private SequenceFile.Reader in;
  private long start;
  private long end;
  private boolean more = true;
  protected Configuration conf;
  protected KEYIN currentKey=null;
  protected VectorWritable currentValue = null;
  protected SequenceFile.Reader sequenceFileReader = null;
  public MahoutVectorSequenceFileRecordReader(Configuration conf, org.apache.hadoop.mapred.FileSplit split)
    throws IOException {
    Path path = split.getPath();
    FileSystem fs = path.getFileSystem(conf);
    this.in = new SequenceFile.Reader(fs, path, conf);
    this.end = split.getStart() + split.getLength();
    this.conf = conf;

    if (split.getStart() > in.getPosition())
      in.sync(split.getStart());                  // sync to start

    this.start = in.getPosition();
    more = start < end;
    sequenceFileReader = new SequenceFile.Reader(fs,path,conf);
  }

  public  void initialize(InputSplit split,
          TaskAttemptContext context
          ) throws IOException, InterruptedException {
  return;
  }



    /**
   * Get the current key
   * @return the current key or null if there is no current key
   * @throws IOException
   * @throws InterruptedException
   */
  public KEYIN getCurrentKey() throws IOException, InterruptedException {
  //System.out.println("returning currentKey "+currentKey.toString());
  return currentKey;
  }
  
  
  /**
   * Get the current value.
   * @return the object that was read
   * @throws IOException
   * @throws InterruptedException
   */
  public VectorWritable getCurrentValue() throws IOException, InterruptedException {
// System.out.println("returning currentValue "+currentValue.get());
  return currentValue;
  }
  


  public Class getKeyClass() { return in.getKeyClass(); }


  public Class getValueClass() { return in.getValueClass(); }

  @SuppressWarnings("unchecked")
  public KEYIN createKey() {
    return (KEYIN) ReflectionUtils.newInstance(getKeyClass(), conf);
  }


  public float getProgress() throws IOException {
    if (end == start) {
      return 0.0f;
    } else {
      return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
    }
  }

  public synchronized long getPos() throws IOException {
    return in.getPosition();
  }

  protected synchronized void seek(long pos) throws IOException {
    in.seek(pos);
  }
  public synchronized void close() throws IOException { in.close(); }


@Override
public boolean next(KEYIN key, VectorWritable value) throws IOException {
    if (!more) return false;
    
    long pos = in.getPosition();
    VALUEIN trueValue = (VALUEIN) ReflectionUtils.newInstance(in.getValueClass(), conf);
    KEYIN newKey = (KEYIN) ReflectionUtils.newInstance(in.getKeyClass(), conf);
    boolean remaining = in.next((Writable)newKey,(Writable)trueValue);
   
    if (remaining) {
    value.set(new RandomAccessSparseVector(751325960+1));
    
    // encode the key and value to one field.
    combineKeyValue((IntWritable)newKey, (VectorWritable)trueValue, value);
    ((IntWritable)key).set(((IntWritable)newKey).get());
    }
    if (pos >= end && in.syncSeen()) {
      more = false;
    } else {
      more = remaining;
    }
    return more;
}
protected abstract void combineKeyValue(IntWritable key, VectorWritable trueValue, VectorWritable newValue);

sub class the MahoutVectorSequenceFileRecordReader to implement the combineKeyValue method where we do the mojo to encode the key and value to one field.


import java.io.IOException;
import java.util.Iterator;

import  org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.mahout.math.VectorWritable;
public class MahoutVectorWritableReader extends MahoutVectorSequenceFileRecordReader<Text, VectorWritable>{

    public MahoutVectorWritableReader(Configuration conf, org.apache.hadoop.mapred.FileSplit split)
            throws IOException {
        super(conf, split);

    }

    @Override
    protected void combineKeyValue(IntWritable key, VectorWritable trueValue, VectorWritable newValue) {
            Vector newVector = newValue.get();
            Vector currentVector = trueValue.get();
            Iterator<Element> itr = currentVector.nonZeroes().iterator();
            double keyValue = (double)key.get();
           // System.out.println("Adding key = "+key.get());
            newVector.set(0,keyValue); // 0 to identify the key. If you already have a value at 0 find some other unique index where you keep your key, so that your Serde know where to look for the key.
            while(itr.hasNext()) {
            Element e = itr.next();
            // System.out.println("adding "+e.index()+" - "+e.get());
            newVector.set(e.index(),e.get());
            }
            newValue.set(newVector);

    }

    public VectorWritable createValue() {        
        return new VectorWritable();
    }
   
}

Now write a custom Serde to parse the VectorWritable (whose 0th index element is the key). My Serde class reads the pairwise item similarities. It sorts the input similarities so that while querying through Hive I can see them sorted in descending order of similarity.


In Hive create the table with the custom serde and custom input formats. Don't forget to add the jars which contains these classes into hive class path.

hive > add jar customSerde.jar

hive> create external table pairwiseItemSimilarity (item string,scores string) row format SERDE 'com.arun.mahout.MahoutVectorWritableSerde' stored as INPUTFORMAT 'com.arun.mahout.MahoutVectorSequenceFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileOutputFormat'  location '/temp/pairwiseSimilarity';

hive > select * from pairwiseItemSimilarity limit 10; 


That is all. You are good to go now. If you want to upload the similarities to HBase you can create a new external Hive table on top of the HBase table and then do a

hive> insert overwrite table pairwiseItemSimilarity_hbase select * from pairwiseItemSimilarity;