记录下如何通关MapReduce课程

Mapreduce是什么

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析
应用”的核心框架。

MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的
分布式运算程序,并发运行在一个 Hadoop 集群上。

MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。

1.Mapper阶段

(1)用户自定义的Mapper要继承自己的父类

(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)

(3)Mapper中的业务逻辑写在map()方法中

(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)

(5)map()方法(MapTask进程)对每一个<K,V>调用一次

2.Reducer阶段

(1)用户自定义的Reducer要继承自己的父类

(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

(3)Reducer的业务逻辑写在reduce()方法中

(4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法

3.Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

具体写法

MapReduce的程一般分为3个类

map类:将原始数据切割成k,v

reduce类:处理map处理好的k,v

drive类:设置job的各个参数,并提交job

Map类

package mapreduce.car;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class CountCarMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k=new Text();
IntWritable v = new IntWritable(1);
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split("\t");
//写入
k.set(words[10]);
context.write(k,v);
}

}


Reduce类

package mapreduce.car;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CountCarReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v=new IntWritable();
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
sum=0;
for (IntWritable i:values)
{
sum++;
}
v.set(sum);
context.write(key,v);
}

}


Driver类

package mapreduce.car;

import com.mapreduce.wordcount.WordCountDriver;
import com.mapreduce.wordcount.WordCountMapper;
import com.mapreduce.wordcount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CountCarDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1 获取配置信息以及获取 job 对象
Configuration conf = new Configuration();
//实例化job(有参构造)
Job job = Job.getInstance(conf);
// 2 关联本 Driver 程序的 jar
job.setJarByClass(CountCarDriver.class);
// 3 关联 Mapper 和 Reducer 的 jar
job.setMapperClass(CountCarMapper.class);
job.setReducerClass(CountCarReduce.class);
// 4 设置 Mapper 输出的 kv 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("F:\\桌面\\hadoop\\汽车车\\newcarSales.txt"));
FileOutputFormat.setOutputPath(job, new Path("F:\\桌面\\hadoop\\汽车车\\out"));
// 7 提交 job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);

}
}

自定义k,v对类型

创建自定义类,然后在Map和Reduce和Driver中使用自定义类即可

package com.mapreduce.been;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//1 继承 Writable 接口
public class FlowBean implements Writable {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
//2 提供无参构造
public FlowBean() {
}
//3 提供三个参数的 getter 和 setter 方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//4 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//5 重写 ToString
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}

自定义输出

在自己写的Reduce中可添加cleanup()函数自定义输出

protected void cleanup(
org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, DoubleWritable>.Context context)
throws java.io.IOException, InterruptedException {
Set<String> keySet = maps.keySet();

for (String str : keySet) {
long value = maps.get(str);

double percent = value / all;

context.write(new Text(str), new DoubleWritable(percent));
}
}

注意点

  1. java的==会比较内存地址是否相同,equals会比较对象内容是否相同,一般用equals