Background

Over the period of time Hadoop ecosystem has evolved drastically and from DBA point of view it is always interesting to see the deployments and figuring out how DBA skills can be leveraged into Hadoop world. During initial days of Hadoop the only way to get the data was through MapReduce which uses Java code. Companies like Yahoo! and Facebook were using Hadoop ecosystems extensively by deploying Java codes to process huge amount of data they had. But later they were overwhelmed due to plenty of time spent in writing Java code to get the job done and were desperate to work faster by allowing non-programmers to achieve the data residing in Hadoop ecosystem and also their intention was to take advantage of existing developers skills like SQL.

Yahoo! came up with Apache Pig and in the same way Facebook developed Hive. Both of these works in similar way, they both take the request from Hive or Pig in the form of SQL and internally creates or transforms it into MapReduce jobs to service the request. This mitigated their concern as vast amount of developers were able to get the data placed in Hadoop by using wellknown programming language SQL.

By design MapReduce jobs are slow to intiate and it have its own demerits as MapReduce jobs are high latency workloads in nature suitable for batch processing workloads. Both Hive and Pig inherit the design of MapReduce and thus derives same properties. Batch processing of data through MapReduce jobs were not having issues but demand in directly accessing the data residing in Hadoop through JDBC or ODBC meant for accessing the traditional databases to work in Hadoop world was a big deal. Later Cloudera distributed the ODBC driver to access Hadoop ecosystem through Hive, this made users to point their reporting tools directly to Hadoop and perform all kinds of reporting, analytics and queries. This increased the demand of end users directly accessing the Hadoop but were disappointed as their expectation was similar to the factor of relational databases response. The actual issue was not Hive or Pig but infact the way MapReduce works in principal and thus the issue was inherited by Hive and Pig.

Google came up with new distributed query engine concept to access Hadoop through SQL without incorporating MapReduce. Similarly Yahoo! came up with their own next-generation query processing engine Presto for real-time access to data via SQL. Cloudera also came up with different approach called as Impala which doesn't uses MapReduce. This Impala service will be running right next to where data resides on every Hadoop data node and thus compatible with other engines like Spark and HBase which doesn't uses MapReduce and have access to the same data to what Impala can. Since Impala doesn't work the way Hive works to translate the SQL into MapReduce jobs, Impala does not suffer from the latencies of distributed processing engine MapReduce.

Overview of Impala

Impala is a massively parallel processing(MPP) database engine. It is flexible as it uses existing components within the Hadoop ecosystem. Impala maintains its metadata information in central traditional MySQL or PostgreSQL database known as the metastore. Metadata contains table definition and also the detail information of data files - physical location of blocks residing in HDFS. This metastore database is the same database where Hive stores its metadata. Since the main intention behind Impala is to improve query performance than loading the data into it, for querying the data having the file formats Parquet, Avro, RCFile, or SequenceFile we need to load it using Hive. Similar to traditional databases Impala has the Optimizer which make use of table and column level statistics to derive the most efficient execution plan. When there are many tables and partitions it will be time consuming and overhead to gather details of table metadata to generate the execution plan, hence each Impala node caches this information locally so that it can reused in future for other queries. Thus it is important to have latest metadata information before issuing query against the table. This refreshing of metadata is automatic in version Impala 1.2 and higher. But when there is any change(DDL/DML) made from Hive or when files in HDFS are changed, refresh of metadata will not be automatic and thus manual refresh has to done in Impala.

Impala uses HDFS as storage medium and depends on HDFS redundancy for high availability of the data. Any Impala table represents as datafile residing in HDFS in form of supported file formats and compression codecs. Same way Impala considers all the data files placed in a directory for a table created in Impala. HBase can be used as alternative storage medium for Impala. HBase is database created on HDFS but without any built-in SQL support. So we can map Impala table to an HBase table and query the HBase table from Impala, thus its possible to perform join between Impala table and HBase table. This ability gives room to creative ideas for balancing performance when using FACT and DIMENSION tables by storing FACT as Impala parquet format table and DIMENSION as HBase table.

Impala has three service daemons which runs on particular hosts of a Hadoop cluster. Below sections will elaborate on each of these Impala processes running in CDH 5.5 distribution.

Impala service

