Reduce = 99% or Skewed Joins in Hive

Often running a HQL query you may notice that it progresses to 99% reduce stage quite fast and then stucks:

...
2014-10-07 08:46:01,149 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 6905.85 sec
2014-10-07 08:47:01,361 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 6999.45 sec
2014-10-07 08:48:01,441 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 7065.59 sec
2014-10-07 08:49:01,670 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 7125.26 sec
2014-10-07 08:50:01,808 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 7188.12 sec

The problem is that Hive estimates the progress depending on the number of reducers completed, and this does not always relevant to the actual execution progress. It is possible that a query can reach 99% in 1 minute and then execute remaining 1% during 1 hour.

The most typical reason of this behavior is skewed data. For example, assume that you have a table that tracks all visits to the specific sites and SITE.COM has 100M rows while there are a dozen of other sites SITE.ORG, SITE.NET etc. that have just 10K visitors each.

Then when you join this table with another by site name, one reducer has to process 100M rows while other reducers process just 10K rows each.

So if you have 99 sites having 10K visitors, single site having 100M visitors and specify 100 reducers then 99% of reducers will finish their work very quickly and you have to wait for a long time when the last reducer terminates.

Not only joins

Data skew issue can arise not only in joins. For example, if you perform a GROUP BY SITE_NAME in our example then a single reducer has to deal with 100M rows while others have to process much smaller number of rows.

Hive on MapReduce Can Launch Multiple ApplicationMasters per Single Query

Hive on MapReduce converts a HQL statement to one or more MapReduce jobs. For example, if you have 2 subqueries joined together, Hive launches multiple jobs to execute this query:

SELECT * FROM
(
  SELECT DISTINCT SOURCE_NAME
  FROM LZ_OUT
  WHERE QDATE = '1900-01-01'
) w LEFT OUTER JOIN
(
  SELECT DISTINCT SOURCE_NAME
  FROM LZ_OUT
  WHERE QDATE != '1900-01-01'
) d ON w.SOURCE_NAME = d.SOURCE_NAME; 

Excerpt from Hive log:

Launching Job 1 out of 4
Submitted application application_1412375486094_61806
The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61806/
Stage-1: number of mappers: 67; number of reducers: 1
...
Launching Job 2 out of 4
Submitted application application_1412375486094_61842
The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61842/
...
Launching Job 4 out of 4
Submitted application application_1412375486094_61863
The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61863/

For each job Hive asks YARN ResourceManager to launch a separate ApplicationMaster. Each ApplicationMaster needs to allocate new containers for its job from scratch. This approach adds overhead to query execution in Hive.

But for long running batch queries this approach ensures that if one of the jobs fails, it can be safely restarted and there is no need to start the query execution from the beginning.

Permanent Functions in Hive 0.13

Since Hive 0.13 you can create permanent functions that are registered in the metastore. Now to use a UDF you do not need to create a temporary function in each session.

Before Hive 0.13

create temporary function lagignorenull as 'com.edw.hive.GenericUDFLagIgnoreNull';
OK

You can reference a temporary function without specifying the database name, and it is still accessible if you change the current database:

select lagignorenull(1) from default.dual;
OK

use lz;  -- Change the current database
select lagignorenull(1) from default.dual;
OK

Since Hive 0.13

You can create a permanent function using CREATE FUNCTION statement:

create function lagignorenull as 'com.edw.hive.GenericUDFLagIgnoreNull';
OK

If you do not specify a database name, the function is created in the current database. Then if you change the current database you have to use the fully qualified name to access the function:

select lagignorenull(1) from default.dual;
OK

use lz;  -- Change the current database
select lagignorenull(1) from default.dual;
FAILED: SemanticException [Error 10011]: Line 1:7 Invalid function 'LAGIGNORENULL'

-- Use fully qualified name
select default.lagignorenull(1) from default.dual;
OK

Temporary to Permanent Functions Conversion

If you want to convert all your temporary functions to permanent you should remember that permanent functions require fully qualified name if the do not exist in the current database, so if you use USE dbname; statements in your scripts you may need to modify the function references.

