PNDA Logstash Avro codec plugin


Logstash is a lightweight, open source data collection engine organized as simple pipeline with a large number of plugins. It has input plugins for Netflow, SNMP, collectd, syslog, etc.

Logstash is used as to collect, enrich and transport data from multiple sources into PNDA.

This codec plugin is used as a stream filter for deserializing or serializing inidvidual logstash events in accordance with the PNDA avro schema.

At a very high level,

  • Kafka Producer will use this codec to avro encode input events.
  • Kafka Consumer will use this codec to avro decode events read from the data bus.

Avro encapsulation

The Avro schema is as follows:

        "namespace": "pnda.entity",
        "type": "record",
        "name": "event",
        "fields": [
            {"name": "timestamp", "type": "long"},
            {"name": "src",       "type": "string"},
            {"name": "host_ip",   "type": "string"},
            {"name": "rawdata",   "type": "bytes"}

Events should be encoded as Avro fragments. This means that the schema is not included in each event, but is instead made available at encode/decode time.

Explanation of the fields:

  • timestamp: Timestamp of when the event was generated/ingested by PNDA.
  • src: source of the event. Where does the event come from? (e.g. syslog, collectd)
  • host_ip: source IP address of the machine where event is generated/ingested by PNDA
  • rawdata: The 'raw' data goes here.


We recommend logstash 2.3.4, the latest version at the time of writing.


Install Java (if you haven't done so):

    sudo add-apt-repository ppa:webupd8team/java
    sudo apt-get update
    sudo apt-get install oracle-java8-installer

Important: Install ruby 2.x using rvm.

    gpg --keyserver hkp:// --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3
    curl -sSL | bash -s stable --ruby

Follow the instruction this produces (in your case the location will be different):

  • To start using RVM you need to run source /home/cloud-user/.rvm/scripts/rvm in all your open shell windows, in rare cases you need to reopen all shell windows.

Create the source directory:

    mkdir -p /opt/logstash
    cd /opt/logstash

Build avro codec

First, clone this repository into the source directory.

    cd /opt/logstash
    git clone [email protected]:pndaproject/prod-logstash-codec-avro.git

Build the gem:

    cd /opt/logstash/prod-logstash-codec-avro
    gem build pnda-logstash-codec.gemspec

You should see a Successfully built RubyGem message.

Install logstash 2.3.4

    cd /opt/logstash
    tar -zxvf logstash-2.3.4.tar.gz

Add avro codec to logstash

    cd /opt/logstash/logstash-2.3.4
    bin/logstash-plugin install /opt/logstash/prod-logstash-codec-avro/logstash-codec-platformavro-0.1.0.gem

You should see an Installation successful message.

Finally, check if everything went well:

    bin/logstash-plugin list | grep platformavro

Which should display:

    logstash-codec-platformavro (0.1.0)

If you were already running Logstash, restart it. Now you are ready to proceed.

Example Configuration

As an example, we will see how to read TCP data and send it to PNDA.

Note: This assumes you have created a kafka topic named test and you are ready to direct data towards this topic.

    input {
      tcp {
         port => 20518
         add_field => [ "src", "syslog" ]

    filter {

    output {
      stdout { codec => rubydebug }
      kafka {
        bootstrap_servers => ''
        topic_id => 'test'
        value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
        codec => platformavro {}

Save the above configuration in logstash.conf.

Then run logstash:

    cd logstash-2.3.4/
    bin/logstash agent -f logstash.conf

You should see a Logstash startup completed message printed on the console. If you don't see this message, something went wrong in your setup or there's an error in your config.

In another terminal:

    echo '{"router: Jun 19 09:56:14.258 EDT: %LINEPROTO-5-UPDOWN: Line protocol on Interface Tunnel1, changed state to down"}' | nc localhost 20518

Start kafka consumer (if you are using local kafka cluster):

    cd $KAFKA_HOME
    bin/ --zookeeper localhost:2181 --topic test --from-beginning

You should be able to see the syslog event we just sent.

SSL Configuration

In case you would like to work with a secured Kafka cluster, you will need to install a newser version of the kafka output plugin by running:

    bin/logstash-plugin install --version 3.0.0 logstash-output-kafka

The logstash.conf configuration file should be for example:

input {
  tcp {
     port => 20518
     add_field => [ "src", "syslog" ]

filter {

output {
  stdout { codec => rubydebug }
  kafka {
    bootstrap_servers => ''
    topic_id => 'avro.log.localtest'
    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
    codec => platformavro {}
    ssl => true
    ssl_keystore_location => "/òpt/pnda/server.keystore.jks"
    ssl_keystore_password => "test1234"
    ssl_truststore_location => "/opt/pnda/server.truststore.jks"
    ssl_truststore_password => "test1234"

Check the Logstash Kafka output plugin documentation and the security part on the PNDA Guide for more information.

Field mapping

Let's understand a bit more about how input fields are mapped to fields expected by the codec.

In Logstash world, the message field is like a default field. It's where most input plugins place the payload that they receive from the network, read from a file, etc.

The way our codec works is to map logstash event fields to the avro schema:

  • avromsg["src"] is the value from event["src"]
  • avromsg["host_ip"] is the value from event["host"]
  • avromsg["timestamp"] is the value from event["@timestamp"] in milliseconds
  • avromsg["rawdata"] is the value from event["message"]

Note : timestamp and host fields are likely generated by logstash itself.

The event["host"] value is parsed by splitting IP address and port (v4, v6). The method parse_addr method in lib/logstash/codecs/platformavro.rb handles cases where the host field might be a combination of IP address and/or port.

Note that src and message are not default fields in logstash event. They need to be set with filters or by an input plugin.

In simple terms, we expect the Logstash event after it's been through the filter section to contain the fields src, host, @timestamp and message. How they are set is up to the integrator.

For example, let's take the file input plugin. Here's a sample config:

    input {
        file {
            path => "/var/log/*"
            add_field => [ "src", "system_logs" ]

Here's a sample line:

Mar 23 13:17:01 localhost CRON[25941]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)

If you want to send the whole line as is, you could do that.

The output from logstash might look something like this:

        "timestamp" => 1458740254000,
        "src" => "system_logs",
        "host_ip" => "",
        "rawdata" => "Mar 23 13:17:01 localhost CRON[25941]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)"

As you may have guessed, the whole message field becomes part of rawdata.

These fields are sent to the Kafka output plugin.

You can perform quite complex logic inside a logstash filter to do whatever you need to do.

For more information, see the logstash documentation.


To run unit tests, download jruby via rvm:

    cd prod-logstash-codec-avro
    curl -sSL | bash -s stable --ruby=jruby

Install bundle gem:

    jruby -S gem install bundler

Update dependencies and run tests:

    jruby -S bundle install
    bundle exec rspec

results matching ""

    No results matching ""