【Hadoop】MapReduce案例——好友推荐度案例
一、前期准备
可参考 “词频统计” 案例中的前期准备阶段
二、数据准备
生成好友关系数据,上传至hdfs
package com.hdtrain;
import com.google.inject.internal.util.$FinalizableReference;
import java.util.HashMap;
import java.util.Map;
public class HelloFriend {
public static void main(String[] args) {
Map<Integer, String> friendMap = new HashMap<>();
for (int i=100;i<=200;i++){
friendMap.put(i, "");
for (int j=1;j<=4;j++){
int friend = (int) (Math.random() * 21 + 200 + 20 * j);
if(!friendMap.containsKey(friend)) {
friendMap.put(friend, "");
}
friendMap.put(i, friendMap.get(i) + " " + friend);
friendMap.put(friend, friendMap.get(friend) + " " + i);
}
for (Integer key: friendMap.keySet()){
System.out.println(key + "----" + friendMap.get(key));
}
}
}
}
三、好友推荐度案例
1.FriendJob.class
package com.hdtrain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FriendJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration(true);
configuration.set("mapreduce.framework.name", "local");
Job job = Job.getInstance(configuration);
job.setJobName("Friend--" + System.currentTimeMillis());
job.setJarByClass(FriendJob.class);
job.setNumReduceTasks(2);
FileInputFormat.setInputPaths(job, new Path("/data/friend.txt"));
FileOutputFormat.setOutputPath(job, new Path("/results/Friend-" + System.currentTimeMillis()));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(FriendMapper.class);
job.setReducerClass(FriendReducer.class);
job.waitForCompletion(true);
}
}
2.FriendMapper.class
package com.hdtrain;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.io.StringReader;
public class FriendMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//开始处理数据
String[] friends = value.toString().replaceAll("----","").split(" ");
String person = friends[0];
if (friends!=null && friends.length>0){
for (int i=1;i<friends.length;i++){
//直接朋友关系
context.write(new Text(friendorder(person, friends[i])), new IntWritable(-1));
for (int j=i+1;j<friends.length;j++){
context.write(new Text(friendorder(friends[i], friends[j])), new IntWritable(1));
}
}
}
}
private String friendorder(String personA, String personB){
return personA.compareTo(personB)>0 ? (personB + " " + personA):(personA + " " + personB);
}
}
3.FriendReducer.class
package com.hdtrain;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class FriendReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
Iterator<IntWritable> iterator = values.iterator();
while(iterator.hasNext()){
int value = iterator.next().get();
if(value == -1){
return ;
}else{
count += value;
}
}
context.write(key, new LongWritable(count));
}
}
4.计算结果