Spark Streaming and HBase tutorial
Applications that run on PNDA are packaged as
tar.gz archives and pushed to an application repository. The
tar archive contains all the binaries and configuration required to run the application.
You'll learn how to use the Deployment Manager API to deploy a package from the package repository and run it on the cluster.
Make sure you are familiar with the following before proceeding with the lab:
- The getting started guide will introduce you to many concepts used in this lab.
- The console is your starting point for interacting with a PNDA cluster. All of the main features of PNDA can be accessed from the console.
- The Deployment Manager API provides a mechanism for discovering available packages and deploying & undeploying functionality. It works with a well-defined package structure to make it fast and straightforward to get real scalable analytics running on the platform.
- Make sure that your package repository is correctly configured as described in the PREPARE phase depending your infra: AWS, OpenStack or server clusters.
- The platform-package-repository tool lets you upload packages to OpenStack Swift.
- Read the technical notes in the example repository.
Make sure you have the following installed on your development computer:
Running the application
Download the application code base
The PNDA distribution is available on GitHub at:
Clone the spark-streaming repository.
Compile with maven
mvn clean package
Upload the app package to the PNDA app repository
Use the platform-package-repository tool to upload the application tar.gz file to your application repository.
Run the application
Producing test data
The technical notes in the example repository describe how to set up a test producer that will create suitable test data for consumption by this example application.
Understanding what the application is doing
The messages on Kafka are Avro encoded and look like this:
src: "test-src" timestamp: 1453480869657 host_ip: "0.0.0.0" rawdata: "a=1;b=2;c=?;gen_ts=?
- a and b are fixed
- c is a varying integer value
- gen_ts is the same as timestamp
The Spark Streaming app copies each message into an HBase row:
rowkey - shard_gen_ts_c cf:a cf:b cf:c cf:gen_ts
and inserts an extra column with the time at which the message was processed:
If you were to calculate
proc_ts - gen_ts you would learn how long each message sat in the Kafka queue before being processed.
The shard part of the
rowkey cycles between 0 and the setting
hbase_shards used in the PUT body. This ensures that the writes to HBase are evenly distributed around the multiple region servers that comprise HBase.
The Spark Streaming code is located in:
streaming-app/src/main/scala/com/cisco/pnda/examples/spark/KafkaToHbaseApp.scala **TODO** check path
createPipeline sets up the processing graph with three main parts:
KafkaUtils.createDirectStreamto read from kafka
DStream.flatMapto apply parsing code to decode each message as read from kafka
DStream.mapPartitionsto write the parsed messages into HBase
You can learn more about these Spark Streaming API calls and the rest of the API by reading the Spark Streaming documentation.
Then the number of messages written in each partition are summed with a reduce call and the total number of messages written to HBase in a given batch is printed to the driver output:
writeCounts.reduce(_ + _).print(1);
You can see these log messages at "http://logstash-ip:logstash-port" with a query "logstash-query".
Checking the output
Compile the Client
Edit the application.properties in src/main/resources with the impala endpoint IP address and port number, then compile it with maven:
cd impala-jdbc-client mvn clean package
Query the data
The supplied sample application executes the following SQL query to compute the average value of cf:c between two timestamps x and y:
select round(avg(cast(col as int)),2) as average from luke-skywalker-example_table where id between x and y
When you run the client you will see the query printed to the console, try to work out why the where clause is more complex in reality. Hint: it is to do with the shard part of the rowkey.
Metadata is defined in the hive metastore which maps the HBase columns onto SQL fields. This metadata is provisioned as part of the application package in the hbase.json file.
Only the SQL field that corresponds to the HBase rowkey can be considered indexed, and it is generally good practice to limit the quantity of data being considered with a where clause on that field.
java -cp target/impala-jdbc-client-1.0.jar:driver/Cloudera_ImpalaJDBC4_22.214.171.1247/* com.cisco.pnda.examples.impalaclient.Client example_table 1453480869657 1453480869942
If you no longer need it, you can stop the application and undeploy the package using the console.