Impala daemon is one of the main daemon which runs on every node of Hadoop cluster. This daemon performs read/write operation and takes incoming request from impala-shell(Command interface to Impala), Hue(Web interface for analyzing data) and JDBC/ODBC drivers. When we submit a query to an Impala daemon it acts coordinator node for the submitted query, all other nodes will pass back their result set to this coordinator node. This Impala daemon does heart beat checkup constantly with statestore daemon to have details of other healthy nodes availability to distribute the workload evenly. This Impala daemon also receives acknowledgement from catalog service daemon whenever a node in the cluster performs Create/Alter/Drop/Insert/Load Data in Impala. These acknowledgments are useful to minimize refresh of stale metadata information cached in each Impala node through REFRESH or INVALIDATE METADATA statements. This process is physically present as impalad daemon in the node as shown below from one of the Hadoop data node.

impala@bigdata01:~> ps -ef|grep impalad
impala   10544 12118  0 Jul18 ?        01:16:27 /hadoop/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/impala/sbin-retail/impalad --flagfile=/run/cloudera-scm-agent/process/3321-impala-IMPALAD/impala-conf/impalad_flags

As you could see the impalad daemon has got a flagfile which consists of all the configuration related parameters for this impalad daemon. It's weird to see that flagfile is placed under Cloudera agent directory which is a component of Cloudera Manager framework having agent and server tightly integrated, usually by default agent will heartbeats with server for every 15 seconds to figure out what agent should be doing. One of the main responsibility of agent is to start and stop the processes in Hadoop ecosystem, if agent is directed to spawn new process then it will create unique directory and unpacks the configuration information to avoid the out dated configuration files in Hadoop ecosystem.

impala@bigdata01:~> cat /run/cloudera-scm-agent/process/3321-impala-IMPALAD/impala-conf/impalad_flags
-beeswax_port=21000
-fe_port=21000
-be_port=22000
-llama_callback_port=28000
-hs2_port=21050
-enable_webserver=true
-mem_limit=21474836480
-max_log_files=10
-webserver_port=25000
-max_result_cache_size=100000
-state_store_subscriber_port=23000
-statestore_subscriber_timeout_seconds=30
-scratch_dirs=/data/1/impala/scratch,/data/2/impala/scratch
-default_query_options
-kerberos_reinit_interval=60
-principal=impala/bigdata01@YAK.world.com
-keytab_file=/run/cloudera-scm-agent/process/3321-impala-IMPALAD/impala.keytab
-log_filename=impalad
-audit_event_log_dir=/var/log/impalad/audit
-max_audit_event_log_file_size=5000
-abort_on_failed_audit_event=false
-lineage_event_log_dir=/var/log/impalad/lineage
-max_lineage_log_file_size=5000
-hostname=bigdata01.world.com
-state_store_host=impa.world.com
-enable_rm=false
-state_store_port=24000
-catalog_service_host=impa.world.com
-catalog_service_port=26000
-local_library_dir=/var/lib/impala/udfs
-fair_scheduler_allocation_path=/run/cloudera-scm-agent/process/3321-impala-IMPALAD/impala-conf/fair-scheduler.xml
-llama_site_path=/run/cloudera-scm-agent/process/3321-impala-IMPALAD/impala-conf/llama-site.xml
-disable_admission_control=true
-disk_spill_encryption=false
-abort_on_config_error=true

Few of the important parameters are related to Kerberos authentication(kerberos_reinit_interval,principal,keytab_file) and memory(max_result_cache_size,mem_limit) related settings for impalad process. Also due to impala capable of doing sort, joins or aggregation the intermediate files has to be created on non-HDFS location which is by default on path /tmp/impala-scratch and can be changed by using parameter scratch_dirs. Few parameters(state_store_host,catalog_service_host) are specified to provide the details of hostname where state-store and catalog server are running. Details of the other parameters will be covered in upcoming articles.

Log file location of impalad daemon will be configured in file log4j.properties located in same Cloudera agent directory as shown below.

impala@bigdata01:/run/cloudera-scm-agent/process/3321-impala-IMPALAD/impala-conf> cat log4j.properties
log.threshold=INFO
main.logger=FA
impala.root.logger=${log.threshold},${main.logger}
log4j.rootLogger=${impala.root.logger}
log.dir=/hadoop/log/impala/impalad
log.file=impalad.INFO
max.log.file.size=200MB
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.File=${log.dir}/${log.file}
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%p%d{MMdd HH:mm:ss.SSS'000'} %t %c] %m%n
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

Statestore service

Statestore daemon is responsible for accounting and monitoring health of each Impalad daemon running in all the nodes of a Hadoop cluster. This daemon needs to be run on only one node in the Hadoop cluster. This helps in distributing the workload when node failure occurs and avoid any request pointing to failed node. So failure of this statestore daemon will not have any impact on the Hadoop cluster but there could be chances of cluster being less robust. If failed statestore daemon comes back then it will re-establish the connection with the cluster and start monitoring it. This process is physically present as statestored daemon in the node as shown below from one of the Hadoop node.

