PNDA Logstash Avro codec plugin


Logstash is a lightweight, open source data collection engine organized as simple pipeline with a large number of plugins. It has input plugins for Netflow, SNMP, collectd, syslog, etc.

Logstash is used as to collect, enrich and transport data from multiple sources into PNDA.

At a very high level,

  • Kafka Producer will use this codec to avro encode input events.
  • Kafka Consumer will use this codec to avro decode events read from the data bus.

Avro encapsulation

For a description of the PNDA Avro encapsulation please read the Data Preparation Guide.


  • Install the latest stable version of Logstash; Logstash 5.3.0 at the time of writing. The procedure is in the Logstash Guide

  • Install the PNDA Avro codec by reading the installation instructions

Example Configuration

As an example, we will see how to read TCP data and send it to PNDA.

Note: This assumes you have created a kafka topic named test and you are ready to direct data towards this topic.

  • Get the PNDA Avro schema

  • Create a logstash configuration file:

    input {
      tcp {
         port => 20518
         add_field => [ "source", "syslog" ]

    filter {
        mutate {
            rename => { "message" => "rawdata" } # Put the content of the message to the PNDA Avro 'rawdata' field
            ruby {
                # Convert the Logstash timestamp to a milliseconds timestamp integer
                # You can use whatever makes sense to you as long as the timestamp is a valid timestamp integer in milliseconds
                code => "event.set('timestamp', (event.get('@timestamp').to_f * 1000).to_f * 1000).to_i)"

    output {
            kafka {
                    codec => pnda-avro { schema_uri => "/opt/pnda/pnda.avsc" } # The PNDA Avro schema to use
                    bootstrap_servers => "localhost:9092"
                    topic_id => 'test'
                    compression_type => "none" # "none", "gzip", "snappy", "lz4"
                    value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'

Then run logstash:

    bin/logstash -f logstash.conf

In another terminal:

    echo '{"router: Jun 19 09:56:14.258 EDT: %LINEPROTO-5-UPDOWN: Line protocol on Interface Tunnel1, changed state to down"}' | nc localhost 20518

Start kafka consumer (if you are using local kafka cluster):

    cd $KAFKA_HOME
    bin/ --zookeeper localhost:2181 --topic test --from-beginning

You should be able to see the syslog event we just sent as a binary PNDA Avro serialized message.

SSL Configuration

Check the Logstash Kafka output plugin documentation and the security part on the PNDA Guide for more information.

As an example a Logstash output section would look like this:

    output {
            kafka {
                    codec => pnda-avro { schema_uri => "/opt/pnda/pnda.avsc" } # The PNDA Avro schema to use
                    bootstrap_servers => "localhost:9092"
                    topic_id => 'test'
                    compression_type => "none" # "none", "gzip", "snappy", "lz4"
                    value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
                    ssl => true
                    ssl_keystore_location => "/opt/pnda/server.keystore.jks"
                    ssl_keystore_password => "test1234"
                    ssl_truststore_location => "/opt/pnda/server.truststore.jks"
                    ssl_truststore_password => "test1234"

Field mapping

Let's understand a bit more about how input fields are mapped to fields expected by the codec.

In Logstash world, the message field is like a default field. It's where most input plugins place the payload that they receive from the network, read from a file, etc.

To be correctly encoded, each Logstash event needs to have the following fields:

The way our codec works is to map logstash event fields to the avro schema:

  • source
  • timestamp
  • rawdata

Note : @timestamp and host fields are likely generated by logstash itself and can be used to create the timestamp field.

Note that source and rawdata are not default fields in logstash event. They need to be set with filters or by an input plugin.

In simple terms, we expect the Logstash event after it's been through the filter section to contain the fields source, timestamp and rawdata. How they are set is up to the integrator.

For example, let's take the file input plugin. Here's a sample config:

    input {
        file {
            path => "/var/log/*"
            add_field => [ "source", "system_logs" ]

Here's a sample line:

Mar 23 13:17:01 localhost CRON[25941]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)

If you want to send the whole line as is, you could do that.

The output from logstash might look something like this:

        "timestamp" => 1458740254000,
        "source" => "system_logs",
        "rawdata" => "Mar 23 13:17:01 localhost CRON[25941]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)"

These fields are sent to the Kafka output plugin which will use the PNDA Avro codec to serialize them according to the PNDA Avro Schema.

You can perform quite complex logic inside a logstash filter to do whatever you need to do.

For more information, see the Logstash documentation.

results matching ""

    No results matching ""