Indexing Using Map Reduce

Indexing Using Map Reduce

Generate indexing using map reduce can’t be done in distributed mode of  mapreduce as each line number is sequential and unique. To achieve this I tried to generate index using single reducer job.

Here are the steps you can follow to create the job-

create a mapper that emits the line offset as key and row as value.

1
2
3
4
5
6
7
class IndexMapper extends Mapper<Object , Text, LongWritable, Text>{
@Override
protected void map(Object key, Text value, Mapper<Object, Text, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
context.write(new LongWritable(Long.parseLong(key.toString())), value);
}
}

At reducer side setup a counter for line number in setup method.

1
2
3
4
static enum Counter{COUNT} protected void setup( Reducer&lt;LongWritable, Text, NullWritable, Text&gt;.Context context) throws IOException, InterruptedException {
context.getCounter(Counter.COUNT).setValue(1);
super.setup(context);
}

In reduce method for each line increment the counter, write the current counter value as key and intetarator’s rows as value. Or club the counter with iterator rows and emit as value.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class IndexReucer extends Reducer<LongWritable, Text, NullWritable, Text>{
static enum Counter{COUNT}
static final char SEPARATOR=0x001;
@Override
protected void setup(
Reducer<LongWritable, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
context.getCounter(Counter.COUNT).setValue(1);
super.setup(context);
}
@Override
protected void reduce(LongWritable key, Iterable<Text> value,Reducer<LongWritable, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
Long counter = context.getCounter(Counter.COUNT).getValue();
context.write(NullWritable.get(), new Text(String.format("%d%c%s", counter,SEPARATOR,value.iterator().next())));
context.getCounter(Counter.COUNT).increment(1);;
}
}

finally setup your driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public int run(String[] args) throws Exception {
 
Configuration conf =getConf();
 
Job job = new Job(conf, "Indexing Job");
 
job.setMapOutputKeyClass(LongWritable.class);
 
job.setMapOutputValueClass(Text.class);
 
job.setMapperClass(IndexMapper.class);
 
job.setReducerClass(IndexReucer.class);
 
job.setNumReduceTasks(1);
 
job.setInputFormatClass(TextInputFormat.class);
 
job.setOutputFormatClass(TextOutputFormat.class);
 
Path output = new Path(args[1]);
 
deleteDirectory(output);
 
FileInputFormat.addInputPath(job, new Path(args[0]));
 
FileOutputFormat.setOutputPath(job, output);
 
job.setJarByClass(IndexDriver.class);
 
return job.waitForCompletion(true)?0:1;
 
}

You can download the entire project from github https://github.com/rahul86s/techsquids.git

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload CAPTCHA.