MapReduce入门

0x01 MapReduce 简介

基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大规模集群上,并以一种可靠且具有容错能力的方式并行处理上TB级别的海量数据集。

工作机制

image-20220618003243928

这个过程中一共含有四个实体

  • 实体一:客户端,用来提交MapReduce作业。
  • 实体二:JobTracker,用来协调作业的运行。
  • 实体三:TaskTracker,用来处理作业划分后的任务。
  • 实体四:HDFS,用来在其它实体间共享作业文件。

具体的运行流程不展开分析。

输入输出

输入输出都是<key,value>键值对,Map和Reduce处理过后都是处理之后写出不同的键值对。

编程规范

image-20220618082802588

  • Map阶段

    1. 设置InputFormat类,将数据切分为key-value(k1和v1)对,输入到第二步;
    2. 自定义map逻辑,将第一步的结果转换成另外的key-value(k2和v2)对,输出结果;
  • Shuffle阶段(这个阶段编程需要借助Compartor)

    3.对输出的key-value 对进行分区;

    4.对不同分区的数据按照相同的key排序;

    5.对数据进行分组,相同key的value放入 一个集合中;

    1. (可选)对分组过的数据初步规约,降低数据的网络拷贝。
  • Reduce阶段

    1. 对多个map 任务的结果进行排序以及合并,编写reduce函数实现自己的逻辑,对输入的key-value进行处理,转为新的key-value(k3和v3)输出
  1. 设置OutputFormat 处理并保持reduce输出的key-value数据

数据类型

  • BooleanWritable :布尔型
  • ByteWritable
  • DoubleWritable
  • FloatWritable

以下是常用的数据类型:

  • IntWritable
  • LongWritable
  • Text:使用UTF8格式存储我们的文本
  • NullWritable:当<key,value>中key或者value为空时使用

0x02 常用函数

