Written by: Juan Carlos Olamendy Turruellas

 

Introduction

 

In this article, I want to talk about connecting to Cassandra cluster from a Java application. This is a very common scenario in companies processing a very huge amount of data after the deployment of cluster. Just for demo purpose, we´re going to develop a console application using DataStax driver along with common CQL (Cassandra Query Language) queries for the Cassandra cluster.

 

Step01. Create a keyspace and a table

 

For this demo, let’s assume that we’ve previously deployed a Cassandra cluster. Now, we need to create the storage data structure (table). From your shell, let’s call the Cassandra shell (cqlsh). After cqlsh is open, then we create a keyspace and a table(also known as column family) as shown below in the listing 01.

 

$ cqlsh

 

cqlsh> CREATE KEYSPACE sampledb WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};

cqlsh> USE sampledb;

 

cqlsh:sampledb> CREATE TABLE sales

... (

... id int,

... month int,

... year int,

... description varchar,

... PRIMARY KEY (id)

... );

Listing 01

 

Step02. Add dependencies into pom.xml

 

Next step is to add the required dependencies for our application. There are basically two types of API libraries:

  • Thrift based drives. It allow accessing to low level storage and protocol mechanism
     
  • CQL based drivers. It’s a higher-level abstraction of what thrift does. It allows communicating with a Cassandra cluster using CQL, a language similar to SQL adapted to Cassandra scenarios (I’ve talked about this language in previous articles at http://www.toadworld.com/platforms/nosql/w/wiki/11668.introduction-to-cassandra-query-language). The driver interface is similar to JDBC drivers.

 

For this demo, we’re going to use DataStax CQL driver. We can download the driver (and further dependencies) manually, although I prefer to include the dependencies in my pom.xml file, so maven manages the dependencies for me. See an excerpt of my pom.xml in the listing 02.

 

 

<!-- Apache Cassandra Datastax's CQL driver. -->

 

<dependency>

<groupId>com.datastax.cassandra</groupId>

<artifactId>cassandra-driver-core</artifactId>

<version>3.1.0</version>

</dependency>

 

<dependency>

<groupId>com.datastax.cassandra</groupId>

<artifactId>cassandra-driver-mapping</artifactId>

<version>3.1.0</version>

</dependency>

Listing 02

 

Step03. Add base elements

 

We need to import the required classes for our solution as shown in the listing 03.

 

// Import the API classes to access to Apache Cassandra

import com.datastax.driver.core.Cluster;

import com.datastax.driver.core.Host;

import com.datastax.driver.core.Metadata;

import com.datastax.driver.core.Session;

import com.datastax.driver.core.Row;

import com.datastax.driver.core.PreparedStatement;

import com.datastax.driver.core.BoundStatement;

Listing 03

 

And then, in our class that represents the console application, let’s add the following variables:

  • server_ip: The IP of any node in the cluster. This is the coordinator node. In our case, it points locally
  • keyspace: The keyspace to which we’re connecting
  • cluster: An object representing the Cassandra ring of nodes
  • session: The session that we use to communicate with Cassandra cluster

 

The code is shown in the listing 04.

 

// Application settings

private static String server_ip = "127.0.0.1";

private static String keyspace = "sampledb";

 

// Application connection objects

private static Cluster cluster = null;

private static Session session = null;

Listing 04

 

Step04. Add methods to open and close session

 

Next step is to develop the open and close mechanisms. The open code is shown in the listing 05. You can see that I’m printing information about the cluster we’re connecting to.

 

public static void openConnection() {

if(cluster!=null) return;

 

cluster = Cluster.builder()

.addContactPoints(server_ip)

.build();

 

final Metadata metadata = cluster.getMetadata();

String msg = String.format("Connected to cluster: %s", metadata.getClusterName());

System.out.println(msg);

 

System.out.println("List of hosts");

for (final Host host : metadata.getAllHosts()){

msg = String.format("Datacenter: %s; Host: %s; Rack: %s",

host.getDatacenter(),

host.getAddress(),

host.getRack());

System.out.println(msg);

}

session = cluster.connect(keyspace);

}

Listing 05

 

The close code is shown in the listing 06.

 

public static void closeConnection() {

if(cluster!=null){

cluster.close();

cluster = null;

session = null;

}

}

Listing 06

 

Step05. Add methods to execute statements

 

Now it’s time to execute some statements for inserting and updating data in the cluster.

 

There are two ways to execute a query:

  1. Write a concrete query (just plain string with all required to execute)
  2. Write a parametric query, so we can bind the actual parameter values before executing

 

The first scenario is covered using the code shown in the listing 07.

 

// Method definition

public static void executeStatement(String statement) {

session.execute(statement);

}

 

// TEST: Method execution for inserting a row

String statement1 = "INSERT INTO sampledb.sales(id, month, year, description)"+

"VALUES(4, 4, 2016,'Apr sale')";

executeStatement(statement1);

 

// TEST: Method execution for updating a row

String statement2 = "UPDATE sampledb.sales "+

"SET month=5, description='May sale' "+

"WHERE id=4";

executeStatement(statement2);

Listing 07

 

For the second scenario, we need to define a method that creates the parametric query and another for executing as shown in the listing 08.

 

// Methods definition

public static PreparedStatement getPreparedStatement(String statement) {

return session.prepare(statement);

}

 

public static PreparedStatement getPreparedStatement(String statement) {

return session.prepare(statement);

}

 

// TEST: Method execution for inserting a row using prepared statements

String statement3 = "INSERT INTO sampledb.sales(id, month, year, description) "+

"VALUES(?, ?, ?, ?)";

PreparedStatement prepared = getPreparedStatement(statement3);

BoundStatement bound = prepared.bind()

.setInt(0, 5)

.setInt(1, 5)

.setInt(2, 2016)

.setString(3, "May sale");

executeStatement(bound);

Listing 08

 

Step06. Add method to execute query

 

Now let’s query the Cassandra cluster to get some reports as shown in the listing 09.

 

// Methods definition

public static void executeQuery(String query) {

for (Row row : session.execute(query)) {

System.out.println( row.toString() );

}

}

 

// Step05. Execute a query to list the actual rows

String query1 = "SELECT id, month, year, description FROM sampledb.sales";

executeQuery(query1);

Listing 09

 

Conclusion

 

In this article, I've showed how to connect your Java applications to a Cassandra cluster. Now you can apply theses ideas and demo to your own solution.