Your task is to compute the page visit counts of Wikipedia p
Your task is to compute the page visit counts of Wikipedia pages over a period of time. We have provided an input le input:csv that contains page titles and visit counts for each page. Each row of the file has page title, page visit count and content size information. These values are separated by space. There can be multiple rows for a single page. Following is a fragment of sample input file:
Ace_of_Swords 2 17276
Law_school 29 539143
Ace_of_Swords 3 17705
You should write a program that computes the total page visits for each page. For example, the total page visits for Ace of Swords will be 5. The output file should contain page title and the total count separated by a single space. Following is the output for the sample input:
Law_school 29
Ace_of_Swords 5
Note that the rows of the output file do not need to be in the same order as the input file.
Implementation
You are given a skeleton code PageCount.java and you need to complete the map and reduce functions in this file.
Mapper
The map function is called once for each key/value pair in the input split. The value argument of map function is of type Text and contains the text of the input file. You can call toString() on it to get a String object of the value 2. You should use value to produce the needed key-value pairs. After that you can call context.write(new_key, new_value) to send the key values to the reducer.
Reducer
The reduce function is called once for each key received from the Mapper. You can iterate over the values of that key to do your computations. In the end of the method you can call context.write(another_key, another_value) to write the results of reduce to an output file.
CODE given:
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PageCount {
public static class TokenizerMapper
extends Mapper{
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
}
}
public static class IntSumReducer
extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, \"page count\");
job.setJarByClass(PageCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Solution
A)
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.mapper;
public class PageCount {
public static class TokenizerMapper
extends Mapper{
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
}
}
public static class IntSumReducer
extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, \"page count\");
job.setJarByClass(PageCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0], new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