Job job = Job.getInstance();
job.setJarByClass(WordCount.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(WordMapper.class);
job.setReducerClass(wordReducer.class);
FileInputFormat.addInputPath(job,new Path(inPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
job.waitForCompletion(true);
  • Job.getInstance();

    获取job对象的实例,可以对他进行配置

  • job.setJarByClass

    通过传入的class 找到job的jar包

  • job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);

    设置输出的keyvalue的类型

  • setMapperClasssetReducerClass

    设置map和reduce的处理对象

  • waitForCompletion

    开始启动,并等待完成。如果成功会返回true,不然返回false

  • addCacheFile

    设置缓存文件,这里的缓存的用法和平时理解的缓存意义不同!

0x03 3个案例

1. WordCount案例

功能:统计一系列文本文件中每个单词出现的次数

image-20220618083619518

处理过程:

  • Map阶段

    我们拿到的输入{0:"Hello Word"}{10:"Hello Hadoop" }.现在我们要开始处理。shuffle阶段的作用要牢记。他会把<key,1>和<key,2>转换成为

    <key,(1,2)>。所以我们假设以单词为key,1为value.那我们在reduce阶段的时候,只需要把value全部加起来就可以了。如图所示

    image-20220618084015679

  • Shuffle阶段

    image-20220618084120093

    这个阶段,我并没有实际利用起来,因为会考虑到一个问题在于自定义的类可能更方便来实现。后面会放出例子来进一步认证。

    都是根据key来分组和进行排序的

  • Reduce阶段

    我拿到的输入<hello,(1,1,1,1)>,这样我们加起来不就行了。

    image-20220618084401092

结果:

初始数据:

image-20220618094345825

代码数据:

image-20220618094112551

生成的数据

image-20220618094320260

代码参考

package com.dem0.MapReduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import com.dem0.hdfs.FileManager;
import java.util.StringTokenizer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class WordCount {
public static class WordMapper extends Mapper<Object, Text, Text, IntWritable>{
//第一个Object表示输入key的类型;第二个Text表示输入value的类型;第三个Text表示表示输出键的类型;第四个IntWritable表示输出值的类型
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException //抛出异常
{
// 将每个单词进行统计 出现的单词都记录为1
StringTokenizer tokenizer = new StringTokenizer(value.toString()," ");
System.out.println("split:<" + key + ","+ value + ">" );
while(tokenizer.hasMoreTokens()){
context.write(new Text(tokenizer.nextToken()),new IntWritable(1));
}
}
}

public static class wordReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
//参数同Map一样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
System.out.println("reduce:<" + key + "," + sum + ">");
//for循环遍历,将得到的values值累加
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
String inPath= "hdfs://"+FileManager.nameNode+"/data/wordcount/datain";
String outPath = "hdfs://"+FileManager.nameNode+"/data/wordcount/dataout";
Configuration cf = new Configuration();
FileSystem fs =FileSystem.get(FileManager.hdfsHost,cf);
if(fs.exists(new Path(outPath))){
fs.delete(new Path(outPath), true);
}
job.setJarByClass(WordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordMapper.class);
job.setReducerClass(wordReducer.class);
FileInputFormat.addInputPath(job,new Path(inPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
job.waitForCompletion(true);
}
}

2. 二度好友推荐

功能为每个用户推荐系数top10的二度好友

处理过程

​ 输入: source_user_id,target_user_id

这里如果如果我们不使用自定义的数据类型,很容易知道,一步不能完成所有的数据处理。这里的处理过程,我们就以job为单位来介绍,在代码部分,大家再参考学习具体的实现部分。

  • first job

    这一步我们要完成:这个用户到底有多少的直接好友?也就是现在不要说我的二度好友是谁了,我连我自己有多少好友都不知道。所以这个很容易完成。直接写入<source_user_id,target_user_id>,就可以了,实现过程就像上面一样简单

  • second job

    这一步我们要完成:哪些用户之间是二度好友?我们上面已经拿到了一个用户的所有的好友了,所以,哪些用户之间可能成为二度好友呢?,我的所有的好朋友之间是都有可能成为二度好友的。但是,这里有一个问题,我的好友之间有可能已经是好友了怎么呢?

    这里的处理是,将a和b(注意a和b与b和a是一个关系)的关系作为key。value为1(可能是二度好友),如果我们已经是好友,我们就打上标签0.那么我们在reduce阶段,汇总的时候,如果有0的value时,就可以很正常踢出去。而其他的value相加,刚好是推荐系数。

  • third job

    这一步,我们拿到的数据是a,b value。我们要完成的任务获取一个用户前十的推介系数的用户。那么这里的key肯定就是用户了,

    注意上面我们进行了折叠,所以这一步要写入key=a和key=b.所以我们在map阶段写入a,<b,value>.这样在reduce就能拿到该用户的所有二度好友,再进行排序即可。

  • 题外话

    这里我们在实现的时候,reduce阶段和map阶段的计算偏重太大,其中的shuffle阶段几乎没有用。但是在这里,我们很容易可以看出来,假如我们有一个类,记录a和b和他们的推介系数

    那么我们在第二步的时候,<bean,n>,设定分组的判断标准就是a相同,b相同。排序设置为按推介系数。那么我们在第三步拿到的value就只有推介系数,但是key是已经排好序的数据。那么我们在第三步的第三步就不用再排序了,可以直接输出。

结果

数据量大,都只做部分截取。输入数据

image-20220618094513150

输出结果

image-20220618094604213

image-20220618095625172

代码

package com.dem0.MapReduce;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.text.TextRandomProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import com.dem0.hdfs.FileManager;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class FriendRecom {
public static class friendMapper1 extends Mapper<Object, Text, Text, Text> {
// 第一个Object表示输入key的类型;第二个Text表示输入value的类型;第三个Text表示表示输出键的类型;第四个IntWritable表示输出值的类型
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(value.toString());
String[] values = value.toString().split("\t");
// context.write(new Text("0"), new Text("1"));
System.out.println("Test Map <key:" + values[0] + "," + values[1] + ">");
context.write(new Text(values[0]), new Text(values[1]));
}
}

public static class friendReducer1 extends Reducer<Text, Text, Text, Text> {

// 参数同Map一样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 现在已经拿到一个用户的所有好友了~~~~
String friends = "";
for (Text value : values) {
friends = friends + value.toString() + "\t";
}
context.write(key, new Text(friends.trim()));
}
}

public static class friendMapper2 extends Mapper<Object, Text, Text, IntWritable>{
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 0 (a\t b,c )
String[] values = value.toString().split("\t");
for (int i = 1; i < values.length; i++) {
System.out.println(concat(values[0], values[i]));
String tmp = concat(values[0], values[i]);
context.write(new Text(tmp), new IntWritable(0));
for (int j = i + 1; j < values.length; j++) {
context.write(new Text(concat(values[i], values[j])), new IntWritable(1));
}
}
}

public String concat(String a, String b) {
int tmp = a.compareTo(b);
int tmpa = Integer.parseInt(a);
int tmpb = Integer.parseInt(b);
return tmp < 0 ? (a + ":" + b) : (b + ":" + a);
}
}

public static class friendReducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {
private final IntWritable rval = new IntWritable();

// 参数同Map一样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型
// a,b 3
// a.c 3
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int num = 0;
for (IntWritable value : values) {
if (value.get() == 0) {
return;
}
num += 1;
}
rval.set(num);
context.write(key, rval);
}
}
public static class friendMapper3 extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//
String[] values = value.toString().split("\t");
String[] keys = values[0].split(":");

