Overview

In last article 'DBA stance in Hadoop world - Impala' we looked into high level architecture overview of Impala, in this article we will practically walk through Impala to get familiar with it. In this article we will explore Impala using interactive command line utility impala-shell, can also use Hue which is a web-based graphical UI tool from Cloudera that provides a SQL workbench like functionality. Due to architectural nature of Impala we need to connect any one of the Data node of Hadoop cluster to study the behavior of impalad daemon running on every data node of Hadoop cluster.

Practical look

To check the processes running in an node we can use jps(Java virtual machine process status tool) command which reports information on the JVM's for which it has got access permission. So let us login to one of the Hadoop data node through user named as 'hdfs', in production Hadoop cluster it is best practice to run different Hadoop daemons under different user where each set of daemons are grouped based on their common product functionality. For example in an data node we have 'hdfs' user meant for HDFS filesystem functionality and in the same way we have 'impala' user meant for managing Impala product functionality. This is the reason we can't leverage jps command to find all the services running in an data node hosted by different user's.

Check HDFS services running through jps command using 'hdfs' user

hdfs@dnode10:~> jps -lv
15329 sun.tools.jps.Jps -Dapplication.home=/usr/java/jdk1.8.0_60 -Xms8m
7173  -Dproc_datanode -Xmx1000m -Dhdfs.audit.logger=INFO,RFAAUDIT -Dsecurity.audit.logger=INFO,RFAS -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/hadoop/log/hdfs -Dhadoop.log.file=hadoop-cmf-hdfs-DATANODE-perle.int.thomsonreuters.com.log.out -Dhadoop.home.dir=/hadoop/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/hadoop -Dhadoop.id.str=hdfs -Dhadoop.root.logger=INFO,RFA -Djava.library.path=/hadoop/cloudera/parcels/GPLEXTRAS-5.7.1-1.cdh5.7.1.p0.11/lib/hadoop/lib/native:/hadoop/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Dhadoop.id.str=hdfs -Xms2147483648 -Xmx2147483648 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh -Dhadoop.security.logger=INFO,RFAS -Dcommons.daemon.process.id=7173 -Dcommons.daemon.process.parent=7027 -Dco

Check Impala services running through jps command using 'impala' user

impala@dnode10:~> jps -lv
15601 sun.tools.jps.Jps -Dapplication.home=/usr/java/jdk1.8.0_60 -Xms8m
9958  -Djava.library.path=/hadoop/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/impala/lib:/hadoop/cloudera/parcels/GPLEXTRAS-5.7.1-1.cdh5.7.1.p0.11/lib/hadoop/lib/native

So jps states that we have impala process running with process id 9958 and if we search the process id it states that it is the impalad daemon.

impala@dnode10:~> ps -ef|grep 9958
impala    9958 12057  0 Sep21 ?        00:24:32 /hadoop/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/impala/sbin-retail/impalad --flagfile=/run/cloudera-scm-agent/process/3929-impala-IMPALAD/impala-conf/impalad_flags

Let us check what configuration information we derive by traversing its files under configuration directory specified in the impalad process.

impala@dnode10:~> ls -lrt /run/cloudera-scm-agent/process/3929-impala-IMPALAD/impala-conf
-rw-r----- 1 impala impala   98 Jul 18 09:11 sentry-site.xml
-rw-r----- 1 impala impala  682 Jul 18 09:11 log4j.properties
-rw-r----- 1 impala impala 1549 Jul 18 09:11 impalad_flags
-rw-r----- 1 impala impala   98 Aug 24 14:27 llama-site.xml
-rw-r----- 1 impala impala 4582 Aug 24 14:27 fair-scheduler.xml

As we discussed in our last article all of the configuration files are placed under impala process directory structure and impalad_flags is the file which consists all of the parameters related to impalad service. This impalad service should be having state_store and catalog service details like port number and the hostname where they are hosted.

