How to Build Data History in Hadoop with Hive: Part 2

hadoop_elephant_trex

Part 2: Growing the data

If you’ve yet to finish part one, we strongly encourage reading it. It’s not super long.

It’s time to get technical.

Hadoop can store lots of data, so we want to keep filling and utilizing it. With that in mind, what’s better than that 7 years of sales data?

Hint hint: Remember, we’re a retail company.

7 years of sales data… along with today’s data… and tomorrow’s data… and the day after that’s data… you get the point. All the data! We want all of it in our data lake!

To accomplish this, we’ve decided to automate the ingestion of sales data into Hadoop every single day, allowing us to grow the dataset continuously! This regular flow of data allows analysts and data scientists to build models — models that can be fed data from the far and recent past, to provide helpful output for the present and future (this is the crux of machine learning). This is just one action we can take on that much data, one of many!

So, what do we do? We structure our Hive tables such that we can add new data daily, and in doing so, making it as simple to access as the bulk-loaded historical data. But how?

There’s a convenient way to build history in Hive, and it’s called partitioning.

Screen Shot 2017-04-20 at 8.36.15 PM

Partitioning is simply structuring a Hive table such that data is physically and logically segmented according to some value. Commonly, and the case of this example, data is partitioned by date.

This means data in a Hive table (inside HDFS, if you wanna be technical about it) is stored in folders/directories based on a date column. If our table is partitioned on a column in the data called “transaction_date” (this is sales data, so this means the date a transaction happened), data from 2017-04-11 is stored in a folder called transaction_date=2017-04-11. Data from 2017-05-07 is stored in a folder called transaction-date=2017-05-07, and so on. These directories will exist as subdirectories for the “LOCATION” specified when you create the Hive table.

This, again, is a way to physically and logically align data in our file system (HDFS). There are many benefits to partitioning, largely around optimization of data storage and querying, but you can read up on that with a Google. For now, know that it not only is reflective of good architecture, it actually will make our queries much, much faster and more efficient as our data gets larger and more unwieldy.

Now, back to the partitioning method of building history: Build a table that contains all the history ever, partitioned by day.

Let’s break down what that means, and how we do it.

James and the Giant Peach Hive Table

peach

This method has you constructing a Hive table that is partitioned by date. In our sales data example, we have a transaction_date.

Firstly, pull the data from its source (whether that’s a Sqoop job from an RDBMS or a Flume ingest from a server, etc) and write it into a Hive table. Then: create a permanent Hive table (partitioned) to hold all the data, separated by date. This is the table that will grow over time.

Here’s some sample data (comma separated):

0001,123456789,abc-123-def,45.00,2017-01-01
0002,123456790,abc-123-def,120.00,2017-01-01
0003,333334445,abc-123-def,65.00,2017-01-01
0004,946272312,abc-123-def,1.00,2017-01-01

Here’s an example of the temp table your Sqoop job or ingestion script can write to (make sure you read up on the difference in table types in Hive):

hive> create external table temp.sales
(
transaction_id bigint,
customer_id bigint,
product_id string,
purchase_amount float,
transaction_date date
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE
LOCATION ‘/tmp/sales/’;

OK
Time taken: 0.249 seconds

Great! Now we’ve got a temporary/staging Hive table where our ingested data for today can live. Now let’s build the table where the history will live and grow!

hive> create table default.sales
(
transaction_id bigint,
customer_id bigint,
product_id string,
purchase_amount float
)
PARTITIONED BY (transaction_date date)
STORED AS ORC
LOCATION ‘/apps/hive/warehouse/default.db/sales/’
tblproperties (‘orc.compress’ = ‘SNAPPY’);

OK
Time taken: 0.198 seconds

Awesome. So, from previous tutorials (or current knowledge), we’ve loaded our temp/staging table with today’s data. Now, to get it into our history table we need to use a Hive insert statement, and insert all the data from the staging table into a new partition in the history table. Like so:

hive> insert into table default.sales partition(transaction_date) select transaction_id, customer_id, product_id, purchase_amount, transaction_date from temp.sales;
Query ID = myuser_20170420232635_b2b8380e-e453-40a4-8177-d6edce3069eb
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1491203684030_42227)

——————————————————————————–
VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
——————————————————————————–
Map 1 ………. SUCCEEDED 1 1 0 0 0 0
——————————————————————————–
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 3.87 s
——————————————————————————–
Loading data to table default.sales partition (transaction_date=null)
Time taken for load dynamic partitions : 508
Loading partition {transaction_date=2017-01-01}
Time taken for adding to write entity : 3
Partition default.sales{transaction_date=2017-01-01} stats: [numFiles=1, numRows=4, totalSize=665, rawDataSize=460]
OK
Time taken: 7.506 seconds

When this runs, it will run a mapper-only job that will scan and copy all of the data from the temp table and create a partition containing that data in our history table. Here’s how we prove it:

Ask Hive to show you the partitions!

hive> show partitions default.sales;
OK
transaction_date=2017-01-01
Time taken: 0.536 seconds, Fetched: 1 row(s)

Now query  that table like you would if you were looking for data from that day!

hive> select * from default.sales where transaction_date=’2017-01-01′;
OK
1 123456789 abc-123-def 45.0 2017-01-01
2 123456790 abc-123-def 120.0 2017-01-01
3 333334445 abc-123-def 65.0 2017-01-01
4 946272312 abc-123-def 1.0 2017-01-01

Now, try that with a date that doesn’t exist yet!

hive> select * from default.sales where transaction_date=’2017-01-02′;
OK
Time taken: 0.184 seconds

Catch that? Not only can we add a new partition every day like this to grow our data, we can also insert all the years of old data this way as well! Just look up dynamic partition inserts in Hive sometime.

Now, how does the data look in HDFS?

Screen Shot 2017-04-20 at 8.36.15 PM

Why is this useful? Well, when you fire off a SQL query against a Hive table, Hive will have to read in any necessary data files and filter through each one. If you put all of your data together without segmenting it, Hive will have to read every single piece of data you add each day every time you query it. It will take more and more time to query, and become unusable.

Architecting your data like this allows Hive to know ahead of time which files to read before burning any time doing so. In the case where you build lots of history in Hadoop, but only want to read one day’s worth of data, or a range of days, Hive will only read the necessary partitions instead of every file in the directory, which is endlessly more efficient. It’s also a reasonable way to lay the data in HDFS.

And that’s really it. The way you drive building history in your Hadoop is up to you. This is just one way that’s worked for us. If you’ve got big big data problems, look into bucketing and sorting on insert/write. It’ll rock your world.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s