From 6a2dbf4d61b76999bd7744be3b63c4e1b494672f Mon Sep 17 00:00:00 2001 From: TanShun Date: Sun, 11 Apr 2021 11:27:18 +0800 Subject: [PATCH] Optimize the pagination under big data. --- docs/index.asciidoc | 30 +++++++++++++++++++++++++ lib/logstash/inputs/jdbc.rb | 24 ++++++++++++++++++++ lib/logstash/plugin_mixins/jdbc/jdbc.rb | 19 +++++++++++++--- 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index f358027..75d909f 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -226,6 +226,7 @@ This plugin supports the following configuration options plus the <> |<>, one of `["numeric", "timestamp"]`|No | <> |<>|No | <> |<>|No +| <> |<>|No |======================================================================= Also see <> for a list of options supported by all @@ -579,6 +580,35 @@ to `false`, `:sql_last_value` reflects the last time the query was executed. When set to `true`, enables prepare statement usage +[id="plugins-{type}s-{plugin}-paging_wrapper_enabled"] +===== `paging_wrapper_enabled` + + * Value type is <> + * Default value is `true` + +By default, the query SQL with pagination will be wrapped like this: +[source,ruby] +------------------------------------------------------- +SELECT * FROM ( SELECT ... ) AS `t1` LIMIT 10000 OFFSET 10000 +------------------------------------------------------- + +When set to false, you should control the pagination by yourself with `:sql_last_value` + +Example: +[source,ruby] +------------------------------------------------------- +input { + jdbc { + ... + statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value AND id <= :sql_last_value + 10000" + ... + } +} +------------------------------------------------------- +This is useful when paging under big data. + +Warning: you should avoid sinking into dead circulation! + [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index b037b3c..2cb78cb 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -207,6 +207,30 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base config :prepared_statement_bind_values, :validate => :array, :default => [] + # By default, the query SQL with pagination will be wrapped like this: + # [source,ruby] + # ------------------------------------------------------- + # SELECT * FROM ( SELECT ... ) AS `t1` LIMIT 10000 OFFSET 10000 + # ------------------------------------------------------- + # + # When set to false, you should control the pagination by yourself with `:sql_last_value` + # + # Example: + # [source,ruby] + # ------------------------------------------------------- + # input { + # jdbc { + # ... + # statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value AND id <= :sql_last_value + 10000" + # ... + # } + # } + # ------------------------------------------------------- + # This is useful when paging under big data. + # + # Warning: you should avoid sinking into dead circulation! + config :paging_wrapper_enabled, :validate => :boolean, :default => true + attr_reader :database # for test mocking/stubbing public diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb index 341cd58..55199ae 100644 --- a/lib/logstash/plugin_mixins/jdbc/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -244,9 +244,22 @@ def execute_statement begin sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc @tracking_column_warning_sent = false - @statement_handler.perform_query(@database, @value_tracker.value, @jdbc_paging_enabled, @jdbc_page_size) do |row| - sql_last_value = get_column_value(row) if @use_column_value - yield extract_values_from(row) + if !@paging_wrapper_enabled and @jdbc_paging_enabled + continue = true + while continue + continue = false + @statement_handler.perform_query(@database, @value_tracker.value, false, @jdbc_page_size) do |row| + sql_last_value = get_column_value(row) if @use_column_value + continue = true if !continue + yield extract_values_from(row) + end + @value_tracker.set_value(sql_last_value) if continue + end + else + @statement_handler.perform_query(@database, @value_tracker.value, @jdbc_paging_enabled, @jdbc_page_size) do |row| + sql_last_value = get_column_value(row) if @use_column_value + yield extract_values_from(row) + end end success = true rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError, Java::JavaSql::SQLException => e