Friday, May 9, 2014

Reading mahout sequence files from Hive

Working on recommendation systems, I wanted to play with the algorithms myself independent of Mahout. But I did not want to throw away a lot of useful processing done by Mahout.  Mahout can compute similarities between items using various similarities measures like cosine, tanimoto correlation, pearson coeffieicne, etc.
The matrix I am interested in it, is each item and a list of similar items ordered in descending order of similarity. Mahout has a sequence dumper program which can output the sequence files to console. I wanted to go beyond this  and query this file and potentially upload it into HBASE.  Once I can access similarity list of an item from HBASE I can easily generated the weighted sum of recommendation score for a given item plus play with various tweaks of the algorithm independently of Mahout. That was my intention.

So how do you read the Mahout Sequence file for  PairwiseItemSimilarity matrix?
 Mahout stores the PairwiseItemSimilarity metrics as a key value pair of item and a Vector of of other items. The key class is org.apache.hadoop.io.IntWritable and Value class is org.apache.mahout.math.VectorWritable. You will need to have a custom Hive Serde to read the sequence file and convert the IntWritable and VectorWritable to a Hive row. But, there is a catch here, While reading a sequence file, Hive only consider the value and ignores the key.  But this is not acceptable to our case, as we have the item id as the key. To make Hive read the key also from the sequence file, we need to write a custom input format class.




How Does Hive Reads from a file.

Hive has org.apache.hadoop.hive.ql.FetchOperator class which has a org.apache.hadoop.mapred,RecordReader as an attribute. RecordReader reads the record. The code for the FetchOperator.getNexRow() looks like below.

try {
318
      while (true) {
319
        if (currRecReader == null) {
320
          currRecReader = getRecordReader();
321
          if (currRecReader == null) {
322
            return null;
323
          }
324
        }
325

326
        boolean ret = currRecReader.next();
327
        if (ret) {
328
          if (this. == null) {
329
            Object obj = .deserialize();
330
            return new InspectableObject(obj.getObjectInspector());
331
          } else {
332
            [0] = .deserialize();
333
            return new InspectableObject();
334
          }
335
        } else {
336
          currRecReader.close();
337
          currRecReader = null;
338
        }
339
      }
340
    } catch (Exception e) {
341
      throw new IOException(e);
342
    }


Note line 329, in which the deserialization happens only for the value. Our hack is to somehow 
embed the key in the value and in the serde while deserializing read the key along with the value and 
then assign it to hive row. We can achieve by writing a custom InputFormat which has a custom record reader. In the call to next(key,value) we encode key into the value.

Here is how it looks like.



import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VectorWritable;


public abstract class MahoutVectorSequenceFileRecordReader<KEYIN, VALUEIN> implements org.apache.hadoop.mapred.RecordReader<KEYIN, VectorWritable> {

  private SequenceFile.Reader in;
  private long start;
  private long end;
  private boolean more = true;
  protected Configuration conf;
  protected KEYIN currentKey=null;
  protected VectorWritable currentValue = null;
  protected SequenceFile.Reader sequenceFileReader = null;
  public MahoutVectorSequenceFileRecordReader(Configuration conf, org.apache.hadoop.mapred.FileSplit split)
    throws IOException {
    Path path = split.getPath();
    FileSystem fs = path.getFileSystem(conf);
    this.in = new SequenceFile.Reader(fs, path, conf);
    this.end = split.getStart() + split.getLength();
    this.conf = conf;

    if (split.getStart() > in.getPosition())
      in.sync(split.getStart());                  // sync to start

    this.start = in.getPosition();
    more = start < end;
    sequenceFileReader = new SequenceFile.Reader(fs,path,conf);
  }

  public  void initialize(InputSplit split,
          TaskAttemptContext context
          ) throws IOException, InterruptedException {
  return;
  }



    /**
   * Get the current key
   * @return the current key or null if there is no current key
   * @throws IOException
   * @throws InterruptedException
   */
  public KEYIN getCurrentKey() throws IOException, InterruptedException {
  //System.out.println("returning currentKey "+currentKey.toString());
  return currentKey;
  }
  
  
  /**
   * Get the current value.
   * @return the object that was read
   * @throws IOException
   * @throws InterruptedException
   */
  public VectorWritable getCurrentValue() throws IOException, InterruptedException {
// System.out.println("returning currentValue "+currentValue.get());
  return currentValue;
  }
  


