Apache Crunch Tutorial 1: Getting Started

LITTLE_CRUNCH

This post is the first in a hopefully substantive and informative series of posts about Apache Crunch, a framework for enabling Java developers to write Map-Reduce programs more easily for Hadoop.

If you’re a Hadoop developer, you know how painful writing Map-Reduce code can be. It requires a lot of serialization, knowledge on what goes in a Mapper versus what goes in a Reducer, and potentially a lot of debugging. Thankfully, there are frameworks out there to make your code-writing life a little simpler — like Apache Crunch!

Crunch, in a nutshell, is a set of Java libraries for writing, testing, and executing Map-Reduce jobs and pipelines. It comes without the hassle of writing generic mappers, reducers, or Hadoop configurations. Its goal is to help you craft Map-Reduce-driven programs that are composed of user-defined functions that can run in parallel (ie Map/Reduce) and are easy to write and test on or off your Hadoop cluster.

Writing Crunch, Explained

Java! You write Strings, Ints, loops, and everything else you’re used to programming with, you just use Crunch’s built-in pipelines and data constructs (called PCollections) to leverage distributed memory and storage systems like MapReduce and Spark. Most of your time with Apache Crunch will be spent writing what the community calls DoFns (short for DoFunctions).

These are functional methods that can (and are designed to be) run in parallel (as mappers/reducers). In fact, they are most commonly run as inputs to the PCollection’s built-in parallelDo method (which takes a DoFn as an Input, and an expected data type as an output).

DoFns Explained

Each DoFn is a class that closely resembles a standard static Java class that can be called to do some form of work. The difference between a DoFn and a standard Java class is that, in Crunch, DoFns are inherently parallel, and will actually run as an individual MapReduce job in a MapReduce pipeline. What really happens in a DoFn is actually pretty simple:

A DoFn has one input (a record), and has one output (an emitter). If the data being processed is, for example, a comma-separated text file (.csv), each instance of a DoFn will receive a random set of records, chunked out for efficiency (this is a distributed computing system after all, we don’t do sequential stuff if we don’t have to). Each record will be treated exactly the same, regardless of the instance: any logic you ask to be applied to a line (which is again the input for every DoFn instance) will be applied to all lines. It’s the Map part of Map/Reduce (you apply a transformation or process to every record in your data set, and then aggregate the results in the Reduce phase later).

In any Crunch program with a MapReduce pipeline, when a DoFn is called to act on a data set, it distributes the load across mappers. An example would be reading a text file into a PCollection (a Crunchified version of the Java Collection), using a DoFn to parse out bad lines and keep the good ones, using another DoFn to convert the stringed lines to classed records (like a normal Java class i.e. Person, Employee, House, etc), use another DoFn to retrieve all of a specific value from each line of the data set, and an Aggregator (built in Crunch class) to sum up all these values.

Once you start playing around with the convenience functions of Crunch, you’ll start to get a feel for what it can do.

Pipelines Explained

Pipelines are at the center of what makes Crunch work. Any DoFns, PCollections, and data sets must report to the same pipeline for your workflow to work. Thankfully, Crunch offers three diverse but equally easy to use pipelines for your workflow: MemPipeline, MRPipeline, and SparkPipeline.

  • MemPipeline is a completely in-memory option for running tests and development on your local machine (like a laptop). This should only be used for unit tests and checking that functionality works, it shouldn’t be used as a production pipeline (use one of the two below instead). It’s really just convenient.
  • MRPipeline is a MapReduce option for running your workflow in the form of Map/Reduce jobs from your Java Crunch code. This is the most popular option.
  • SparkPipeline is a Spark option for running your workflow in the form of Spark memory jobs, which run on systems with high amounts of memory. This is an up and coming new option for Spark-enabled Hadoop clusters.

Let’s Write a Simple Crunch Program!

To get started with Crunch, you need to have an IDE like Eclipse or IntelliJ. You’ll also need to get the Crunch libraries installed and a Crunch project started. Thankfully, this step can be done pretty quickly and easily. Just follow these steps to getting a fresh Crunch project installed here. When you’re done, come back here!

That same link above (also here) also details a simple WordCount example in Crunch. While that example is well and good, we’re going to ignore whether you’ve done it or not and just get into a simple example of how common code in Crunch works. Let’s take a common problem: we have a data set, but we need to parse what’s there and add something to it. For this example specifically, let’s take a .csv file that contains names, ages, and places of employment, and construct a fourth column for each record: an email, from the aforementioned data.

Let’s break it down!

Sample Crunch Program


//* ExampleCrunchApp.java @version 1.0
//* @author Landon Robinson
//* @package com.bigdata.crunch
package com.bigdata.crunch;

import org.apache.crunch.*;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.io.To;
import org.apache.crunch.types.writable.Writables;

public class ExampleCrunchApp {

public static void main(String[] args) throws Exception {

// Store Arguments in Variables
String inputPath = args[0];
String outputPath = args[1];

// Create and Configure Data Pipeline
Pipeline pipeline = MemPipeline.getInstance();

// Read Data from File into Crunch PCollection
System.out.println("Reading file (" + inputPath + ") into PCollection...");
PCollection person_data = pipeline.readTextFile(inputPath);

//now we have a collection of strings, each a line from the .csv
System.out.println("Number of lines ingested from (" + inputPath + "): " 
+ person_data.length().getValue());

// lets add some emails to those records with a DoFn
PCollection person_data_with_emails = person_data
.parallelDo(DoFn_AddEmailAddresses(), Writables.strings());

System.out.println(person_data_with_emails.length().getValue()
+ " valid records made out of " 
+ person_data.length().getValue() 
+ " valid lines.");

// With new data, let's write it to a file on HDFS (as a csv)
person_data_with_emails.write(To.textFile(outputPath));

//end the pipeline and exit the program
PipelineResult result = pipeline.done();
System.exit(result.succeeded() ? 0 : 1);

}//end main

// ======================================
// * CUSTOM DOFNS
// * Custom parallel functions designed to be Map/Reduce operations.
// ======================================

/**
This DoFn takes a person in raw text and adds an email based on current 
columns.
*/
static DoFn<String, String> DoFn_AddEmailAddresses(){
    return new DoFn<String, String>() {
    @Override
    public void process(String input, Emitter emitter) {
       String[] inputParts = input.split(",");
       String personName = inputParts[0];
       String personAge = inputParts[1];
       String personCompany = inputParts[2];

       String personEmail_Generated = personName + "." + personAge + "@" 
+ personCompany + ".com";
       String updatedPerson = input + "," + personEmail_Generated;

       emitter.emit(updatedPerson);
}
};
}
}

So, what does our input (people.csv) look like?

landon,23,walmart
joey,29,lowes
jessica,18,apple
mary,44,ibm

In our IDE, our run configuration has two program arguments (input file and a path to output a file):

unnamed

And from command line on our Hadoop cluster, like this:

hadoop jar ExampleCrunchAppJar com.bigdata.crunch.ExampleCrunchApp /tmp/people.csv /tmp/example-crunch-output/

And what does our output look like? Exactly what we expected!

landon,23,walmart,landon.23@walmart.com
joey,29,lowes,joey.29@lowes.com
jessica,18,apple,jessica.18@apple.com
mary,44,ibm,mary.44@ibm.com

And that’s it for the first post on Apache Crunch. Tune in soon for part two, where I’ll talk about converting to object, materializing, output formats, and more!

Next Tutorial >>

Advertisements

3 thoughts on “Apache Crunch Tutorial 1: Getting Started

  1. Fredrik N August 3, 2016 / 8:52 am

    Very nice example!

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s