impala@dnode10:/run/cloudera-scm-agent/process/3929-impala-IMPALAD/impala-conf> cat impalad_flags | grep -P 'state_store|catalog'
-state_store_subscriber_port=23000
-state_store_host=impa.world.com
-state_store_port=24000
-catalog_service_host=impa.world.com
-catalog_service_port=26000

Kerberos Security

Now let us find the impalad log file and go through it to make our-self familiar. We can get the location of log from file log4j.properties of the impalad daemon.

impala@dnode10:/run/cloudera-scm-agent/process/3929-impala-IMPALAD/impala-conf> cat log4j.properties | grep log
log.threshold=INFO
main.logger=FA
impala.root.logger=${log.threshold},${main.logger}
log4j.rootLogger=${impala.root.logger}
log.dir=/hadoop/log/impala/impalad
log.file=impalad.INFO
max.log.file.size=200MB
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.File=${log.dir}/${log.file}
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%p%d{MMdd HH:mm:ss.SSS'000'} %t %c] %m%n
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

So we can find the log file in location defined by variable named as log.dir and file name would be based on variable log.file.

impala@dnode10:/hadoop/log/impala/impalad> view impalad.dnode10.impala.log.INFO.20160718-091106.9412
Log file created at: 2016/07/18 09:11:06
Running on machine: dnode10
Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg
I0718 09:11:06.986838  9412 logging.cc:119] stdout will be logged to this file.
E0718 09:11:06.986960  9412 logging.cc:120] stderr will be logged to this file.
I0718 09:11:06.993680  9412 authentication.cc:650] Using internal kerberos principal "impala/dnode10.world.com@QA.WORLD.COM"
I0718 09:11:06.993686  9412 authentication.cc:985] Internal communication is authenticated with Kerberos
I0718 09:11:06.993875  9412 authentication.cc:770] Waiting for Kerberos ticket for principal: impala/dnode10.world.com@QA.WORLD.COM
I0718 09:11:06.993881  9471 authentication.cc:450] Registering impala/dnode10.world.com@QA.WORLD.COM, keytab file /run/cloudera-scm-agent/process/3311-impala-IMPALAD/impala.keytab
I0718 09:11:07.009990  9412 authentication.cc:772] Kerberos ticket granted to impala/dnode10.world.com@QA.WORLD.COM
I0718 09:11:07.010023  9412 authentication.cc:650] Using external kerberos principal "impala/dnode10.world.com@QA.WORLD.COM"
I0718 09:11:07.010028  9412 authentication.cc:1001] External communication is authenticated with Kerberos
I0718 09:11:07.010143  9412 init.cc:158] impalad version 2.3.0-cdh5.5.2 RELEASE (build cc1125f10419a7269366f7f950f57b24b07acd64)

It's interesting to see that all the internal and external communication of impalad daemon will be done through the authentication of Kerberos. The origin of Kerberos was from MIT(Massachusetts Institute of Technology) and then it evolved to become the de-facto standard in the industries. By default Hadoop runs in non-secure mode where there is no requirement of authentication, but if we configure Hadoop cluster in secure mode then each user and service has to be authenticated by Kerberos to use the Hadoop services. Kerberos main intention is to ensure that when there is a request by someone then it should validate to confirm they are really who say they are. In secured Hadoop cluster every daemon will interact with Kerberos to perform mutual authentication, which means interacting daemons will ensure that other daemons are who they claim they are. Kerberos 'principal' is an entity and it can be a user or daemon which gets verified by either providing a password or keytab file. For Impala daemons it needs keytab file for the given principal so that daemon will use the principal and the keys stored in keytab file for all the communications.

Since we are planning to use impala-shell to explore Impala we need to establish secure connection with impalad daemon using Kerberos authentication. One of the simplest way to achieve keytab file details is by leveraging the keytab file used by impalad daemon.

impala@dnode10:~> ps -ef|grep 9958
impala    9958 12057  0 Sep21 ?        00:24:32 /hadoop/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/impala/sbin-retail/impalad --flagfile=/run/cloudera-scm-agent/process/3929-impala-IMPALAD/impala-conf/impalad_flags

