Java sample code: http://dl.huangshiyang.com/hadoopUtilTools.zip
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable s : values) {
sum += s.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(1);
job.setJarByClass(WordCount.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Compile:
javac -cp path/to/hadoop-core-1.0.3.jar WordCount.java jar cf WordCount.jar *.class # unzip -l WordCount.jar hadoop jar ../WordCount.jar WordCount ../input ../output
On Linux:
cd /opt/ mkdir wordcount cd wordcount wget dl.huangshiyang.com/hadoop/input.txt wget dl.huangshiyang.com/hadoop/WordCount.jar su hdfs su mapr hadoop fs -mkdir -p /wordcount/input/ hadoop fs -put input.txt /wordcount/input/ hadoop jar WordCount.jar WordCount /wordcount/input /wordcount/output1 hadoop fs -cat /wordcount/output1/part-r-00000
Remote run wordcount on your computer
import com.classLoader.classLoaderJar;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.StringTokenizer;
/**
* Created by shiyanghuang on 16/11/1.
*/
public class mapreduce {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private Logger logger = Logger.getLogger(mapreduce.class);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
logger.info("Logger: " + line);
byte[] bytes = value.getBytes();
line = new String(bytes, "UTF-8");
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable s : values) {
sum += s.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.addResource(new Path("/path/to/conf/core-site.xml")); // Replace with actual path
conf.addResource(new Path("/path/to/conf/hdfs-site.xml")); // Replace with actual path
conf.addResource(new Path("/path/to/conf/mapred-site.xml")); // Replace with actual path
conf.addResource(new Path("/path/to/conf/yarn-site.xml")); // Replace with actual path
// conf.set("tez.lib.uris","/hdp/apps/2.3.4.0-3485/tez/tez.tar.gz");
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
// fs = FileSystem.getLocal(conf);
System.out.println("Get FS");
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Home directory :"+fs.getHomeDirectory());
try {
FileStatus[] status = fs.listStatus( new Path("/") );
for (FileStatus st : status) {
System.out.println(st.getPath());
}
} catch (IOException e) {
e.printStackTrace();
}
final String jarPath = "/path/to/your/WordCount.jar";
final String hdfsJarPath = "/tmp/leetcode.jar";
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs");
final Configuration fconf = conf;
try {
ugi.doAs(new PrivilegedExceptionAction() {
public Void run() throws Exception {
Job job = null;
try {
addJarToDistributedCache(jarPath, fconf);
job = new Job(fconf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(1);
job.setJarByClass(mapreduce.class);
String input = "/wordcount/input";
String rand = Math.random() + "";
System.out.println("Randnum: " + rand);
String output = "/wordcount/output/" + rand;
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
public static void addJarToDistributedCache(
String path, Configuration conf)
throws IOException {
File jarFile = new File(path);
// Declare new HDFS location
Path hdfsJar = new Path("/tmp/"
+ jarFile.getName());
// Mount HDFS
FileSystem hdfs = FileSystem.get(conf);
// Copy (override) jar file to HDFS
hdfs.copyFromLocalFile(false, true, new Path(path), hdfsJar);
// Add jar to distributed classPath
DistributedCache.addFileToClassPath(hdfsJar, conf);
}
}
Old api for submitting job:
package com.mapreduce;
/**
* Created by shiyanghuang on 16/6/23.
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.StringTokenizer;
public class submitOldJobTest {
public static class Map implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
String line = text.toString();
byte[] bytes = text.getBytes();
line = new String(bytes, "UTF-8");
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
outputCollector.collect(word, one);
}
}
@Override
public void close() throws IOException {
}
@Override
public void configure(JobConf jobConf) {
}
}
public static class Reduce implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
int sum = 0;
while (iterator.hasNext()) {
sum += Integer.parseInt(iterator.next().toString());
}
outputCollector.collect(text, new IntWritable(sum));
}
@Override
public void close() throws IOException {
}
@Override
public void configure(JobConf jobConf) {
}
}
public static void main(final String[] args) throws Exception {
if (args.length != 4) {
System.out.println("Wrong args. Please add args as following:");
System.out.println("/path/to/hadoop/conf /path/to/submitOldJobTest.jar /hfds/input/file/path /hdfs/output/folder");
return;
}
for (String s : args) {
System.out.println(s);
}
Configuration conf = new Configuration();
conf.addResource(new Path(args[0] + "/core-site.xml")); // Replace with actual path
conf.addResource(new Path(args[0] + "/hdfs-site.xml")); // Replace with actual path
conf.addResource(new Path(args[0] + "/mapred-site.xml")); // Replace with actual path
conf.addResource(new Path(args[0] + "/yarn-site.xml")); // Replace with actual path
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs");
final Configuration fconf = conf;
try {
ugi.doAs(new PrivilegedExceptionAction() {
public Void run() throws Exception {
addJarToDistributedCache(args[1], fconf);
JobConf jobConf = new JobConf(fconf);
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
jobConf.setMapperClass(Map.class);
jobConf.setReducerClass(Reduce.class);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setNumReduceTasks(1);
jobConf.setJarByClass(submitOldJobTest.class);
FileInputFormat.addInputPath(jobConf, new Path(args[2]));
FileOutputFormat.setOutputPath(jobConf, new Path(args[3]));
JobClient jobClient = new JobClient(fconf);
RunningJob runningJob = jobClient.submitJob(jobConf);
while (!runningJob.isComplete()) {
System.out.println("Job status: " + runningJob.getJobState());
Thread.sleep(5000);
}
System.out.println("Job Complete");
return null;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
public static void addJarToDistributedCache(
String path, Configuration conf)
throws IOException {
File jarFile = new File(path);
// Declare new HDFS location
Path hdfsJar = new Path("/tmp/"
+ jarFile.getName());
// Mount HDFS
FileSystem hdfs = FileSystem.get(conf);
// Copy (override) jar file to HDFS
hdfs.copyFromLocalFile(false, true, new Path(path), hdfsJar);
// Add jar to distributed classPath
DistributedCache.addFileToClassPath(hdfsJar, conf);
}
}
Compile:
javac -cp .:/opt/Dataguise/DgSecure/Agents/HDFSAgent/expandedArchive/WEB-INF/lib/* submitOldJobTest.java jar -cvf submitOldJobTest.jar *.class java -cp .:/path/to/hadoop/lib/* -Dhdp.version=2.3.4.0-3485 submitOldJobTest /etc/hadoop/conf /path/to/submitOldJobTest.jar /wordcount/input /wordcount/output
One Response so far.