Apache Crunch Tutorial 3: MemPipeline and Basic Functionality

LITTLE_CRUNCH

This post is the third in a hopefully substantive and informative series of posts about Apache Crunch, a framework for enabling Java developers to write Map-Reduce programs more easily for Hadoop.

In my previous and second tutorial on Apache Crunch, we walked through setting up a project from scratch step-by-step, working with Maven, and importing a project to IntelliJ. In this tutorial, we’ll dive into some code and showcase what Crunch can really do.

In a nutshell, Crunch is useful for writing and executing Map/Reduce code without the pain of writing actual Map/Reduce code. Through Java and specialized class methods called DoFns (do functions for short), you can compile and run a Jar on your cluster that runs your Java code with chunks of Map/Reduce jobs peppered in. But today, let’s focus on doing stuff locally to get you familiar with everything.

Let’s look at some code.

//* ExampleCrunchApp.java @version 1.0
//* @author Landon Robinson
//* @package com.bigdata.crunch
package com.bigdata.crunch;

import org.apache.crunch.*;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.io.To;
import org.apache.crunch.types.writable.Writables;

public class ExampleCrunchApp {

public static void main(String[] args) throws Exception {

// Store Arguments in Variables
String inputPath = args[0];
String outputPath = args[1];

// Create and Configure Data Pipeline
Pipeline pipeline = MemPipeline.getInstance();

// Read Data from File into Crunch PCollection
System.out.println("Reading file (" + inputPath + ") into PCollection...");
PCollection person_data = pipeline.readTextFile(inputPath);

//now we have a collection of strings, each a line from the .csv
System.out.println("Number of lines ingested from (" + inputPath + "): " 
+ person_data.length().getValue());

// lets add some emails to those records with a DoFn
PCollection person_data_with_emails = person_data
.parallelDo(DoFn_AddEmailAddresses(), Writables.strings());

System.out.println(person_data_with_emails.length().getValue()
+ " valid records made out of " 
+ person_data.length().getValue() 
+ " valid lines.");

// With new data, let's write it to a file on HDFS (as a csv)
person_data_with_emails.write(To.textFile(outputPath));

//end the pipeline and exit the program
PipelineResult result = pipeline.done();
System.exit(result.succeeded() ? 0 : 1);

}//end main

// ======================================
// * CUSTOM DOFNS
// * Custom parallel functions designed to be Map/Reduce operations.
// ======================================
 
/**
This DoFn takes a person in raw text and adds an email based on current 
columns.
*/
static DoFn<String, String> DoFn_AddEmailAddresses(){
 return new DoFn<String, String>() {
 @Override
 public void process(String input, Emitter emitter) {
 String[] inputParts = input.split(",");
 String personName = inputParts[0];
 String personAge = inputParts[1];
 String personCompany = inputParts[2];
 
 String personEmail_Generated = personName + "." + personAge + "@"
+ personCompany + ".com";
 String updatedPerson = input + "," + personEmail_Generated;
 
 emitter.emit(updatedPerson);
}
};
}
}
}

First things first: to test your code in IntelliJ/Eclipse and not through a jar on a command line, you need to have some code written in a test class with a MemPipeline. This is what I’ve done in the code above. You’ll have trouble running classes in the ‘src/main’ area of the project, since those classes currently invoke Hadoop configurations and tools that are normally expected of a command line environment. You can definitely make classes in that folder that don’t, which you can run like any old Java project, but it’s easier to make your code in the test folder without all that Configuration/Tool stuff used in M/R pipeline code.

To get to know the code and how it works, start by making a class in the ‘src/test’ folder of your project. Call it something like I did, ‘ExampleCrunchApp.java’ or something. You’re a smart guy, you can handle this.

Code Breakdown (short version):

If you want a longer breakdown of the simple, common code components of Crunch, go here. I’ll focus on the specifics of my code to show you what we can achieve, and get you off the ground and testing code.

Setup a standard Java class with a standard main method. Simple right?

String inputPath = args[0];
String outputPath = args[1];

Setup two strings to store your arguments (think of calling code on a command line, you’ll want to give it some arguments instead of hardcoding stuff right?) which are a input file and output path for results respectively.

Pipeline pipeline = MemPipeline.getInstance();

Setup a MemPipeline that will be the foundation of our entire program. Read up on pipelines to understand when to use which (shortcut: MemPipeline is for running tests on your local machine/laptop, and the other two are for actual production/legitimate execution).

PCollection person_data = pipeline.readTextFile(inputPath);

Now we’re in the Crunch stuff.

