Testing and debugging multi threaded programs is hard. Now take the same programs and massively distribute them across multiple JVMs deployed on a cluster of machines and the complexity goes off the roof. One way to overcome this complexity is to do testing in isolation and catch as many bugs as possible locally. MRUnit is a testing framework that lets you test and debug Map Reduce jobs in isolation without spinning up a Hadoop cluster. In this  blog post we will cover various features of MRUnit by walking through a simple MapReduce job.

Lets say we want to take the input below and create an inverted index using MapReduce.

Input
www.kohls.com,clothes,shoes,beauty,toys
www.amazon.com,books,music,toys,ebooks,movies,computers
www.ebay.com,auctions,cars,computers,books,antiques
www.macys.com,shoes,clothes,toys,jeans,sweaters
www.kroger.com,groceries

Expected output

antiques      www.ebay.com
auctions      www.ebay.com
beauty        www.kohls.com
books         www.ebay.com,www.amazon.com
cars          www.ebay.com
clothes       www.kohls.com,www.macys.com
computers     www.amazon.com,www.ebay.com
ebooks        www.amazon.com
jeans         www.macys.com
movies        www.amazon.com
music         www.amazon.com
shoes         www.kohls.com,www.macys.com
sweaters      www.macys.com
toys          www.macys.com,www.amazon.com,www.kohls.com
groceries     www.kroger.com
below are the Mapper and Reducer that do the transformation
public class InvertedIndexMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public static final int RETAIlER_INDEX = 0;
 
 @Override
 public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
  final String[] record = StringUtils.split(text.toString(), ",");
  final String retailer = record[RETAIlER_INDEX];
  for (int i = 1; i < record.length; i++) {
   final String keyword = record[i];
   outputCollector.collect(new Text(keyword), new Text(retailer));
   }
  }
 }
public class InvertedIndexReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
@Override
 public void reduce(Text text, Iterator<Text> textIterator, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
  final String retailers = StringUtils.join(textIterator, ',');
  outputCollector.collect(text, new Text(retailers));
  }
 }

Implementation details are not really important but basically Mapper gets a line at a time, splits the line and emits key value pairs where Key is a category of product and value is the website which is selling the product. For example line retailer,category1,category2 will be emitted as (category1,retailer) and (category2,retailer). Reducer gets a key and a list of values, transforms the list of values to a comma delimited String and emits the key and value out.

Now lets use MRUnit to write various tests for this Job. Three key classes in MRUnits are MapDriver for Mapper Testing, ReduceDriver for Reducer Testing and MapReduceDriver for end to end MapReduce Job testing. This is how we will setup the Test Class.
public class InvertedIndexJobTest {

 private MapDriver<LongWritable, Text, Text, Text> mapDriver;
 private ReduceDriver<Text, Text, Text, Text> reduceDriver;
 private MapReduceDriver<LongWritable, Text, Text, Text, Text, Text> mapReduceDriver;

 @Before
 public void setUp() throws Exception {

 final InvertedIndexMapper mapper = new InvertedIndexMapper();
 final InvertedIndexReducer reducer = new InvertedIndexReducer();

 mapDriver = MapDriver.newMapDriver(mapper);
 reduceDriver = ReduceDriver.newReduceDriver(reducer);
 mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
 }
}
MRUnit supports two styles of testings. First style is to tell the framework both input and output values and let the framework do the assertions, second is the more traditional approach where you do the assertion yourself. Lets write a test using the first approach.
@Test
 public void testMapperWithSingleKeyAndValue() throws Exception {
 final LongWritable inputKey = new LongWritable(0);
 final Text inputValue = new Text("www.kroger.com,groceries");
 
 final Text outputKey = new Text("groceries");
 final Text outputValue = new Text("www.kroger.com");

 mapDriver.withInput(inputKey, inputValue);
 mapDriver.withOutput(outputKey, outputValue);
 mapDriver.runTest();

 }
In the test above we tell the framework both input and output Key and Value pairs and the framework does the assertion for us. This test can be written in a more traditional way as follow
@Test
 public void testMapperWithSingleKeyAndValueWithAssertion() throws Exception {
 final LongWritable inputKey = new LongWritable(0);
 final Text inputValue = new Text("www.kroger.com,groceries");
 final Text outputKey = new Text("groceries");
 final Text outputValue = new Text("www.kroger.com");

 mapDriver.withInput(inputKey, inputValue);
 final List<Pair<Text, Text>> result = mapDriver.run();

 assertThat(result)
 .isNotNull()
 .hasSize(1)
 .containsExactly(new Pair<Text, Text>(outputKey, outputValue));
}
Sometimes Mapper emits multiple Key Value pairs for a single input. MRUnit provides a fluent API to support this use case. Here is an example
@Test
 public void testMapperWithSingleInputAndMultipleOutput() throws Exception {
 final LongWritable key = new LongWritable(0);
mapDriver.withInput(key, new Text("www.amazon.com,books,music,toys,ebooks,movies,computers"));
 final List<Pair<Text, Text>> result = mapDriver.run();
 
 final Pair<Text, Text> books = new Pair<Text, Text>(new Text("books"), new Text("www.amazon.com"));
 final Pair<Text, Text> toys = new Pair<Text, Text>(new Text("toys"), new Text("www.amazon.com"));

assertThat(result)
 .isNotNull()
 .hasSize(6)
 .contains(books, toys);
}
You write the test for the reduce exactly the same way.
@Test
 public void testReducer() throws Exception {
final Text inputKey = new Text("books");
final ImmutableList<Text> inputValue = ImmutableList.of(new Text("www.amazon.com"), new Text("www.ebay.com"));

reduceDriver.withInput(inputKey,inputValue);
final List<Pair<Text, Text>> result = reduceDriver.run();
final Pair<Text, Text> pair2 = new Pair<Text, Text>(inputKey, new Text("www.amazon.com,www.ebay.com"));

 assertThat(result)
 .isNotNull()
 .hasSize(1)
 .containsExactly(pair2);
 }

