Showing posts with label Apache Pig. Show all posts
Showing posts with label Apache Pig. Show all posts

Tuesday, March 12, 2013

Polymorphism of Apache Pig's Eval Function

Polymorphism is not necessary for a programming language, but it will make our code more beautiful and clean. At least it will save us several lines of code.
In Apache Pig, a Eval Function is a class which extends EvalFunc, rather than a function, so we can't leverage java's polymorphism for function. But there are 2 back doors left/designed by EvalFunc designer:
1. The input of EvalFunc is a tuple which makes Input Polymorphism possible.
2. EvalFunc is generic which makes Output Polymorphism possible.

Input Polymorphism

Input Polymorphism is referring to the variance of input.
As the input of EvalFunc is a tuple, and the element of tuple is object which means you can embed any object to tuple and pass to EvalFunc.
For example:

public class Add extends EvalFunc<Double> {

@Override
public Double exec(Tuple input) throws IOException {
Object a = input.get(0);
Object b = input.get(1);
Double da, db;
if(a instanceof String){
da = Double.parseDouble(a);
}
else{
da = a;
}

if(b instanceof String){
db = Double.parseDouble(b);
}
else{
db = b;
}
return da+db;

}
}
In the previous example, the Add function tries to parse a string into double so that add between strings or between string and double is ok.

Output Polymorphism

Output Polymorphism is referring to the variance of output.
Usually you have to designate the output type of Eval Function. In the example above, Double is the return type. But if you want the return type to vary, you could just use Object as the return type.
For example:

public class AorB extends EvalFunc<Object> {

@Override
public Object exec(Tuple input) throws IOException {
Object a = input.get(0);
Object b = input.get(1);
if(a != null){
return a;
}
else{
return b;
}
}
}
In the example above, AorB returns a if a is not null or b otherwise.

Of course, the combination of input and output polymorphism make Eval Function more flexible and powerful.



Tuesday, February 26, 2013

Practice on Matrix Factorization Using Pig

  1. Background

Recently I'm thinking about optimize my recommender system. The online version is using Item-based Collaborative Filtering, which basically recommend other items when user is viewing an item.
The Original usage of Matrix Factorization by Netflix-prize-winning algorithm is to predict rating for not viewed items and choose the highest rating items to recommend. But it take a lot of effort to predict because you have to calculate for every single item for every single user which is O(m*n). This is not practical for large system having 10s of million users and millions of items.
My variation of using Matrix Factorization is to add it on top of Item-based Collaborative Filtering. Which means:
first, using cf to generate recommend candidates, usually 10s or 100
then, for each single user, re-rank candidates according to features learned by mf.
this usage requires features learned by mf can be retrieved in real-time.  the best to do is to minimize the size of features, which means the features learned have to be as sparse as possible. I'm thinking keep about 10 significant features for each user or item.

2. Practice

according to  Matrix Factorization Techniques for Recommender System, I choose Stochastic Gradient Descend algorithm to learn the features which is:
1. easy to implement
2. relative fast running time
3. easy to parallel for large data set

the lost function is give below
lost = sum(r_ui - q_i*p_u)^2) for (i, u) + lambda*( sum(||q_i||) for i + sum(||p_u||) for u )

Problem 1, which norm should be used as regularization ?  

According to Sparse methods for machine learning Theory and algorithms
using l1-norm rather than l2-norm used in the previous paper is better as it performs regularization as well as model selection.

define: e_iu = r_ui - q_i*p_u
take partial derivative for q_i and p_u for lost function then the update for q_i and p_u are as below:

q_i = q_i + LearningRate*(sum(e_ui*p_u) for u - lambda*sign(q_i) )
p_u = p_u + LearningRate*(sum(e_ui*q_i) for i - lambda*sign(p_u) )

Problem 2, how to initiate q_i and p_u?