A PCollection is a distributed collection of whatever you want it to be: strings, ints, whole Java objects — you name it. In this line, we’re doing two things: we’re making a PCollection called person_data — and we’re using a Crunch convenience method called ‘readTextFile’ to ingest a file into said PCollection. This method is part of the pipeline object we instantiated earlier, which is a MemPipeline (again, a local version of a Map/Reduce pipeline that runs in memory on our computer). Notice how we used the ‘inputPath’ variable from before? That’s ideally an absolute path to a locally stored file of some kind — a .csv in the case of this example. When this thing runs in a Map/Reduce pipeline, it will run as a Map/Reduce job that is fired off from this code to your Map/Reduce scheduler. Most PCollection related jobs in Crunch are, in fact, dynamically-generated Map/Reduce jobs.

Let’s keep trucking. The next important line (and where the real meat of Crunch lives) is this one:

PCollection person_data_with_emails = person_data
.parallelDo(DoFn_AddEmailAddresses(), Writables.strings());

Couple of things happening here. We’re defining a new PCollection called person_data_with_emails, which as you probably expected, is the same person_data from before, but now with an email added to each record. We do this through some good ol’ parallel processing (i.e. Map/Reduce/Crunch) through the PCollection object’s built-in method, .parallelDo. This is simply a method that will run a piece of logical code you define in the form of a DoFn, and takes two arguments: the name of the DoFn you wish to call, and what output is expected when the DoFn’s work is done and stored in the PCollection.

More on DoFns

In the case of this example, we’re calling a custom DoFn we wrote (you will write many of these in your time developing with Crunch) called DoFn_AddEmailAddresses(). We know it will return strings, so we tell the method it should expect Writables.strings(). Writables simply refers to the Hadoop native text library, which can write all the normal java data types you know and love like strings and ints. You can also, alternatively, use Avros.Strings(), if your team wants to use Avro format by default. If you’re unsure, stick with text (Writables.strings).

When the code hits this step in execution, it will execute a Map/Reduce job that takes the contents of your PCollection, passes it to a container that will spawn however many mappers/reducers it needs to process your data. If your data is small, it will be handled by one mapper and one reducer, and if it’s big, it could be handled by n mappers and n reducers. That’s the magic of stuff like Map/Reduce, right? Anyway, this isn’t a white paper on M/R, let’s talk about what happens with Crunch.

The contents of your DoFn code will be run by the mapper(s), whether that’s one or a hundred. A DoFn takes in a chunk of records from a PCollection, and does to them what you say. It emits back whatever you tell it to. What does emit mean? It’s whatever you pass back to be stored in a PCollection after processing is done. So when you call a DoFn through a parallelDo method on a PCollection, the contents of a PCollection are passed to the DoFn, processed, and emitted back into a new PCollection.

Let’s look at the DoFn code a little more closely:

/**
This DoFn takes a person in raw text and adds an email based on current 
columns.
*/
static DoFn<String, String> DoFn_AddEmailAddresses(){
 return new DoFn<String, String>() {
 @Override
 public void process(String input, Emitter emitter) {
 String[] inputParts = input.split(",");
 String personName = inputParts[0];
 String personAge = inputParts[1];
 String personCompany = inputParts[2];
 
 String personEmail_Generated = personName + "." + personAge + "@"
+ personCompany + ".com";
 String updatedPerson = input + "," + personEmail_Generated;
 
 emitter.emit(updatedPerson);
}
};
}

Notice a few things about the structure of the DoFn. It has a repeating <String, String> notation. This indicates that the DoFn takes a String for input (the first parameter), and will return a String as output (the second parameter). These can be defined however you like. In the next tutorial, I’ll show you how to take in a String and return a full-fledged plain-old Java object/class. It’s super neat and immensely useful.

Since our input is a String (which is actually a single string from our .csv we ingested earlier), you can see that the DoFn splits it by commas, and parses the 3 major attributes out. It then generates a fourth attribute (an email composed of a person’s name and age) and builds a new csv string with all four values contained. The DoFn then uses the built in and required Emitter object to emit that string back, which goes into our new PCollection.

That’s how you go from a data file with records of 3 values, to a data file with records of 4 values. It’s not fancy or complicated, but it conveys the idea (I hope!).

You can then do what you want with the records in that new PCollection — you can run a new DoFn job on them, or you can write them out somewhere! In fact, that’s exactly what we do in the final lines of the code: using our outputPath as guidance as to where to write those records.

And with that, the pipeline is done – so we call the .done() method and exit.

LET’S RUN IT!

Screen Shot 2015-11-01 at 4.59.33 PM

Screen Shot 2015-11-01 at 5.01.05 PM

Thanks for reading — as for next time, no more kiddy stuff. We’re diving into the advanced features of Crunch, and ways to avoid some future headaches and roadblocks that we happily stumbled and banged our heads against for you. 🙂

Other notes:

  • Setup a ‘configuration’ in IntelliJ or Eclipse to allow you to pass arguments to your test. You can follow the steps in tutorial one for more info.

Thanks again!

<< Previous Tutorial  |  Next Tutorial >>

Advertisements

2 thoughts on “Apache Crunch Tutorial 3: MemPipeline and Basic Functionality

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s