Finally you can use MapReduceDriver to test your Mapper, Combiner and Reducer together as a single job. You can also pass multiple key value pairs as input to your job. Test below demonstrate MapReduceDriver in action
@Test
 public void testMapReduce() throws Exception {
 mapReduceDriver.withInput(new LongWritable(0), new Text("www.kohls.com,clothes,shoes,beauty,toys"));
 mapReduceDriver.withInput(new LongWritable(1), new Text("www.macys.com,shoes,clothes,toys,jeans,sweaters"));

final List<Pair<Text, Text>> result = mapReduceDriver.run();

final Pair clothes = new Pair<Text, Text>(new Text("clothes"), new Text("www.kohls.com,www.macys.com"));
final Pair jeans = new Pair<Text, Text>(new Text("jeans"), new Text("www.macys.com"));

assertThat(result)
 .isNotNull()
 .hasSize(6)
 .contains(clothes, jeans);
 }

This covers the basic features of MRUnit. It contains tons of other feature that we have not cover here such as testing counters, passing a mock configuration etc but my favorite feature is that it allows me to put a break point in my mapper and reducer and debug through the code if i need to. Bottom line is that MRUnit is an invaluable tool for anyone working with Map Reduce and Hadoop and it certainly takes a lot of pain out of testing MapReduce jobs.

UPDATED 04/03/2013
Please note that this is using MapReduce 1 APIs so you will have to import org.apache.hadoop.mrunit.MapDriver class in your test to make the test run.
6

