105k views
4 votes
Modify the WordCount program so it outputs the wordcount for each distinct word in each file. So the output of this DocWordCount program should be of the form ‘word#####filename count’, where ‘#####’ serves as a delimiter between word and filename and tab serves as a delimiter between filename and count. Submit your source code in a file named DocWordCount.java.

Explanation: Consider two simple files file1.txt and file2.txt. $ echo "Hadoop is yellow Hadoop" > file1.txt $ echo "yellow Hadoop is an elephant" > file2.txt Running ‘DocWordCount.java’ on these two files will give an output similar to that below, where ##### is a delimiter.

Output of DocWordCount.java

yellow#####file2.txt 1

Hadoop#####file2.txt 1

is#####file2.txt 1

elephant#####file2.txt 1

yellow#####file1.txt 1

Hadoop#####file1.txt 2

is#####file1.txt 1

an#####file2.txt 1

Initial code that needs to be modified:

package org.myorg;

import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;


public class WordCount extends Configured implements Tool {

private static final Logger LOG = Logger .getLogger( WordCount.class);

public static void main( String[] args) throws Exception {
int res = ToolRunner .run( new WordCount(), args);
System .exit(res);
}

public int run( String[] args) throws Exception {
Job job = Job .getInstance(getConf(), " wordcount ");
job.setJarByClass( this .getClass());

FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[ 1]));
job.setMapperClass( Map .class);
job.setReducerClass( Reduce .class);
job.setOutputKeyClass( Text .class);
job.setOutputValueClass( IntWritable .class);

return job.waitForCompletion( true) ? 0 : 1;
}

public static class Map extends Mapper {
private final static IntWritable one = new IntWritable( 1);
private Text word = new Text();

private static final Pattern WORD_BOUNDARY = Pattern .compile("\\s*\\b\\s*");

public void map( LongWritable offset, Text lineText, Context context)
throws IOException, InterruptedException {

String line = lineText.toString();
Text currentWord = new Text();

for ( String word : WORD_BOUNDARY .split(line)) {
if (word.isEmpty()) {
continue;
}
currentWord = new Text(word);
context.write(currentWord,one);
}
}
}

public static class Reduce extends Reducer {
@Override
public void reduce( Text word, Iterable counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for ( IntWritable count : counts) {
sum += count.get();
}
context.write(word, new IntWritable(sum));
}
}
}

User WoJ
by
4.0k points

1 Answer

5 votes

Answer and Explanation:

package PackageDemo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static void main(String [] args) throws Exception

{

Configuration c=new Configuration();

String[] files=new GenericOptionsParser(c,args).getRemainingArgs();

Path input=new Path(files[0]);

Path output=new Path(files[1]);

Job j=new Job(c,"wordcount");

j.setJarByClass(WordCount.class);

j.setMapperClass(MapForWordCount.class);

j.setReducerClass(ReduceForWordCount.class);

j.setOutputKeyClass(Text.class);

j.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(j, input);

FileOutputFormat.setOutputPath(j, output);

System.exit(j.waitForCompletion(true)?0:1);

}

public static class MapForWordCount extends Mapper<LongWritable, Text, Text, IntWritable>{

public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException

{

String line = value.toString();

String[] words=line.split(",");

for(String word: words )

{

Text outputKey = new Text(word.toUpperCase().trim());

IntWritable outputValue = new IntWritable(1);

con.write(outputKey, outputValue);

}

}

}

public static class ReduceForWordCount extends Reducer<Text, IntWritable, Text, IntWritable>

{

public void reduce(Text word, Iterable<IntWritable> values, Context con) throws IOException, InterruptedException

{

int sum = 0;

for(IntWritable value : values)

{

sum += value.get();

}

con.write(word, new IntWritable(sum));

}

}

}

User Mahesh Jamdade
by
4.9k points