<<无图片版本>>
目录
课程大纲(MAPREDUCE详解)… 1 流量统计相关需求… 2 社交粉丝数据分析… 3 倒排索引建立… 4
3.1 需求… 14 3.2 分析… 14 3.3 实现… 14
4.1 Map端join案例… 18 4.1.1 需求… 18 4.1.2 分析… 18 4.1.3 实现… 19
11.1 资源相关参数… 23 11.2 容错相关参数… 23 11.3 本地运行mapreduce 作业… 24 11.4 效率和稳定性相关参数… 24
课程大纲(MAPREDUCE详解)
MapReduce快速入门
如何理解map、reduce计算模型
Mapreudce程序运行演示
Mapreduce编程规范及示例编写
Mapreduce程序运行模式及debug方法
MapReduce高级特性
Mapreduce程序的核心机制
MapReduce的序列化框架
MapReduce的排序实现
MapReduce的分区机制及自定义
Mapreduce的数据压缩
Mapreduce与yarn的结合
Mapreduce编程案例
Mapreduce 参数优化
目标: 掌握mapreduce分布式运算框架的编程思想 掌握mapreduce常用算法的编程套路 掌握mapreduce分布式运算框架的运行机制,具备一定自定义开发的能力
流量统计相关需求
- 对流量日志中的用户统计总上、下行流量
技术点: 自定义javaBean用来在mapreduce中充当value 注意: javaBean要实现Writable接口,实现两个方法
//序列化,将对象的字段信息写入输出流 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(downflow); out.writeLong(sumflow); } //反序列化,从输入流中读取各个字段信息 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); downflow = in.readLong(); sumflow = in.readLong(); }
- 统计流量且按照流量大小倒序排序
技术点:这种需求,用一个mapreduce -job 不好实现,需要两个mapreduce -job 第一个job负责流量统计,跟上题相同 第二个job读入第一个job的输出,然后做排序 要将flowBean作为map的key输出,这样mapreduce就会自动排序 此时,flowBean要实现接口WritableComparable 要实现其中的compareTo()方法,方法中,我们可以定义倒序比较的逻辑
- 统计流量且按照手机号的归属地,将结果数据输出到不同的省份文件中
技术点:自定义Partitioner
_ @Override_ _ public int getPartition(Text key, FlowBean value, int numPartitions) {_ _ String prefix = key.toString().substring(0,3);_ _ Integer partNum = pmap.get(prefix);_ _ return (partNum==null?4:partNum);_ _ }_
自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(5);
注意:如果reduceTask的数量>= _getPartition的结果数_ _,则会多产生几个空的输出文件part-r-000xx_ _如果 1<reduceTask的数量<getPartition的结果数_ _,则有一部分分区数据无处安放,会Exception!!!_ _如果_ reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000
社交粉丝数据分析
以下是qq的好友列表数据,冒号前是一个用,冒号后是该用户的所有好友(数据中的好友关系是单向的) A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J 求出哪些人两两之间有共同好友,及他俩的共同好友都有谁? 解题思路:
第一步 map 读一行 A:B,C,D,F,E,O 输出
扩展:求互粉的人!!!!
倒排索引建立
需求:有大量的文本(文档、网页),需要建立搜索索引
1. 自定义inputFormat
1.1 需求
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案
1.2 分析
小文件的优化无非以下几种方式:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
- 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
- 在mapreduce处理时,可采用combineInputFormat提高效率
1.3 实现
本节实现的是上述第二种方式 程序的核心机制: 自定义一个InputFormat 改写RecordReader,实现一次读取一个完整文件封装为KV 在输出时使用SequenceFileOutPutFormat输出合并文件 代码如下: 自定义InputFromat
public class WholeFileInputFormat extends FileInputFormat
自定义RecordReader
class WholeFileRecordReader extends RecordReader
定义mapreduce处理流程
public class SmallFilesToSequenceFileConverter extends Configured implements Tool { static class SequenceFileMapper extends Mapper
2. 自定义outputFormat
2.1 需求
现有一些原始日志需要做增强解析处理,流程:
- 从原始日志文件中读取数据
- 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志
- 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录
2.2 分析
程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现
2.3 实现
实现要点:
- 在mapreduce中访问外部资源
自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
代码实现如下: 数据库获取数据的工具
public class DBLoader { public static void dbLoader(HashMap
自定义一个outputformat
public class LogEnhancerOutputFormat extends FileOutputFormat
开发mapreduce处理流程
/** * 这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行原始日志后面) @author / public class LogEnhancer { static class LogEnhancerMapper extends Mapper
3. 自定义GroupingComparator
3.1 需求
有如下订单数据
订单id
商品id
成交金额
Order_0000001
Pdt_01
222.8
Order_0000001
Pdt_05
25.8
Order_0000002
Pdt_03
522.8
Order_0000002
Pdt_04
122.4
Order_0000003
Pdt_01
222.8
现在需要求出每一个订单中成交金额最大的一笔交易
3.2 分析
1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce 2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值
3.3 实现
自定义groupingcomparator
/** * 用于控制shuffle过程中reduce端对kv对的聚合逻辑 @author duanhaitao@itcast.cn */ public class ItemidGroupingComparator extends WritableComparator { protected ItemidGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; //将item_id相同的bean都视为相同,从而聚合为一组 return abean.getItemid().compareTo(bbean.getItemid()); } }
定义订单信息bean
/** * 订单信息bean,实现hadoop的序列化机制 @author duanhaitao@itcast.cn */ public class OrderBean implements WritableComparable
编写mapreduce处理流程
/** * 利用secondarysort机制输出每种item订单金额最大的记录 @author duanhaitao@itcast.cn */ public class SecondarySort { static class SecondarySortMapper extends Mapper
4. Mapreduce中的DistributedCache应用
4.1 Map端join案例
4.1.1 需求
实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志” join “产品信息”
4.1.2 分析
—原理阐述 适用于关联表中有小表的情形; 可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果 可以大大提高join操作的并发度,加快处理速度 —示例:先在mapper类中预先定义好小表,进行join —并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加载到小表的数据,进而在本地即可实现join
4.1.3 实现
public class TestDistributedCache { static class TestDistributedCacheMapper extends Mapper
5. Mapreduce的其他补充
5.1 计数器应用
在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce框架中提供的全局计数器来实现 示例代码如下:
public class MultiOutputs { //通过枚举形式定义自定义计数器 enum MyCounter{MALFORORMED,NORMAL} static class CommaMapper extends Mapper
5.2 多job串联
一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现 示例代码:
ControlledJob cJob1 = new ControlledJob(job1.getConfiguration()); ControlledJob cJob2 = new ControlledJob(job2.getConfiguration()); ControlledJob cJob3 = new ControlledJob(job3.getConfiguration()); // 设置作业依赖关系 cJob2.addDependingJob(cJob1); cJob3.addDependingJob(cJob2); JobControl jobControl = new JobControl("RecommendationJob"); jobControl.addJob(cJob1); jobControl.addJob(cJob2); jobControl.addJob(cJob3); cJob1.setJob(job1); cJob2.setJob(job2); cJob3.setJob(job3); // 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束 Thread jobControlThread = new Thread(jobControl); jobControlThread.start(); while (!jobControl.allFinished()) { Thread.sleep(500); } jobControl.stop(); return 0;
5.3 Configuration对象高级应用
6. mapreduce参数优化
MapReduce重要配置参数
11.1 资源相关参数
(1) mapreduce.map.memory.mb: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。 (2) mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。 (3) mapreduce.map.java.opts: Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g. “-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “” (4) mapreduce.reduce.java.opts: Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g. “-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默认值: “” (5) mapreduce.map.cpu.vcores: 每个Map task可使用的最多cpu core数目, 默认值: 1 (6) mapreduce.map.cpu.vcores: 每个Reduce task可使用的最多cpu core数目, 默认值: 1
11.2 容错相关参数
(1) mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 (2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 (3) mapreduce.map.failures.maxpercent: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。 (4) mapreduce.reduce.failures.maxpercent: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0. (5) mapreduce.task.timeout: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
11.3 本地运行mapreduce 作业
设置以下几个参数: mapreduce.framework.name=local mapreduce.jobtracker.address=local fs.defaultFS=local
11.4 效率和稳定性相关参数
(1) mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false (2) mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false (3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。 (4) mapreduce.input.fileinputformat.split.minsize: 每个Map Task处理的数据量(仅针对基于文件的Inputformat有效,比如TextInputFormat,SequenceFileInputFormat),默认为一个block大小,即 134217728。