Cara menggunakan logstash-mongodb input plugin

While a visit to their website provides complete information ( https://www.elastic.co/products/logstash ), in general, Logstash provides an extensible data processing pipeline.

How do I set up logging for z/TPF support for MongoDB?

For more information about configuring z/TPF, see Logging for z/TPF support for MongoDB in IBM Documentation.

1.) Install Logstash

Logstash runs in a JVM, so it can run on multiple hardware platforms. First, download the Logstash installation. This article was developed and tested with logstash-4.5.1 (https://www.elastic.co/downloads/past-releases/logstash-5-4-1).

After you download Logstash, extract the archive file in a directory of your choice.

2.) Install BSON plug-in (optional if you use the mongodb-output-plugin)

  1. Issue the following command from the Logstash installation directory chosen in step 1.
    env GEM_HOME=./vendor/bundle/jruby/1.9 ./vendor/jruby/bin/jruby -S gem install bson -v 4.2.1
  2. Add gem "bson" to ./Gemfile if not already present.

To install the mongodb-output-plugin instead, issue the following command from the Logstash installation directory from step 1:
 bin/logstash-plugin install logstash-output-mongodb 

3.) Create tpfbson.rb

You can create a TCP listener style plug-in similar to normal TCP/IP listeners; however, the z/TPF system uses BSON to compress the contents of the transmitted data and requires a specialized input transformation. Place this file in the following relative directory:

./logstash-core/lib/logstash/inputs/tpfbson.rb

# encoding: utf-8
require "logstash/inputs/base"
require "logstash/util/socket_peer"

require "socket"

require "bson"
require "json"
require "stringio"

# Read events over a TCP socket.
#
# Like stdin and file inputs, each event is assumed to be one line of text.
#
# Can either accept connections from clients or connect to a server,
# depending on `mode`.
class LogStash::Inputs::Tpfbson < LogStash::Inputs::Base
  class Interrupted < StandardError; end
  config_name "tpfbson"
  milestone 2

  # When mode is `server`, the address to listen on.
  # When mode is `client`, the address to connect to.
  config :host, :validate => :string, :default => "0.0.0.0"

  # When mode is `server`, the port to listen on.
  # When mode is `client`, the port to connect to.
  config :port, :validate => :number, :required => true

  # The 'read' timeout in seconds. If a particular bson connection is idle for

  # more than this timeout period, we will assume it is dead and close it.
  #
  # If you never want to timeout, use -1.
  config :data_timeout, :validate => :number, :default => -1

  # Mode to operate in. `server` listens for client connections,
  # `client` connects to a server.
  config :mode, :validate => ["server", "client"], :default => "server"

  def initialize(*args)
    super(*args)
    @server_socket = nil
    @connection_sockets = {}
    @socket_mutex = Mutex.new
  end # def initialize

def docheck(c)
  if c.is_a?(BSON::Binary)
     return c.to_json;
  elsif c.is_a?(BSON::ObjectId)
     return  c.as_json;
  else
     return c;
  end
end

def iarray(a)
  a.each do |item,idx|

     if item.is_a?(Hash)
       ihash(item)
     elsif v.is_a?(Array)
       iarray(v)
     else
       a[idx] = docheck(item);
     end
  end
end

def ihash(h)
  h.each_pair do |k,v|
    if v.is_a?(Hash)
      ihash(v)
    elsif v.is_a?(Array)
      iarray(v)
    else
      h[k] = docheck(v);
    end
  end
