Thursday, 15 January 2015

Data mining on Wikipedia data set

Data mining on Wikipedia data set

User Case:
Find top titles from Wikipedia

Download the hourly dataset from Wikipedia dumps which is available as a .gz file with approx 100 mb size

Dataset structure:
Data is organized into four columns:

  • First column is the project name i.e "en" for English "fr" for French etc
  • Second column is the title of the page retrieved
  • Third column is the number of requests
  • Fourth column is the size of the content returned


  • Steps: 
    1. Start by filtering all the records with a particular project name like "en"
    2. Compute the sum of all the requests received for a particular page
    3. Arrange them in descending order
    Approach:
    Write 2 map reduce jobs.

    1st job:
    Mapper: Filter all english projects
    Reducer: Calculate the sum of requests for a particular page

    2nd job(secondary sorting):
    Secondary sort on values and arrange them in descending order

    1st Map Reduce Job:
    filterMapper.java

    package com.mining;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class FilterMapper extends
      Mapper<LongWritable, Text, Text, LongWritable> {
    
     private String projectName;
    
     public void map(LongWritable offset, Text values, Context context)
       throws IOException, InterruptedException {
      String[] splits = values.toString().split(" ");
      if (splits.length == 4) {
       if (splits[0].equals("en")) {
        context.write(new Text(splits[1]),
          new LongWritable(Long.parseLong(splits[2])));
    
       }
      }
     }
    
    }
    Lets take a sample record from the dataset:
    "en commemorative_coins 9 1332897"
    splits is an array of string containing each record split on space to obtain an array of length 4 
    Since it is not a clean dataset we check if each splits is of length 4
    In the next step we filter all the records with "en" project name
    We write the splits[1] i.e the page title as key and splits[2] i.e page count as the value
    
    
    
    
    From the above sample:
    splits[0] = en
    splits[1] = %E2%82%AC2_commemorative_coins
    splits[2] = 9
    splits[3] = 133289
    
    
    Using the context object we set "%E2%82%AC2_commemorative_coins" as key and "9" as value.
    
    
    filterReducer.java
    package com.mining;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FilterReducer extends
      Reducer<Text, LongWritable, Text, LongWritable> {
    
     @Override
     public void reduce(Text key, Iterable<LongWritable> values, Context context)
       throws IOException, InterruptedException {
      long sum = 0;
      for (LongWritable val : values) {
       sum = sum + val.get();
      }
      context.write(key, new LongWritable(sum));
     }
    }
    
    
    
    
    
    
    
    
    
    SortComparator.java
    package com.mining;
    
    import org.apache.hadoop.io.LongWritable;
    
    public class sortComparator extends WritableComparator {
    
     protected sortComparator() {
      super(LongWritable.class, true);
      // TODO Auto-generated constructor stub
     }
    
     @Override
     public int compare(WritableComparable o1, WritableComparable o2) {
      LongWritable k1 = (LongWritable) o1;
      LongWritable k2 = (LongWritable) o2;
      int cmp = k1.compareTo(k2);
      return -1 * cmp;
     }
    
    }
    
    
    
    SortMapper.java
    package com.mining;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    
    public class sortMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
     public void map(LongWritable key, Text value, Context context)
       throws IOException, InterruptedException {
    
      String[] splits = value.toString().trim().split("\\s+");
      context.write(new LongWritable(Long.parseLong(splits[1])), new Text(
        splits[0]));
     }
    
    }
    
    
    
    
    
    SortReducer.java
    package com.mining;
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    
    public class sortReducer extends
      Reducer<LongWritable, Text, Text, LongWritable> {
    
     @Override
     public void reduce(LongWritable key, Iterable<Text> values, Context context)
       throws IOException, InterruptedException {
    
      for (Text val : values) {
       context.write(val, key);
      }
     }
    }
    
    
    
    
    
    WikiDriver.java
    package com.mining;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class WikiDriver extends Configured implements Tool {
    
     public static void main(String[] args) throws Exception {
      ToolRunner.run(new WikiDriver(), args);
     }
    
     @Override
     public int run(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job filter = new Job(getConf(), "Wiki filter");
      filter.setJarByClass(WikiDriver.class);
    
      filter.setMapperClass(FilterMapper.class);
      filter.setReducerClass(FilterReducer.class);
    
      filter.setMapOutputKeyClass(Text.class);
      filter.setMapOutputValueClass(LongWritable.class);
    
      filter.setOutputKeyClass(Text.class);
      filter.setOutputValueClass(LongWritable.class);
    
      FileInputFormat.addInputPath(filter, new Path(args[0]));
      FileOutputFormat.setOutputPath(filter, new Path(args[1]));
    
      filter.waitForCompletion(true);
    
      Job sort = new Job(getConf(), "Wiki Sorting");
      sort.setJarByClass(WikiDriver.class);
      sort.setMapperClass(sortMapper.class);
      sort.setReducerClass(sortReducer.class);
      sort.setSortComparatorClass(sortComparator.class);
    
      sort.setMapOutputKeyClass(LongWritable.class);
      sort.setMapOutputValueClass(Text.class);
    
      sort.setOutputKeyClass(Text.class);
      sort.setOutputValueClass(LongWritable.class);
    
      FileInputFormat.addInputPath(sort, new Path(args[1]));
      FileOutputFormat.setOutputPath(sort, new Path(args[2]));
    
      sort.waitForCompletion(true);
      return 0;
     }
    
    }