Hive Internals – ApplicationMaster 10-20+ Seconds Startup Overhead

Hive on MapReduce launches one or more ApplicationMasters per single query. The ApplicationMaster then typically launches multiple containers on cluster nodes to execute the query. What is their startup overhead?

From the client-side hive.log file you can see when Hive contacted ResourceManager to start the job:

2014-10-06 02:08:45,150 INFO  mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:243
2014-10-06 02:08:45,264 INFO  mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1412375486094_37432
2014-10-06 02:08:45,704 INFO  impl.YarnClientImpl (YarnClientImpl.java:submitApplication(236)) - Submitted application application_1412375486094_37432
2014-10-06 02:08:45,751 INFO  mapreduce.Job (Job.java:submit(1289)) - The url to track the job: N/A
2014-10-06 02:08:45,756 INFO  exec.Task (SessionState.java:printInfo(538)) - Starting Job = job_1412375486094_37432, Tracking URL = N/A

In this example, Hive submitted the YARN application at 02:08:45. This is the time when Hive actually started the execution of the query on YARN. Before that Hive can spend time on Metastore and HDFS operations parsing the query and preparing temporary directories.

Let’s see ResourceManager logs to confirm this submission:

2014-10-06 02:08:45,577 INFO  resourcemanager.ClientRMService (ClientRMService.java:submitApplication(541)) - Application with id 37432 submitted by user v-dtolpeko
2014-10-06 02:08:45,578 INFO  rmapp.RMAppImpl (RMAppImpl.java:transition(911)) - Storing application with id application_1412375486094_37432
2014-10-06 02:08:45,578 INFO  resourcemanager.RMAuditLogger (RMAuditLogger.java:logSuccess(142)) - USER=v-dtolpeko IP=10.188.244.156 OPERATION=Submit Application Request TARGET=ClientRMService RESULT=SUCCESS APPID=application_1412375486094_37432

After checking the queue permissions, ResourceManager changes the state of the job to SCHEDULED:

2014-10-06 02:08:46,198 INFO  attempt.RMAppAttemptImpl (RMAppAttemptImpl.java:handle(667)) - appattempt_1412375486094_37432_000001 State change from SUBMITTED to SCHEDULED

Then, in this particular example, the next message appears in ResourceManager log in about 27 seconds:

2014-10-06 02:09:13,103 INFO  rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(367)) - container_1412375486094_37432_01_000001 Container Transitioned from NEW to ALLOCATED
2014-10-06 02:09:13,104 INFO  resourcemanager.RMAuditLogger (RMAuditLogger.java:logSuccess(100)) - USER=v-dtolpeko OPERATION=AM Allocated Container TARGET=SchedulerApp RESULT=SUCCESS APPID=application_1412375486094_37432 CONTAINERID=container_1412375486094_37432_01_000001

It took about 27 seconds to find a container to launch ApplicationMaster. After further preparations ApplicationMaster is ready to launch:

2014-10-06 02:09:13,112 INFO  attempt.RMAppAttemptImpl (RMAppAttemptImpl.java:handle(667)) - appattempt_1412375486094_37432_000001 State change from ALLOCATED to LAUNCHED
2014-10-06 02:09:14,105 INFO  rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(367)) - container_1412375486094_37432_01_000001 Container Transitioned from ACQUIRED to RUNNING

So the ApplicationMaster has been started and now we can see messages in its log:

2014-10-06 02:09:15,141 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1412375486094_37432_000001

ApplicationMaster performs initialization, and connects to ResourceManager:

2014-10-06 02:09:19,272 INFO [main] org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at chsxedw/10.188.244.160:8030

You can see this from ResourceManager log:

2014-10-06 02:09:19,455 INFO  resourcemanager.RMAuditLogger (RMAuditLogger.java:logSuccess(121)) - USER=v-dtolpeko	IP=10.188.244.148	OPERATION=Register App Master	TARGET=ApplicationMasterService	RESULT=SUCCESS	APPID=application_1412375486094_37432	APPATTEMPTID=appattempt_1412375486094_37432_000001

