Apache Crunch Tutorial #7: Scaling Crunch with your Data (ScaleFactor)

LITTLE_CRUNCH

This post is the seventh in a hopefully substantive and informative series of posts about Apache Crunch, a framework for enabling Java developers to write Map-Reduce programs more easily for Hadoop.

In my previous and sixth tutorial on Apache Crunch, we covered setting custom Hadoop configuration options on a per-job basis. Let’s talk about how we can scale Crunch and its performance with our data, more so than it already does out of the box.

I’m talking about scaleFactor(). And it’s not the factor of how many reptilian scales cover your data — no, I mean the factor by which your data grows or shrinks when it passes through Crunch. Specifically, how your data grows or shrinks as it passes through a specific DoFn (do function).

ScaleFactor, Explained

In most scenarios, you’re going to do the following in Crunch:

  • Ingest data into a PCollection
  • Process and enhance the data
  • Write it out somewhere on Hadoop/HDFS

But what if you have a data set that, when written/processed from Crunch, doesn’t 100% match the size of the data before you gave it to Crunch? That’s called scaling — meaning your data either grew or shrunk based on your processing. Here’s a common example: Say your data on ingestion contains records about purchases/transactions in a store when you put it into a Crunch PCollection. As part of processing, your goal is to only return records that match a certain criteria. Let’s say that criteria is “records that are product returns”, meaning your Crunch pipeline will read in data with all transactions, and return/write only records that are logged as returns to the store by a customer. Assume in this scenario that this file contains all form of transactions, like purchases and returns.

Let’s say that your store’s return rate is 50%. While that’s a disastrous customer satisfaction rate and you’re likely to be out of business soon, let’s run with it. Crunch’s Filter DoFns, for example, by default, expect you to return (otherwise known as “emit”) about half the records you bring into a Filter DoFn from a PCollection. Having our return rate be 50% is great in this situation because of that default value — but we won’t always return half of what we run through a DoFn. In fact, we’ll often return more than what we ingested because of the act of creating data from data that’s processed.

You can return back as much or as little as you want, and Crunch usually handles it pretty well. But if you let it know ahead of time that the data is going to expand or shrink by a significant margin, you can save Crunch (and yourself) some optimization headaches down the road.

See this excerpt from the Hadoop Definitive Guide: 4th Edition (full attribution at the bottom of this page):

“If your DoFn significantly changes the size of the PCollection it is operating on then you can override its scaleFactor() method to give a hint to the Crunch planner about the estimated relative size of the output, which may improve its efficiency.

FilterFn’s scaleFactor() method returns 0.5; in other words, the assumption is that implementations will filter out about half of the elements in a PCollection. You can override this method if your filter function is significantly more or less selective than this.”

So if I know that my data will be 4x (four times) larger when it leaves a DoFn to become a new PCollection, I can tell Crunch that with the scaleFactor method in my DoFn(s):

 
 @Override
 public float scaleFactor(){
 return 4.0f;
 }

But wait, where does this cool little function go? Inside your DoFn class, and right after your process method:

 
 static DoFn<String, String> DoFn_CreateRecords(){
 return new DoFn<String, String>() {
 @Override
 public void process(String input, Emitter<String> emitter) {
 emitter.emit(input + "1");
 emitter.emit(input + "2");
 emitter.emit(input + "3");
 emitter.emit(input + "4");
 }

 @Override
 public float scaleFactor(){
 return 4.0f;
 }
 };
 }
 

So this isn’t exactly a quality example because the above DoFn just returns every record 4 times, with a number appended after each… but you get the point yeah? The idea is that our data starting with PCollection 1 will grow through this DoFn on its way to PCollection 2. Setting the ScaleFactor in our DoFn allows us to hint to the Crunch Planner that our data will grow, so it can set some attributes like num.reduce.tasks accordingly and potentially raise its efficiency.

You can do this for any and all DoFns you wish. Pretty useful, huh?

Thanks for reading today’s tutorial! Hope you can apply ScaleFactor to your work. Let me know what you’d like to learn about next!

<< Previous Tutorial  |  Next Tutorial >>

Advertisements

2 thoughts on “Apache Crunch Tutorial #7: Scaling Crunch with your Data (ScaleFactor)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s