Choosing a right initial point is very import for stochastic gradient descent. even for a convex problem, choose a good initial point will speed up the learning process. for non-convex problem like this, a good choice also decide the quality of local optimum.
Unfortunately, there is not general rule to choose a good initial point that make a local optimum close to global optimum, but there is still way to speed up the learning process that save up a lot time.
  • Is choosing all 0 a good choice? Take a look at the update formula above, if the initial is zero, the square error part would become zero, and the regularization part will become 1. after the update, all elements of q_i and p_u will be -LearningRate*lambda. if that is what   we want, we can just initiate q_i and p_u with -LearningRate*lambda and save all calculation.
  • Is initiate every elements with a small value like -LearningRate*lambda a good choice? I don't think so, if q_i and p_u is very small, then the update of them will also be very small for each round of training, then it will take several more step to came to a point close to optimum.
My intuitive is choose the initial so that q_i*p_u is close to mean(r_ui) so that the initial is close to optimum.
Assume mean(r_ui) is 4, and q_i and p_u both are vectors with 100 elements. E() is for Expectation in statistics
E(q_i*p_u) = 100*E(q_i_k) * E(p_u_k) as q_i_k and p_u_k are independent.
E(q_i*p_u) = 100*E(q_i_k) ^2 as assuming q_i_k and p_u_k have the same distribution.
then  E(q_i_k)  = 0.2 
The question left is how to assign every q_i_k so that E(q_i_k) is 0.2.
Constant value obviously is not a good choice, because in this case, every element in a vector will update with the same difference and keep all elements the same forever.
A simple alternative is randomly pick from [0, 0.4],  or use other distribution like Gaussian Distribution with mean = 0.2. I didn't try but blindly choose random assign.

Problem 3, how to choose Learning Rate ?

Theoretically as long as the Learning Rate is small enough, stochastic gradient descent will eventually get to local optimum. so a brute force way to do it is to start from 0.1 and exponentially shrink like 0.01, 0.001... until every step lost function result decreases.
In my experiment, I have to choose 10E-5 to make lost function result decrease. and with this small learning rate, the decrease of lost function result is very very slow, after run about 100 round, it only decrease about 10%.
After think about this for a few days. I realized that the algorithm only take account of observed data, which lead to huge unbalance. As the hotness of items coincides with Zipf's Law, there is huge unbalance between items or users.
For example, item A may be viewed by 1000 users, but item B may only be viewed by 1 user.
then the square error part of update function of q_i for A is supposed to be 1000 times larger then B. In this case, a uniform learning rate for both item A and B would be inefficient. That is also why only such little learning rate can decrease lost function result. It's because after sum errors from all users view a popular item, the update difference could be very large.
To tackle the problem, I use avg instead of sum so that the difference of update would be the same size as q_i or p_u. Then the update function becomes to:

q_i = q_i + LearningRate*(avg(e_ui*p_u) for u - lambda*sign(q_i) )
p_u = p_u + LearningRate*(avg(e_ui*q_i) for i - lambda*sign(p_u) )

In this case, I only need a 0.01 learning rate and the learning speed is good, after taken 60 round, more than half error is reduced.

Problem 4, how to make feature vector sparse?


As described in background section, I hope the feature vector of user and item to be as sparse as possible so that it will require less space to store those vectors. Further, in reality, a user usually has only several interests and an item is also belong to several categories, so spareness is necessary.
The idea is simple, to make the result spare, we just have to add a regularization element that evaluate spareness to lost function.
The first thing poped into my head is Entropy.
H=sum(-p_i*log(p_i)) 
where p is the probability of each element, which is for each element i in a vector
p_i = abs(x_i)/sum(x_i)
According to information theory, Entropy is a metrics for how many bits needed to describe the vector, the more sparse the vector is the less entropy is has. then after adding this into lost function, it becomes:
lost = sum(r_ui - q_i*p_u)^2) for (i, u) + lambda_1*( sum(||q_i||) for i + sum(||p_u||) for u ) + lambda_2*( sum(H(q_i)) for i + sum(H(p_j)) for j )
let's change update function next.
to simplify, we just assume sum(x_i) is a constant then it can be ignored because magnitude is not important in gradient descend.
then
d H/d p_i = -( log(p_i) + 1 )
d H/ d x_i = -( log(x_i) + 1 ) if x_i >=0
                = log(-x_i) + 1 if x_i <0