Then ApplicationMaster starts allocating containers:

2014-10-06 02:09:38,710 INFO  rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(367)) - container_1412375486094_37432_01_000002 Container Transitioned from NEW to RESERVED
2014-10-06 02:09:38,725 INFO  rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(367)) - container_1412375486094_37432_01_000004 Container Transitioned from NEW to RESERVED

From this example you can see that ApplicationMaster startup and Container allocation still take considerable time on YARN that does not allow to complete Hive queries within even 10 seconds using the MapReduce engine.

Hive Internals: Query Pre-Execution Steps

When you use Hive on MapReduce execution engine sometimes you have to wait a long time before you can see any execution progress. So what happens behind the scenes?

Metastore and HDFS Operations

First Hive parses the query, performs semantic analysis and creates temporary directories in HDFS. When this step is complete you can see Query ID assigned by Hive:

Query ID = v-dtolpeko_20141003075353_e51611fe-d071-4aee-aac0-a29a090f5a39
Total jobs = 1
Launching Job 1 out of 1

At this stage Hive still does not yet contact YARN ResourceManager. If you do not see Query ID message for a long time, there are probably performance issues with Metastore or HDFS.

Input Split Calculation

Then Hive calculates input splits for the job. You can see the following message indicating this step:

Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapreduce.job.reduces=

Contacting ResourceManager

At this stage Hive contacts ResourceManager for the first time to submit an application. When the ResourceManager accepts the request, you can see assigned application_id:

Starting Job = job_1412320486912_0486, Tracking URL = http://cheledwh:8088/proxy/application_1412320486912_0486/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1412320486912_0486

Launching ApplicationMaster and Starting Execution

Assigned application_id does not mean that the query is already being executed. After accepting the application ResourceManager needs to start ApplicationMaster and wait until it registers itself with the ResourceManager. It can took 5-10 seconds mostly due to JVM startup overhead.

When the ApplicationMaster starts you can see the following message:

Hadoop job information for Stage-1: number of mappers: 66; number of reducers: 1

And then you will start receiving task progress:

2014-10-03 07:54:10,911 Stage-1 map = 0%,  reduce = 0%
2014-10-03 07:54:34,615 Stage-1 map = 2%,  reduce = 0%, Cumulative CPU 2.96 sec
...

Implementation Limitations of MapJoin in Hive 0.13 on MR

When you need to join a large table (fact) with a small table (dimension) Hive can perform a map side join. You may assume that multiple map tasks is started to read the large table and each mapper will read its own full copy of the small table and perform the join locally.

In Hive 0.13 on MapReduce engine it is implemented in a slightly different and unfortunately not always optimal way:

Step 1 – Download Side-table to the Hive Client machine

First, the data file of the side table is downloaded to the local disk of the Hive client machine which typically is not a Data Node.

You can see this from log:

Starting to launch local task to process map join;
Dump the side-table into file: file:/tmp/v-dtolpeko/hive_2014-10-01 ...
...
End of local task; Time Taken: 2.013 sec.
Execution completed successfully
MapredLocal task succeeded

Step 2 – Create HashTable and archive the file

Then Hive transforms the side-table data to HashTable and creates .gz archive file:

Archive 1 hash table files to file:/tmp/v-dtolpeko/hive_2014-10-01.../Stage-4.tar.gz

Step 3 – Upload HashTable to HDFS

Hive uploads the archive .gz file to HDFS and add it to the Distributed Cache.

Upload 1 archive file  from file:/tmp/v-dtolpeko/hive_2014-10-01.../Stage-4.tar.gz to 
hdfs://chsxe...
Add 1 archive file to distributed cache. Archive file: hdfs://chsxe... 

mapjoin

As you can see the side-table is downloaded from HDFS, transformed into Hash Table and archived, and finally written back to HDFS.

You should take into account this behavior when optimizing Hive queries using MapJoin.