自定义UDF,由此可见,pig还是很值得一用的,它也提供插入python代码
package com.zhangdan.ykt;import java.io.IOException;import java.text.DateFormat;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Date;import java.util.Iterator;import org.apache.pig.EvalFunc;import org.apache.pig.data.BagFactory;import org.apache.pig.data.DataBag;import org.apache.pig.data.Tuple;import org.apache.pig.data.TupleFactory;/** * * @author xunying 为每个有关系可能有关系的学生生成数据对 */public class GetConnection extends EvalFunc{ TupleFactory tupleFactory = TupleFactory.getInstance(); BagFactory bagFactory = BagFactory.getInstance(); static long dis = 3 * 60;// 时间设定,超过三分钟就不存在联系 int maxP = 20;// 设定20个相关联的人,将会有40个相关联的人 Person[] line = new Person[maxP];// 设定maxp个相关联的人 int indexS = 0;// 标记开始和结束队列 int indexE = 0; boolean notice = false;// 标记 public class Person { String account; String jntime; } public static boolean isFriendM(String jn1, String jn2) { // 获取两个时间参数 DateFormat format = new SimpleDateFormat("yyyyMMdd HH:mm:ss"); Date date1 = null; Date date2 = null; try { date1 = format.parse(jn1); date2 = format.parse(jn2); } catch (ParseException e) { e.printStackTrace(); } long diff = (int) ((date2.getTime() - date1.getTime()) / 1000); return diff < dis ? true : false; } @Override public DataBag exec(Tuple input) throws IOException { ArrayList tuples = new ArrayList (); DataBag values = (DataBag) input.get(0); String account = null; String jntime = null; if (input == null || input.size() == 0) { return null; } for (int i = 0; i < maxP; i++) line[i] = new Person(); for (Iterator it = values.iterator(); it.hasNext();) { Tuple t = it.next(); account = (String) t.get(0); jntime = (String) t.get(2) + (String) t.get(3) + (String) t.get(4) + " " + (String) t.get(5); if (indexE == indexS && notice) indexS = (indexS + 1) % maxP; line[indexE].account = account; line[indexE].jntime = jntime; try { for (; indexS != indexE;) { // 修改开始索引 if (isFriendM(line[indexS].jntime, line[indexE].jntime)) break; indexS = (indexS + 1) % maxP; } } catch (Exception e) { e.printStackTrace(); } String tmp1, tmp2; for (int i = indexS; i != indexE;) { Tuple tuple = tupleFactory.newTuple(); tmp1 = line[i].account; tmp2 = line[indexE].account; try { if (Integer.parseInt(line[i].account) > Integer.parseInt(line[indexE].account)) { tmp2 = line[i].account; tmp1 = line[indexE].account; } } catch (Exception e) { e.printStackTrace(); } if (tmp1 != null && tmp2 != null && tmp1.trim() != "" && tmp2.trim() != "") { tuple.append(tmp1); tuple.append(tmp2); tuple.append(line[i].jntime); tuple.append(line[indexE].jntime); tuples.add(tuple); } i = (i + 1) % maxP; } indexE = (indexE + 1) % maxP; notice = true; } DataBag bag = bagFactory.newDefaultBag(tuples); return bag; }}