Often running a HQL query you may notice that it progresses to 99% reduce stage quite fast and then stucks:
... 2014-10-07 08:46:01,149 Stage-8 map = 100%, reduce = 99%, Cumulative CPU 6905.85 sec 2014-10-07 08:47:01,361 Stage-8 map = 100%, reduce = 99%, Cumulative CPU 6999.45 sec 2014-10-07 08:48:01,441 Stage-8 map = 100%, reduce = 99%, Cumulative CPU 7065.59 sec 2014-10-07 08:49:01,670 Stage-8 map = 100%, reduce = 99%, Cumulative CPU 7125.26 sec 2014-10-07 08:50:01,808 Stage-8 map = 100%, reduce = 99%, Cumulative CPU 7188.12 sec
The problem is that Hive estimates the progress depending on the number of reducers completed, and this does not always relevant to the actual execution progress. It is possible that a query can reach 99% in 1 minute and then execute remaining 1% during 1 hour.
The most typical reason of this behavior is skewed data. For example, assume that you have a table that tracks all visits to the specific sites and SITE.COM has 100M rows while there are a dozen of other sites SITE.ORG, SITE.NET etc. that have just 10K visitors each.
Then when you join this table with another by site name, one reducer has to process 100M rows while other reducers process just 10K rows each.
So if you have 99 sites having 10K visitors, single site having 100M visitors and specify 100 reducers then 99% of reducers will finish their work very quickly and you have to wait for a long time when the last reducer terminates.
Not only joins
Data skew issue can arise not only in joins. For example, if you perform a GROUP BY SITE_NAME in our example then a single reducer has to deal with 100M rows while others have to process much smaller number of rows.
Hive on MapReduce converts a HQL statement to one or more MapReduce jobs. For example, if you have 2 subqueries joined together, Hive launches multiple jobs to execute this query:
SELECT * FROM ( SELECT DISTINCT SOURCE_NAME FROM LZ_OUT WHERE QDATE = '1900-01-01' ) w LEFT OUTER JOIN ( SELECT DISTINCT SOURCE_NAME FROM LZ_OUT WHERE QDATE != '1900-01-01' ) d ON w.SOURCE_NAME = d.SOURCE_NAME;
Excerpt from Hive log:
Launching Job 1 out of 4 Submitted application application_1412375486094_61806 The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61806/ Stage-1: number of mappers: 67; number of reducers: 1 ... Launching Job 2 out of 4 Submitted application application_1412375486094_61842 The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61842/ ... Launching Job 4 out of 4 Submitted application application_1412375486094_61863 The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61863/
For each job Hive asks YARN ResourceManager to launch a separate ApplicationMaster. Each ApplicationMaster needs to allocate new containers for its job from scratch. This approach adds overhead to query execution in Hive.
But for long running batch queries this approach ensures that if one of the jobs fails, it can be safely restarted and there is no need to start the query execution from the beginning.
Since Hive 0.13 you can create permanent functions that are registered in the metastore. Now to use a UDF you do not need to create a temporary function in each session.
Before Hive 0.13
create temporary function lagignorenull as 'com.edw.hive.GenericUDFLagIgnoreNull'; OK
You can reference a temporary function without specifying the database name, and it is still accessible if you change the current database:
select lagignorenull(1) from default.dual; OK use lz; -- Change the current database select lagignorenull(1) from default.dual; OK
Since Hive 0.13
You can create a permanent function using CREATE FUNCTION statement:
create function lagignorenull as 'com.edw.hive.GenericUDFLagIgnoreNull'; OK
If you do not specify a database name, the function is created in the current database. Then if you change the current database you have to use the fully qualified name to access the function:
select lagignorenull(1) from default.dual; OK use lz; -- Change the current database select lagignorenull(1) from default.dual; FAILED: SemanticException [Error 10011]: Line 1:7 Invalid function 'LAGIGNORENULL' -- Use fully qualified name select default.lagignorenull(1) from default.dual; OK
Temporary to Permanent Functions Conversion
If you want to convert all your temporary functions to permanent you should remember that permanent functions require fully qualified name if the do not exist in the current database, so if you use USE dbname; statements in your scripts you may need to modify the function references.
Hive on MapReduce launches one or more ApplicationMasters per single query. The ApplicationMaster then typically launches multiple containers on cluster nodes to execute the query. What is their startup overhead?
From the client-side hive.log file you can see when Hive contacted ResourceManager to start the job:
2014-10-06 02:08:45,150 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:243 2014-10-06 02:08:45,264 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1412375486094_37432 2014-10-06 02:08:45,704 INFO impl.YarnClientImpl (YarnClientImpl.java:submitApplication(236)) - Submitted application application_1412375486094_37432 2014-10-06 02:08:45,751 INFO mapreduce.Job (Job.java:submit(1289)) - The url to track the job: N/A 2014-10-06 02:08:45,756 INFO exec.Task (SessionState.java:printInfo(538)) - Starting Job = job_1412375486094_37432, Tracking URL = N/A
In this example, Hive submitted the YARN application at 02:08:45. This is the time when Hive actually started the execution of the query on YARN. Before that Hive can spend time on Metastore and HDFS operations parsing the query and preparing temporary directories.
Let’s see ResourceManager logs to confirm this submission:
2014-10-06 02:08:45,577 INFO resourcemanager.ClientRMService (ClientRMService.java:submitApplication(541)) - Application with id 37432 submitted by user v-dtolpeko 2014-10-06 02:08:45,578 INFO rmapp.RMAppImpl (RMAppImpl.java:transition(911)) - Storing application with id application_1412375486094_37432 2014-10-06 02:08:45,578 INFO resourcemanager.RMAuditLogger (RMAuditLogger.java:logSuccess(142)) - USER=v-dtolpeko IP=10.188.244.156 OPERATION=Submit Application Request TARGET=ClientRMService RESULT=SUCCESS APPID=application_1412375486094_37432
After checking the queue permissions, ResourceManager changes the state of the job to SCHEDULED:
2014-10-06 02:08:46,198 INFO attempt.RMAppAttemptImpl (RMAppAttemptImpl.java:handle(667)) - appattempt_1412375486094_37432_000001 State change from SUBMITTED to SCHEDULED
Then, in this particular example, the next message appears in ResourceManager log in about 27 seconds:
2014-10-06 02:09:13,103 INFO rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(367)) - container_1412375486094_37432_01_000001 Container Transitioned from NEW to ALLOCATED 2014-10-06 02:09:13,104 INFO resourcemanager.RMAuditLogger (RMAuditLogger.java:logSuccess(100)) - USER=v-dtolpeko OPERATION=AM Allocated Container TARGET=SchedulerApp RESULT=SUCCESS APPID=application_1412375486094_37432 CONTAINERID=container_1412375486094_37432_01_000001
It took about 27 seconds to find a container to launch ApplicationMaster. After further preparations ApplicationMaster is ready to launch:
2014-10-06 02:09:13,112 INFO attempt.RMAppAttemptImpl (RMAppAttemptImpl.java:handle(667)) - appattempt_1412375486094_37432_000001 State change from ALLOCATED to LAUNCHED 2014-10-06 02:09:14,105 INFO rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(367)) - container_1412375486094_37432_01_000001 Container Transitioned from ACQUIRED to RUNNING
So the ApplicationMaster has been started and now we can see messages in its log:
2014-10-06 02:09:15,141 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1412375486094_37432_000001
ApplicationMaster performs initialization, and connects to ResourceManager:
2014-10-06 02:09:19,272 INFO [main] org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at chsxedw/10.188.244.160:8030
You can see this from ResourceManager log:
2014-10-06 02:09:19,455 INFO resourcemanager.RMAuditLogger (RMAuditLogger.java:logSuccess(121)) - USER=v-dtolpeko IP=10.188.244.148 OPERATION=Register App Master TARGET=ApplicationMasterService RESULT=SUCCESS APPID=application_1412375486094_37432 APPATTEMPTID=appattempt_1412375486094_37432_000001
Then ApplicationMaster starts allocating containers:
2014-10-06 02:09:38,710 INFO rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(367)) - container_1412375486094_37432_01_000002 Container Transitioned from NEW to RESERVED 2014-10-06 02:09:38,725 INFO rmcontainer.RMContainerImpl (RMContainerImpl.java:handle(367)) - container_1412375486094_37432_01_000004 Container Transitioned from NEW to RESERVED
From this example you can see that ApplicationMaster startup and Container allocation still take considerable time on YARN that does not allow to complete Hive queries within even 10 seconds using the MapReduce engine.
When you use Hive on MapReduce execution engine sometimes you have to wait a long time before you can see any execution progress. So what happens behind the scenes?
Metastore and HDFS Operations
First Hive parses the query, performs semantic analysis and creates temporary directories in HDFS. When this step is complete you can see Query ID assigned by Hive:
Query ID = v-dtolpeko_20141003075353_e51611fe-d071-4aee-aac0-a29a090f5a39 Total jobs = 1 Launching Job 1 out of 1
At this stage Hive still does not yet contact YARN ResourceManager. If you do not see Query ID message for a long time, there are probably performance issues with Metastore or HDFS.
Input Split Calculation
Then Hive calculates input splits for the job. You can see the following message indicating this step:
Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapreduce.job.reduces=
At this stage Hive contacts ResourceManager for the first time to submit an application. When the ResourceManager accepts the request, you can see assigned application_id:
Starting Job = job_1412320486912_0486, Tracking URL = http://cheledwh:8088/proxy/application_1412320486912_0486/ Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1412320486912_0486
Launching ApplicationMaster and Starting Execution
Assigned application_id does not mean that the query is already being executed. After accepting the application ResourceManager needs to start ApplicationMaster and wait until it registers itself with the ResourceManager. It can took 5-10 seconds mostly due to JVM startup overhead.
When the ApplicationMaster starts you can see the following message:
Hadoop job information for Stage-1: number of mappers: 66; number of reducers: 1
And then you will start receiving task progress:
2014-10-03 07:54:10,911 Stage-1 map = 0%, reduce = 0% 2014-10-03 07:54:34,615 Stage-1 map = 2%, reduce = 0%, Cumulative CPU 2.96 sec ...
When you need to join a large table (fact) with a small table (dimension) Hive can perform a map side join. You may assume that multiple map tasks is started to read the large table and each mapper will read its own full copy of the small table and perform the join locally.
In Hive 0.13 on MapReduce engine it is implemented in a slightly different and unfortunately not always optimal way:
Step 1 – Download Side-table to the Hive Client machine
First, the data file of the side table is downloaded to the local disk of the Hive client machine which typically is not a Data Node.
You can see this from log:
Starting to launch local task to process map join; Dump the side-table into file: file:/tmp/v-dtolpeko/hive_2014-10-01 ... ... End of local task; Time Taken: 2.013 sec. Execution completed successfully MapredLocal task succeeded
Step 2 – Create HashTable and archive the file
Then Hive transforms the side-table data to HashTable and creates .gz archive file:
Archive 1 hash table files to file:/tmp/v-dtolpeko/hive_2014-10-01.../Stage-4.tar.gz
Step 3 – Upload HashTable to HDFS
Hive uploads the archive .gz file to HDFS and add it to the Distributed Cache.
Upload 1 archive file from file:/tmp/v-dtolpeko/hive_2014-10-01.../Stage-4.tar.gz to hdfs://chsxe... Add 1 archive file to distributed cache. Archive file: hdfs://chsxe...
As you can see the side-table is downloaded from HDFS, transformed into Hash Table and archived, and finally written back to HDFS.
You should take into account this behavior when optimizing Hive queries using MapJoin.