end

  public
  def register
    fix_streaming_codecs
    self.server_socket = new_server_socket
  end # def register

  private
  def handle_socket(socket, client_address, output_queue)
    while !stop?
      buf = nil
      databuf = nil
      # NOTE(petef): the timeout only hits after the line is read
      # or socket dies
      # TODO(sissel): Why do we have a timeout here? What's the point?

      if @data_timeout == -1
        #skipped the high speed connector header
        buf = readData(socket,32)
        buf = read(socket)
      else
        Timeout::timeout(@data_timeout) do
          #skipped the high speed connector header
          buf = readData(socket,32)
          buf = read(socket)
        end
      end
      if BSON::VERSION[0] == '4'
       readstring = BSON::ByteBuffer.new(buf)
      else
       readstring = StringIO.new(buf);
      end
      datalen = BSON::Int32.from_bson(readstring)  
      databuf = readData(socket,datalen-4)
    
      buf << databuf

      if BSON::VERSION[0] == '4'
         readstring = BSON::ByteBuffer.new(buf)
      else
         readstring = StringIO.new(buf);
      end
      doc = BSON::Document.from_bson(readstring)
      ihash(doc);
      event = LogStash::Event.new(doc);
      event.set("host",client_address);
      event.set("path","bsonTcpInput");
      decorate(event)
      output_queue << event
    end # loop do
  rescue EOFError
    @logger.debug("Connection closed", :client => socket.peeraddr[3])
  rescue Errno::ECONNRESET
    @logger.debug? && @logger.debug("Connection reset by peer", :client => socket.peeraddr[3])
  rescue => e
    # if plugin is stopping, don't bother logging it as an error
    !stop? && @logger.error("An error occurred. Closing connection", :client => socket.peeraddr[3], :exception => e, :backtrace => e.backtrace)
  ensure
    # catch all rescue nil on close to discard any close errors or invalid socket
    socket.close rescue nil
  end

  private

  private
  def read(socket)
    return socket.sysread(4)
  end # def readline

  private
  def readData(socket,dataLen)
    data = ""
    if data.respond_to?(:force_encoding)
      data.force_encoding("BINARY")
    end
    read_buffer = ""
    if read_buffer.respond_to?(:force_encoding)
      read_buffer.force_encoding("BINARY")
    end
    until dataLen == 0
      data.force_encoding("BINARY")
      socket.sysread(dataLen,read_buffer)
      dataLen = dataLen - read_buffer.length
      data << read_buffer
    end
    return data
  end # def readline

  public
  def run(output_queue)

    run_server(output_queue)
  end # def run

  def stop
    # force close all sockets which will escape any blocking read with a IO exception
    # and any thread using them will exit.
    # catch all rescue nil on close to discard any close errors or invalid socket
    server_socket.close rescue nil
    connection_sockets.each{|socket| socket.close rescue nil}
  end

  def close
    # see related comment in register: we must make sure to close the server socket here
    # because it is created in the register method and we could be in the context of having
    # register called but never run & stop, only close.
    # catch all rescue nil on close to discard any close errors or invalid socket
    server_socket.close rescue nil
  end

  def run_server(output_queue)
    while !stop?
      begin
        socket = add_connection_socket(server_socket.accept)
        server_connection_thread(output_queue,socket)
      rescue => e
        # if this exception occured while the plugin is stopping
        # just ignore and exit
        raise e unless stop?

     end
    end
  ensure
    server_socket.close rescue nil
  end # def run_server

 def server_connection_thread(output_queue, socket)
    Thread.new(output_queue, socket) do |q, s|
      begin
        @logger.debug? && @logger.debug("Accepted connection", :client => s.peeraddr[3], :server => "#{@host}:#{@port}")
        handle_socket(s, s.peeraddr[3], q)
      ensure
        delete_connection_socket(s)
      end
    end
  end

  def new_server_socket
    @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")

    begin
      socket = TCPServer.new(@host, @port)
    rescue Errno::EADDRINUSE
      @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port)
      raise
    end
  end

  def server_socket=(socket)
    @socket_mutex.synchronize{@server_socket = socket}

  end

  def server_socket
    @socket_mutex.synchronize{@server_socket}
  end

  def add_connection_socket(socket)
    @socket_mutex.synchronize{@connection_sockets[socket] = true}
    socket
  end

  def delete_connection_socket(socket)
    @socket_mutex.synchronize{@connection_sockets.delete(socket)}
  end

  def connection_sockets
    @socket_mutex.synchronize{@connection_sockets.keys.dup}
  end

end # class LogStash::Inputs::Tpfbson

4.) Update binary.rb

The BSON library does not correctly transform binary content correctly, so you might want to update the following functions in the binary.rb file. If the functions do not exist, you can insert the new contents as follows:

./vendor/bundle/jruby/1.9/gems/bson-4.2.1-java/lib/bson/binary.rb

    # Get the binary as JSON hash data.
    #
    # @example Get the binary as a JSON hash.
    #   binary.as_json
    #
    # @return [ Hash ] The binary as a JSON hash.
    #
    # @since 2.0.0
    def as_json(*args)
      { "$binary" => data.unpack('H*').first, "$type" => type }
    end

    # Get the binary as JSON hash data.
    #
    # @example Get the binary as a JSON hash.
    #   binary.as_json
    #
    # @return [ Hash ] The binary as a JSON hash.
    #
    # @since 2.0.0
    def to_json(*args)
      { "$binary" => data.unpack('H*').first, "$type" => type }
    end

5.) Create a Logstash config file

This configuration starts a BSON listener on port 4514 and creates output to the /tmp/bson.log file.

./logstash-bson.config:

input {
  tpfbson {
    port => 4514
  }
}
output {
  file { path => "/tmp/bson.log" }
}

 

6.) Start Logstash

bin/logstash -f ./logstash-bson.config

You can expect to see a line that contains something similar to the following:

[2017-08-23T17:06:32,089][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-08-23T17:06:32,107][INFO ][logstash.inputs.tpfbson  ] Automatically switching from plain to line codec {:plugin=>"tpfbson"}
[2017-08-23T17:06:32,110][INFO ][logstash.inputs.tpfbson  ] Starting tcp input listener {:address=>"0.0.0.0:4514"}
[2017-08-23T17:06:32,112][INFO ][logstash.pipeline        ] Pipeline main started
[2017-08-23T17:06:32,139][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600

Your Logstash plug-in is installed correctly and is now ready for input.  You can review the configuration for your z/TPF system to route logging to the target Logstash environment and begin logging MongoDB requests.

[{"Business Unit":{"code":"BU058","label":"IBM Infrastructure w\/TPS"},"Product":{"code":"SSZL53","label":"TPF"},"Component":"","Platform":[{"code":"PF036","label":"z\/TPF"}],"Version":"All versions","Edition":"","Line of Business":{"code":"LOB35","label":"Mainframe SW"}}]