This post is the fifth in a hopefully substantive and informative series of posts about Apache Crunch, a framework for enabling Java developers to write Map-Reduce programs more easily for Hadoop.
In my previous and fourth tutorial on Apache Crunch, we touched on some cool features of Crunch like Distincts, Materialization, and Data as Objects. Today is focused on Configurations. Hadoop Configurations, if I’m to be specific.
Hadoop Configurations…. WHAT ARE THOSE?
Though you’ve likely already noticed in your journey with Apache Crunch, Crunch takes in a Hadoop/Map-Reduce Conf(iguration) at the start of its pipeline. Why? Because it’s running Map-Reduce jobs for you, so it needs to know the “standards” of your system, so to speak.
Simply leveraging the built in getConf() method and passing that to your pipeline is sufficient when you have a simple workflow to accomplish. When you start leveraging Crunch for a little more power and complex operations, you’ll want to pass and override certain and likely many parameters. Here’s how we currently get a default configuration to give to Crunch.
Configuration crunchConf = getConf(); Pipeline pipeline = new MRPipeline(ExampleCrunchApp.class, "Crunch Pipeline", crunchConf);
Now this is the standard, formalized approach in Crunch. In fact, most people just do this little number to be even more efficient:
Pipeline pipeline = new MRPipeline(ExampleCrunchApp.class, "Crunch Pipeline", getConf());
But while that’s well and good to get us off the ground, we might want a little more customization to take place. Take for example, the Map-Reduce queue setting. If you have your Hadoop environment set up with multiple user and batch queues to distribute resources for jobs of different groupings, then you’re already familiar with its usefulness with tools like Map-Reduce, Oozie, and Falcon.
With Crunch, and like any Map-Reduce program, your code/jobs will run in Hadoop’s default queue unless told otherwise. Maybe we want this series of jobs to always run in a queue called batch. Because Crunch doesn’t know anything about your environment unless you tell it specifically, we have to go about telling it what we want it to know… which we can accomplish with a few brief and simple lines of code.
Take for example the code below, which generates a Hadoop Conf and stores it as a Conf variable at near top of our program. Normally, one passes this conf to Crunch’s pipeline instantiation so we can get rocking and rolling (as shown earlier above), but we want to specify the queue in which our jobs will run for this pipeline, that queue being batch. So in the code below, we’ll add that parameter as a key-value pair for the conf, and then give it to the pipeline.
Configuration crunchConf = getConf(); //Designate Queue for Pipeline crunchConf.set("mapred.job.queue.name", "batch"); //Establish Pipeline with modified configuration Pipeline pipeline = new MRPipeline(ExampleCrunchApp.class, "Crunch Pipeline", crunchConf);
Now, all Map-Reduce jobs that execute under this pipeline will execute under the batch queue, given your friendly administrator has set it up for you. If you need to setup a capacity scheduler and queues, check out a few tutorials on it (though that’s hardly the point of this tutorial and certainly not a requirement).
But what if we want to set other Map-Reduce settings, like map-side compression and codec, and even more specific weird settings like io.sort.mb? Well, you do it the exact same way – just include those parameters in your conf object before you pass it to the pipeline, and the settings you’d normally pass with a –Dmapred on command line will be included in your Crunch pipeline. Like so:
Configuration crunchConf = getConf(); //Designate Queue for Pipeline crunchConf.set("mapred.job.queue.name", "batch"); //Tell Crunch to show Progress of Jobs in Output crunchConf.setBoolean("crunch.log.job.progress", true); //Tell Mappers to Compress Their Output Before Reduce crunchConf.set("mapred.compress.map.output", "true"); //Set Compression Codec to Snappy crunchConf.set("mapred.map.output.compression.codec", org.apache.hadoop.io.compress.SnappyCodec); //Set Higher memory-limit while sorting data crunchConf.set("mapreduce.task.io.sort.mb", 1792); //Establish Pipeline with modified configuration Pipeline pipeline = new MRPipeline(ExampleCrunchApp.class, "Crunch Pipeline", crunchConf);
And… presto! Pretty neat huh, how Crunch honors the standard Hadoop configuration?
So, to recap: Crunch doesn’t pick up all of your Hadoop/Map-Reduce defaults by just calling getConf(), so you have to tell it otherwise with the above steps. I don’t have a running list as to what all of those defaults are, but referring back to the io.sort.mb example from before… Crunch defaults that value to 100 (mb) when I run, which limits the amount of memory allocated to the io.sort phase of Map-Reduce (mostly the map). You might have a higher default already set in your cluster (through Ambari or manual change), or you simply might want to set a higher value, which you can do with the previous code. For my team, our value is set to 1792, not 100. Big difference there. And since we talked about compression and memory/storage usage earlier, here’s a nifty chart:
Now, that’s how you set global parameters for an entire pipeline. But since Crunch is really just a shell that runs a series of Map-Reduce jobs for you… shouldn’t you be able to set custom options on a per-job basis?!
Well thankfully you can! On a per-DoFN basis! But we’ll talk about that… next time. Had you fooled, didn’t I?
Thanks for reading today’s tutorial! Hope you can use one or more of these cool configuration options in Crunch. Let me know what you’d like to learn about next!