记录下如何通关MapReduce课程
Mapreduce是什么
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析
应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的
分布式运算程序,并发运行在一个 Hadoop 集群上。
MapReduce编程规范
用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。
1.Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(MapTask进程)对每一个<K,V>调用一次
2.Reducer阶段
(1)用户自定义的Reducer要继承自己的父类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
3.Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
具体写法
MapReduce的程一般分为3个类
map类:将原始数据切割成k,v
对
reduce类:处理map处理好的k,v
对
drive类:设置job的各个参数,并提交job
Map类
package mapreduce.car;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CountCarMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text k=new Text(); IntWritable v = new IntWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] words = line.split("\t"); k.set(words[10]); context.write(k,v); }
}
|
Reduce类
package mapreduce.car;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CountCarReduce extends Reducer<Text, IntWritable, Text, IntWritable> { int sum; IntWritable v=new IntWritable(); protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ sum=0; for (IntWritable i:values) { sum++; } v.set(sum); context.write(key,v); }
}
|
Driver类
package mapreduce.car;
import com.mapreduce.wordcount.WordCountDriver; import com.mapreduce.wordcount.WordCountMapper; import com.mapreduce.wordcount.WordCountReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CountCarDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
job.setJarByClass(CountCarDriver.class);
job.setMapperClass(CountCarMapper.class); job.setReducerClass(CountCarReduce.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:\\桌面\\hadoop\\汽车车\\newcarSales.txt")); FileOutputFormat.setOutputPath(job, new Path("F:\\桌面\\hadoop\\汽车车\\out"));
boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1);
} }
|
自定义k,v对类型
创建自定义类,然后在Map和Reduce和Driver中使用自定义类即可
package com.mapreduce.been;
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
|
自定义输出
在自己写的Reduce中可添加cleanup()
函数自定义输出
protected void cleanup( org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, DoubleWritable>.Context context) throws java.io.IOException, InterruptedException { Set<String> keySet = maps.keySet();
for (String str : keySet) { long value = maps.get(str);
double percent = value / all;
context.write(new Text(str), new DoubleWritable(percent)); } }
|
注意点
- java的
==
会比较内存地址是否相同,equals
会比较对象内容是否相同,一般用equals