Categories

  • Big Data

Tags

  • hadoop
  • map-reduce

Generate indexing using map reduce can’t be done in distributed mode of  mapreduce as each line number is sequential and unique. To achieve this I tried to generate index using single reducer job.

Here are the steps you can follow to create the job-

create a mapper that emits the line offset as key and row as value.

class IndexMapper extends Mapper<object text longwritable>{
@Override
protected void map(Object key, Text value, Mapper<object text longwritable>.Context context)
throws IOException, InterruptedException {
context.write(new LongWritable(Long.parseLong(key.toString())), value);
}
}
</object></object>

At reducer side setup a counter for line number in setup method.

static enum Counter{COUNT} protected void setup( Reducer\<LongWritable, Text, NullWritable, Text\>.Context context) throws IOException, InterruptedException { context.getCounter(Counter.COUNT).setValue(1); super.setup(context); }

In reduce method for each line increment the counter, write the current counter value as key and intetarator’s rows as value. Or club the counter with iterator rows and emit as value.

class IndexReucer extends Reducer<longwritable text nullwritable>{
static enum Counter{COUNT}
static final char SEPARATOR=0x001;
@Override
protected void setup(
Reducer<longwritable text nullwritable>.Context context)
throws IOException, InterruptedException {
context.getCounter(Counter.COUNT).setValue(1);
super.setup(context);
}
@Override
protected void reduce(LongWritable key, Iterable<text> value,Reducer<longwritable text nullwritable>.Context context)
throws IOException, InterruptedException {
Long counter = context.getCounter(Counter.COUNT).getValue();
context.write(NullWritable.get(), new Text(String.format("%d%c%s", counter,SEPARATOR,value.iterator().next())));
context.getCounter(Counter.COUNT).increment(1);;
}
}
</longwritable></text></longwritable></longwritable>

finally setup your driver

public int run(String[] args) throws Exception { Configuration conf =getConf(); Job job = new Job(conf, "Indexing Job"); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(IndexMapper.class); job.setReducerClass(IndexReucer.class); job.setNumReduceTasks(1); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path output = new Path(args[1]); deleteDirectory(output); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, output); job.setJarByClass(IndexDriver.class); return job.waitForCompletion(true)?0:1; }

You can download the entire project from github https://github.com/rahul86s/techsquids.git