Cara menggunakan logstash-mongodb input plugin

While a visit to their website provides complete information [ //www.elastic.co/products/logstash ], in general, Logstash provides an extensible data processing pipeline.

How do I set up logging for z/TPF support for MongoDB?

For more information about configuring z/TPF, see Logging for z/TPF support for MongoDB in IBM Documentation.

1.] Install Logstash

Logstash runs in a JVM, so it can run on multiple hardware platforms. First, download the Logstash installation. This article was developed and tested with logstash-4.5.1 [//www.elastic.co/downloads/past-releases/logstash-5-4-1].

After you download Logstash, extract the archive file in a directory of your choice.

2.] Install BSON plug-in [optional if you use the mongodb-output-plugin]

  1. Issue the following command from the Logstash installation directory chosen in step 1.
    env GEM_HOME=./vendor/bundle/jruby/1.9 ./vendor/jruby/bin/jruby -S gem install bson -v 4.2.1
  2. Add gem "bson" to ./Gemfile if not already present.

To install the mongodb-output-plugin instead, issue the following command from the Logstash installation directory from step 1:
 bin/logstash-plugin install logstash-output-mongodb 

3.] Create tpfbson.rb

You can create a TCP listener style plug-in similar to normal TCP/IP listeners; however, the z/TPF system uses BSON to compress the contents of the transmitted data and requires a specialized input transformation. Place this file in the following relative directory:

./logstash-core/lib/logstash/inputs/tpfbson.rb

# encoding: utf-8
require "logstash/inputs/base"
require "logstash/util/socket_peer"

require "socket"

require "bson"
require "json"
require "stringio"

# Read events over a TCP socket.
#
# Like stdin and file inputs, each event is assumed to be one line of text.
#
# Can either accept connections from clients or connect to a server,
# depending on `mode`.
class LogStash::Inputs::Tpfbson < LogStash::Inputs::Base
  class Interrupted < StandardError; end
  config_name "tpfbson"
  milestone 2

  # When mode is `server`, the address to listen on.
  # When mode is `client`, the address to connect to.
  config :host, :validate => :string, :default => "0.0.0.0"

  # When mode is `server`, the port to listen on.
  # When mode is `client`, the port to connect to.
  config :port, :validate => :number, :required => true

  # The 'read' timeout in seconds. If a particular bson connection is idle for

  # more than this timeout period, we will assume it is dead and close it.
  #
  # If you never want to timeout, use -1.
  config :data_timeout, :validate => :number, :default => -1

  # Mode to operate in. `server` listens for client connections,
  # `client` connects to a server.
  config :mode, :validate => ["server", "client"], :default => "server"

  def initialize[*args]
    super[*args]
    @server_socket = nil
    @connection_sockets = {}
    @socket_mutex = Mutex.new
  end # def initialize

def docheck[c]
  if c.is_a?[BSON::Binary]
     return c.to_json;
  elsif c.is_a?[BSON::ObjectId]
     return  c.as_json;
  else
     return c;
  end
end

def iarray[a]
  a.each do |item,idx|

     if item.is_a?[Hash]
       ihash[item]
     elsif v.is_a?[Array]
       iarray[v]
     else
       a[idx] = docheck[item];
     end
  end
end

def ihash[h]
  h.each_pair do |k,v|
    if v.is_a?[Hash]
      ihash[v]
    elsif v.is_a?[Array]
      iarray[v]
    else
      h[k] = docheck[v];
    end
  end
