Wordcount for Hadoop

import java.io.IOException;
import java.util.*;
        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
        
public class WordCount {
        
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 
        
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable s : values) {
                sum += s.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
        
 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
        
        Job job = new Job(conf, "wordcount");
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
        
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
        
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setNumReduceTasks(1);
    job.setJarByClass(WordCount.class);
        
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
    job.waitForCompletion(true);
 }
        
}

Compile:

javac -cp path/to/hadoop-core-1.0.3.jar WordCount.java 
jar cf WordCount.jar *.class
# unzip -l WordCount.jar
hadoop jar ../WordCount.jar WordCount ../input ../output

On Linux:

cd /opt/
mkdir wordcount
cd wordcount
wget dl.huangshiyang.com/hadoop/input.txt
wget dl.huangshiyang.com/hadoop/WordCount.jar
su hdfs
su mapr
hadoop fs -mkdir -p /wordcount/input/
hadoop fs -put input.txt /wordcount/input/
hadoop jar WordCount.jar WordCount /wordcount/input /wordcount/output1
hadoop fs -cat /wordcount/output1/part-r-00000

Remote run wordcount on your computer

import com.classLoader.classLoaderJar;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.StringTokenizer;

/**
 * Created by shiyanghuang on 16/11/1.
 */
public class mapreduce {

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private Logger logger = Logger.getLogger(mapreduce.class);
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            logger.info("Logger: " + line);
            byte[] bytes = value.getBytes();
            line = new String(bytes, "UTF-8");
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable s : values) {
                sum += s.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/path/to/conf/core-site.xml")); // Replace with actual path
        conf.addResource(new Path("/path/to/conf/hdfs-site.xml")); // Replace with actual path
        conf.addResource(new Path("/path/to/conf/mapred-site.xml")); // Replace with actual path
        conf.addResource(new Path("/path/to/conf/yarn-site.xml")); // Replace with actual path

        // conf.set("tez.lib.uris","/hdp/apps/2.3.4.0-3485/tez/tez.tar.gz");
        FileSystem fs = null;
        try {
            fs = FileSystem.get(conf);
            // fs = FileSystem.getLocal(conf);
            System.out.println("Get FS");
        } catch (IOException e) {
            e.printStackTrace();
        }

        System.out.println("Home directory :"+fs.getHomeDirectory());

        try {
            FileStatus[] status = fs.listStatus( new Path("/") );
            for (FileStatus st : status) {
                System.out.println(st.getPath());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        
        final String jarPath = "/path/to/your/WordCount.jar";
        final String hdfsJarPath = "/tmp/leetcode.jar";

        UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs");
        final Configuration fconf = conf;
        try {

            ugi.doAs(new PrivilegedExceptionAction() {

                public Void run() throws Exception {

                    Job job = null;
                    try {
                        addJarToDistributedCache(jarPath, fconf);

                        job = new Job(fconf, "wordcount");

                        job.setOutputKeyClass(Text.class);
                        job.setOutputValueClass(IntWritable.class);

                        job.setMapperClass(Map.class);
                        job.setReducerClass(Reduce.class);

                        job.setInputFormatClass(TextInputFormat.class);
                        job.setOutputFormatClass(TextOutputFormat.class);

                        job.setNumReduceTasks(1);
                        job.setJarByClass(mapreduce.class);

                        String input = "/wordcount/input";
                        String rand = Math.random() + "";
                        System.out.println("Randnum: " + rand);
                        String output = "/wordcount/output/" + rand;

                        FileInputFormat.addInputPath(job, new Path(input));
                        FileOutputFormat.setOutputPath(job, new Path(output));

                        job.waitForCompletion(true);

                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ClassNotFoundException e) {
                        e.printStackTrace();
                    }
                    return null;
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void addJarToDistributedCache(
            String path, Configuration conf)
            throws IOException {

        File jarFile = new File(path);

        // Declare new HDFS location
        Path hdfsJar = new Path("/tmp/"
                + jarFile.getName());

        // Mount HDFS
        FileSystem hdfs = FileSystem.get(conf);

        // Copy (override) jar file to HDFS
        hdfs.copyFromLocalFile(false, true, new Path(path), hdfsJar);

        // Add jar to distributed classPath
        DistributedCache.addFileToClassPath(hdfsJar, conf);
    }
}



Old api for submitting job:

final JobConf job = new JobConf(mapreduceOldAPI.class);
//如果需要打成jar运行,需要下面这句
job.setJarByClass(mapreduceOldAPI.class);

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

//告诉job执行作业时的输入路径
FileInputFormat.setInputPaths(job, input);
//告诉job执行作业时的输出路径
FileOutputFormat.setOutputPath(job, new Path(output));

//指明输出的k3类型
job.setOutputKeyClass(Text.class);
//指明输出的v3类型
job.setOutputValueClass(IntWritable.class);

//让作业运行,直到运行结束,程序退出
JobClient jobClient = new JobClient(job);
jobClient.submitJob(job);

1 thought on “Wordcount for Hadoop

  1. Very nice post. I just stumbled upon your blog and wanted to say that I have really enjoyed browsing your blog posts. In any case I will be subscribing to your rss feed and I hope you write again soon!

Leave a Reply

Your email address will not be published. Required fields are marked *