View comments

  1. As you sink deeper into the big data world and your collection of map reduce jobs grow you realize that you need a work scheduler to chain all your map reduce jobs together. At the time of this writing there are two leading job scheduler in hadoop world. Apache Oozie, which comes bundle with pretty much every hadoop distribution (except Pivotal HD) and Linkedin's Azkaban. This blog post is not about comparing Oozie and Azkaban so I will cut to the chase and tell you upfront that in my opinion Azkaban is much better than Oozie in pretty much every aspect. If you are starting a new cluster, I would recommend seriously taking a look at Azkaban but if you are stuck with a distribution that comes with Oozie or if your company only uses Oracle (Azkaban uses MySQL for storing workflows) and would not allow MySql in production (even though MySql is owned by Oracle) than your best bet is to learn to love Apache Oozie.

    Apache Oozie like bunch of other first generation Hadoop component uses ungodly amount of XML for configuration. There is a project from Cloudera called Hue to ease some of the pain in creating Oozie workflows but if you cannot use a UI for creating workflows for version control and various other reasons you may need a different solution to avoid XML hell. In this blog post we will create a simple Oozie workflow without using a single line of XML and compare it to traditional approach of creating flows in Oozie.

    The example below is taken from 'Introduction to Oozie' by Boris Lublinsky, Michael Segel. Lets say that you have two Map/Reduce jobs - one that is doing initial ingestion of the data and the second one merging data of the given type. The actual ingestion needs to execute initial ingestion and then merge data for two of the types - Lidar and Multicam. To automate this process we have created a simple Oozie Workflow (See the article by Boris Lublinsky, Michael Segel for the workflow in all its XML glory). Lets take a stab at creating the same workflow without using any XML. We will be using Gradle and gradle-oozie-plugin to create our workflow. This is how our flow will look like using Groovy dsl
    oozie {
    
    
        def common_props = [
                jobTracker: '${jobTracker}',
                namenode: '${nameNode}',
                configuration: ["mapred.job.queue.name": "default"]
        ]
    
    
        def ingestor = [
                name: "ingestor",
                type: "java",
                mainClass: "com.navteq.assetmgmt.MapReduce.ips.IPSLoader",
                ok: "merging",
                error: "fail",
                args: ['${driveID}'],
    
        ]
    
        def merging = [
                name: "merging",
                type: "fork",
                paths: [
                        "mergeLidar",
                        "mergeSignage"
                ]
        ]
    
        def mergeLidar = [
                name: "mergeLidar",
                type: "java",
                mainClass: "com.navteq.assetmgmt.hdfs.merge.MergerLoader",
                ok: "completed",
                error: "fail",
                args: ['-drive',
                        '${driveID}',
                        '-type',
                        'Lidar',
                        '-chunk',
                        '${lidarChunk}'
                ],
                javaOpts: "-Xmx2048m"
        ]
    
    
        def mergeSignage = [
                name: "mergeSignage",
                type: "java",
                mainClass: "com.navteq.assetmgmt.hdfs.merge.MergerLoader",
                ok: "completed",
                error: "fail",
                args: ['-drive',
                        '${driveID}',
                        '-type',
                        'Lidar',
                        '-chunk',
                        '${signageChunk}'
                ],
                javaOpts: "-Xmx2048m"
        ]
    
        def completed = [
                name: "completed",
                type: "join",
                to: "end"
        ]
    
        def fail = [
                name: "fail",
                type: "kill",
                message: "Java failed, error message[\${wf:errorMessage(wf:lastErrorNode())}]"
        ]
    
    
        actions = [
                ingestor,
                merging,
                mergeLidar,
                mergeSignage,
                completed,
                fail]
    
        common = common_props
        start = "ingestor"
        end = "end"
        name = 'oozie_flow'
        namespace = 'uri:oozie:workflow:0.1'
        outputDir = file("$projectDir/workflow2")
    }
    

    once we have the flow created, we can generated the XML flow by running 'gradle oozieWorkflow'.  This approach is a lot cleaner as Groovy not only removes a lot of boilerplate XML but also because we can define all the common properties in one location instead of repeating them in all nodes.  Here is the the complete build.gradle file and generated workflow. Hopefully this approach will reduce some pain in working with Apache Oozie.
    0

    Add a comment

  2. Imagine you run a popular e-commerce site focused on hip and trendy clothing for women and you got a whiz bang recommendation engine that recommends products based on customer's previous shopping history. While the recommendation engine is doing a decent job recommending to regular customers, it suffers from cold start problem when new or anonymous users browse the site catalog. In this blog we will attempt to solve the cold start problem by recommending similar products to the product customer is viewing by leveraging Scalding and a few machine learning algorithms. Scalding is a scala based abstraction over raw map reduce from twitter. See this slide deck from LinkedIn/Ebay for an awesome introduction to Scalding.

    Before we start cranking code lets get some business rules out of the way.
    • Recommended products must be in the same category. For example if someone is viewing a waffle maker and we recommended him a towel, it would be considered a bad recommendation.
    • Recommended products must be in the same sub category as well. Our imaginary site sells twenty different types of jeans, if a customer is looking at a Slimming Bootcut, Rinse Wash jeans we should try to recommend other bootcut jeans.
    • Recommended products must be in the same price range. For example if someone is looking at a $20 dollar shoes, we won't recommend her 100$ shoes.
    For simplicity sake we will assume that our product catalog is available in CSV format and our products have five attributes namely DEPARTMENT, SUB_DEPARTMENT, PRODUCT, DESCRIPTION, REG_PRICE, SALE_PRICE. A sample of product catalog is available here. Lets start by reading the product catalog in a scalding job and doing a self join on DEPARTMENT, SUB_DEPARTMENT. This partitioning of catalog will take care of the first business rule of recommending products only from the same category.
     
    
        /* * Schema of our product catalog */ 
        val inputSchema = ('DEPARTMENT, 'SUB_DEPARTMENT, 'PRODUCT, 'DESCRIPTION, 'REG_PRICE, 'SALE_PRICE)
        
        /* Duplicate schema used for self joining */ 
        val renameSchema = ('DEPARTMENT1, 'SUB_DEPARTMENT1, 'PRODUCT1, 'DESCRIPTION1, 'REG_PRICE1, 'SALE_PRICE1)
        
        /* Read in the catalog */  
        val productMatrix = Csv("input", separator = ",", fields = inputSchema, quote = "\"").read
        
        /* Read in the catalog a second time for joining */ 
        val productMatrixDuplicate = Csv("input", separator = ",", fields = inputSchema, quote = "\"").read.rename(inputSchema -> renameSchema)
        
        /** * Do a self join based on DEPARTMENT and SUB_DEPARTMENT */ 
        productMatrix.joinWithSmaller(('DEPARTMENT,'SUB_DEPARTMENT) -> ('DEPARTMENT1,'SUB_DEPARTMENT1), productMatrixDuplicate)
         
    
    
    Once we have read in the catalog, we need to figure out how to define similarity between products. From our business rules we know that a product is similar if its in the same price range and also if its in the same category and sub category. Additionally we would like to recommend products of similar styles even within a sub category.  Lets take a look at a few of the records to figure out which columns are useful for similarity calculation.
         
        "DEPARTMENT","SUB_DEPARTMENT","PRODUCT","DESCRIPTION","REG_PRICE","SALE_PRICE"
        "women","shoes","Marc Fisher Shoes","Pacca Pumps shoes","75.00","64.99"
        "women","shoes","Bandolino Shoes","Bayard Wedge Sandals shoes","59.00","49.99"
        "women","shoes","Nine West Shoes","Rocha Platform Pumps shoes","79.00","59.99"
        "women","shoes","MICHAEL Michael Kors Shoes","Fulton Moc Flats shoes","110.00","79.99"
         
    
    Looking at the data we can use REG_PRICE and SALE_PRICE to calculate similarity based on price range. We will calculate Tanimoto Distance between products using their REG_PRICE and SALE_PRICE. Discussion on Tanimoto Distance is out of scope but in practical terms when two items have same reg price and sale price, the result is 0.0. When they have nothing in common, it’s 1.0.

    Looking at the data again we can see that the style information is embedded inside DESCRIPTION column. Unfortunately we cannot use Tanimoto Distance for description based similarity without converting and normalizing description into numerical values. Instead we will calculate NGram distance between product description. A good description of NGram distance is provided here pdf, but for our application NGram distance will return a value between 1 and 0. A value of 1 means the specified strings are identical and 0 means the string are maximally different. Luckily both Tanimoto Distance and NGram Distance calculation are provided in Apache Mahout and Lucene respectively, so we won't have to hand roll our own implementation.

    Once we we have calculated Tanimoto and NGram distance, we can combine two distances by calculating how far they are from a perfect match and adding the difference together. An easy way to make sure that we have calculated every thing correctly is to calculate the the distance between the same product and making sure that the distance is 0.0
         
        /** * output Schema */ 
        val outputSChema = ('PRODUCT, 'PRODUCT1, 'Distance)
        
        /** * Map over the grouped fields and calculate distance  */
        .mapTo('* -> outputSChema) {
        in: (String, String, String, String, Double, Double, String, String, String, String, Double, Double) => calculateDistance(in)
        }
        
        /**
         * Calculates Tanimoto and NGram distance based on different product features and combine them together. 
         * @param in * @return 
         */ 
        def calculateDistance(in: (String, String, String, String, Double, Double, String, String, String, String, Double, Double)) = {
        val (_, _, p1_product, p1_description, p1_regPrice, p1_salePrice, _, _, p2_product, p2_description, p2_regPrice, p2_salePrice) = in
        
        val ngramDistance = 1 - MathUtils.round(ngram.getDistance(p1_description, p2_description).toDouble, SCALE)
        
        val p1_vector = new DenseVector(Array(p1_regPrice, p1_salePrice)) 
        val p2_vector = new DenseVector(Array(p2_regPrice, p2_salePrice)) 
        val tanimotoDistance = MathUtils.round(tanimotoDistanceMeasure.distance(p1_vector, p2_vector), SCALE) 
        
        val distance = MathUtils.round((tanimotoDistance + ngramDistance), SCALE) val result = (p1_product, p2_product, distance) result }
         
    
    In the end we can sort the result by distance and take the top K similar products
         
        .groupBy('PRODUCT) { g=> 
         g.sortBy('Distance).take(3) }
        
         .write(Csv("output", separator = ",", fields = outputSChema))
     
    
    Complete recommendation list based on the sample product catalog is here but lets look at the recommendation for one product and verify that all our business goals are met.
          
        //products
        "women","shoes","Rampage Shoes","Weatherly Booties shoes","59.99","35.99"
        "women","shoes","Marc Fisher Shoes","Pacca Pumps shoes","75.00","64.99"
        "women","shoes","Bandolino Shoes","Bayard Wedge Sandals shoes","59.00","49.99"
        "women","shoes","Nine West Shoes","Rocha Platform Pumps shoes","79.00","59.99"
        "women","shoes","MICHAEL Michael Kors Shoes","Fulton Moc Flats shoes","110.00","79.99"
    
        //top 3 recommendations for "Marc Fisher Shoes Pacca Pumps shoes"
         Nine West Shoes,0.48493
         Bandolino Shoes,0.73206
         MICHAEL Michael Kors Shoes,0.75641
        
    
    As we can see that the top recommendation for "Marc Fisher Shoes Pacca Pumps" is "Nine West Shoes Rocha Platform Pumps". These two shoes are not only within similar price range but are also of same shoe style. Based on our result, it seems like we are able to meet all three of our business objectives even with a very small sample of data. The complete source code for Product Recommendation job is located here.
    0

    Add a comment

  3. Testing and debugging multi threaded programs is hard. Now take the same programs and massively distribute them across multiple JVMs deployed on a cluster of machines and the complexity goes off the roof. One way to overcome this complexity is to do testing in isolation and catch as many bugs as possible locally. MRUnit is a testing framework that lets you test and debug Map Reduce jobs in isolation without spinning up a Hadoop cluster. In this  blog post we will cover various features of MRUnit by walking through a simple MapReduce job.

    Lets say we want to take the input below and create an inverted index using MapReduce.

    Input
    www.kohls.com,clothes,shoes,beauty,toys
    www.amazon.com,books,music,toys,ebooks,movies,computers
    www.ebay.com,auctions,cars,computers,books,antiques
    www.macys.com,shoes,clothes,toys,jeans,sweaters
    www.kroger.com,groceries
    
    Expected output
    
    antiques      www.ebay.com
    auctions      www.ebay.com
    beauty        www.kohls.com
    books         www.ebay.com,www.amazon.com
    cars          www.ebay.com
    clothes       www.kohls.com,www.macys.com
    computers     www.amazon.com,www.ebay.com
    ebooks        www.amazon.com
    jeans         www.macys.com
    movies        www.amazon.com
    music         www.amazon.com
    shoes         www.kohls.com,www.macys.com
    sweaters      www.macys.com
    toys          www.macys.com,www.amazon.com,www.kohls.com
    groceries     www.kroger.com
    below are the Mapper and Reducer that do the transformation
    public class InvertedIndexMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
    public static final int RETAIlER_INDEX = 0;
     
     @Override
     public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
      final String[] record = StringUtils.split(text.toString(), ",");
      final String retailer = record[RETAIlER_INDEX];
      for (int i = 1; i < record.length; i++) {
       final String keyword = record[i];
       outputCollector.collect(new Text(keyword), new Text(retailer));
       }
      }
     }
    public class InvertedIndexReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
    @Override
     public void reduce(Text text, Iterator<Text> textIterator, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
      final String retailers = StringUtils.join(textIterator, ',');
      outputCollector.collect(text, new Text(retailers));
      }
     }

    Implementation details are not really important but basically Mapper gets a line at a time, splits the line and emits key value pairs where Key is a category of product and value is the website which is selling the product. For example line retailer,category1,category2 will be emitted as (category1,retailer) and (category2,retailer). Reducer gets a key and a list of values, transforms the list of values to a comma delimited String and emits the key and value out.

    Now lets use MRUnit to write various tests for this Job. Three key classes in MRUnits are MapDriver for Mapper Testing, ReduceDriver for Reducer Testing and MapReduceDriver for end to end MapReduce Job testing. This is how we will setup the Test Class.
    public class InvertedIndexJobTest {
    
     private MapDriver<LongWritable, Text, Text, Text> mapDriver;
     private ReduceDriver<Text, Text, Text, Text> reduceDriver;
     private MapReduceDriver<LongWritable, Text, Text, Text, Text, Text> mapReduceDriver;
    
     @Before
     public void setUp() throws Exception {
    
     final InvertedIndexMapper mapper = new InvertedIndexMapper();
     final InvertedIndexReducer reducer = new InvertedIndexReducer();
    
     mapDriver = MapDriver.newMapDriver(mapper);
     reduceDriver = ReduceDriver.newReduceDriver(reducer);
     mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
     }
    }
    MRUnit supports two styles of testings. First style is to tell the framework both input and output values and let the framework do the assertions, second is the more traditional approach where you do the assertion yourself. Lets write a test using the first approach.
    @Test
     public void testMapperWithSingleKeyAndValue() throws Exception {
     final LongWritable inputKey = new LongWritable(0);
     final Text inputValue = new Text("www.kroger.com,groceries");
     
     final Text outputKey = new Text("groceries");
     final Text outputValue = new Text("www.kroger.com");
    
     mapDriver.withInput(inputKey, inputValue);
     mapDriver.withOutput(outputKey, outputValue);
     mapDriver.runTest();
    
     }
    In the test above we tell the framework both input and output Key and Value pairs and the framework does the assertion for us. This test can be written in a more traditional way as follow
    @Test
     public void testMapperWithSingleKeyAndValueWithAssertion() throws Exception {
     final LongWritable inputKey = new LongWritable(0);
     final Text inputValue = new Text("www.kroger.com,groceries");
     final Text outputKey = new Text("groceries");
     final Text outputValue = new Text("www.kroger.com");
    
     mapDriver.withInput(inputKey, inputValue);
     final List<Pair<Text, Text>> result = mapDriver.run();
    
     assertThat(result)
     .isNotNull()
     .hasSize(1)
     .containsExactly(new Pair<Text, Text>(outputKey, outputValue));
    }
    Sometimes Mapper emits multiple Key Value pairs for a single input. MRUnit provides a fluent API to support this use case. Here is an example
    @Test
     public void testMapperWithSingleInputAndMultipleOutput() throws Exception {
     final LongWritable key = new LongWritable(0);
    mapDriver.withInput(key, new Text("www.amazon.com,books,music,toys,ebooks,movies,computers"));
     final List<Pair<Text, Text>> result = mapDriver.run();
     
     final Pair<Text, Text> books = new Pair<Text, Text>(new Text("books"), new Text("www.amazon.com"));
     final Pair<Text, Text> toys = new Pair<Text, Text>(new Text("toys"), new Text("www.amazon.com"));
    
    assertThat(result)
     .isNotNull()
     .hasSize(6)
     .contains(books, toys);
    }
    You write the test for the reduce exactly the same way.
    @Test
     public void testReducer() throws Exception {
    final Text inputKey = new Text("books");
    final ImmutableList<Text> inputValue = ImmutableList.of(new Text("www.amazon.com"), new Text("www.ebay.com"));
    
    reduceDriver.withInput(inputKey,inputValue);
    final List<Pair<Text, Text>> result = reduceDriver.run();
    final Pair<Text, Text> pair2 = new Pair<Text, Text>(inputKey, new Text("www.amazon.com,www.ebay.com"));
    
     assertThat(result)
     .isNotNull()
     .hasSize(1)
     .containsExactly(pair2);
     }

    Finally you can use MapReduceDriver to test your Mapper, Combiner and Reducer together as a single job. You can also pass multiple key value pairs as input to your job. Test below demonstrate MapReduceDriver in action
    @Test
     public void testMapReduce() throws Exception {
     mapReduceDriver.withInput(new LongWritable(0), new Text("www.kohls.com,clothes,shoes,beauty,toys"));
     mapReduceDriver.withInput(new LongWritable(1), new Text("www.macys.com,shoes,clothes,toys,jeans,sweaters"));
    
    final List<Pair<Text, Text>> result = mapReduceDriver.run();
    
    final Pair clothes = new Pair<Text, Text>(new Text("clothes"), new Text("www.kohls.com,www.macys.com"));
    final Pair jeans = new Pair<Text, Text>(new Text("jeans"), new Text("www.macys.com"));
    
    assertThat(result)
     .isNotNull()
     .hasSize(6)
     .contains(clothes, jeans);
     }

    This covers the basic features of MRUnit. It contains tons of other feature that we have not cover here such as testing counters, passing a mock configuration etc but my favorite feature is that it allows me to put a break point in my mapper and reducer and debug through the code if i need to. Bottom line is that MRUnit is an invaluable tool for anyone working with Map Reduce and Hadoop and it certainly takes a lot of pain out of testing MapReduce jobs.

    UPDATED 04/03/2013
    Please note that this is using MapReduce 1 APIs so you will have to import org.apache.hadoop.mrunit.MapDriver class in your test to make the test run.
    6

    View comments

  4. I first ran into monoid while searching for monads on google. Monoids are ubiqitius in programming and chances are that you have already used them without being aware of them.  Wikipedia describes monoid as an algebraic structure with a single associative binary operation and an identity element. Now that definition is a mouthful and does not make much sense unless you are not rusty in college algebra. It is much easier to understand them by looking into some examples.
    Before we delve into code, lets get some math out of the way. Lets look at the addition operation on integers

       1) Addition is associative i.e 1+(2+3)=(1+2)+3
       2) There is an identity element (e) for addtion such that (e+i = i+e = i) i.e 5+0  =  0+5 = 5

    Any data structure that obeys these two laws is a monoid and there are handful of them. Lets look at some more examples

      * Multiplication operation on integer
          i) associative => (1*5)*2 = 1*(2*5)
         ii) Identity => 5*1= 1*5 =5

      * Concatenation operation on String
          i) associative =>  "monoids" +("are"+"cool") = ("monoids"+"are)+"cool" ="monoids are cool"
          ii) Identity => "monoids"+""  =  ""+"monoids" = "monoids" , where "" is empty string

    There are more examples of monoids such as append operation on list etc but I think we have gotten the picture. Now at this point you may be wondering how they are useful in day to day programming. Before I show you some real world examples, let me introduce you to another operation that goes hand in gloves with Monoids.  Originating from the functional world, this function has already been incorporated in most main stream languages. It is called Aggregate in C#, Inject in Ruby and Fold in Scala with variations such as FoldLeft and FoldRight. Even Java is finally getting it in Java 8 as a part of project Lambda. Regardless of what it is called in your language, this function applies a binary operator to a start value and all elements of the collection. Not very helpful definition but lets look at a example in Scala which calculates sum of all the elements in a list.

      val ints=List(1,2,3)
      ints.foldLeft(0){(a,b)=>a+b}
      6
     Basically foldLeft takes a seed value, 0 in our case, and a lambda that applies some binary operation on consecutive elements on the list. If you understand fold and if you squint slightly you will notice that the seed value in this example is nothing more than our identity element from our first monoid example and lambda we passed is simply the associative operation 'addition' on integer. I could have written the above example as follow

    class AdditionMonoid {
    def identity = 0
    def op(a1: Int, a2: Int) = a1 + a2
    }
    val monoid = new AdditionMonoid
    val ints = List(1, 2, 3)
    ints.foldLeft(monoid.identity)(monoid.op)
     If you have ever looped over a collection to aggregate a value, chances are that you were dealing with a monoid and since they are associative, they play very well with parallelism. You can chunk the data into small pieces and fold over them in parallel. In addition product of two monoids is also a monoid which lets you compose more complex monoids. You can calculate both sum and length of a list in one Fold for example.

    Now lets look at a semi real example of monoid. Lets say you are calling a web service that returns user information including social security number, credit card etc as JSON response. Lets say you want to scrub sensitive information before logging the response. One way you could do it using a monoid and foldLeft is as follow

    JSON response
    {
    "first": "John",
    "last": "Doe", 
    "credit_card": 5105105105105100,
    "ssn": "123-45-6789",
    "salary": 70000, 
    "registered": true }
    A JSON obfuscation monoid
    trait Monoid[A] {
    def identity: A
    def op(a1: A, a2: A): A
    }
    
    class JsonObfsucator extends Monoid[String] {
     val CREDIT_CARD_REGEX = "\\b\\d{13,16}\\b"
     val SSN_REGEX = "\\b[0-9]{3}-[0-9]{2}-[0-9]{4}\\b"
     val BLOCK_TEXT = "*********"
    
    def identity = ""
    def op(a1: String, a2: String) =  a1+a2.replaceAll(MASTER_CARD_REGEX, BLOCK_TEXT).replaceAll(SSN_REGEX, BLOCK_TEXT)+","
    }
    and the application

    val monoid = new JsonObfsucator
    val result = json.split(',').foldLeft(monoid.identity)(monoid.op)
    println(result)
    finally the result

    {
    "first": "John",
    "last": "Doe",
    "credit_card": *********,
    "ssn": "*********",
    "salary": 70000,
    "registered": true
    }

     a very trivial example showing the power of monoids and fold. No need to explicitly loop and mutate state. Monoids are very simple abstraction and if you like monoids you would love monads which bring even higher order of abstraction over various data types. Finally, If you are getting into functional programming, I would highly encourage reading Functional Programming in Scala which was the main inspiration for this post.
    0

    Add a comment

  5. “There's no sense being exact about something if you don't even know what you're talking about” -- John von Neumann

    I read the above quote a few months ago in Steve McConnell’s excellent book on Software Estimation while researching different software estimation methodologies for a new project. I was about to start a new skunk work project and a friend has suggested Steve McConnell’s book along with Mike Cohn’s Agile Estimation and Planning book as an excellent resource for avoiding common mistakes while developing a large and complex project. Now that the project is well underway, this is my reflection on some of the techniques we have applied on the project.

    A project is typically a compromise between cost, schedule and feature set. As the project progresses, project lead has to make many decisions about cost, schedule and feature and uncertainty in the project estimate arises from uncertainty in how these decisions will be resolved. The expectation is that as you make a greater number of these decisions, you reduce the estimation uncertainty. Steve McConnel has graphically depicted this as the great “Cone of Uncertainty”. It is based on a very simple but powerful idea that as you make the right decisions at the various stages of the project, the uncertainty decreases as the project progresses.



    Once you understand that a project is an exercise in removing uncertainty, the next logical step is to breakdown all the known risks associated with the project. The next challenge after that is to figure out in which order team should remove these risks and uncertainties. This is a crucial decision that can easily cause the project to fail. If your team is asked to demonstrate progress after the end of every sprint, there is a tendency to pick up low risk features early on in the project. This approach is a recipe for disaster as it gives a false sense of progress early on in the project while in reality the cone of uncertainty does not narrow down since the team has not tackle any high risk features. Mike Cohn suggests a better approach to solving this problem. He suggests dividing the risk into following three categories:

    • Schedule risk ("We might not be done by October")
    • Cost risk ("We might not be able to buy hardware for the right price")
    • Functionality risk ("We might not be able to get that to work")

    One you have all the known risks categorized, a better sequence of development is to do high-value, high-risk features first. These features will deliver the most value and eliminate significant risks. High-value, low-risk features should be developed next as they are slightly less risky. These features offer as much value as the first set, but they are less risky. Low-value, low-risk features are sequenced third because they will have less impact on the total value of the project. Finally features that deliver low value but are high risk are best avoided till later.



    This is a very sensible risk eliminating approach however it is not always easy to follow in practice. For example, it is not always possible to do high-value, high risk features earlier in the project. In my project one of the high-risk, high-value features was to integrate with two other projects that were being developed alongside my project. Since our timelines did not match exactly with their timeline, we got their service contracts much late in our development cycle and it was impossible for us to eliminate the integration risk early on in the project. We tried to mitigate this by mocking their service contracts as best as we could but this was not sufficient to completely eliminate the risk.

    Once you have planned out all the known risks, it is important to remember that there may be unknown risks that you are not aware of early on in the project. Since these risks are not known, you won’t be able to plan for them and then you will have to deal with them as you encounter them. If one of these unknown risks is a black Swan event and you encounter it late in the project, there is a very high chance that it will cause the project to fail. Black Swan events are low probability, high risk events that Wikipedia describes as follow:

    First it is an outlier, as it lies outside the realm of regular expectations, because nothing in the past can convincingly point to its possibility. Second, it carries an extreme impact. Third, in spite of its outlier status, human nature makes us concoct explanations for its occurrence after the fact, making it explainable and predictable.

    If your project is destined to run into a Black Swan type of problem, it is much preferable to encounter that iceberg early on in the project because even if it totally destroys your project, the cost would be low compare to running into it much later in the game.

    The unfortunate thing is that even after planning all the known risks and keeping an eye out for unmitigated risks, it is not a guarantee that your project will succeed. The reality is that projects rarely fail due to technical decisions and at the end of the day numerous other factors that are not in your control affect the outcome of the project. Culture of the organization is a key contributing factor to the outcome of the project. A large conservative organization can slow down an agile and nimble team with its bureaucracy and status co. An organization lacking infrastructure to support rapid iterative development can slow down a skilled team. At the end of the day, successful project execution is equally dependent on strategic management, organizational culture and value as it is on technical design choices. The bottom line is that regardless of which project management methodology you use, there are no silver bullets and only way to survive and succeeded is to stay nimble and adapt quickly to challenges your project will face throughout its lifecycle.

    This post contains copyrighted material the use of which has not always been specifically authorized by the copyright owner.The material is being made available for non-profit research and educational purposes only.I believe this constitutes a 'fair use' of any such copyrighted material as provided for in section 107 of the US Copyright Law.If you wish to use this copyrighted material for purposes of your own that go beyond 'fair use,' you must obtain permission from the copyright owner.This is a completely non-commercial site for private personal use. No fee is charged, and no money is made off of the operation of this site.
    0

    Add a comment

  6. Against the current popular wisdom on the interweb, Supreme court of my home state Wisconsin is porting their legacy applications to Scala and Lift. State of Wisconsin is either employing rockstar developers that rather work for local Governments than a Silicaon Valley startup or Scala is slowly gaining critical mass to creep into mainstream conservative development shops.


    http://wiscjobs.state.wi.us/public/job_view.asp?annoid=54048&jobid=53563


    0

    Add a comment

  7. Scala for the impatient

    Recently I got hold of a free sample of Cay Horstmann’s upcoming Scala book, “Scala for the Impatient” and so far I have been fairly impressed with the quality of the content. Book is not finished yet and the first few chapters only cover the basics, so no new earth shattering discoveries as far as knowledge of Scala is concerned however a few things are already making this book stand out compare to other Scala books. Chapters are small, compact and to the point and breadth and depth of the topics covered are absolutely ideal for someone just starting Scala. My favorite part of the book so far is exercise section at the end of every chapter. This alone has made the book a lot more engaging as author not only want the readers to understand the topics covered but also expect the readers to demonstrate their new found knowledge by solving problems at the end of the chapter. I am thinking of using this book to introduce Scala at work as it is ideal to be used in group study sessions.

    0

    Add a comment

  8. I have been evaluating Scala for quite some time now as an alternative to Java for enterprise development and one of the most common criticisms I have seen against Scala is that it is too complex for average developers. A quick Google search on Scala Complexity returns numerous links to blogs both defending and criticizing Scala for its perceived complexity. In my opinion one key aspect that has been ignored in this debate is that adoption of a framework/language/whatever in a large enterprise is almost never about technology per say, but has more to do with the culture of the organization.

    To better understand the statement ‘Scala is too complex for average developers’, we will have to dig a little bit deeper into the prevalent culture in most IT departments in large enterprise today. This can be done by slightly tweaking the Type XY theories created and developed by Douglas McGregor at MIT in 1960s. In a nutshell Type X developers and Type Y developers are on the two opposite extreme of a spectrum.

    Type X programmers could be described as:

    • Individuals who lack ambition, dislike responsibility and prefer to be led.
    • Individuals who desire job security and try to maintain status co by resisting change.
    • Individuals who dislike work and avoid it where possible.

    Type Y programmers could be described as:

    • Individuals who are ambitious, self-motivated and exercise self-control.
    • Individuals who consider effort at work as just like rest or play.
    • Individuals who possess the ability for creative problem solving, but their talents are underused in most organizations.

    Now with this new classification in mind, let’s see how our two programmer types fare against Scala and its perceived complexity.

    Is Scala too complex for Type X programmers?

    Scala is definitely too complex for Type X programmers. However

    • Java 5 and beyond is also too complex for Type X programmers.
    • Spring and Hibernate are also too complex for Type X programmers.
    • Multithreading and thread pools are also too complex for Type X programmers.

    In a nutshell, anything and everything that is new and require learning is too complex for Type X programmers.

    Is Scala too complex for Type Y programmers?

    Scala is definitely NOT too complex for programmers who are driven and self-motivated. The seduction of Scala for Type Y programmers in large enterprise is that Scala solves real world problems much more efficiently than Java. Some of the key benefits of Scala in large enterprise are:

    • Much more elegant solutions to Multithreading problems than error prone Java alternatives.
    • A lot less boilerplate code in the form of type inference, case classes and higher level functions.
    • Ability to use existing Java libraries and infrastructure.

    Lastly, Is Scala too complex for the management?

    Management does not care about the complexity of programming language as long as it is enabling them to achieve faster time to market. In my opinion Scala does not require any additional investment from management as it can leverage all the existing Java infrastructure and is far more productive for their development team since it requires a lot less code than Java to solve similar problems.

    So, is Scala really too complex for average developers?

    In a large and conservative enterprise, a few Type Y programmers act as change agents by influencing both executive management and hordes of Type X programmers. A few years ago they evangelized Spring and Hibernate in their companies and there is no reason that same cannot be done with Scala. I believe Scala in particular and any new technology in general should just focus on Type Y programmers since Type X programmers will always resist change no matter what the change is. Scala is probably too complex for average developers however the opinions of average developers generally don’t matter. Scala needs to convert that small minority of enterprise developers who are typically responsible for bringing organization wide changes.In this regard Scala is doing a fine job attracting intelligent and self motivated programmers and I have no doubt that it will continue to gain traction in enterprise.

    20

    View comments

Loading