end

  public
  def register
    fix_streaming_codecs
    self.server_socket = new_server_socket
  end # def register

  private
  def handle_socket[socket, client_address, output_queue]
    while !stop?
      buf = nil
      databuf = nil
      # NOTE[petef]: the timeout only hits after the line is read
      # or socket dies
      # TODO[sissel]: Why do we have a timeout here? What's the point?

      if @data_timeout == -1
        #skipped the high speed connector header
        buf = readData[socket,32]
        buf = read[socket]
      else
        Timeout::timeout[@data_timeout] do
          #skipped the high speed connector header
          buf = readData[socket,32]
          buf = read[socket]
        end
      end
      if BSON::VERSION[0] == '4'
       readstring = BSON::ByteBuffer.new[buf]
      else
       readstring = StringIO.new[buf];
      end
      datalen = BSON::Int32.from_bson[readstring]  
      databuf = readData[socket,datalen-4]
    
      buf  socket.peeraddr[3]]
  rescue => e
    # if plugin is stopping, don't bother logging it as an error
    !stop? && @logger.error["An error occurred. Closing connection", :client => socket.peeraddr[3], :exception => e, :backtrace => e.backtrace]
  ensure
    # catch all rescue nil on close to discard any close errors or invalid socket
    socket.close rescue nil
  end

  private

  private
  def read[socket]
    return socket.sysread[4]
  end # def readline

  private
  def readData[socket,dataLen]
    data = ""
    if data.respond_to?[:force_encoding]
      data.force_encoding["BINARY"]
    end
    read_buffer = ""
    if read_buffer.respond_to?[:force_encoding]
      read_buffer.force_encoding["BINARY"]
    end
    until dataLen == 0
      data.force_encoding["BINARY"]
      socket.sysread[dataLen,read_buffer]
      dataLen = dataLen - read_buffer.length
      data  e
        # if this exception occured while the plugin is stopping
        # just ignore and exit
        raise e unless stop?

     end
    end
  ensure
    server_socket.close rescue nil
  end # def run_server

 def server_connection_thread[output_queue, socket]
    Thread.new[output_queue, socket] do |q, s|
      begin
        @logger.debug? && @logger.debug["Accepted connection", :client => s.peeraddr[3], :server => "#{@host}:#{@port}"]
        handle_socket[s, s.peeraddr[3], q]
      ensure
        delete_connection_socket[s]
      end
    end
  end

  def new_server_socket
    @logger.info["Starting tcp input listener", :address => "#{@host}:#{@port}"]

    begin
      socket = TCPServer.new[@host, @port]
    rescue Errno::EADDRINUSE
      @logger.error["Could not start TCP server: Address in use", :host => @host, :port => @port]
      raise
    end
  end

  def server_socket=[socket]
    @socket_mutex.synchronize{@server_socket = socket}

  end

  def server_socket
    @socket_mutex.synchronize{@server_socket}
  end

  def add_connection_socket[socket]
    @socket_mutex.synchronize{@connection_sockets[socket] = true}
    socket
  end

  def delete_connection_socket[socket]
    @socket_mutex.synchronize{@connection_sockets.delete[socket]}
  end

  def connection_sockets
    @socket_mutex.synchronize{@connection_sockets.keys.dup}
  end

end # class LogStash::Inputs::Tpfbson

4.] Update binary.rb

The BSON library does not correctly transform binary content correctly, so you might want to update the following functions in the binary.rb file. If the functions do not exist, you can insert the new contents as follows:

./vendor/bundle/jruby/1.9/gems/bson-4.2.1-java/lib/bson/binary.rb

    # Get the binary as JSON hash data.
    #
    # @example Get the binary as a JSON hash.
    #   binary.as_json
    #
    # @return [ Hash ] The binary as a JSON hash.
    #
    # @since 2.0.0
    def as_json[*args]
      { "$binary" => data.unpack['H*'].first, "$type" => type }
    end

    # Get the binary as JSON hash data.
    #
    # @example Get the binary as a JSON hash.
    #   binary.as_json
    #
    # @return [ Hash ] The binary as a JSON hash.
    #
    # @since 2.0.0
    def to_json[*args]
      { "$binary" => data.unpack['H*'].first, "$type" => type }
    end

5.] Create a Logstash config file

This configuration starts a BSON listener on port 4514 and creates output to the /tmp/bson.log file.

./logstash-bson.config:

input {
  tpfbson {
    port => 4514
  }
}
output {
  file { path => "/tmp/bson.log" }
}

 

6.] Start Logstash

bin/logstash -f ./logstash-bson.config

You can expect to see a line that contains something similar to the following:

[2017-08-23T17:06:32,089][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-08-23T17:06:32,107][INFO ][logstash.inputs.tpfbson  ] Automatically switching from plain to line codec {:plugin=>"tpfbson"}
[2017-08-23T17:06:32,110][INFO ][logstash.inputs.tpfbson  ] Starting tcp input listener {:address=>"0.0.0.0:4514"}
[2017-08-23T17:06:32,112][INFO ][logstash.pipeline        ] Pipeline main started
[2017-08-23T17:06:32,139][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600

Your Logstash plug-in is installed correctly and is now ready for input.  You can review the configuration for your z/TPF system to route logging to the target Logstash environment and begin logging MongoDB requests.

[{"Business Unit":{"code":"BU058","label":"IBM Infrastructure w\/TPS"},"Product":{"code":"SSZL53","label":"TPF"},"Component":"","Platform":[{"code":"PF036","label":"z\/TPF"}],"Version":"All versions","Edition":"","Line of Business":{"code":"LOB35","label":"Mainframe SW"}}]

Bài mới nhất

Chủ Đề