Hadoop中的NLineInputFormat

http://blog.csdn.net/lzm1340458776/article/details/42747443

一:背景

NLineInputFormat也是FileInputFormat的子类,它是根据行数来划分InputSplit的,而不是像TextInputFormat那样依赖分片大小和行的长度。也就是说,TextInputFormat当一行很长或分片很小时,获取的分片很可能只包含很少的K-V对,这样一个MapTask处理的K-V对就很少,这是不太理想的。因此我们可以使用NLineInputFormat来控制一个MapTask处理的K-V对,这是通过分割InputSplit时按行数分割的方法来实现的,关键是通过mapreduce.input.lineinputformat.linespermap来设置这个行数。

二:技术实现

代码如下:

 

/**
 * TextInputFormat处理的数据来自一个InputSplit,InputSplit是根据大小划分的。
 * NLineInputFormat可以决定每个Mapper处理的记录数是相同的。
 * @author 廖钟民
 * time : 2015年1月15日下午8:40:43
 * @version
 */
public class MyNLineInputFormat {
	
	// 定义输入路径
		private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/hello";
		// 定义输出路径
		private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

		public static void main(String[] args) {

			try {
				// 创建配置信息
				Configuration conf = new Configuration();
				//设置每个Map可以是处理多少条记录
				conf.setInt("mapreduce.input.lineinputformat.linespermap", 2);
				/**********************************************/
				//对Map端输出进行压缩
				//conf.setBoolean("mapred.compress.map.output", true);
				//设置map端输出使用的压缩类
				//conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
				//对reduce端输出进行压缩
				//conf.setBoolean("mapred.output.compress", true);
				//设置reduce端输出使用的压缩类
				//conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
				// 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)
				/*
				 * conf.addResource("classpath://hadoop/core-site.xml"); 
				 * conf.addResource("classpath://hadoop/hdfs-site.xml");
				 * conf.addResource("classpath://hadoop/hdfs-site.xml");
				 */

				// 创建文件系统
				FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
				// 如果输出目录存在,我们就删除
				if (fileSystem.exists(new Path(OUT_PATH))) {
					fileSystem.delete(new Path(OUT_PATH), true);
				}

				// 创建任务
				Job job = new Job(conf, WordCountTest.class.getName());

				// 天龙八部1.1	设置输入目录和设置输入数据格式化的类
				FileInputFormat.setInputPaths(job, INPUT_PATH);
				job.setInputFormatClass(NLineInputFormat.class);

				// 天龙八部1.2	设置自定义Mapper类和设置map函数输出数据的key和value的类型
				job.setMapperClass(MyNLineInputFormatMapper.class);
				job.setMapOutputKeyClass(Text.class);
				job.setMapOutputValueClass(LongWritable.class);

				// 天龙八部1.3	设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
				job.setPartitionerClass(HashPartitioner.class);
				job.setNumReduceTasks(1);

				// 天龙八部1.4	排序、分组
				// 天龙八部1.5	归约
				// 天龙八部2.1	Shuffle把数据从Map端拷贝到Reduce端。
				// 天龙八部2.2	指定Reducer类和输出key和value的类型
				job.setReducerClass(MyNLineInputFormatReducer.class);
				job.setOutputKeyClass(Text.class);
				job.setOutputValueClass(LongWritable.class);

				// 天龙八部2.3	指定输出的路径和设置输出的格式化类
				FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
				job.setOutputFormatClass(TextOutputFormat.class);


				// 提交作业 退出
				System.exit(job.waitForCompletion(true) ? 0 : 1);
			
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
	public static class MyNLineInputFormatMapper extends Mapper {

		// 定义一个LongWritable对象作为map输出的value类型
		LongWritable oneTime = new LongWritable(1);
		// 定义一个Text对象作为map输出的key类型
		Text word = new Text();

		protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException,
				InterruptedException {

			// 对每一行记录采用制表符(\t)进行分割
			String[] splits = value.toString().split("\t");

			// 遍历字符串数组输出每一个单词
			for (String str : splits) {

				// 设置word
				word.set(str);
				// 把结果写出去
				context.write(word, oneTime);
			}
		}
	}

	public static class MyNLineInputFormatReducer extends Reducer {

		// 定义LongWritable对象最为Reduce输出的value类型
		LongWritable result = new LongWritable();

		protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException,
				InterruptedException {

			int sum = 0;

			// 遍历集合,计算每个单词出现的和
			for (LongWritable s : values) {

				sum += s.get();
			}
			// 设置result
			result.set(sum);
			// 把结果写出去
			context.write(key, result);
		}
	}
}

LEAVE A COMMENT