  public Class getKeyClass() { return in.getKeyClass(); }


  public Class getValueClass() { return in.getValueClass(); }

  @SuppressWarnings("unchecked")
  public KEYIN createKey() {
    return (KEYIN) ReflectionUtils.newInstance(getKeyClass(), conf);
  }


  public float getProgress() throws IOException {
    if (end == start) {
      return 0.0f;
    } else {
      return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
    }
  }

  public synchronized long getPos() throws IOException {
    return in.getPosition();
  }

  protected synchronized void seek(long pos) throws IOException {
    in.seek(pos);
  }
  public synchronized void close() throws IOException { in.close(); }


@Override
public boolean next(KEYIN key, VectorWritable value) throws IOException {
    if (!more) return false;
    
    long pos = in.getPosition();
    VALUEIN trueValue = (VALUEIN) ReflectionUtils.newInstance(in.getValueClass(), conf);
    KEYIN newKey = (KEYIN) ReflectionUtils.newInstance(in.getKeyClass(), conf);
    boolean remaining = in.next((Writable)newKey,(Writable)trueValue);
   
    if (remaining) {
    value.set(new RandomAccessSparseVector(751325960+1));
    
    // encode the key and value to one field.
    combineKeyValue((IntWritable)newKey, (VectorWritable)trueValue, value);
    ((IntWritable)key).set(((IntWritable)newKey).get());
    }
    if (pos >= end && in.syncSeen()) {
      more = false;
    } else {
      more = remaining;
    }
    return more;
}
protected abstract void combineKeyValue(IntWritable key, VectorWritable trueValue, VectorWritable newValue);

sub class the MahoutVectorSequenceFileRecordReader to implement the combineKeyValue method where we do the mojo to encode the key and value to one field.


import java.io.IOException;
import java.util.Iterator;

import  org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.mahout.math.VectorWritable;
public class MahoutVectorWritableReader extends MahoutVectorSequenceFileRecordReader<Text, VectorWritable>{

    public MahoutVectorWritableReader(Configuration conf, org.apache.hadoop.mapred.FileSplit split)
            throws IOException {
        super(conf, split);

    }

    @Override
    protected void combineKeyValue(IntWritable key, VectorWritable trueValue, VectorWritable newValue) {
            Vector newVector = newValue.get();
            Vector currentVector = trueValue.get();
            Iterator<Element> itr = currentVector.nonZeroes().iterator();
            double keyValue = (double)key.get();
           // System.out.println("Adding key = "+key.get());
            newVector.set(0,keyValue); // 0 to identify the key. If you already have a value at 0 find some other unique index where you keep your key, so that your Serde know where to look for the key.
            while(itr.hasNext()) {
            Element e = itr.next();
            // System.out.println("adding "+e.index()+" - "+e.get());
            newVector.set(e.index(),e.get());
            }
            newValue.set(newVector);

    }

    public VectorWritable createValue() {        
        return new VectorWritable();
    }
   
}

Now write a custom Serde to parse the VectorWritable (whose 0th index element is the key). My Serde class reads the pairwise item similarities. It sorts the input similarities so that while querying through Hive I can see them sorted in descending order of similarity.


In Hive create the table with the custom serde and custom input formats. Don't forget to add the jars which contains these classes into hive class path.

hive > add jar customSerde.jar

hive> create external table pairwiseItemSimilarity (item string,scores string) row format SERDE 'com.arun.mahout.MahoutVectorWritableSerde' stored as INPUTFORMAT 'com.arun.mahout.MahoutVectorSequenceFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileOutputFormat'  location '/temp/pairwiseSimilarity';

hive > select * from pairwiseItemSimilarity limit 10; 


That is all. You are good to go now. If you want to upload the similarities to HBase you can create a new external Hive table on top of the HBase table and then do a

hive> insert overwrite table pairwiseItemSimilarity_hbase select * from pairwiseItemSimilarity;