the derivative is colored in red and orange in the picture below:
Derivative of Entropy
Let's take a look of the derivative. It becomes extreme large when the absolute value of x_i is near zero. If we apply this into gradient descend, we will subtract(gradient descend use the negative of derivative) a big number when x_i is just a little bigger then 0, which make the next x_i a negative number with a very big absolute value. This will make x_i very unstable.
Let's think about how entropy makes the vector sparse when applying gradient descend. When x_i is bigger than 0.1 which makes H(x_i)=0, it add a little bit to x_i, and the bigger x_i is, the bigger the addition is. When x_i is smaller than 0.1, it subtract a little bit to x_i, and the smaller x_i is, the bigger the subtraction is. Then we can find a function whose derivative has similar attributes and the derivative of it won't be extreme large when the absolute value is small.
The simplest form is
H(p_i) = -x^2+x 
d H/d x_i = -2x+1 if x>=0
               = -2x-1 if x<0
whose derivative is simply a linear function as showed in blue and black in the above picture.
In this case, the update functions become:

q_i = q_i + LearningRate*(avg(e_ui*p_u) for u - lambda_1*sign(q_i) - lambda_2*(-2x+sign(q_i)) )
p_u = p_u + LearningRate*(avg(e_ui*q_i) for i - lambda*sign(p_u)  - lambda_2*(-2x+sign(p_j)) )

In my experiment, the entropy really make the output vector more sparse. In the picture below is the same vector trained with and without entropy regularization. Elements are displayed from small to big. As you can see the blue one is more steeper.



Thursday, November 22, 2012

Apache Pig, a better way to map reduce

中文版:Apache Pig, 便捷的map reduce之道
Have used Apache Pig for about a year before I start to write anything about it. 
First, you won't want to use pig or map reduce if your data is in the scale of MB or even GB, it will no more efficient than python. But for big data, it's beyond the ability of single machine, so you have to think about parallel process, which is map reduce used for, and pig is the tool that make MR more easier and efficient.  
Second, I have to say it's very handy and elegant that you can make simple statistical or data transformation with only several lines of code. eg. you want to count how many query view for each query through a search log, the code looks like below:
---pig example for qv---
log = load 'xxx' as (query);
grp = group log by query;
qv = for each grp generate group as query, COUNT(log) as qv;
------
If you write map reduce, you have to follow the routines and copy-paste tens of lines of code before you start to write your logic. eg. in java on hadoop is append at the end, because it's really too long.
Third, pig like any other script languages, is very convenient to test or run, just modify the script and run, but with java, you have to modify, compile, package src as jar and then execute.
Fourth, when using java or even streaming on hadoop, you have to run you code on real cluster to test, but with pig, you can use "-x local" to run locally with local file system, which is not only faster but also more easier to check out results than using hdfs. you can use this feature to process small data if you're willing to.
Fifth, grammar check with "-c" makes me more comfortable while coding pig, you don't have to be worried about yourself missing one letter will cause you test again or even miss to tell the bug because of lack of unit test like with python.
Sixth, pig has a very extendable API, with which you can implement your own UDF(user defined function) to deal with more complex data element process. eg. make string transform, calculate variance of a bag of data, or read/store data into/from a DIY format. further, java functions from jdk can be directly used without additional code.
---mr example for qv---
import xxxx;
...
import xxxx;
public class QueryCount extends Configured implements Tool{
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

//map logic
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
//reduce logic
}
}

@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
//some settings on job

boolean success = job.waitForCompletion(true);
return success?0:1;

}

public static void main(String[] args) throws Exception{
int ret = ToolRunner.run(new QueryCount(), args);
System.exit(ret);
}


}
------