Java sample code: http://dl.huangshiyang.com/hadoopUtilTools.zip
import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCount { public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable s : values) { sum += s.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setNumReduceTasks(1); job.setJarByClass(WordCount.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
Compile:
javac -cp path/to/hadoop-core-1.0.3.jar WordCount.java jar cf WordCount.jar *.class # unzip -l WordCount.jar hadoop jar ../WordCount.jar WordCount ../input ../output
On Linux:
cd /opt/ mkdir wordcount cd wordcount wget dl.huangshiyang.com/hadoop/input.txt wget dl.huangshiyang.com/hadoop/WordCount.jar su hdfs su mapr hadoop fs -mkdir -p /wordcount/input/ hadoop fs -put input.txt /wordcount/input/ hadoop jar WordCount.jar WordCount /wordcount/input /wordcount/output1 hadoop fs -cat /wordcount/output1/part-r-00000
Remote run wordcount on your computer
import com.classLoader.classLoaderJar; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import java.io.File; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.StringTokenizer; /** * Created by shiyanghuang on 16/11/1. */ public class mapreduce { public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private Logger logger = Logger.getLogger(mapreduce.class); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); logger.info("Logger: " + line); byte[] bytes = value.getBytes(); line = new String(bytes, "UTF-8"); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable s : values) { sum += s.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) { Configuration conf = new Configuration(); conf.addResource(new Path("/path/to/conf/core-site.xml")); // Replace with actual path conf.addResource(new Path("/path/to/conf/hdfs-site.xml")); // Replace with actual path conf.addResource(new Path("/path/to/conf/mapred-site.xml")); // Replace with actual path conf.addResource(new Path("/path/to/conf/yarn-site.xml")); // Replace with actual path // conf.set("tez.lib.uris","/hdp/apps/2.3.4.0-3485/tez/tez.tar.gz"); FileSystem fs = null; try { fs = FileSystem.get(conf); // fs = FileSystem.getLocal(conf); System.out.println("Get FS"); } catch (IOException e) { e.printStackTrace(); } System.out.println("Home directory :"+fs.getHomeDirectory()); try { FileStatus[] status = fs.listStatus( new Path("/") ); for (FileStatus st : status) { System.out.println(st.getPath()); } } catch (IOException e) { e.printStackTrace(); } final String jarPath = "/path/to/your/WordCount.jar"; final String hdfsJarPath = "/tmp/leetcode.jar"; UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); final Configuration fconf = conf; try { ugi.doAs(new PrivilegedExceptionAction() { public Void run() throws Exception { Job job = null; try { addJarToDistributedCache(jarPath, fconf); job = new Job(fconf, "wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setNumReduceTasks(1); job.setJarByClass(mapreduce.class); String input = "/wordcount/input"; String rand = Math.random() + ""; System.out.println("Randnum: " + rand); String output = "/wordcount/output/" + rand; FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } }); } catch (Exception e) { e.printStackTrace(); } } public static void addJarToDistributedCache( String path, Configuration conf) throws IOException { File jarFile = new File(path); // Declare new HDFS location Path hdfsJar = new Path("/tmp/" + jarFile.getName()); // Mount HDFS FileSystem hdfs = FileSystem.get(conf); // Copy (override) jar file to HDFS hdfs.copyFromLocalFile(false, true, new Path(path), hdfsJar); // Add jar to distributed classPath DistributedCache.addFileToClassPath(hdfsJar, conf); } }
Old api for submitting job:
package com.mapreduce; /** * Created by shiyanghuang on 16/6/23. */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.security.UserGroupInformation; import java.io.File; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Iterator; import java.util.StringTokenizer; public class submitOldJobTest { public static class Map implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException { String line = text.toString(); byte[] bytes = text.getBytes(); line = new String(bytes, "UTF-8"); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); outputCollector.collect(word, one); } } @Override public void close() throws IOException { } @Override public void configure(JobConf jobConf) { } } public static class Reduce implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException { int sum = 0; while (iterator.hasNext()) { sum += Integer.parseInt(iterator.next().toString()); } outputCollector.collect(text, new IntWritable(sum)); } @Override public void close() throws IOException { } @Override public void configure(JobConf jobConf) { } } public static void main(final String[] args) throws Exception { if (args.length != 4) { System.out.println("Wrong args. Please add args as following:"); System.out.println("/path/to/hadoop/conf /path/to/submitOldJobTest.jar /hfds/input/file/path /hdfs/output/folder"); return; } for (String s : args) { System.out.println(s); } Configuration conf = new Configuration(); conf.addResource(new Path(args[0] + "/core-site.xml")); // Replace with actual path conf.addResource(new Path(args[0] + "/hdfs-site.xml")); // Replace with actual path conf.addResource(new Path(args[0] + "/mapred-site.xml")); // Replace with actual path conf.addResource(new Path(args[0] + "/yarn-site.xml")); // Replace with actual path UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); final Configuration fconf = conf; try { ugi.doAs(new PrivilegedExceptionAction() { public Void run() throws Exception { addJarToDistributedCache(args[1], fconf); JobConf jobConf = new JobConf(fconf); jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(IntWritable.class); jobConf.setMapperClass(Map.class); jobConf.setReducerClass(Reduce.class); jobConf.setInputFormat(TextInputFormat.class); jobConf.setOutputFormat(TextOutputFormat.class); jobConf.setNumReduceTasks(1); jobConf.setJarByClass(submitOldJobTest.class); FileInputFormat.addInputPath(jobConf, new Path(args[2])); FileOutputFormat.setOutputPath(jobConf, new Path(args[3])); JobClient jobClient = new JobClient(fconf); RunningJob runningJob = jobClient.submitJob(jobConf); while (!runningJob.isComplete()) { System.out.println("Job status: " + runningJob.getJobState()); Thread.sleep(5000); } System.out.println("Job Complete"); return null; } }); } catch (Exception e) { e.printStackTrace(); } } public static void addJarToDistributedCache( String path, Configuration conf) throws IOException { File jarFile = new File(path); // Declare new HDFS location Path hdfsJar = new Path("/tmp/" + jarFile.getName()); // Mount HDFS FileSystem hdfs = FileSystem.get(conf); // Copy (override) jar file to HDFS hdfs.copyFromLocalFile(false, true, new Path(path), hdfsJar); // Add jar to distributed classPath DistributedCache.addFileToClassPath(hdfsJar, conf); } }
Compile:
javac -cp .:/opt/Dataguise/DgSecure/Agents/HDFSAgent/expandedArchive/WEB-INF/lib/* submitOldJobTest.java jar -cvf submitOldJobTest.jar *.class java -cp .:/path/to/hadoop/lib/* -Dhdp.version=2.3.4.0-3485 submitOldJobTest /etc/hadoop/conf /path/to/submitOldJobTest.jar /wordcount/input /wordcount/output
One Response so far.