We can get the keytab file used by impalad daemon from the parameter keytab_file defined in impalad_flags file.

impala@dnode10:~> cat /run/cloudera-scm-agent/process/3929-impala-IMPALAD/impala-conf/impalad_flags | grep keytab
-keytab_file=/run/cloudera-scm-agent/process/3929-impala-IMPALAD/impala.keytab

After getting keytab file we can check the keys held in the keytab file by using klist command with -k argument which displays the encryption types of the session key and the ticket for each credential in the credential cache, or each key in the keytab file.

impala@dnode10:~> klist -kte /run/cloudera-scm-agent/process/3929-impala-IMPALAD/impala.keytab
Keytab name: FILE:/run/cloudera-scm-agent/process/3929-impala-IMPALAD/impala.keytab
KVNO Timestamp           Principal
---- ------------------- ------------------------------------------------------
   1 02/16/2016 17:08:15 impala/dnode10.world.com@QA.WORLD.COM (arcfour-hmac)

So we have principal - impala/dnode10.world.com@QA.WORLD.COM defined in the keytab file, now we could check if we have any cache collection and display a table summarizing the caches present in the collection by using -l argument for klist command as shown below.

impala@dnode10:~> klist -l
Principal name                 Cache name
--------------                 ----------

We don't have any cache collection and thus we need to request a ticket obtained from a key present in the keytab file by using kinit command as shown below.

impala@dnode10:~> kinit -k -t /run/cloudera-scm-agent/process/3311-impala-IMPALAD/impala.keytab impala/dnode10.world.com@QA.WORLD.COM

Now if we list the cache collection we could see that we have principal defined in it.

impala@dnode10:~> klist -l
Principal name                 Cache name
--------------                 ----------
impala/dnode10.world.com       FILE:/tmp/krb5cc_906

Let's find the details of ticket held in the credential cache by using klist as shown below.

impala@dnode10:~> klist
Ticket cache: FILE:/tmp/krb5cc_906
Default principal: impala/dnode10.world.com@QA.WORLD.COM

Valid starting       Expires              Service principal
08/30/2016 03:37:08  08/30/2016 13:37:08  krbtgt/QA.WORLD.COM@QA.WORLD.COM
        renew until 09/06/2016 03:37:08
08/30/2016 03:37:22  08/30/2016 13:37:08  impala/dnode10.world.com@QA.WORLD.COM
        renew until 09/06/2016 03:37:08

This confirms that we have ticket in place and we can establish secure connection to Impala through impala-shell interface. If we are connecting remotely then we can use -i option with impala-shell to specify hostname and port number where impalad daemon is running and ensure that we have ticket present in the cache to establish secure connection, by default it tries to connect impalad daemon on default port number 21000.

Interactive Impala-shell

When impala-shell is invoked without any argument then initially it tries to connect without Kerberos authentication and fails, later on it retries the connection if it finds the ticket present in credential cache. We can disable this behavior by passing -k argument to impala-shell for establishing secure Kerberos authentication.

impala@dnode10:~> impala-shell
Starting Impala Shell without Kerberos authentication
Error connecting: TTransportException, TSocket read 0 bytes
Kerberos ticket found in the credentials cache, retrying the connection with a secure transport.
Connected to dnode10.world.com:21000
Server version: impalad version 2.5.0-cdh5.7.1 RELEASE (build 8097681a0eccf6d4335ea1)
***********************************************************************************
Welcome to the Impala shell. Copyright (c) 2015 Cloudera, Inc. All rights reserved.
(Impala Shell v2.5.0-cdh5.7.1 (27a4325) built on Wed Jun  1 16:06:09 PDT 2016)

To see a summary of a query's progress that updates in real-time, run 'set
LIVE_PROGRESS=1;'.
***********************************************************************************
[dnode10.world.com:21000] >

