package com.oracle.bmc.hadoop.example; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.oracle.bmc.hdfs.BmcFilesystem; import lombok.RequiredArgsConstructor; @RequiredArgsConstructor public class SampleOracleBmcHadoopJob { private static final String SAMPLE_JOB_PATH = "/samplehadoopjob"; private static final String INPUT_FILE = SAMPLE_JOB_PATH + "/input.dat"; private static final String OUTPUT_DIR = SAMPLE_JOB_PATH + "/output"; // non-static since this is the runner class it needs to initialize after we set the properties private final Logger log = LoggerFactory.getLogger(SampleOracleBmcHadoopJob.class); /** * Runner for sample hadoop job. This expects 3 args: path to configuration file, Object Store namespace, Object * Store bucket. To run this, you must: *{@code * Create a standard hadoop configuration file * Create the bucket ahead of time. *} * This runner will create a test input file in a file '/samplehadoopjob/input.dat', and job results will be written * to '/samplehadoopjob/output'. * * @param args * 1) path to configuration file, 2) namespace, 3) bucket * @throws Exception */ public static void main(final String[] args) throws Exception { if (args.length != 3) { throw new IllegalArgumentException( "Must have 3 args: 1) path to config file, 2) object storage namespace, 3) object storage bucket"); } // redirect all logs to sysout System.setProperty("org.slf4j.simpleLogger.logFile", "System.out"); System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "debug"); final SampleOracleBmcHadoopJob job = new SampleOracleBmcHadoopJob(args[0], args[1], args[2]); System.exit(job.execute()); } private final String configurationFilePath; private final String namespace; private final String bucket; public int execute() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { log.info("Creating hadoop configuration"); final Configuration configuration = this.createConfiguration(this.configurationFilePath); final String authority = this.bucket + "@" + this.namespace; final String uri = "oraclebmc://" + authority; log.info("Using uri: {}", uri); log.info("Creating job inputs"); this.setup(uri, configuration); log.info("Creating job"); final Job job = this.createJob(configuration); final String in = uri + INPUT_FILE; final String out = uri + OUTPUT_DIR; log.info("Using input: {}", in); log.info("Using output: {}", out); FileInputFormat.addInputPath(job, new Path(in)); FileOutputFormat.setOutputPath(job, new Path(out)); log.info("Executing job..."); final int response = job.waitForCompletion(true) ? 0 : 1; log.info("Attempting to read job results"); this.tryReadResult(uri, configuration); return response; } private Configuration createConfiguration(final String configFilePath) { final Configuration configuration = new Configuration(); configuration.addResource(new Path(configFilePath)); return configuration; } private void setup(final String uri, final Configuration configuration) throws IOException, URISyntaxException { try (final BmcFilesystem fs = new BmcFilesystem()) { fs.initialize(new URI(uri), configuration); fs.delete(new Path(SAMPLE_JOB_PATH), true); final FSDataOutputStream output = fs.create(new Path(INPUT_FILE)); output.writeChars("foo\nbar\ngak\ntest\nfoo\ngak\n\ngak"); output.close(); } } private Job createJob(final Configuration configuration) throws IOException { final Job job = Job.getInstance(configuration, "word count"); job.setJarByClass(SampleOracleBmcHadoopJob.class); job.setMapperClass(SimpleMapper.class); job.setCombinerClass(SimpleReducer.class); job.setReducerClass(SimpleReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job; } private void tryReadResult(final String uri, final Configuration configuration) throws IOException, URISyntaxException { try (final BmcFilesystem fs = new BmcFilesystem()) { fs.initialize(new URI(uri), configuration); // this should be the output file name, but that could change final FSDataInputStream input = fs.open(new Path(OUTPUT_DIR + "/part-r-00000")); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); IOUtils.copy(input, baos); log.info("\n=====\n" + baos.toString() + "====="); input.close(); } } } package com.oracle.bmc.hadoop.example; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SimpleMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private final Text word = new Text(); @Override public void map(final Object key, final Text value, final Context context) throws IOException, InterruptedException { final StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } } package com.oracle.bmc.hadoop.example; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SimpleReducer extends Reducer { private final IntWritable result = new IntWritable(); @Override public void reduce(final Text key, final Iterable values, final Context context) throws IOException, InterruptedException { int sum = 0; for (final IntWritable val : values) { sum += val.get(); } this.result.set(sum); context.write(key, this.result); } }