Data mining on Wikipedia data set
User Case:
Find top titles from Wikipedia
Download the hourly dataset from Wikipedia dumps which is available as a .gz file with approx 100 mb size
Dataset structure:
Data is organized into four columns:
Steps:
- Start by filtering all the records with a particular project name like "en"
- Compute the sum of all the requests received for a particular page
- Arrange them in descending order
Approach:
Write 2 map reduce jobs.
1st job:
Mapper: Filter all english projects
Reducer: Calculate the sum of requests for a particular page
2nd job(secondary sorting):
Secondary sort on values and arrange them in descending order
1st Map Reduce Job:
filterMapper.java
package com.mining; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FilterMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private String projectName; public void map(LongWritable offset, Text values, Context context) throws IOException, InterruptedException { String[] splits = values.toString().split(" "); if (splits.length == 4) { if (splits[0].equals("en")) { context.write(new Text(splits[1]), new LongWritable(Long.parseLong(splits[2]))); } } } }
Lets take a sample record from the dataset:
"en commemorative_coins 9 1332897"
splits is an array of string containing each record split on space to obtain an array of length 4
Since it is not a clean dataset we check if each splits is of length 4
In the next step we filter all the records with "en" project name
We write the splits[1] i.e the page title as key and splits[2] i.e page count as the value
From the above sample:
splits[0] = en
splits[1] = %E2%82%AC2_commemorative_coins
splits[2] = 9
splits[3] = 133289
Using the context object we set "%E2%82%AC2_commemorative_coins" as key and "9" as value.
filterReducer.java
package com.mining;
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FilterReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { sum = sum + val.get(); } context.write(key, new LongWritable(sum)); } }
SortComparator.java
package com.mining; import org.apache.hadoop.io.LongWritable; public class sortComparator extends WritableComparator { protected sortComparator() { super(LongWritable.class, true); // TODO Auto-generated constructor stub } @Override public int compare(WritableComparable o1, WritableComparable o2) { LongWritable k1 = (LongWritable) o1; LongWritable k2 = (LongWritable) o2; int cmp = k1.compareTo(k2); return -1 * cmp; } }
SortMapper.java
package com.mining; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class sortMapper extends Mapper<LongWritable, Text, LongWritable, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().trim().split("\\s+"); context.write(new LongWritable(Long.parseLong(splits[1])), new Text( splits[0])); } }
SortReducer.javapackage com.mining; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class sortReducer extends Reducer<LongWritable, Text, Text, LongWritable> { @Override public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { context.write(val, key); } } }WikiDriver.java
package com.mining; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WikiDriver extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new WikiDriver(), args); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job filter = new Job(getConf(), "Wiki filter"); filter.setJarByClass(WikiDriver.class); filter.setMapperClass(FilterMapper.class); filter.setReducerClass(FilterReducer.class); filter.setMapOutputKeyClass(Text.class); filter.setMapOutputValueClass(LongWritable.class); filter.setOutputKeyClass(Text.class); filter.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(filter, new Path(args[0])); FileOutputFormat.setOutputPath(filter, new Path(args[1])); filter.waitForCompletion(true); Job sort = new Job(getConf(), "Wiki Sorting"); sort.setJarByClass(WikiDriver.class); sort.setMapperClass(sortMapper.class); sort.setReducerClass(sortReducer.class); sort.setSortComparatorClass(sortComparator.class); sort.setMapOutputKeyClass(LongWritable.class); sort.setMapOutputValueClass(Text.class); sort.setOutputKeyClass(Text.class); sort.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(sort, new Path(args[1])); FileOutputFormat.setOutputPath(sort, new Path(args[2])); sort.waitForCompletion(true); return 0; } }