As we could see from impala banner that it has initially tries without Kerberos and then retires secured connection when it finds tickets in the credential cache. Now let's move ahead to explore basics details of Impala through impala-shell. If you notice most of the looks in impala-shell is almost similar to Mysql and hence easier to work upon.

[dnode10.world.com:21000] > show databases;
Query: show databases
+------------------+----------------------------------------------+
| name             | comment                                      |
+------------------+----------------------------------------------+
| _impala_builtins | System database for Impala builtin functions |
| default          | Default Hive database                        |
| demo_db          |                                              |
+------------------+----------------------------------------------+

There is a special database, named default, where you begin when you connect to Impala. Tables created in default are physically located one level higher in HDFS than all the user-created databases. Impala includes another predefined database, _impala_builtins, that serves as the location for the built-in functions. Let's login to demo_db database and try to find the tables existing in it. Let's login to demo_db database and check the details of tables existing in this database.
[dnode10.world.com:21000] > use demo_db;
Query: use demo_db

[dnode10.world.com:21000] > show tables;
Query: show tables
+------------+
| name       |
+------------+
| sample_07  |
| sample_08  |
+------------+

Switching to demo_db can be done through 'use <database_name>' command which closely resembles to Mysql. There are about 4 tables and now the most interesting question is where does it stores metadata information of these tables ? Basically Impala stores it's table definitions in a database called 'metastore' which can be either MySQL or PostgreSQL, it's the same database used by Hive to store it's metadata, this is the reason it's mandatory to have Hive for Impala. Hence it is possible to query tables created in Hive from Impala . In addition to Hive metadata information Impala keeps track of low-level information like blocks physical location in HDFS.

In Impala the synchronization of metadata(Both - HDFS block and Hive metastore ) across all the impalad daemon in data nodes of Hadoop cluster is done by catalogd daemon, upon receiving this information is used by impalad daemon for optimizing the query. When DDL is performed by Impala then catalogd will take care of synchronizing the metadata by updating the Hive metastore and broadcasting the same information to all the impalad daemons. But when DDL is performed by Hive then we need to manually refresh the Impala metadata as currently this is not automatic. After manual refresh catalogd daemon will refresh the Impala metadata and broadcasts this to all the impalad daemon in data nodes.

To check the properties of existing table we can use show command

[dnode10.world.com:21000] > show create table sample_07;
Query: show create table sample_07
+--------------------------------------------------------------------------------------------------------------------------------------+
| result                                                                                                                               |
+--------------------------------------------------------------------------------------------------------------------------------------+
| CREATE TABLE demo_db.sample_07 (                                                                                                     |
|   col1 INT,                                                                                                                          |
|   col2 INT                                                                                                                           |
| )                                                                                                                                    |
| STORED AS TEXTFILE                                                                                                                   |
| LOCATION 'hdfs://HadoopCluster/user/hive/warehouse/sample_07'                                                                        |
| TBLPROPERTIES ('transient_lastDdlTime'='1472584031', 'totalSize'='4', 'numRows'='1', 'COLUMN_STATS_ACCURATE'='true', 'numFiles'='1') |
+--------------------------------------------------------------------------------------------------------------------------------------+
Fetched 1 row(s) in 0.01s

This table is stored as plain TEXTFILE located under Hadoop cluster named as 'HadoopCluster' in the Impala directory /user/hive/warehouse/sample_07. Impala supports various file formats used in Hadoop framework and also Impala has higher significant performance consequences when used with different file formats. Not only different file formats but Impala also supports various compression codecs used in Hadoop framework and therefore choosing the right file formats does plays an important role. In general Impala supports creating and reading file formats - (Text, Parquet, Avro, RCFile and Sequencefile) but inserting from Impala can be done only on Text and Parquet file formats, in such cases insert data through Hive and refresh the metadata or insert by using LOAD DATA on data files already in the right format. It is also possible to query HBase tables through Impala.

Let's take an example of how Statistics associated with tables are stored in metastore of Impala. Check the statistics associated with an table and if it doesn't exists then gather the statistics which in turn will be stored into Impala metastore(MySQL).