impala@impa:~> ps -ef|grep statestored
impala   40790  4840  0 Jul18 ?        03:11:50 /hadoop/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/impala/sbin-retail/statestored --flagfile=/run/cloudera-scm-agent/process/3314-impala-STATESTORE/impala-conf/state_store_flags

Flag file used while starting statestored daemon having all the configuration information.
impala@impa:/run/cloudera-scm-agent/process/3314-impala-STATESTORE/impala-conf # cat state_store_flags
-state_store_pending_task_count_max=0
-max_log_files=10
-state_store_port=24000
-enable_webserver=true
-webserver_port=25010
-state_store_num_server_worker_threads=4
-kerberos_reinit_interval=60
-principal=impala/impa.world.com@YAK.world.com
-keytab_file=/run/cloudera-scm-agent/process/3314-impala-STATESTORE/impala.keytab
-log_filename=statestored

Log file location and other properties of statestored daemon will be configured in file cloudera-monitor.properties placed in same Cloudera agent directory as shown below.

impala@impa:/run/cloudera-scm-agent/process/3314-impala-STATESTORE # cat cloudera-monitor.properties
[statestore]
host = impa.world.com
service_name = impala
service_version = 5
service_release = 5.5.2
role_name = impala-STATESTORE-fe3aac32eed034f392bc5651ce554df3
collect_interval = 60
log_dir = /hadoop/log/impala/statestore
statestore_webserver_port = 25010

Catalog service

Catalog daemon got introduced in Impala 1.2 version to reduce/eliminate the necessity of refreshing the cached metadata in each node of a Hadoop cluster before executing the query so that newly added data files in HDFS can be recognized or if you load data through Hive. This daemon is required only on any one node in an Hadoop cluster and since this daemon passes all the requests through statestore daemon it is preferred to run statestored and catalogd service in the same node. So Catalog daemon broadcasts the metadata changes(Made through Impala SQL queries) to all available nodes in an Hadoop cluster. On receiving end it will be Impala daemon who receives the relayed metadata changes. This process is physically present as catalogd daemon in the node as shown below from one of the Hadoop node.

impala@impa:~> ps -ef|grep catalogd
impala   40807  4840  0 Jul18 ?        01:09:38 /hadoop/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/impala/sbin-retail/catalogd --flagfile=/run/cloudera-scm-agent/process/3322-impala-CATALOGSERVER/impala-conf/catalogserver_flags

Flag file used while starting catalogd daemon having all the configuration information. Note that Kerberos authentication has been enabled for inter process communication and thus parameters kerberos_reinit_interval, principal and keytab_file have been defined accordingly.

impala@impa:/run/cloudera-scm-agent/process/3322-impala-CATALOGSERVER/impala-conf # cat catalogserver_flags
-catalog_service_port=26000
-max_log_files=10
-enable_webserver=true
-load_catalog_in_background=true
-webserver_port=25020
-kerberos_reinit_interval=60
-principal=impala/impa.world.com@YAK.world.com
-keytab_file=/run/cloudera-scm-agent/process/3322-impala-CATALOGSERVER/impala.keytab
-log_filename=catalogd
-statestore_subscriber_timeout_seconds=30
-state_store_host=impa.world.com
-state_store_port=24000

Log file location and other properties of catalog daemon will be configured in file cloudera-monitor.properties placed in same Cloudera agent directory as shown below.

impala@impa:/run/cloudera-scm-agent/process/3322-impala-CATALOGSERVER # cat cloudera-monitor.properties
[catalogserver]
host = impa.world.com
service_name = impala
service_version = 5
service_release = 5.5.2
role_name = impala-CATALOGSERVER-fe3aac32eed034f392bc5651ce554df3
collect_interval = 60
log_dir = /hadoop/log/impala/catalogd
catalogserver_webserver_port = 25020

Conclusion

This article goes through very basics of Impala with an understanding that Impala requires mandatory component Hive as it uses the same metadata. Apache Hue is one of the GUI kind of interface for interacting with Impala. From CDH5 onwards we can integrate YARN with Impala for managing resources as there is no capability to control I/O for Impala queries. For this integration of YARN and Impala we need llma(Long-Lived Application Master) which acts as intermediate among them. Impala queries which are low latency in nature can be performed on HDFS, Hbase and Kudu products. Basically Impala is C++ implementation which accesses data directly and thus eliminating the need to MapReduce jobs. Impala architecture does consists of Optimizer having concepts similar to other RDBMS engines which uses statistics on individual objects to decide optimal execution plan. Future articles will focus on more details of Hadoop ecosystem particularly from DBA perspective towards Hadoop.