新旧MapReduce 的API对比

Hadoop 的版本0.20包含一个新的java MapReduce API,我们也称他为上下文对象(context object)。新的API在类型虽然不兼容先前的API,但是更容易扩展。

新增的API和旧的API之间的不同点:

1、  新的API倾向于使用抽象类,而不是接口,是为了更容易扩展。

例如:可以不需要修改类的实现而在抽象类中添加一个方法。在新的API中,mapper和reducer现在都是抽象类;

2、  新的API放在org.apache.hadoop.mapreduce包(和子包)中。老版本的API依然在org.apache.hadoop.mapred中。

3、  新的API充分使用上下文对象,使用户代码能与MapReduce系统通信。

例如,MapContext基本具备了JobConf、OutputCollector和Reporter的功能。

4、  新的API同时支持“推(push)”和“拉(pull)”式的迭代。这两类API,均可以将

键/值对记录推给mapper,但除此之外,新的API也允许把记录从map()方式中拉出。对reducer来说是一样的。“拉”式处理数据的好处是可以实现数据的批量处理,而非逐条记录的处理。

5、  新增的API实现了配置的统一。旧的API通过一个特殊的JobConf对象配置作业,该对象是Hadoop配置对象的一个扩展(用于配置守护进程)。在新的API中,我们丢弃这种区分,所有的配置都是通过Configuration来完成。

6、  新API中作业控制有Job类实现,而非JobClient类,新API中删除了JobClient类。

7、  输出文件的命名文件不同,map的输出文件名为part-m-nnnnn,而reduce的输出为part-r-nnnnn(其中nnnnn表示分块序号,为整数,且从0开始算)。

 

 

旧MapReduce代码:

 

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class WordCountApp {
public static final String INPUT_PATH = “hdfs://itcast225:9000/hello”;
//输出路径必须是不存在的
public static final String OUTPUT_PATH = “hdfs://itcast225:9000/output”;
/**
* 驱动代码
*/
public static void main(String[] args) throws IOException, InterruptedException,

ClassNotFoundException {

final JobConf job = new JobConf(WordCountApp.class);
//如果需要打成jar运行,需要下面这句
job.setJarByClass(WordCountApp.class);

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

//告诉job执行作业时的输入路径
FileInputFormat.setInputPaths(job, INPUT_PATH);
//告诉job执行作业时的输出路径
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

//指明输出的k3类型
job.setOutputKeyClass(Text.class);
//指明输出的v3类型
job.setOutputValueClass(IntWritable.class);

//让作业运行,直到运行结束,程序退出
JobClient.runJob(job);
}
/**
* KEYIN    即k1    表示每一行的起始字节偏移量
* VALUEIN    即v1    表示每一行的文本内容
* KEYOUT    即k2    表示每一行被拆分的单词
* VALUEOUT    即v2    表示每一行被拆分的单词次数
*/
static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text,

Text, IntWritable>{

@Override
public void map(LongWritable key,
Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter)
throws IOException {
System.out.println(“map的输入:<“+key.get()+”,”+value.toString()+”>”);

final String[] splited = value.toString().split(” “);
for (String word : splited) {
//key2    表示该行中的单词
final Text key2 = new Text(word);
//value2    表示单词在该行中的出现次数
final IntWritable value2 = new IntWritable(1);
//把k2、v2写入到context中
output.collect(key2, value2);
System.out.println(“map的输出:<“+key2.toString()+”,”+value2.get()+”>”);
}
}
}
/**
* KEYIN    即k2
* VALUEIN    即v2
* KEYOUT    即k3
* VALUEOUT    即v3
*/
static class MyReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text,

IntWritable>{
@Override
public void reduce(    Text key,
Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter)
throws IOException {
System.out.println(“reduce的输入:<“+key.toString()+”,”+values.toString()+”>”);
int sum = 0;
while (values.hasNext()) {
IntWritable count = (IntWritable) values.next();
sum += count.get();
System.out.println(“reduce中for循环:”+count);
}
//到这里了,sum表示该单词key出现的总次数

//key3与key2相同
final Text key3 = key;
//value3表示单词出现的总次数
final IntWritable value3 = new IntWritable(sum);
output.collect(key3, value3);
System.out.println(“reduce的输出:<“+key3.toString()+”,”+value3.get()+”>”);
}

}
}

 

新MapReduce代码:

package mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountApp {
public static final String INPUT_PATH = “hdfs://itcast221:9000/hmbbs”;
//输出路径必须是不存在的
public static final String OUTPUT_PATH = “hdfs://itcast221:9000/output”;
/**
* 驱动代码
*/
public static void main(String[] args) throws IOException, InterruptedException,

ClassNotFoundException {
final Job job = new Job(new Configuration(), WordCountApp.class.getName());
//如果需要打成jar运行,需要下面这句
job.setJarByClass(WordCountApp.class);

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

job.setPartitionerClass(MyPartitoner.class);
job.setNumReduceTasks(2);

//告诉job执行作业时的输入路径
FileInputFormat.setInputPaths(job, INPUT_PATH);
//告诉job执行作业时的输出路径
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

//指明输出的k3类型
job.setOutputKeyClass(Text.class);
//指明输出的v3类型
job.setOutputValueClass(IntWritable.class);

//让作业运行,直到运行结束,程序退出
job.waitForCompletion(true);
}
static class MyPartitoner extends Partitioner<Text, IntWritable>{

@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
return key.toString().startsWith(“hello”)?0:1;
}

}

/**
* KEYIN    即k1    表示每一行的起始字节偏移量
* VALUEIN    即v1    表示每一行的文本内容
* KEYOUT    即k2    表示每一行被拆分的单词
* VALUEOUT    即v2    表示每一行被拆分的单词次数
*/
static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* key        表示每一行的起始字节偏移量
* value    表示每一行的文本内容
* context    表示上下文环境,我们的输出是需要写入到该对象中的
*/
protected void map(LongWritable key, Text value, Context context) throws

java.io.IOException ,InterruptedException {
System.out.println(“map的输入:<“+key.get()+”,”+value.toString()+”>”);

final String[] splited = value.toString().split(” “);
for (String word : splited) {
//key2    表示该行中的单词
final Text key2 = new Text(word);
//value2    表示单词在该行中的出现次数
final IntWritable value2 = new IntWritable(1);
//把k2、v2写入到context中
context.write(key2, value2);
System.out.println(“map的输出:<“+key2.toString()+”,”+value2.get()+”>”);
}
};
}
/**
* KEYIN    即k2
* VALUEIN    即v2
* KEYOUT    即k3
* VALUEOUT    即v3
*/
static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* key    即k2
* values    即v2的集合
* context    上下文对象
*/
protected void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context)

throws java.io.IOException ,InterruptedException {
System.out.println(“reduce的输入:<“+key.toString()+”,”+values.toString()+”>”);
int sum = 0;
for (IntWritable count : values) {
sum += count.get();
System.out.println(“reduce中for循环:”+count);
}
//到这里了,sum表示该单词key出现的总次数

//key3与key2相同
final Text key3 = key;
//value3表示单词出现的总次数
final IntWritable value3 = new IntWritable(sum);

context.write(key3, value3);
System.out.println(“reduce的输出:<“+key3.toString()+”,”+value3.get()+”>”);
};
}
}

代码上的区别,新map,reduce方法不用继承MapReduceBase类,传入的参数由原来的4个变成现在的3个。去掉

OutputCollector<Text, IntWritable> output, Reporter reporter

由一个新的类 Context封装了上述信息。

新的Job类对象,由Configuration创建的,也封装了JobClient的方法。

Leave a Reply

Your email address will not be published. Required fields are marked *