Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added zookeeper as sql_last_value storage #316

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Gemfile.lock
.bundle
vendor
derby.log
derby.log
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ Reading data from MySQL:
jdbc_password => "password"
# or jdbc_password_filepath => "/path/to/my/password_file"
statement => "SELECT ..."
use_column_value => true
tracking_column => tracking_number
last_run_storage => "zookeeper"
# or last_run_storage => "file"
# last_run_metadata_path => "/path/to/last_run_metadata_path"
last_run_zookeeper_path => "/last_run_zookeeper_path"
zk_ip_list => "zookeeper_host:zookeeper_port"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
Expand Down
43 changes: 40 additions & 3 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,21 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base
# exactly once.
config :schedule, :validate => :string

# last run time storage ('file', 'zookeeper')
config :last_run_storage, :validate => :string, :default => "file"

# Path to file with last run time
config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run"

# Path to zookeeper node with last run time
config :last_run_zookeeper_path, :validate => :string, :default => "/logstash_input_jdbc_last_run"

# Zookeeper ip list
config :zk_ip_list, :validate => :string, :default => "localhost:2181"

# Znode we created is permanent or ephemeral.
config :zk_ephemeral, :validate => :boolean, :default => false

# Use an incremental column value rather than a timestamp
config :use_column_value, :validate => :boolean, :default => false

Expand Down Expand Up @@ -213,7 +225,8 @@ def register
end
end

set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self))
init_value_tracker

set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger))

@enable_encoding = [email protected]? || !@columns_charset.empty?
Expand Down Expand Up @@ -242,6 +255,16 @@ def register
end
end # def register

def init_value_tracker
if @last_run_storage.downcase == "file"
set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self))
else
if @last_run_storage.downcase == "zookeeper"
set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTrackingZookeeper.build_last_value_tracker(self))
end
end
end

# test injection points
def set_statement_logger(instance)
@statement_logger = instance
Expand Down Expand Up @@ -273,7 +296,14 @@ def stop

def execute_query(queue)
# update default parameters
@parameters['sql_last_value'] = @value_tracker.value
if @last_run_storage.downcase == "file"
@parameters['sql_last_value'] = @value_tracker.value
else
if @last_run_storage.downcase == "zookeeper"
@parameters['sql_last_value'] = @value_tracker.read_value
end
end
@event_sent = false
execute_statement(@statement, @parameters) do |row|
if enable_encoding?
## do the necessary conversions to string elements
Expand All @@ -282,8 +312,15 @@ def execute_query(queue)
event = LogStash::Event.new(row)
decorate(event)
queue << event
@event_sent = true
end
begin
# save value if it's not the same as previous
@value_tracker.write if @parameters['sql_last_value'] != @value_tracker.value && @event_sent
rescue => e
@logger.error("Failed to write last value", :exception => e)
stop
end
@value_tracker.write
end

private
Expand Down
1 change: 1 addition & 0 deletions lib/logstash/plugin_mixins/jdbc/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "time"
require "date"
require_relative "value_tracking"
require_relative "value_tracking_zookeeper"
require_relative "checked_count_logger"

java_import java.util.concurrent.locks.ReentrantLock
Expand Down
149 changes: 149 additions & 0 deletions lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# encoding: utf-8
require "zk"

module LogStash module PluginMixins module Jdbc
class ValueTrackingZookeeper

def self.build_last_value_tracker(plugin)
if plugin.use_column_value && plugin.tracking_column_type == "numeric"
# use this irrespective of the jdbc_default_timezone setting
klass = NumericValueTrackerZK
else
if plugin.jdbc_default_timezone.nil? || plugin.jdbc_default_timezone.empty?
# no TZ stuff for Sequel, use Time
klass = TimeValueTrackerZK
else
# Sequel does timezone handling on DateTime only
klass = DateTimeValueTrackerZK
end
end

handler = NullNodeHandler.new(plugin.last_run_zookeeper_path)
if plugin.record_last_run
handler = NodeHandler.new(plugin)
end
if plugin.clean_run
handler.clean
end
instance = klass.new(handler)
return instance
end

attr_reader :value

def initialize(handler)
@node_handler = handler
set_value(read_value)
end

def read_value
# override in subclass
end

def set_value(value)
# override in subclass
end

def write
@node_handler.write(@value)
end
end


class NumericValueTrackerZK < ValueTrackingZookeeper
def read_value
@val = @node_handler.read
return 0 if @val.nil?
@val.to_f.round(0)
end

def set_value(value)
return unless value.is_a?(Numeric)
@value = value
end
end

class DateTimeValueTrackerZK < ValueTrackingZookeeper
def read_value
@node_handler.read || DateTime.new(1970)
end

def set_value(value)
if value.respond_to?(:to_datetime)
@value = value.to_datetime
else
@value = DateTime.parse(value)
end
end
end

class TimeValueTrackerZK < ValueTrackingZookeeper
def read_value
@node_handler.read || Time.at(0).utc
end

def set_value(value)
if value.respond_to?(:to_time)
@value = value.to_time
else
@value = DateTime.parse(value).to_time
end
end
end

class NodeHandler
def initialize(plugin)
@path = plugin.last_run_zookeeper_path
@zk_ip_list = plugin.zk_ip_list
@zk_ephemeral = plugin.zk_ephemeral

@zk = ZK.new(@zk_ip_list)
@exists = @zk.exists?(@path)
create_node
end

def clean
return unless @exists
@zk.delete(@path)
@exists = false
end

def read
return unless @exists
@zk.get(@path).first
end

def set_initial(initial)
@initial = initial
end

def create_node
unless @exists
if @zk_ephemeral
@zk.create(@path, :ephemeral => true)
else
@zk.create(@path)
end
@exists = true
end
end

def write(value)
@zk.set(@path, value.to_s)
end
end

class NullNodeHandler
def initialize(path)
end

def clean
end

def read
end

def write(value)
end
end
end end end
1 change: 1 addition & 0 deletions logstash-input-jdbc.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency 'tzinfo'
s.add_runtime_dependency 'tzinfo-data'
s.add_runtime_dependency 'rufus-scheduler'
s.add_runtime_dependency "zk", ">= 1.9.6"

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'timecop'
Expand Down