Collecting, Indexing and Searching MySQL Database Table Data in Apache Solr

Oracle Community

Collecting, Indexing and Searching MySQL Database Table Data in Apache Solr

Flume is designed primarily for aggregating logs but may also be used to aggregate database data. The advantage of collecting database data in Solr is that the data gets indexed and is searchable using simple input such as the start row and the number of rows to get.  In contrast querying data using a command line client requires SQL SELECT statements. In this tutorial we shall collect, index and search MySQL database table data in Apache Solr. This tutorial has the following sections.
 
 

Creating a  MySQL Database Table

 
Download and extract MySQL database tar.gz file.
 
tar zxvf mysql-5.6.22-linux-glibc2.5-i686.tar.gz
 
Add the mysql group and the mysql user to the mysql group.
 
groupadd mysql
useradd -r -g mysql mysql
 
Create a symlink for the MySQL installation directory and change directory (cd) to the symlink.
 
ln -s /mysql/mysql-5.6.19-linux-glibc2.5-i686 mysql
cd mysql
 
Run the following commands to install MySQL Database.
 
  chown -R mysql .
  chgrp -R mysql .
  scripts/mysql_install_db --user=mysql
  chown -R root .
  chown -R mysql data
 
Start MySQL Database.
 
bin/mysqld_safe --user=mysql &
 
Set the password for the root user.
 
mysqladmin -u root -p mysql
 
Login to the MySQL Command line shell and select the test database.
 
mysql -u root -p
use test
 
Create the wlslog table for WebLogic server log data. The id field of type INTEGER is required to be included.
 
CREATE TABLE wlslog(id INTEGER PRIMARY KEY,time_stamp VARCHAR(4000),category VARCHAR(4000),type VARCHAR(4000),servername VARCHAR(4000),code VARCHAR(4000),msg VARCHAR(4000));
 
INSERT INTO wlslog(id,time_stamp,category,type,servername,code,msg) VALUES(1,'Apr-8-2014-7:06:16-PM-PDT','Notice','WebLogicServer','AdminServer','BEA-000365','Server state changed to STANDBY');
INSERT INTO wlslog(id,time_stamp,category,type,servername,code,msg) VALUES(2,'Apr-8-2014-7:06:17-PM-PDT','Notice','WebLogicServer','AdminServer','BEA-000365','Server state changed to STARTING');
INSERT INTO wlslog(id,time_stamp,category,type,servername,code,msg) VALUES(3,'Apr-8-2014-7:06:18-PM-PDT','Notice','WebLogicServer','AdminServer','BEA-000365','Server state changed to ADMIN');
INSERT INTO wlslog(id,time_stamp,category,type,servername,code,msg) VALUES(4,'Apr-8-2014-7:06:19-PM-PDT','Notice','WebLogicServer','AdminServer','BEA-000365','Server state changed to RESUMING');
INSERT INTO wlslog(id,time_stamp,category,type,servername,code,msg) VALUES(5,'Apr-8-2014-7:06:20-PM-PDT','Notice','WebLogicServer','AdminServer','BEA-000361','Started WebLogic AdminServer');
 
The database table wlslog gets created.
 

Setting the Environment

 
The following software is required for this tutorial.
 
-MySQL Database
-Flume 1.4
-Apache Solr 4.10.3
-Java 7
-Maven 3.x
 
Hadoop and HDFS are not required to collect MySQL Table data in Solr, but if Hadoop is installed for another Flume configuration the same configuration may be used. Create a directory /flume to install the software and set its permissions to global (777).
                                                                                                                 
mkdir /flume
chmod -R 777 /flume
cd /flume
 
Download and extract the Flume 1.4.0 tar.gz file.
 
wget http://archive-primary.cloudera.com/cdh4/cdh/4/flume-ng-1.4.0-cdh4.6.0.tar.gz
tar -xvf flume-ng-1.4.0-cdh4.6.0.tar.gz
 
Download and extract Solr 4.10.3 tgz file.
 
wget http://apache.mirror.gtcomm.net/lucene/solr/4.10.3/solr-4.10.3.tgz
tar xvf  solr-4.10.3.tgz
 
