Categories

  • Big Data

Tags

  • hadoop
  • java

Matrix operations in Mapreduce

Matrices Sum Map Reducer
Matrix sum is the operation of adding matrices by adding corresponding entries together.
Entrywise sum

The sum of two m × n (pronounced “m by n”) matrices A and B, denoted by A + B, is again an m × n matrix computed by adding corresponding elements
matraix sum

Entrywise sum implementation using Map-reduce-

I have two matrices in separate files ( check blog indexingfor generating index), each file contains same size (m*n) matrix. matrix row index(m) and values separated by ^A [0x001] and values(n) are separated by (,).

  1. Mapper emits the row index as key and entire row as a value. If you have different types of matrices then create separate mappers to process/ filters the matrix.
class MatrixSumMapper extends Mapper\<LongWritable, Text, LongWritable, Text\> { 
  String fName = null;
  char keySeprator;
  @Override protected void setup( Mapper\<LongWritable, Text, LongWritable, Text\>.Context context) throws IOException, InterruptedException { fName = ((FileSplit)context.getInputSplit()).getPath().getName();
  keySeprator=(char)context.getConfiguration().getInt("matrix.key.separator",0x001);
  } @Override protected void map(LongWritable key, Text value, Mapper\<LongWritable, Text, LongWritable, Text\>.Context context) throws IOException, InterruptedException { LongWritable keyM = new LongWritable(Long.parseLong(value.toString().split(String.format("%c",keySeprator))[0]));
  Text val = new Text(value.toString().split(String.format("%c",keySeprator))[1]);
  context.write(keyM, val);
} }
  1. Reducer gets the row key as (m). Next split each value to generate (n) then add the values index and position wise.
 
  1. Driver code:
    public class Driver extends Configured implements Tool { private static Logger logger = Logger.getLogger(Driver.class);
    private boolean deleteDirectory(Path path) throws IOException { FileSystem fs = FileSystem.get(getConf());
    return fs.delete(path, true);
    } public int run(String[] args) throws Exception { logger.info("job Matrix Sum Driver Begin");
    Configuration conf = getConf();
    conf.setInt("matrix.key.separator", 0x001);
    conf.set("matrix.element.separator",",");
    Job job = new Job(conf, "Matrix Sum");
    job.setJarByClass(Driver.class);
    Path input1 = new Path(args[0]);
    Path input2 = new Path(args[1]);
    Path output = new Path(args[2]);
    deleteDirectory(output);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setMapperClass(MatrixSumMapper.class);
    job.setReducerClass(MatrixSumReducer.class);
    job.setNumReduceTasks(1);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    logger.info("deleting output directory: " + deleteDirectory(output));
    FileInputFormat.setInputPaths(job, input1, input2);
    FileOutputFormat.setOutputPath(job, output);
    return job.waitForCompletion(true) ? 0 : 1;
    } public static void main(String[] args) throws Exception { for (String str : args) System.out.println(str);
    Configuration config = new Configuration();
    System.exit(ToolRunner.run(config, new Driver(), args));
    } }
    

check out the complete source code from techsquids