在MR的时候经常会遇到多数据源join的问题,如果简单的分析任务采用hive处理就好,如果复杂一点需要自己写MR。
多数据源采用MultipleInputs类的addInputPath方法添加。
Job类
public class EfcOrderProRangeOdJob extends Configured implements Tool { //TODO 路径 private final static String INTPUT_A = "D:/order/order/"; private final static String INTPUT_B = "D:/order/address/"; private final static String OUTPUT = "D:/testAAAAA/"; // private final static String OUTPUT = "/warehouse/tmp/pt_eft_order_pro_range/"; private final static String OUTPUT_TABLE = "fct_pt_icr_trade_day"; public static void main(String[] args) { try { int res = ToolRunner.run(new Configuration(), new EfcOrderProRangeOdJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { try { String start = "20130217"; //TODO Configuration conf = ConfUtil.getConf(getConf()); conf.set("start", start); Job job1 = Job.getInstance(conf, "pt_eft_order_pro_range_first"); Path pathOrder = new Path(INTPUT_A); Path pathAddress = new Path(INTPUT_B); Path output = new Path(OUTPUT + start + "/"); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)){ fs.delete(output,true); } job1.setMapOutputKeyClass(TextPair.class); job1.setMapOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job1, output); MultipleInputs.addInputPath(job1, pathOrder, TextInputFormat.class, EfcOrderProRangeOrderMapper.class); MultipleInputs.addInputPath(job1, pathAddress, TextInputFormat.class, EfcOrderProRangeAddressMapper.class); job1.setReducerClass(EfcOrderProRangeReducer.class); job1.setJarByClass(EfcOrderProRangeOdJob.class); Job job2 = Job.getInstance(conf,"pt_eft_order_pro_range_second"); FileInputFormat.setInputPaths(job2, output); job2.setMapperClass(EfcOrderProRangeSecondMapper.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(IntWritable.class); TableMapReduceUtil.initTableReducerJob(OUTPUT_TABLE, EfcOrderProRangeSecondReducer.class, job2); return JobChainHandler.handleJobChain(job1, job2, "pt_eft_order_pro_range"); } catch (Exception e) { e.printStackTrace(); return 0; } } public static class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } public int compareTo(TextPair tp) { return first.compareTo(tp.first); } } }
mapper1类
public class EfcOrderProRangeOrderMapper extends Mapper<LongWritable, Text, TextPair, Text>{ private static final int ORDER_ID_INDEX = 2; private static final int ORDER_STATUS_INDEX = 5; private static final String EFFECTIVE_STATUS = "3"; private static final String COL_SPLITER = "\001"; @Override public void map(LongWritable key, Text value, Context context) { try { String [] order = value.toString().split(COL_SPLITER); String orderId = order[ORDER_ID_INDEX]; String status = order[ORDER_STATUS_INDEX]; if(!EFFECTIVE_STATUS.equals(status)){ return; } TextPair textPair = new TextPair(new Text(orderId),new Text("order")); context.write(textPair, new Text(status)); } catch (Exception e) { e.printStackTrace(); } } }
mapper2类
public class EfcOrderProRangeAddressMapper extends Mapper<LongWritable, Text, TextPair, Text>{ //TODO 通过hivemeta去取index private static final int ORDER_ID_INDEX = 0; private static final int PROVINCE_ID_INDEX = 1; private static final String COL_SPLITER = "\001"; @Override public void map(LongWritable key, Text value, Context context) { try { String [] address = value.toString().split(COL_SPLITER); String orderId = address[ORDER_ID_INDEX]; String province = address[PROVINCE_ID_INDEX]; TextPair textPair = new TextPair(new Text(orderId),new Text("address")); context.write(textPair, new Text(province)); } catch (Exception e) { e.printStackTrace(); } } }
reducer端做join操作,通过TextPair中的second来获取来源,取得需要取得的维度。
public class EfcOrderProRangeReducer extends Reducer<TextPair,Text,Text,Text>{ private static final String COL_SPLITER = "\001"; @Override protected void reduce(TextPair key, Iterable<Text> values, Context context) { try { Text tag = key.getSecond(); Text orderId = key.getFirst(); String status = null;String province = null; StringBuilder out = new StringBuilder(); for (Text value : values) { if(tag.toString().equals("order")){ status = value.toString(); } if(tag.toString().equals("address")){ province = value.toString(); } } if (province != null && status != null){ out.append(orderId.toString()).append(COL_SPLITER).append(status).append(COL_SPLITER).append(province); context.write(null, new Text(out.toString())); } } catch (Exception e) { e.printStackTrace(); } } }
相关推荐
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
【MapReduce篇06】MapReduce之MapJoin和ReduceJoin1
mapreduce多表关联join多个job相互依赖传递参数
MapReduce多数据源信息提取系统的开发与实现,汪伟,李昕,MapReduce是现有大数据平台中典型的分布式并行计算编程模型,在大数据处理中被广泛应用。然而,程序应用往往需要从不同结构、不同存��
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性...
Hadoop 用mapreduce实现Wordcount实例,绝对能用
用MapReduce实现TF-IDF,Hadoop版本是2.7.7,参考某教程亲自手写的,可以运行,有问题可以留言
使用Hadoop MapReduce实现两个矩阵相乘算法
用MapReduce实现KMeans算法,数据的读写都是在HDFS上进行的,在伪分布下运行没有问题。文档中有具体说明。
本项目为一个Hadoop课程设计,使用Java语言和map/reduce实现贝叶斯文本分类器。项目的具体内容如下:1:用MapReduce算法实现贝叶斯分类器的训练过程,并输出训练模型; 2:用输出的模型对测试集文档进行分类测试。...
主要为大家详细介绍了基于MapReduce实现决策树算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
mapreduce实现文档数据去重
云计算MapReduce实现KNN算法,使用环境:在vmware虚拟机上安装unbuntu14系统,系统中安装hadoop。文件中包含有MapReduce以及KNN的java代码、包含训练数据的excel表格以及详细的教程文档,文档中手把手教到如何使用...
http://my.oschina.net/BreathL/blog/75112,文章描述代码整合引用。
【MapReduce篇07】MapReduce之数据清洗ETL1
mapreduce基本数据读取,通俗易懂。 此项目情景为,遗传关系族谱。 (爷爷 、父母 、孩子) 经典案例
基于hadoop2.0,mapreduce实现朴素贝叶斯算法,源码,NaieBayes
Hadoop mapreduce 实现InvertedIndexer倒排索引,能用。