Create the morphlines.conf file in the /flume/apache-flume-1.4.0-cdh4.6.0-bin directory using vi or the touch command. Create the flume-env.sh file from template.
 
cp $FLUME_HOME/conf/flume-env.sh.template $FLUME_HOME/conf/flume-env.sh
 
Set the following JAVA_OPTS in the flume-env.sh file.
 
JAVA_OPTS="-Xms1000m -Xmx1000m -Xss128k -XX:MaxDirectMemorySize=256m
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC"
 
Download the same jar files as for collecting Oracle Database logs and copy the jars to the Flume lib directory.
 
Jar File
Description
lucene-core-4.10.3.jar
Apache Lucene Java Core
lucene-codecs-4.10.3.jar
Codecs and postings formats for Lucene
lucene-analyzers-common-4.10.3.jar
Lucene Common Analyzers
lucene-analyzers-kuromoji-4.10.3.jar
Lucene Kuromoji Japanese Morphological Analyzer
lucene-spatial-4.10.3.jar
Spatial Strategies for Lucene
lucene-phonetic-3.6.2.jar
Lucene Phonetic Analyzer
org.restlet-2.3.1.jar
Restlet Core - API and Engine
org.apache.servicemix.bundles.restlet-1.1.10_3.jar
This OSGi bundle, which wraps org.restlet, and com.noelios.restlet 1.1.10 jar files
spatial4j-0.4.1.jar
 
Spatial4j, a general purpose spatial / geospatial ASL licensed open-source Java library.
org.apache.commons.fileupload-1.2.2.LIFERAY-PATCHED-1.jar
Apache Commons File Upload
lucene-analyzers-phonetic-4.10.3.jar
Lucene Phonetic Filters
lucene-queries-4.10.3.jar
Lucene Queries Module
kite-morphlines-core-0.18.0.jar
Kite Morphlines Core
metrics-core-3.0.2
Metrics Core Library
metrics-healthchecks-3.0.2
Metrics Health Checks
config-1.2.1.jar
Config
 
 
Copy the MySQL JDBC jar file to the Flume lib directory.
 
cp mysql-connector-java-5.1.31-bin.jar  /flume/apache-flume-1.4.0-cdh4.6.0-bin/lib
 
Download, compile and package the source code for flume-ng-sql-source from https://github.com/keedio/flume-ng-sql-sourc into a jar file with Maven.
 
mvn package
 
Copy the generated flume-ng-sql-source-0.8.jar   jar to the Flume lib directory.
 
Also create a /plugins.d/sql-source/lib directory in $FLUME_HOME and copy the  flume-ng-sql-source-0.8.jar    file to the directory.
 
mkdir -p $FLUME_HOME/plugins.d/sql-source/lib
cp flume-ng-sql-source-0.8.jar $FLUME_HOME/plugins.d/sql-source/lib
cp flume-ng-sql-source-0.8.jar /flume/apache-flume-1.4.0-cdh4.6.0-bin/lib
 
Also copy the flume-ng-sql-source-0.8.jar file to the Flume lib directory.
 
cp /media/sf_VMShared/flume/mysql/flume-ng-sql-source-0.8.jar $FLUME_HOME/lib
 
Add environment variables for Flume, Solr, Maven, MySQL, and Java.
 
vi ~/.bashrc
 