[dnode10.world.com:21000] > show table stats sample_07;
Query: show table stats sample_07
+-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------+
| #Rows | #Files | Size    | Bytes Cached | Cache Replication | Format | Incremental stats | Location                                          |
+-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------+
| 0     | 1      | 44.98KB | NOT CACHED   | NOT CACHED        | TEXT   | false             | hdfs://HadoopCluster/user/hive/warehouse/sample_07 |
+-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------+
Fetched 1 row(s) in 0.02s

[dnode10.world.com:21000] > select count(*) from sample_07;
Query: select count(*) from sample_07
+----------+
| count(*) |
+----------+
| 823      |
+----------+
Fetched 1 row(s) in 1.02s

Though table has 823 rows the statistics of this table shows zero rows as statistics were not collected on this table. Gather table stats and check what all information it takes into account while gathering statistics.

[dnode10.world.com:21000] > compute stats sample_07;
Query: compute stats sample_07
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 4 column(s). |
+-----------------------------------------+
Fetched 1 row(s) in 1.56s

[dnode10.world.com:21000] > show table stats sample_07;
Query: show table stats sample_07
+-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------+
| #Rows | #Files | Size    | Bytes Cached | Cache Replication | Format | Incremental stats | Location                                          |
+-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------+
| 823   | 1      | 44.98KB | NOT CACHED   | NOT CACHED        | TEXT   | false             | hdfs://HadoopCluster/user/hive/warehouse/sample_07 |
+-------+--------+---------+--------------+-------------------+--------+-------------------+---------------------------------------------------+
Fetched 1 row(s) in 0.01s

[dnode10.world.com:21000] > show column stats sample_07;
Query: show column stats sample_07
+-------------+--------+------------------+--------+----------+-------------------+
| Column      | Type   | #Distinct Values | #Nulls | Max Size | Avg Size          |
+-------------+--------+------------------+--------+----------+-------------------+
| code        | STRING | 830              | -1     | 7        | 7                 |
| description | STRING | 832              | -1     | 105      | 34.69020080566406 |
| total_emp   | INT    | 801              | -1     | 4        | 4                 |
| salary      | INT    | 742              | -1     | 4        | 4                 |
+-------------+--------+------------------+--------+----------+-------------------+
Fetched 4 row(s) in 0.02s

So both table and column level statistics are gathered which will be used by the Query planner to generate best possible execution plan for the associated query.

Execution flow of Impala query

  1. When Impala client connects to an impalad daemon for performing queries then this impalad daemon becomes Query Coordinator
  2. Query Coordinator will use metadata information(Block locations on each data node) received from catalogd daemon and also it uses information(Health of all impalad daemon) received from statestored daemon. Based on these information Query Coordinator will distribute the work among Worker impalad daemons running in each data nodes of Hadoop cluster.
  3. When each of the Worker impalad daemon completes its work, the results are propagated back to Query Coordinator which inturn collaborates the result received from all the Worker impalad daemons.

Since Impala daemons are co-located in the same hosts(Data nodes) where data is placed in an Hadoop ecosystem, we can leverage the 'HDFS Short-Circuit Local Reads'. When this feature is enabled HDFS client(In this case impalad daemons) can directly access the files using UNIX domain socket that allows client and data node to communicate directly, this short-circuit reads substantially increases the performance. In brief short-circuit reads avoids the overhead of data node reading the data from file and sending it to client over TCP socket.

Conclusion

In this article we practically went through basics of Impala mechanism. There are many tools(Profile, Explain, Summary and Live_summary) available in Impala to evaluate the performance of the query, it also helps in studying the flow of data within the Hadoop cluster when an query is executed. When working with Impala its critical to consider the importance of Hive presence as most of the file formats supports data loading only through Hive, also its very important to consider the various file formats from performance perspective when creating the table. In future articles we will discuss more about how Hive and Impala are related, benchmarking performance of various file formats and tuning queries by using various tools available in Impala.