Monday, March 31, 2014

Note to myself on S3Distcp


note on using --groupBy

say u have 4 files named 01-05-2013,02-05-2013,01-06-2013 and 02-06-2013. These are files named in DD-mm-yyyy format.
use parenthesis to separate out the parts which are common.

Piping to one file

 If you want to concatenate all files use --groupBy *(-0).(-2013).*
hadoop@ip-172-31-34-171:~$ hadoop fs -ls /test/3
Found 1 items
-rw-r--r-- 3 hadoop supergroup 44 2013-12-27 22:53 /test/3/-0-20130   --> only one file
hadoop@ip-172-31-34-171:~$ hadoop fs -cat /test/3/-0-20130
1
2
3
4
5
6
10
11
6
7
12
13
8
9
14
15
17
18

Piping to different files for each group

If you want to separate out the concatenation of files for each month then use --groupBy,.*(-0.-2013).*'
 ./elastic-mapreduce --jobflow j-14HZLRKA2CUZ7  --jar s3n://arun-emr-files/emr-s3distcp-1.0.jar --args '--src,s3n://arum-test-bucket/,--dest,hdfs:///test/2,--groupBy,.*(-0.-2013).*'
hadoop@ip-172-31-34-171:~$ hadoop fs -ls /test/2
Found 2 items
-rw-r--r-- 3 hadoop supergroup 20 2013-12-27 22:50 /test/2/-05-20130
-rw-r--r-- 3 hadoop supergroup 24 2013-12-27 22:50 /test/2/-06-20130
hadoop@ip-172-31-34-171:~$ hadoop fs -cat /test/1/-05-20130
1
2
3
4
5
6
6
7
8
9
hadoop@ip-172-31-34-171:~$ hadoop fs -cat /test/1/-06-20130
10
11
12
13
14
15
17
18

Notes from AWS support


The wildcards inside the parenthesis are usable and whatever they match out to will be the potential groups.

For example, say you have the filenames:
hosta-subprocess1-2013-06-01.log 
hosta-subprocess2-2013-06-02.log
hostb-subprocess1-2013-06-01.log
hostb-subprocess2-2013-06-02.log
hostc-subprocess1-2013-06-01.log

using group by .*(subprocess).* would result in a concatenation of all files into a single file.

Using group by (.*subprocess).* would result in 3 files:
hosta-subprocess (which includes hosta-subprocess1-2013-06-01.log and hosta-subprocess2-2013-06-02.log)
hostb-subprocess (which includes hostb-subprocess1-2013-06-01.log and hostb-subprocess2-2013-06-02.log)
hostc-subprocess (only hostc-subprocess1-2013-06-01.log)

Using group by .*(\d+-\d+-\d+).* would result in 2 files:
2013-06-01 (which includes hosta-subprocess1-2013-06-01.log, hostb-subprocess1-2013-06-01.log, hostc-subprocess1-2013-06-01.log)
2013-06-02 (which includes hosta-subprocess2-2013-06-02.log and hostb-subprocess2-2013-06-02.log)

Sunday, March 30, 2014

Parallelizing Graident Descent

Parallalizing any non trivial ML learning algorithm is hard. I am evaluating a product called Alpine Data Labs and got to know some interesting thoughts on MapReducing a Stochaic Graident Descent algorithm.

Here is the paper I am referring to. Thanks to Alpine Data Labs for sharing the information.
http://www.research.rutgers.edu/~lihong/pub/Zinkevich11Parallelized.pdf. Basically it takes the average of the optimized values after running SGD on each individual nodes.









On the variations of AdaBoost

Ada Boost Intuition

