MapReduce经典案例实战
MapReduce经典案例实战
实验实现过程
重要知识点:
- MapReduce是一种分布式并行编程模型,是Hadoop核心子项目之一,如果已经安装了Hadoop,就不需要另外安装MapReduce。
- 主要的理论知识点:
(1)倒排索引
倒排索引是文档检索系统中最常用的数据结构,被广泛应用于全文搜索引擎。倒排索引主要用来存储某个单词(或词组)在一组文档中的存储位置的映射,提供了可以根据内容来查找文档的方式,而不是根据文档来确定内容,因此称为倒排索引(Inverted Index)。带有倒排索引的文件我们称为倒排索引文件,简称倒排文件(Inverted File)。
(2)数据去重
数据去重主要是为了掌握利用并行化思想来对数据进行有意义的筛选,数据去重指去除重复数据的操作。在大数据开发中,统计大数据集上的多种数据指标,这些复杂的任务数据都会涉及数据去重。
(3)TopN排序
TopN分析法是指从研究对象中按照某一个指标进行倒序或正序排列,取其中所需的N个数据,并对这N个数据进行重点分析的方法。 - MapReduce的程序可以用Eclipse编译运行或使用命令行编译打包运行,本实验使用Eclipse编译运行MapReduce程序。
实验准备工作:
1、掌握Eclipse以及插件的安装与配置
2、熟悉在Eclipse中操作分布式系统HDFS 中的文件的方法
以上内容前面文章已做:
大数据实验环境准备与配置(1/4)
https://blog.csdn.net/weixin_43640161/article/details/108614907
大数据实验环境准备与配置(2/4)
https://blog.csdn.net/weixin_43640161/article/details/108619802
大数据实验环境准备与配置(3/4)
https://blog.csdn.net/weixin_43640161/article/details/108691921
大数据实验环境准备与配置(第四部分完结)
https://blog.csdn.net/weixin_43640161/article/details/108697510
Hadoop环境配置与测试
https://blog.csdn.net/weixin_43640161/article/details/108745864
分布式文件系统HDFS Shell命令和API编程
https://blog.csdn.net/weixin_43640161/article/details/108879567
MapReduce编程实践
https://blog.csdn.net/weixin_43640161/article/details/108947291
实验一:倒排索引案例实现–请根据理论课案例分析步骤实现具体的倒排索引
假设有file1.txt,file2.txt,file3.txt。它们的内容分别如下:
file1.txt文件内容:MapReduce is simple
file2.txt文件内容:MapReduce is powerful is simple
file3.txt文件内容:Hello MapReduce bye MapReduce
具体步骤如下:
一、在 Eclipse 中创建 MapReduce 项目
点击 File 菜单,选择 New -> Project…:选择 Map/Reduce Project,点击 Next。
填写 Project name 为MapReduceDemo即可,点击 Finish 就创建好了项目。
此时在左侧的 Project Explorer 就能看到刚才建立的项目了。接着右键点击刚创建的 MapReduce项目src,选择 New -> packet,在 Package 处填写 cn.com.sise.mapreduce.invertedindex;
二、Map阶段实现
在cn.com.sise.mapreduce.invertindex包下新建自定义类Mapper类InvertedIndexMapper,该类继承Mapper类,如下图所示。
该类的作用:将文本中的单词按照空格进行切割,并以冒号拼接,“单词:文档名称
”作为key,单词次数作为value,都以文本方式传输至Combine阶段。
参考代码如下:
package cn.com.sise.mapreduce.invertedindex;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text,Text> {
private static Text keyInfo = new Text();//存储单词和URL组合
private static final Text valueInfo = new Text("1");//存储词频,初始化为1
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{
String line = value.toString();
String[] fields = StringUtils.split(line, " ");//得到字段数组
FileSplit fileSplit = (FileSplit) context.getInputSplit();//得到这行数据所在的文件切片
String fileName = fileSplit.getPath().getName();//根据文件切片得到文件名
for (String field : fields){
//key值由单词和URL组成,如"MapReduce:file1"
keyInfo.set(field +":" + fileName);
context.write(keyInfo, valueInfo);
}
}
}
三、Combine阶段实现
根据Map阶段的输出结果形式,在cn.com.sise.mapreduce.invertindex包下,自定义实现Combine阶段的类InvertedIndexCombiner,该类继承Reducer类,对每个文档的单词进行词频统计,如下图所示。
该类作用:对Map阶段的单词次数聚合处理,并重新设置key值为单词,value值由文档名称和词频组成。
参考代码:
package cn.com.sise.mapreduce.invertedindex;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
private static Text info = new Text();
//输入: <MapReduce:file3 {1,1..>
//输出: <MapReduce file3:2>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{
int sum=0;//统计词频
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");//重新设置value值由URL和词频组成
info.set(key.toString().substring(splitIndex +1) +":" + sum);
//重新设置key值为单词
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
四、Reduce阶段实现
根据Combine阶段的输出结果形式,同样在cn.com.sise.mapreduce.invertindex包下,自定义实现Reducer类InvertedIndexReducer,该类继承Reducer,同上步,略。
该类作用:接收Combine阶段输出的数据,按照最终案例倒排索引文件需求的样式,将单词作为key,多个文档名称和词频连接作为value,输出到目标目录。
参考代码:
package cn.com.sise.mapreduce.invertedindex;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
private static Text result = new Text();
//输入: <MapReduce file3:2>
//输出: <MapReduce file1:1;file2:1;file3:2;>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//生成文档列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString() +";";
}
result.set(fileList);
context.write(key, result);
}
}
五、Runner程序主类实现
在同一个包下编写MapReduce程序运行主类InvertedIndexDriver。
该类作用:设置MapReduce工作任务的相关参数,本来采用集群运行模式。因此,需要设置远程HDFS系统源文件目录(hdfs://localhost:9000/user/hadoop/inputdata)以及结果输出目录(hdfs://localhost:9000/user/hadoop/outputdata),设置完毕,运行主程序即可。
参考代码:
package cn.com.sise.mapreduce.invertedindex;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input. FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.service.api.records.Configuration;
public class InvertedIndexDriver {
/**
* @param args
* @throws InterruptedException
* @throws IOException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws ClassNotFoundException,IOException,InterruptedException{
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(InvertedIndexDriver.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata"));
//指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata"));
//向yarn集群提交这个job
boolean res =job.waitForCompletion(true);
System.exit(res? 0: 1);
}
}
六、数据准备
(1)在linux本地目录下(/home/hadoop/workspace/MapReduceDemo)新建源文件file1.txt,file2.txt,file3.txt。
file1.txt文件内容:MapReduce is simple
file2.txt文件内容:MapReduce is powerful is simple
file3.txt文件内容:Hello MapReduce bye MapReduce
参考命令代码如下:
pwd
cd workspace/MapReduceDemo/
sudo vim file1.txt
sudo vim file2.txt
sudo vim file3.txt
ls
(2)启动hadoop(start-all.sh),在hdfs分布式文件系统目录(/user/hadoop/)中新建inputdata目录,然后将步骤(1)中的3个文件上传到该目录下,参考命令代码如下:
启动Hadoop:
操作完成后记得鼠标右键空白处刷新
把刚刚创建的三个txt文件上传到inputdata目录
七、运行并查看结果(2种方式)
(1)shell命令
终端命令:
hdfs dfs -ls /user/hadoop/outputdata
hdfs dfs -cat /user/hadoop/outputdata/part-r-00000
(2)Eclipse IDE
实验二:数据去重案例实现
假设有数据文件file4.txt和file5.txt,内容分别如下,编程实现2个文件内容去重:
file4.txt内容:
2020-9-1 a
2020-9-2 b
2020-9-3 c
2020-9-4 d
2020-9-5 a
2020-9-6 b
2020-9-7 c
2020-9-3 c
file5.txt内容:
2020-9-1 b
2020-9-2 a
2020-9-3 b
2020-9-4 d
2020-9-5 a
2020-9-6 c
2020-9-7 d
2020-9-3 c
1.在MapReduceDemo项目下新建包cn.com.sise.mapreduce.dedup,思路请参考实验一。
2.Map阶段实现
在包cn.com.sise.mapreduce.dedup下自定义类DedupMapper,该类继承Mapper。
该类作用:读取数据集文件将TextInputFormat默认组件解析的类似<0,2020-9-1 a>键值对修改为<2020-9-1 a,null>
参考代码:
package cn.com.sise.mapreduce.dedup;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable>
{ private static Text field = new Text();
//<0,2020-9-3 c><11,2020-9-4 d>@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
field = value;
context.write(field, NullWritable.get());
}
// <2020-9-3 c.null> <2020-9-4 d.null>
}
3.Reduce阶段实现
在相同包下自定义DedupReducer类,该类继承Reducer。
该类作用:仅接受Map阶段传递过来的数据,根据Shuffle工作原理,键值key相同的数据就会被合并,因此输出的数据就不会出现重复数据了。
参考代码:
package cn.com.sise.mapreduce.dedup;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
//<2020-9-3 c.null> <2020-9-4 d.null><2020-9-4 d.null>
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
- Runner主程序实现
编写MapReduce程序运行主类DedupRunner。
该类作用:设置MapReduce工作任务的相关参数,本案例采用集群运行模式。因此,需要设置远程HDFS系统源文件目录(hdfs://localhost:9000/user/hadoop/inputdata1)以及结果输出目录(hdfs://localhost:9000/user/hadoop/outputdata1),设置完毕,运行主程序即可。
参考代码:
package cn.com.sise.mapreduce.dedup;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.service.api.records.Configuration;
public class DedupRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
Job job =Job.getInstance();
job.setJarByClass(DedupRunner.class);
job.setMapperClass(DedupMapper.class);
job.setReducerClass(DedupReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata1"));
//指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata1"));
job.waitForCompletion(true);
}
}
- 数据准备(思路参考实验一)
在linux本地目录下(/home/hadoop/workspace/MapReduceDemo)新建源文件file4.txt,file5.txt(内容已给),然后在hdfs分布式文件系统目录(/user/hadoop)中新建inputdata1目录,将本地2个文件上传到该目录下,参考命令代码如下:
终端命令:
cd workspace/MapReduceDemo/
sudo vim file4.txt
sudo vim file5.txt
6.运行并查看结果(2种方式,请参考实验一)
鼠标右键刷新hadoop
(1)shell命令
终端命令:
hdfs dfs -ls /user/hadoop/outputdata1
hdfs dfs -cat /user/hadoop/outputdata/part-r-00000
(2)Eclipse IDE
实验三:TopN排序案例实现
假设有数据文件num.txt,文件内容如下:
10 3 8 7 6 5 1 2 9 4
11 12 17 14 15 20
19 16 18 13
要求使用MapReduce技术提取上述文本中最大的5个数据,并将最终结果汇总到一个文件中。
1.在MapReduceDemo项目下新建包cn.com.sise.mapreduce.topn,思路请参考实验一和二。
2.Map阶段实现
自定义TopNMapper类,继承Mapper。
该类作用:先将文件中的每行数据进行切割提取,并把数据保存到TreeMap中,判断TreeMap是否大于5,如果大于5就需要移除最小的数据。由于数据是逐行读取,如果这时就向外写数据,那么TreeMap就保存了每一行的最大5个数,因此需要在cleanup()方法中编写context.write()方法,这样就保证了当前MapTask中TreeMap保存了当前文件最大的5条数据后,再输出到Reduce阶段。
参考代码:
package cn.com.sise.mapreduce.topn;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TopNMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable> {
private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();
//<0,10 387651294//<xx.11 12 17 14 15 20>
@Override
public void map(LongWritable key, Text value, Context context) {
String line = value.toString();
String[] nums = line.split("");
for (String num : nums) {
repToRecordMap.put(Integer.parseInt(num), " ");
if (repToRecordMap.size() >5) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
}
@Override
protected void cleanup(Context context) {
for (Integer i : repToRecordMap.keySet()){
try {
context.write(NullWritable.get(), new IntWritable(i));
}catch (Exception e){
e.printStackTrace();
}
}
}
}
- Reduce阶段
在同一个包下自定义TopNReducer类,该类继承Reducer。
该类作用:首先TreeMap自定义排序规则,当需求取最大值时,只需要在compare()方法中返回正数即可满足倒序排序,reduce()方法依然要满足时刻判断TreeMap中存放数据是前5个数,并最终遍历输出最大的5个数。
参考代码:
package cn.com.sise.mapreduce.topn;
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class TopNReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
private TreeMap<Integer,String>repToRecordMap = new TreeMap<Integer,String>
(new Comparator<Integer>(){
//返回一个基本类型的整型,谁大谁排后面.
//返回负数表示:01小于02
//返回0表示:表示: 01和02相等
//返回正数表示: 01大于02。
public int compare(Integer a, Integer b) {
return b-a;
}
});
public void reduce(NullWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for (IntWritable value : values) {
repToRecordMap.put(value.get(),"");
if (repToRecordMap.size()> 5) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
for (Integer i : repToRecordMap.keySet()) {
context.write(NullWritable.get(), new IntWritable(i));
}
}
}
- Runner程序主类实现
在linux本地目录下(/home/hadoop/workspace/MapReduceDemo)新建源文件num.txt(内容已给),然后在hdfs分布式文件系统目录(/user/hadoop)中新建inputdata2目录,将本地2个文件上传到该目录下,参考命令代码如下:
参考代码:
package cn.com.sise.mapreduce.topn;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.service.api.records.Configuration;
public class TopNRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(TopNRunner.class);
job.setMapperClass(TopNMapper.class);
job.setReducerClass(TopNReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(NullWritable.class);// map阶段的输出的key
job.setMapOutputValueClass(IntWritable.class);// map阶段的输出的value
job.setOutputKeyClass(NullWritable.class);// reduce阶段的输出的key
job.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value.
FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata2"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata2"));
boolean res =job.waitForCompletion(true);
System.exit(res? 0: 1);
}
}
- 运行并查看结果
** 到了这一步,本次实验就完成了,你今天学会了吗?**