export SOLR_HOME=/flume/solr-4.10.3/example/solr/collection1
export SOLR_CONF=/flume/solr-4.10.3/example/solr/collection1/conf
export MAVEN_HOME=/flume/apache-maven-3.2.2-bin
export FLUME_HOME=/flume/apache-flume-1.4.0-cdh4.6.0-bin
export FLUME_CONF=/flume/apache-flume-1.4.0-cdh4.6.0-bin/conf
export MYSQL_HOME=/mysql/mysql-5.6.19-linux-glibc2.5-i686
export JAVA_HOME=/flume/jdk1.7.0_55
export PATH=$PATH:$MYSQL_HOME/bin:$FLUME_HOME/bin:$SOLR_HOME/bin:$MAVEN_HOME/bin
export CLASSPATH=$SOLR_HOME/lib/*:$FLUME_HOME/lib/*
 
 
Generate the  Flume Morphline Sink jar file (flume-ng-morphline-solr-sink-1.4.0.jar) using Maven or download the jar file, and copy the jar file to the Flume  lib directory.
 
cd /flume/apache-flume-1.4.0-cdh4.6.0-bin/flume-ng-sinks
cd flume-ng-morphline-solr-sink
mvn install
 
wget http://central.maven.org/maven2/org/apache/flume/flume-ng-sinks/flume-ng-morphline-solr-sink/1.4.0/flume-ng-morphline-solr-sink-1.4.0.jar
 
cp flume-ng-morphline-solr-sink-1.4.0.jar $FLUME_HOME/lib
 
 
 

Configuring Flume

 
The Flume configuration file flume.conf is the same as for collecting Oracle Database logs except that the source is SQLSource instead of exec. The SQLSource is discussed in detail in the earlier tutorials on using MySQL as Flume source. The sink is the same, which is MorphlineSolrSink. The flume.conf is listed.
 
agent.sources = sql-source
agent.sinks=sink1
agent.channels=ch1
agent.channels.ch1.type=memory
 
agent.sources.sql-source.type = org.apache.flume.source.SQLSource 
agent.sources.sql-source.channels = ch1
 
agent.sources.sql-source.connection.url = jdbc:mysql://localhost:3306/test
 
# Database connection properties
agent.sources.sql-source.user = root 
agent.sources.sql-source.password = mysql 
agent.sources.sql-source.table = wlslog
agent.sources.sql-source.database = test
 
agent.sources.sql-source.columns.to.select = * 
 
# Increment column properties
agent.sources.sql-source.incremental.column.name = id 
# Increment value is from you want to start taking data from tables (0 will import entire table)
agent.sources.sql-source.incremental.value = 0 
 
# Query delay, each configured milisecond the query will be sent
agent.sources.sql-source.run.query.delay=10000
 
# Status file is used to save last readed row
agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status
 
agent.sinks.sink1.morphlineId = morphline1
agent.sinks.sink1.type= org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent.sinks.sink1.channel=ch1
agent.sinks.sink1.morphlineFile = /flume/apache-flume-1.4.0-cdh4.6.0-bin/conf/morphlines.conf
 
agent.sinks.sink1.batchSize = 1
agent.sinks.sink1.batchDurationMillis = 10000
 
agent.channels.ch1.capacity = 1000000
 
Create the /var/lib/flume directory and create the sql-source.status file in the directory.
 
mkdir -p /var/lib/flume
chmod -R 777 /var/lib/flume
cd /var/lib/flume
vi sql-source.status
 
Add the following line to the sql-source.status file.
 
jdbc:mysql://localhost:3306/test wlslog id 0
 
The morphlines.conf file is the same as for collecting Oracle Database logs and listed:
 
SOLR_LOCATOR : {
 
  collection : collection1
  solrUrl : "http://localhost:8983/solr/" 
  solrHomeDir: "/flume/solr-4.10.3"
}
morphlines : [
  {
    id : morphline1
    importCommands : ["com.cloudera.**", "org.apache.solr.**", "org.kitesdk.**"]
    commands : [
      {
        readLine {
           charset : UTF-8
 
        }
      }
      {
          generateUUID {
              field : id
          }
      }
      {
        sanitizeUnknownSolrFields {
          solrLocator : ${SOLR_LOCATOR}
        }
      } 
           
      { logDebug { format : "output record: {}", args : ["@{}"] } }   
           {
        loadSolr {
          solrLocator : ${SOLR_LOCATOR}
        }
      }
    ]
  }
]
 
 

Configuring Solr

 
 
The Solr schema file schema.xml (/flume/solr-4.10.3/example/solr/collection1/conf) is also the same as for collecting Oracle Database logs and is listed:
 
<?xml version="1.0" encoding="UTF-8" ?>
<schema name="example" version="1.5">
<field name="message" type="string" indexed="true"  stored="true"  multiValued="false" />
   <field name="_version_" type="long" indexed="true" stored="true"/>
   <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
 <uniqueKey>id</uniqueKey>
    
</schema>
 
Start Solr server before collecting MySQL Table data.
 
cd /flume/solr-4.10.3/example/
java -jar start.jar
 

Streaming MySQL Table Data to Solr

 
Before running the Flume agent the source MySQL database and the  Solr sink must be running. Start Flume agent with the following command.
 
flume-ng agent  --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/flume.conf -n agent -Dflume.root.logger=INFO,console
 
The Flume channel, source and sink get started and the MySQL table data gets streamed to Solr server.
 
 
The id in the sql-source.status file gets incremented to 5.
 
jdbc:mysql://localhost:3306/test wlslog id 5
 
The output from the Flume agent is listed:
 
[root@localhost flume]# flume-ng agent  --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/flume.conf -n agent -Dflume.root.logger=INFO,console
Info: Sourcing environment configuration script /flume/apache-flume-1.4.0-cdh4.6.0-bin/conf/flume-env.sh
Configuration provider starting
2015-02-20 17:44:23,301 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/flume/apache-flume-1.4.0-cdh4.6.0-bin/conf/flume.conf
2015-02-20 17:44:23,449 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:sink1
2015-02-20 17:44:23,793 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent]
2015-02-20 17:44:23,807 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2015-02-20 17:44:23,913 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel ch1 type memory
2015-02-20 17:44:23,954 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel ch1
Creating instance of source sql-source, type org.apache.flume.source.SQLSource
2015-02-20 17:44:24,016 (conf-file-poller-0) [INFO - org.apache.flume.source.SQLSource.configure(SQLSource.java:71)] Reading and processing configuration values for source sql-source
2015-02-20 17:44:24,039 (conf-file-poller-0) [INFO - org.apache.flume.source.SQLSource.configure(SQLSource.java:78)] Establishing connection to database test for source  sql-source
2015-02-20 17:44:27,953 (conf-file-poller-0) [INFO - org.apache.flume.source.SQLSource.configure(SQLSource.java:81)] Source sql-source Connected to test
2015-02-20 17:44:28,048 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: sink1, type: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
2015-02-20 17:44:28,181 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel ch1 connected to [sql-source, sink1]
org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel ch1
2015-02-20 17:44:28,326 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
2015-02-20 17:44:28,336 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: ch1 started
2015-02-20 17:44:28,338 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink sink1
2015-02-20 17:44:28,344 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:88)] Starting Morphline Sink sink1 (MorphlineSolrSink) ...
2015-02-20 17:44:28,352 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source sql-source
2015-02-20 17:44:28,358 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2015-02-20 17:44:28,377 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: sink1 started
2015-02-20 17:44:28,398 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.apache.flume.source.SQLSourceUtils.getStatusFileIncrement(SQLSourceUtils.java:102)] /var/lib/flume/sql-source.status correctly formed
2015-02-20 17:44:28,405 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.apache.flume.source.SQLSource.process(SQLSource.java:100)] Query: SELECT * FROM wlslog WHERE id>0 ORDER BY id;
2015-02-20 17:44:28,551 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.apache.flume.source.SQLSource.process(SQLSource.java:135)] Last row increment value readed: 5, updating status file...
2015-02-20 17:44:28,554 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.apache.flume.source.SQLSourceUtils.updateStatusFile(SQLSourceUtils.java:76)] Updating status file
2015-02-20 17:44:29,668 (lifecycleSupervisor-1-2) [INFO - org.kitesdk.morphline.api.MorphlineContext.importCommandBuilders(MorphlineContext.java:89)] Importing commands
2015-02-20 17:44:40,615 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.apache.flume.source.SQLSourceUtils.getStatusFileIncrement(SQLSourceUtils.java:102)] /var/lib/flume/sql-source.status correctly formed
2015-02-20 17:44:40,634 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.apache.flume.source.SQLSource.process(SQLSource.java:100)] Query: SELECT * FROM wlslog WHERE id>5 ORDER BY id;
2015-02-20 17:44:58,867 (lifecycleSupervisor-1-2) [INFO - org.kitesdk.morphline.api.MorphlineContext.importCommandBuilders(MorphlineContext.java:106)] Done importing commands
2015-02-20 17:44:58,971 (lifecycleSupervisor-1-2) [INFO - org.apache.solr.core.SolrResourceLoader.<init>(SolrResourceLoader.java:136)] new SolrResourceLoader for directory: '/flume/solr-4.10.3/'
2015-02-20 17:45:00,658 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.apache.flume.source.SQLSourceUtils.getStatusFileIncrement(SQLSourceUtils.java:102)] /var/lib/flume/sql-source.status correctly formed
2015-02-20 17:45:00,660 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.apache.flume.source.SQLSource.process(SQLSource.java:100)] Query: SELECT * FROM wlslog WHERE id>5 ORDER BY id;
2015-02-20 17:45:52,429 (lifecycleSupervisor-1-2) [INFO - org.apache.solr.schema.IndexSchema.readSchema(IndexSchema.java:574)] unique key field: id
2015-02-20 17:45:57,856 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:101)] Morphline Sink sink1 started.
2015-02-20 17:46:00,882 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.apache.flume.source.SQLSource.process(SQLSource.java:100)] Query: SELECT * FROM wlslog WHERE id>5 ORDER BY id;
 
 

Searching in Solr

 
Login to the Solr Amin console with the URL http://localhost:8983/solr/. Specify the http://localhost:8983/solr/#/collection1/query URL in the browser and specify the /select Request Handler.
 
 
Select the json format (also the default) in the wt (writer type) choice list for the query response. Click on Execute Query to run the /select query.
 
 
The 5 documents indexed in Solr get listed. The numFound indicates the number of documents found. The start field is first document selected and rows is the number of documents returned by the query.
 
 
The documents are in JSON format.
 
 
The query result may also be listed in the more compact csv format.
 
 
 

Streaming, not just Importing Database Data

 
The DataImportHandler, which was used in an earlier tutorial on Solr, also collected database  data (Oracle Database data). What is the advantage of using Flume instead of the DataImportHandler? The data import handler was run with the click of a button and imported all the available Oracle Database table data and got terminated. If additional table data is added the data import handler has to be run again. By using the Flume agent the data gets streamed from MySQL Database table. As new rows of data are added to MySQL table the table data gets streamed to Solr.
 
For example run the following SQL statement to add another row in MySQL table.
 
INSERT INTO wlslog(id,time_stamp,category,type,servername,code,msg) VALUES(6,'Apr-8-2014-7:06:21-PM-PDT','Notice','WebLogicServer','AdminServer','BEA-000365','Server state changed to RUNNING');
 
A 6th row gets added to the MySQL table wlslog.
 
 
The Flume agent  streams the new row data to Solr server, which indexes the new row. The Flume SQL query to select MySQL table data gets incremented by 1 to 6 and so does the value of id in the sql-source.status file.
 
 
If the Execute Query is clicked again in Solr Admin console 6 rows get listed.
 
 
The newly added row also gets listed in the query result.
 
 
 

Searching with Curl

 
The Solr server data may be queried using the curl tool command as for collecting Oracle Database logs. Run the following command to list the MySQL table data indexed in Solr.
 
curl http://localhost:8983/solr/select?q=*:*&rows=0
 
The 6 rows of data indexed in Solr get listed.
 
In this tutorial we collected and indexed MySQL table data in Solr server using Flume.
7337 2 /
Follow / 23 Apr 2015 at 3:13pm

Hello dvohra,

Thanks for using the flume sql plugin developed at Keedio by Marcelo Valle  and Luis Lazaro

We are pleased to help bringing a solution to sql databases streaming ingestion with flume and this plugin.

The project is alive and we are working in some new features and performance improvements in the plugin.

Some configuration parameters will be changed in future releases, you can check it in README files in the project.

github.com/.../flume-ng-sql-source

Thanks.

Marcelo Valle

Keedio.

Follow / 23 Apr 2015 at 3:16pm

Sorry I pasted the project URL wrong.

Here it is:

github.com/.../flume-ng-sql-source