Wordcount for Hadoop

Java sample code: http://dl.huangshiyang.com/hadoopUtilTools.zip

import java.io.IOException;
import java.util.*;
        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
        
public class WordCount {
        
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 
        
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable s : values) {
                sum += s.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
        
 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
        
        Job job = new Job(conf, "wordcount");
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
        
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
        
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setNumReduceTasks(1);
    job.setJarByClass(WordCount.class);
        
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
    job.waitForCompletion(true);
 }
        
}

Compile:

javac -cp path/to/hadoop-core-1.0.3.jar WordCount.java 
jar cf WordCount.jar *.class
# unzip -l WordCount.jar
hadoop jar ../WordCount.jar WordCount ../input ../output

On Linux:

cd /opt/
mkdir wordcount
cd wordcount
wget dl.huangshiyang.com/hadoop/input.txt
wget dl.huangshiyang.com/hadoop/WordCount.jar
su hdfs
su mapr
hadoop fs -mkdir -p /wordcount/input/
hadoop fs -put input.txt /wordcount/input/
hadoop jar WordCount.jar WordCount /wordcount/input /wordcount/output1
hadoop fs -cat /wordcount/output1/part-r-00000

Remote run wordcount on your computer

import com.classLoader.classLoaderJar;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.StringTokenizer;

/**
 * Created by shiyanghuang on 16/11/1.
 */
