While a visit to their website provides complete information ( https://www.elastic.co/products/logstash ), in general, Logstash provides an extensible data processing pipeline.
How do I set up logging for z/TPF support for MongoDB?
For more information about configuring z/TPF, see Logging for z/TPF support for MongoDB in IBM Documentation.
1.) Install Logstash
Logstash runs in a JVM, so it can run on multiple hardware platforms. First, download the Logstash installation. This article was developed and tested with logstash-4.5.1 (https://www.elastic.co/downloads/past-releases/logstash-5-4-1).
After you download Logstash, extract the archive file in a directory of your choice.
2.) Install BSON plug-in (optional if you use the mongodb-output-plugin)
- Issue the following command from the Logstash installation directory chosen in step 1.
env GEM_HOME=./vendor/bundle/jruby/1.9 ./vendor/jruby/bin/jruby -S gem install bson -v 4.2.1 - Add gem "bson" to ./Gemfile if not already present.
To install the mongodb-output-plugin instead, issue the following command from the Logstash installation directory from step 1:
bin/logstash-plugin install logstash-output-mongodb
3.) Create tpfbson.rb
You can create a TCP listener style plug-in similar to normal TCP/IP listeners; however, the z/TPF system uses BSON to compress the contents of the transmitted data and requires a specialized input transformation. Place this file in the following relative directory:
./logstash-core/lib/logstash/inputs/tpfbson.rb
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/util/socket_peer"
require "socket"
require "bson"
require "json"
require "stringio"
# Read events over a TCP socket.
#
# Like stdin and file inputs, each event is assumed to be one line of text.
#
# Can either accept connections from clients or connect to a server,
# depending on `mode`.
class LogStash::Inputs::Tpfbson < LogStash::Inputs::Base
class Interrupted < StandardError; end
config_name "tpfbson"
milestone 2
# When mode is `server`, the address to listen on.
# When mode is `client`, the address to connect to.
config :host, :validate => :string, :default => "0.0.0.0"
# When mode is `server`, the port to listen on.
# When mode is `client`, the port to connect to.
config :port, :validate => :number, :required => true
# The 'read' timeout in seconds. If a particular bson connection is idle for
# more than this timeout period, we will assume it is dead and close it.
#
# If you never want to timeout, use -1.
config :data_timeout, :validate => :number, :default => -1
# Mode to operate in. `server` listens for client connections,
# `client` connects to a server.
config :mode, :validate => ["server", "client"], :default => "server"
def initialize(*args)
super(*args)
@server_socket = nil
@connection_sockets = {}
@socket_mutex = Mutex.new
end # def initialize
def docheck(c)
if c.is_a?(BSON::Binary)
return c.to_json;
elsif c.is_a?(BSON::ObjectId)
return c.as_json;
else
return c;
end
end
def iarray(a)
a.each do |item,idx|
if item.is_a?(Hash)
ihash(item)
elsif v.is_a?(Array)
iarray(v)
else
a[idx] = docheck(item);
end
end
end
def ihash(h)
h.each_pair do |k,v|
if v.is_a?(Hash)
ihash(v)
elsif v.is_a?(Array)
iarray(v)
else
h[k] = docheck(v);
end
end
end
public
def register
fix_streaming_codecs
self.server_socket = new_server_socket
end # def register
private
def handle_socket(socket, client_address, output_queue)
while !stop?
buf = nil
databuf = nil
# NOTE(petef): the timeout only hits after the line is read
# or socket dies
# TODO(sissel): Why do we have a timeout here? What's the point?
if @data_timeout == -1
#skipped the high speed connector header
buf = readData(socket,32)
buf = read(socket)
else
Timeout::timeout(@data_timeout) do
#skipped the high speed connector header
buf = readData(socket,32)
buf = read(socket)
end
end
if BSON::VERSION[0] == '4'
readstring = BSON::ByteBuffer.new(buf)
else
readstring = StringIO.new(buf);
end
datalen = BSON::Int32.from_bson(readstring)
databuf = readData(socket,datalen-4)
buf << databuf
if BSON::VERSION[0] == '4'
readstring = BSON::ByteBuffer.new(buf)
else
readstring = StringIO.new(buf);
end
doc = BSON::Document.from_bson(readstring)
ihash(doc);
event = LogStash::Event.new(doc);
event.set("host",client_address);
event.set("path","bsonTcpInput");
decorate(event)
output_queue << event
end # loop do
rescue EOFError
@logger.debug("Connection closed", :client => socket.peeraddr[3])
rescue Errno::ECONNRESET
@logger.debug? && @logger.debug("Connection reset by peer", :client => socket.peeraddr[3])
rescue => e
# if plugin is stopping, don't bother logging it as an error
!stop? && @logger.error("An error occurred. Closing connection", :client => socket.peeraddr[3], :exception => e, :backtrace => e.backtrace)
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
socket.close rescue nil
end
private
private
def read(socket)
return socket.sysread(4)
end # def readline
private
def readData(socket,dataLen)
data = ""
if data.respond_to?(:force_encoding)
data.force_encoding("BINARY")
end
read_buffer = ""
if read_buffer.respond_to?(:force_encoding)
read_buffer.force_encoding("BINARY")
end
until dataLen == 0
data.force_encoding("BINARY")
socket.sysread(dataLen,read_buffer)
dataLen = dataLen - read_buffer.length
data << read_buffer
end
return data
end # def readline
public
def run(output_queue)
run_server(output_queue)
end # def run
def stop
# force close all sockets which will escape any blocking read with a IO exception
# and any thread using them will exit.
# catch all rescue nil on close to discard any close errors or invalid socket
server_socket.close rescue nil
connection_sockets.each{|socket| socket.close rescue nil}
end
def close
# see related comment in register: we must make sure to close the server socket here
# because it is created in the register method and we could be in the context of having
# register called but never run & stop, only close.
# catch all rescue nil on close to discard any close errors or invalid socket
server_socket.close rescue nil
end
def run_server(output_queue)
while !stop?
begin
socket = add_connection_socket(server_socket.accept)
server_connection_thread(output_queue,socket)
rescue => e
# if this exception occured while the plugin is stopping
# just ignore and exit
raise e unless stop?
end
end
ensure
server_socket.close rescue nil
end # def run_server
def server_connection_thread(output_queue, socket)
Thread.new(output_queue, socket) do |q, s|
begin
@logger.debug? && @logger.debug("Accepted connection", :client => s.peeraddr[3], :server => "#{@host}:#{@port}")
handle_socket(s, s.peeraddr[3], q)
ensure
delete_connection_socket(s)
end
end
end
def new_server_socket
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
begin
socket = TCPServer.new(@host, @port)
rescue Errno::EADDRINUSE
@logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port)
raise
end
end
def server_socket=(socket)
@socket_mutex.synchronize{@server_socket = socket}
end
def server_socket
@socket_mutex.synchronize{@server_socket}
end
def add_connection_socket(socket)
@socket_mutex.synchronize{@connection_sockets[socket] = true}
socket
end
def delete_connection_socket(socket)
@socket_mutex.synchronize{@connection_sockets.delete(socket)}
end
def connection_sockets
@socket_mutex.synchronize{@connection_sockets.keys.dup}
end
end # class LogStash::Inputs::Tpfbson
4.) Update binary.rb
The BSON library does not correctly transform binary content correctly, so you might want to update the following functions in the binary.rb file. If the functions do not exist, you can insert the new contents as follows:
./vendor/bundle/jruby/1.9/gems/bson-4.2.1-java/lib/bson/binary.rb
# Get the binary as JSON hash data.
#
# @example Get the binary as a JSON hash.
# binary.as_json
#
# @return [ Hash ] The binary as a JSON hash.
#
# @since 2.0.0
def as_json(*args)
{ "$binary" => data.unpack('H*').first, "$type" => type }
end
# Get the binary as JSON hash data.
#
# @example Get the binary as a JSON hash.
# binary.as_json
#
# @return [ Hash ] The binary as a JSON hash.
#
# @since 2.0.0
def to_json(*args)
{ "$binary" => data.unpack('H*').first, "$type" => type }
end
5.) Create a Logstash config file
This configuration starts a BSON listener on port 4514 and creates output to the /tmp/bson.log file.
./logstash-bson.config:
input {
tpfbson {
port => 4514
}
}
output {
file { path => "/tmp/bson.log" }
}
6.) Start Logstash
bin/logstash -f ./logstash-bson.config
You can expect to see a line that contains something similar to the following:
[2017-08-23T17:06:32,089][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-08-23T17:06:32,107][INFO ][logstash.inputs.tpfbson ] Automatically switching from plain to line codec {:plugin=>"tpfbson"}
[2017-08-23T17:06:32,110][INFO ][logstash.inputs.tpfbson ] Starting tcp input listener {:address=>"0.0.0.0:4514"}
[2017-08-23T17:06:32,112][INFO ][logstash.pipeline ] Pipeline main started
[2017-08-23T17:06:32,139][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600
Your Logstash plug-in is installed correctly and is now ready for input. You can review the configuration for your z/TPF system to route logging to the target Logstash environment and begin logging MongoDB requests.
[{"Business Unit":{"code":"BU058","label":"IBM Infrastructure w\/TPS"},"Product":{"code":"SSZL53","label":"TPF"},"Component":"","Platform":[{"code":"PF036","label":"z\/TPF"}],"Version":"All versions","Edition":"","Line of Business":{"code":"LOB35","label":"Mainframe SW"}}]