Ada Boost is an ensemble method (meaning it is an ensemble of classifiers, instead of  being a single classifier).  The intuition behind AdaBoost is to form a committee of expert leaners, and take the final output from the committee instead of from a single learner.  Each expert in the committee is a learning algorithm. Each learner is trained specifically to learn about instances which other learners could not classify correctly (not exactly, but each leaner give high importance to instances which others could not classify, and "focusses" more on those instances" .  So in a sense, the committee is formed by expert learners who each specializes in classifying specific sets of instances . Each of the learner in the committee compliments the others, because each of them specifically focussed to learn from instances, others could not classify correctly.

Algorithm


 ADABOOST(S,Learn,K)
   S:TrainingSet{(X1,Y1),(X2,Y2),....,(Xm,Ym)},Yi in Y
   Learn:Learner(S,Weights)
   k: # rounds

For all i in S: w(1,i)=1/m

For r=1 to k, do
     For all i, w(r,i)=w(r,i)/  ΣW(r,i) for all i's
   
     H(r)=Learn(S,w(r))

     ε(r)=Σw(r,i)1[h(r,i)≠Y(i)]

     if  ε(r) > ½
        k=r-1
        exit
     β(r)=  ε(r)/(1- ε(r))
 
     For all i, w(r+1,i)=w(r,i) * β(r) to the power of( 1-1[h(r,i)≠Y(i)]
Output: h(x)=argmax(yΞY) Σ(r=1 to k) log(1/β(r)) 1[h(r,i)≠Y(i)]


Explanation

Inputs to the AdaBoost algorithm are a training  set Y, a learner function S and the number of iterations,k. The learner function should be capable of learning from a training set  with  weighted instances. How to learn from a weighted instance training set? We will come to that later. For now assume that the learner can learn from a set of weighted instance. The number k indicates the number of iterations the algorithms has to go through. This number is the number of experts in the committee. At each iteration the algorithm trains and picks up the best candidate algorithm and elects to the committee.

First we initialize weights to all the instances equally as 1/m, where m is the number of instances. Then the expert selection rounds begin (r=1 to k). In each round the first step is to calculate the normalized weights (P(r,i)  so that they all sum to one. Note that we do not update the weights as the normalized weights, but assign the normalized weights to a new variable.  After that we call the learn function which outputs a hypothesis function. Note that, the learn function outputs the best learner, which minimizes the error (cost function).  We select this learner as the selected learner for round r. Then we compute the aggregate weighted error of the  learner. If the aggregate error is > 1/2 we stop and exit (why? ). After that we update the weights of the instance for the next round, such that for those instances which the current learner classified correctly their weight decreases.   

The weight for each learner h(r) is  1/β(r).  1/β(r) increases with the error level associated with the particular learner h(r). That means each learner in the committee is weighted w.r.t how best they classify the instances.

Why stop when   ε(r) > ½?

  When ε(r) > ½, that means the weighted aggregated error is more than .5 or the probability of getting a correct answer from the classifier is less than 0.5 which is worse than a plain coin toss. Simply put if we toss a coin and chose the decision, that classifier will do better than our candidate classifier.  That means we can simply throw away our built in classifier and exit.

How to learn from weighted instances?

What should be the nature of the learner function Learn(S,Weights)? It is supposed to learn from a training set, where each instance has a weight associated with it.  Depending on the individual classifier you can incorporate the weight into it. For example, for naive-bayes classifier, you can incorporate the weights in building the conditional probability term. Another approach is to  create a new training set by sampling from the original weighted training set according to the weights. 

How to pick the number of iterations

It is done empirically, after plotting the train and test errors against different number of iterations. Pick the point where the training and test error is lowest. Basically as you keep increasing the number of iterations, the training error will keep  decreasing and test error will also keep decreasing. At some point the algorithm will start over fitting and test error start increasing. Pick the number of iterations immediately before this point, at which training error is at lowest but the over fitting is yet to happen.

What if we use strong classifiers instead of weak classifiers

AdaBoost is used to combine number of weak classifiers (error rate<0.5) to form a classifiers. Then naturally the question arises; what if we use  strong classifiers instead of weak classifiers. Will we get even a stronger classifier as the final ensemble? Answer is no. When you use strong classifiers and particularly when one of the classifier is far stronger than others, the strong classifier will dominate the other classifiers and because of this, those small numbers of outliers will not be picked up correctly. If all the classifiers are equally strong, then the final classifier will be of about the same strength as of the constituent classifiers.

Feature selection using AdaBoost

One interesting use of AdaBoost is in face detection in images. Viola and Jones proposed this interesting face detection algorithm using machine learning. They used AdaBoost as their ensemble method. Read about Viola-Jones paper below. 

 In this paper, authors argue that in the same way classifiers are learned during the AdaBoost features can also be learned.  Each classifier is based on a single feature. At each round of the boosting, the classifier which has the minimum weighted error rate is selected. This is different from the classical AdaBoost that, the classifiers are preformed rather than generated during the boosting process. Since each classifier is dependent on only one feature, this can be seen as a feature selection process also.


Friday, March 21, 2014

Is your map reduce job running locally?

I spent almost half the day figuring out why my mahout job is running locally, even though I have not set MAHOUT_LOCAL flag. Then I figured out that, in the  mapped-site.xml, if you do not set the property, yarn.app.mapreduce.am.staging-dir, your job is going to run locally.


<name>yarn.app.mapreduce.am.staging-dir</name>
     <value>yarn</value>
     <description>Name of the MapReduce framework. Default value is yarn.</description>


Also on another note, set , yarn.application.classpath, so that ur mapred related jars are in class path.



<property>
    <name>yarn.application.classpath</name>
    <value>/etc/hadoop/conf,/usr/lib/hadoop/*,/usr/lib/hadoop/lib/*,/usr/lib/hadoop-hdfs/*,/usr/lib/hadoop-hdfs/lib/*,/usr/lib/hadoop-yarn/*,/usr/lib/hadoop-yarn/lib/*,/usr/lib/hadoop-mapreduce/*,/usr/lib/hadoop-mapreduce/lib/*</value>
  </property>

Monday, March 17, 2014

Hive joins

Hive is a beast if you tune it for your queries, but it can be painfully slow if you do not optimize hive for your queries, especially join operations.


read about hive join optimization here.

https://www.facebook.com/notes/facebook-engineering/join-optimization-in-apache-hive/470667928919



A word about bucket map join

----------------------------------------

Bucket map join is helpful if you know the keys you are going to use in your queries. Basically Hive hashes on the key (specified for clustering while table creation) and then clusters them into n backets (again n is specified during table creation). You create two tables who are to be joined, specifying the same key and same number of buckets. While joining hive loads the bucket with the same number (any of 0 to n-1) and performs an in memory join.

There are two issues I find with bucket map joins.

1) The tables have to be hive managed. To do bucket map join for external table is tedious. If you are not willing to make the table hive managed (or INTERNAL) then, you will have to hash the key and bucket them by yourself and then partition the data into n buckets.  I have not tried this yourself. I guess the easier way to do this to copy the data to a hive managed tables and then do a insert overwrite select .... query into the hive managed table from your external table. Remember to set hive.enforce.bucketing=true. You can do this from hive prompt.

hive >  hive.enforce.bucketing=true

That means, you are duplicating the data into hive managed table. My data pipeline is, data flowing  into hdfs location, and then copying to a hive managed table for joining with other tabled.


2) If you have two keys which are candidate of joins to two separate tables, then you will have to have two separate hive managed tables with the same data, but different keys used for clustering. If you specify two keys at the same time for clustering, hive uses a hash of both keys together, so that the hash is a combination of both keys. If you are doing a join on both keys together well and good. But if you have to join on one key at one time and another key another time, then this wont work, as the hash was created using both the keys.

3) If you do a load data in path..., then hive simply copies the directory from external location to hive warehouse (/user/hive/warehouse) in hdfs. It does not do the bucketing and sorting, even if you enforce it. So the way to have your tables bucketized is to do an insert overwrite into the table as in

hive > insert overwrite table foo select * from bar;

That means create two hive tables each clustered by different keys.

If anyone knows a better kungfoo, then please post it as a comment.


Yarn memory requirements and tuning it for your hive joins.

  If you run into error messages like below

Container [pid=19699,containerID=container_1394473937156_0074_01_000122] is running beyond virtual memory limits. Current usage: 34.7 MB of 1 GB physical memory used; 11.1 GB of 2.1 GB virtual memory used. Killing container.

for yarn optimization go here
yarn optimization
It is an indication that, you have not tuned your yarn memory configurations.
Yarn has the notion of container which is associated with each hadoop node. Resources are allocated to each container. You have to specify  the memory allocation to each container and to each JVM inside the container.  Yarn typically allocates 2.1 times(default configuration)  of physical memory as virtual memory.  In the above error message it indicates that the JVM is allocated more memory  in the setting java.child.opts (around  9GB) than the container can handle. The jvm memory allocation should be less than what is allocated to yarn container. When you do a bucketmap join (or even a regular map join) hive reads the data into memory. You have to carefully configure your yarn and jvm memory considering the size of your data.

For more details on configuring yarn memory read the link mentioned above.

Just in - I figured out that yarn does not respect the configuration
yarn.scheduler.minimum-allocation-mb, to allocate a minimum memory for the container. Instead it uses this configuration as the base chunk of memmory allocation, whose multiple will be allocated to cover the memory specified in 

mapreduce.map.memory.mb and mapreduce.reduce.memory.mb. So if you run into physical memory limits for the yarn container then configure the above params accordingly.