public class mapreduce {

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private Logger logger = Logger.getLogger(mapreduce.class);
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            logger.info("Logger: " + line);
            byte[] bytes = value.getBytes();
            line = new String(bytes, "UTF-8");
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable s : values) {
                sum += s.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/path/to/conf/core-site.xml")); // Replace with actual path
        conf.addResource(new Path("/path/to/conf/hdfs-site.xml")); // Replace with actual path
        conf.addResource(new Path("/path/to/conf/mapred-site.xml")); // Replace with actual path
        conf.addResource(new Path("/path/to/conf/yarn-site.xml")); // Replace with actual path

        // conf.set("tez.lib.uris","/hdp/apps/2.3.4.0-3485/tez/tez.tar.gz");
        FileSystem fs = null;
        try {
            fs = FileSystem.get(conf);
            // fs = FileSystem.getLocal(conf);
            System.out.println("Get FS");
        } catch (IOException e) {
            e.printStackTrace();
        }

        System.out.println("Home directory :"+fs.getHomeDirectory());

        try {
            FileStatus[] status = fs.listStatus( new Path("/") );
            for (FileStatus st : status) {
                System.out.println(st.getPath());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        
        final String jarPath = "/path/to/your/WordCount.jar";
        final String hdfsJarPath = "/tmp/leetcode.jar";

        UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs");
        final Configuration fconf = conf;
        try {

            ugi.doAs(new PrivilegedExceptionAction() {

                public Void run() throws Exception {

                    Job job = null;
                    try {
                        addJarToDistributedCache(jarPath, fconf);

                        job = new Job(fconf, "wordcount");

                        job.setOutputKeyClass(Text.class);
                        job.setOutputValueClass(IntWritable.class);

                        job.setMapperClass(Map.class);
                        job.setReducerClass(Reduce.class);

                        job.setInputFormatClass(TextInputFormat.class);
                        job.setOutputFormatClass(TextOutputFormat.class);

                        job.setNumReduceTasks(1);
                        job.setJarByClass(mapreduce.class);

                        String input = "/wordcount/input";
                        String rand = Math.random() + "";
                        System.out.println("Randnum: " + rand);
                        String output = "/wordcount/output/" + rand;

                        FileInputFormat.addInputPath(job, new Path(input));
                        FileOutputFormat.setOutputPath(job, new Path(output));

                        job.waitForCompletion(true);

                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ClassNotFoundException e) {
                        e.printStackTrace();
                    }
                    return null;
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void addJarToDistributedCache(
            String path, Configuration conf)
            throws IOException {

        File jarFile = new File(path);

        // Declare new HDFS location
        Path hdfsJar = new Path("/tmp/"
                + jarFile.getName());

        // Mount HDFS
        FileSystem hdfs = FileSystem.get(conf);

        // Copy (override) jar file to HDFS
        hdfs.copyFromLocalFile(false, true, new Path(path), hdfsJar);

        // Add jar to distributed classPath
        DistributedCache.addFileToClassPath(hdfsJar, conf);
    }
}



Old api for submitting job:

package com.mapreduce;

/**
 * Created by shiyanghuang on 16/6/23.
 */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.StringTokenizer;

public class submitOldJobTest {

    public static class Map implements Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
            String line = text.toString();
            byte[] bytes = text.getBytes();
            line = new String(bytes, "UTF-8");
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                outputCollector.collect(word, one);
            }
        }

        @Override
        public void close() throws IOException {

        }

        @Override
        public void configure(JobConf jobConf) {

        }
    }

    public static class Reduce implements Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
            int sum = 0;
            while (iterator.hasNext()) {
                sum += Integer.parseInt(iterator.next().toString());
            }
            outputCollector.collect(text, new IntWritable(sum));
        }

        @Override
        public void close() throws IOException {

        }

        @Override
        public void configure(JobConf jobConf) {

        }
    }

    public static void main(final String[] args) throws Exception {
        if (args.length != 4) {
            System.out.println("Wrong args. Please add args as following:");
            System.out.println("/path/to/hadoop/conf /path/to/submitOldJobTest.jar /hfds/input/file/path /hdfs/output/folder");
            return;
        }
        
        for (String s : args) {
            System.out.println(s);
        }

        Configuration conf = new Configuration();

        conf.addResource(new Path(args[0] + "/core-site.xml")); // Replace with actual path
        conf.addResource(new Path(args[0] + "/hdfs-site.xml")); // Replace with actual path
        conf.addResource(new Path(args[0] + "/mapred-site.xml")); // Replace with actual path
        conf.addResource(new Path(args[0] + "/yarn-site.xml")); // Replace with actual path

        UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs");
        final Configuration fconf = conf;
        try {

            ugi.doAs(new PrivilegedExceptionAction() {

                public Void run() throws Exception {
                    addJarToDistributedCache(args[1], fconf);

                    JobConf jobConf = new JobConf(fconf);

                    jobConf.setOutputKeyClass(Text.class);
                    jobConf.setOutputValueClass(IntWritable.class);

                    jobConf.setMapperClass(Map.class);
                    jobConf.setReducerClass(Reduce.class);

                    jobConf.setInputFormat(TextInputFormat.class);
                    jobConf.setOutputFormat(TextOutputFormat.class);

                    jobConf.setNumReduceTasks(1);
                    jobConf.setJarByClass(submitOldJobTest.class);

                    FileInputFormat.addInputPath(jobConf, new Path(args[2]));
                    FileOutputFormat.setOutputPath(jobConf, new Path(args[3]));

                    JobClient jobClient = new JobClient(fconf);

                    RunningJob runningJob = jobClient.submitJob(jobConf);

                    while (!runningJob.isComplete()) {
                        System.out.println("Job status: " + runningJob.getJobState());
                        Thread.sleep(5000);
                    }

                    System.out.println("Job Complete");

                    return null;
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void addJarToDistributedCache(
            String path, Configuration conf)
            throws IOException {

        File jarFile = new File(path);

        // Declare new HDFS location
        Path hdfsJar = new Path("/tmp/"
                + jarFile.getName());

        // Mount HDFS
        FileSystem hdfs = FileSystem.get(conf);

        // Copy (override) jar file to HDFS
        hdfs.copyFromLocalFile(false, true, new Path(path), hdfsJar);

        // Add jar to distributed classPath
        DistributedCache.addFileToClassPath(hdfsJar, conf);
    }

}

Compile:

javac -cp .:/opt/Dataguise/DgSecure/Agents/HDFSAgent/expandedArchive/WEB-INF/lib/* submitOldJobTest.java
jar -cvf submitOldJobTest.jar *.class
java -cp .:/path/to/hadoop/lib/* -Dhdp.version=2.3.4.0-3485 submitOldJobTest /etc/hadoop/conf /path/to/submitOldJobTest.jar /wordcount/input /wordcount/output

One Response so far.

  1. Ardis Yohn says:
    Very nice post. I just stumbled upon your blog and wanted to say that I have really enjoyed browsing your blog posts. In any case I will be subscribing to your rss feed and I hope you write again soon!

LEAVE A COMMENT