MapReduce三个入门案例
MapReduce入门
0x01 MapReduce 简介
基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大规模集群上,并以一种可靠且具有容错能力的方式并行处理上TB级别的海量数据集。
工作机制
这个过程中一共含有四个实体
- 实体一:客户端,用来提交MapReduce作业。
- 实体二:JobTracker,用来协调作业的运行。
- 实体三:TaskTracker,用来处理作业划分后的任务。
- 实体四:HDFS,用来在其它实体间共享作业文件。
具体的运行流程不展开分析。
输入输出
输入输出都是<key,value>键值对,Map和Reduce处理过后都是处理之后写出不同的键值对。
编程规范
Map阶段
- 设置InputFormat类,将数据切分为key-value(k1和v1)对,输入到第二步;
- 自定义map逻辑,将第一步的结果转换成另外的key-value(k2和v2)对,输出结果;
Shuffle阶段(这个阶段编程需要借助Compartor)
3.对输出的key-value 对进行分区;
4.对不同分区的数据按照相同的key排序;
5.对数据进行分组,相同key的value放入 一个集合中;
- (可选)对分组过的数据初步规约,降低数据的网络拷贝。
Reduce阶段
- 对多个map 任务的结果进行排序以及合并,编写reduce函数实现自己的逻辑,对输入的key-value进行处理,转为新的key-value(k3和v3)输出
- 设置OutputFormat 处理并保持reduce输出的key-value数据
数据类型
- BooleanWritable :布尔型
- ByteWritable
- DoubleWritable
- FloatWritable
以下是常用的数据类型:
- IntWritable
- LongWritable
- Text:使用UTF8格式存储我们的文本
- NullWritable:当<key,value>中key或者value为空时使用
0x02 常用函数
Job job = Job.getInstance(); |
Job.getInstance();
获取job对象的实例,可以对他进行配置
job.setJarByClass
通过传入的class 找到job的jar包
job.setOutputKeyClass(Text.class)
;job.setOutputValueClass(IntWritable.class);
设置输出的
key
和value
的类型setMapperClass
和setReducerClass
设置map和reduce的处理对象
waitForCompletion
开始启动,并等待完成。如果成功会返回
true
,不然返回false
addCacheFile
设置缓存文件,这里的缓存的用法和平时理解的缓存意义不同!
0x03 3个案例
1. WordCount案例
功能:统计一系列文本文件中每个单词出现的次数
处理过程:
Map阶段
我们拿到的输入
{0:"Hello Word"}
和{10:"Hello Hadoop" }
.现在我们要开始处理。shuffle
阶段的作用要牢记。他会把<key,1>和<key,2>转换成为<key,(1,2)>。所以我们假设以单词为
key
,1为value
.那我们在reduce阶段的时候,只需要把value全部加起来就可以了。如图所示Shuffle阶段
这个阶段,我并没有实际利用起来,因为会考虑到一个问题在于自定义的类可能更方便来实现。后面会放出例子来进一步认证。
都是根据
key
来分组和进行排序的Reduce阶段
我拿到的输入
<hello,(1,1,1,1)>
,这样我们加起来不就行了。
结果:
初始数据:
代码数据:
生成的数据
代码参考:
package com.dem0.MapReduce; |
2. 二度好友推荐
功能:为每个用户推荐系数top10的二度好友
处理过程:
输入: source_user_id,target_user_id
这里如果如果我们不使用自定义的数据类型,很容易知道,一步不能完成所有的数据处理。这里的处理过程,我们就以job
为单位来介绍,在代码部分,大家再参考学习具体的实现部分。
first job
这一步我们要完成:这个用户到底有多少的直接好友?也就是现在不要说我的二度好友是谁了,我连我自己有多少好友都不知道。所以这个很容易完成。直接写入
<source_user_id,target_user_id>
,就可以了,实现过程就像上面一样简单second job
这一步我们要完成:
哪些用户之间是二度好友?
我们上面已经拿到了一个用户的所有的好友了,所以,哪些用户之间可能成为二度好友呢?
,我的所有的好朋友之间是都有可能成为二度好友的
。但是,这里有一个问题,我的好友之间有可能已经是好友了怎么呢?这里的处理是,将a和b(注意a和b与b和a是一个关系)的关系作为
key
。value为1(可能是二度好友)
,如果我们已经是好友,我们就打上标签0
.那么我们在reduce阶段,汇总的时候,如果有0
的value时,就可以很正常踢出去。而其他的value相加,刚好是推荐系数。third job
这一步,我们拿到的数据是
a,b value
。我们要完成的任务获取一个用户前十的推介系数的用户
。那么这里的key肯定就是用户了,注意上面我们进行了折叠,所以这一步要写入key=
a
和key=b
.所以我们在map阶段写入a,<b,value>
.这样在reduce就能拿到该用户的所有二度好友,再进行排序即可。题外话
这里我们在实现的时候,reduce阶段和map阶段的计算偏重太大,其中的shuffle阶段几乎没有用。但是在这里,我们很容易可以看出来,假如我们有一个类,记录
a和b和他们的推介系数
。那么我们在第二步的时候,<
bean
,n>,设定分组的判断标准就是a相同,b相同
。排序设置为按推介系数
。那么我们在第三步拿到的value就只有推介系数,但是key是已经排好序的数据。那么我们在第三步的第三步就不用再排序了,可以直接输出。
结果:
数据量大,都只做部分截取。输入数据
输出结果
代码
package com.dem0.MapReduce; |
3. SQL任务
功能:找出每类电影的观看人群的年龄段分布情况,并进行排序
处理过程:
Map阶段
数据格式:
Rating.txt userid , movieid , rating, genre
User.txt userid , gender , age , occupation首先,我们要明确,这一步是有两个输入文件的!所以我们需要在这一步将两个文件进行合并的同时,获取出我们需要的数据。
可以发现两个文件之间通过
userid
可以进行连接,那么我们在处理的时候,可以在map对象里面维护一个map的私有变量,以userid
为key,gender , age , occupation
为value,来存储user。在map的时候,再根据rating的userid
取出来。最后<genre,age>,1
写入Reduce阶段
我们拿到的输入数据
<genre,age>,n
,这一步就和二度好友的排序一样了,不再展开。
结果:
代码
package com.dem0.MapReduce; |
0x04 心得体会
MapReduce大大降低了并行编程的门槛(点名批评MPI),通过这三个案例,可以大概入门MapReduce的开发,但是更多的数据开发,需要大家再进行深一步的开发和探索了。注意输入和输出的类型一定要匹配。