context.write(new Text(keys[0]), new Text(keys[1]+":" + values[1]));
context.write(new Text(keys[1]), new Text(keys[0]+":" + values[1]));
}
}

public static class friendReducer3 extends Reducer<Text, Text, Text, Text> {
// 参数同Map一样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型
@Override
// a b:3
protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
Map<String, String> map = new HashMap<>();
for(Text value : values){
String[] tmp = value.toString().split(":");
map.put(tmp[0], tmp[1]);
}
map = map.entrySet().stream().sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (s, s2) -> s,HashMap::new));
String res = "";
int cnt = 0;
for (String tmp : map.keySet()) {
if(cnt >= 10){
break;
}
res = res + tmp + " ";
cnt++;
}
context.write(key, new Text(res.trim()));
System.out.println("<res key==>" + key.toString() + ":" + res.trim() + ">" );
}
}



public static void main(String[] args) throws Exception {
Configuration cf = new Configuration();
FileSystem fs = FileSystem.get(FileManager.hdfsHost, cf);
String inPath = "hdfs://" + FileManager.nameNode + "/data/friends/datain";
String outPath = "hdfs://" + FileManager.nameNode + "/data/friends/dataout";
Job job = Job.getInstance();
if (fs.exists(new Path(outPath))) {
fs.delete(new Path(outPath), true);
}
job.setJarByClass(FriendRecom.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(friendMapper1.class);
job.setReducerClass(friendReducer1.class);
FileInputFormat.addInputPath(job, new Path(inPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
if(job.waitForCompletion(true)){
Job job1 = Job.getInstance();
job1.setJarByClass(FriendRecom.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
job1.setMapperClass(friendMapper2.class);
job1.setReducerClass(friendReducer2.class);
FileInputFormat.addInputPath(job1, new Path(outPath + "/part-r-00000"));
if (fs.exists(new Path(outPath + "b"))) {
fs.delete(new Path(outPath + "b"), true);
}
FileOutputFormat.setOutputPath(job1, new Path(outPath + "b"));
if(job1.waitForCompletion(true)){
Job job2 = Job.getInstance();
job2.setJarByClass(FriendRecom.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setMapperClass(friendMapper3.class);
job2.setReducerClass(friendReducer3.class);
FileInputFormat.addInputPath(job2, new Path(outPath + "b" + "/part-r-00000"));
if (fs.exists(new Path(outPath + "c"))) {
fs.delete(new Path(outPath + "c"), true);
}
FileOutputFormat.setOutputPath(job2, new Path(outPath + "c"));
job2.waitForCompletion(true);
}
}

}
}

3. SQL任务

功能:找出每类电影的观看人群的年龄段分布情况,并进行排序

处理过程

  • Map阶段

    数据格式:
    Rating.txt userid , movieid , rating, genre
    User.txt userid , gender , age , occupation

    首先,我们要明确,这一步是有两个输入文件的!所以我们需要在这一步将两个文件进行合并的同时,获取出我们需要的数据。

    可以发现两个文件之间通过userid可以进行连接,那么我们在处理的时候,可以在map对象里面维护一个map的私有变量,以userid为key,gender , age , occupation为value,来存储user。在map的时候,再根据rating的userid取出来。最后<genre,age>,1写入

  • Reduce阶段

    我们拿到的输入数据<genre,age>,n,这一步就和二度好友的排序一样了,不再展开。

结果:

image-20220618095909085

代码

package com.dem0.MapReduce;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class SQLReduce {

public static class MapJoinMapper extends Mapper<Object, Text, Text, IntWritable> {
private Map<String, String> pdMap = new HashMap<String, String>();
private IntWritable cnt = new IntWritable(1);

@Override
// 任务开始前将pd数据缓存进来
protected void setup(Context context) throws IOException {
// 获取缓存路径
String nameNode = "192.168.239.28:9000";
URI hdfsHost = null;
try {
hdfsHost = new URI("hdfs://"+nameNode);
} catch (URISyntaxException e) {
e.printStackTrace();
}
URI[] cacheFiles = context.getCacheFiles();
// 获取文件对象,并开流
FileSystem fs = FileSystem.get(hdfsHost,context.getConfiguration());
FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));
// 通过包装流转换为reader,方便按行读取
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
// 逐行读取按行处理
String line = reader.readLine();
while (line != null && line != "") {
String[] split = line.split("\t");
pdMap.put(split[0], split[1] + "\t" + split[2] + "\t" + split[3]);
line = reader.readLine();
}
reader.close();
}

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(0);
// userid , movieid , rating, genre
String[] split = value.toString().split("\t");
// 获取user gender age , occupation
String pName = pdMap.get(split[0]);
// 获取user gender age , occupation
String[] tmp = pName.split("\t");
// 写出 (genre,age) 1
context.write(new Text(split[3] + "," + tmp[1]), cnt);
System.out.println("Insert (" + split[3] + "," + tmp[1] + ")" + "\t" + "1");
}
}

public static class MapJoinReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
// genre,age n
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
System.out.println("reduce:<" + key.toString() + "," + sum + ">");
context.write(key, new IntWritable(sum));
}
}

public static class MapJoinMapper1 extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//0, (Western,18 95)
String[] values = value.toString().split("\t");
String[] tmp = values[0].split(",");
context.write(new Text(tmp[0]), new Text(tmp[1]+":"+ values[1]));
}
}
public static class MapJoinReducer1 extends Reducer<Text, Text, Text, Text> {
// 参数同Map一样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型
@Override
// western 18:95
protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
Map<String, String> map = new HashMap<>();
for(Text value : values){
String[] tmp = value.toString().split(":");
map.put(tmp[0], tmp[1]);
}
map = map.entrySet().stream().sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (s, s2) -> s,HashMap::new));
String res = "";
for (String tmp : map.keySet()) {
res = res + tmp + " ";
}
context.write(key, new Text(res.trim()));
System.out.println("<res key==>" + key.toString() + ":" + res.trim() + ">" );

}
}

public static void main(String[] args) throws Exception{
String nameNode = "192.168.239.28:9000";
String inPath= "hdfs://"+nameNode+"/data/sql/datain/";
String outPath = "hdfs://"+nameNode+"/data/sql/dataout";
URI hdfsHost = new URI("hdfs://"+nameNode);
Job job = Job.getInstance();
Configuration cf = new Configuration();
FileSystem fs =FileSystem.get(hdfsHost,cf);
if(fs.exists(new Path(outPath))){
fs.delete(new Path(outPath), true);
}
job.setJarByClass(SQLReduce.class);
job.addCacheFile(new URI(inPath +"user.txt"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MapJoinMapper.class);
job.setReducerClass(MapJoinReducer.class);
FileInputFormat.addInputPath(job,new Path(inPath + "rating.txt"));
FileOutputFormat.setOutputPath(job, new Path(outPath));
if(job.waitForCompletion(true)){
Job job2 = Job.getInstance();
job2.setJarByClass(SQLReduce.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setMapperClass(MapJoinMapper1.class);
job2.setReducerClass(MapJoinReducer1.class);
FileInputFormat.addInputPath(job2, new Path(outPath + "/part-r-00000"));
if (fs.exists(new Path(outPath + "c"))) {
fs.delete(new Path(outPath + "c"), true);
}
FileOutputFormat.setOutputPath(job2, new Path(outPath + "c"));
job2.waitForCompletion(true);
}
}
}

0x04 心得体会

MapReduce大大降低了并行编程的门槛(点名批评MPI),通过这三个案例,可以大概入门MapReduce的开发,但是更多的数据开发,需要大家再进行深一步的开发和探索了。注意输入和输出的类型一定要匹配