Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the pagination under big data. #387

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-tracking_column_type>> |<<string,string>>, one of `["numeric", "timestamp"]`|No
| <<plugins-{type}s-{plugin}-use_column_value>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-use_prepared_statements>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-paging_wrapper_enabled>> |<<boolean,boolean>>|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
Expand Down Expand Up @@ -579,6 +580,35 @@ to `false`, `:sql_last_value` reflects the last time the query was executed.

When set to `true`, enables prepare statement usage

[id="plugins-{type}s-{plugin}-paging_wrapper_enabled"]
===== `paging_wrapper_enabled`

* Value type is <<boolean,boolean>>
* Default value is `true`

By default, the query SQL with pagination will be wrapped like this:
[source,ruby]
-------------------------------------------------------
SELECT * FROM ( SELECT ... ) AS `t1` LIMIT 10000 OFFSET 10000
-------------------------------------------------------

When set to false, you should control the pagination by yourself with `:sql_last_value`

Example:
[source,ruby]
-------------------------------------------------------
input {
jdbc {
...
statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value AND id <= :sql_last_value + 10000"
...
}
}
-------------------------------------------------------
This is useful when paging under big data.

Warning: you should avoid sinking into dead circulation!

[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]

Expand Down
24 changes: 24 additions & 0 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,30 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base

config :prepared_statement_bind_values, :validate => :array, :default => []

# By default, the query SQL with pagination will be wrapped like this:
# [source,ruby]
# -------------------------------------------------------
# SELECT * FROM ( SELECT ... ) AS `t1` LIMIT 10000 OFFSET 10000
# -------------------------------------------------------
#
# When set to false, you should control the pagination by yourself with `:sql_last_value`
#
# Example:
# [source,ruby]
# -------------------------------------------------------
# input {
# jdbc {
# ...
# statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value AND id <= :sql_last_value + 10000"
# ...
# }
# }
# -------------------------------------------------------
# This is useful when paging under big data.
#
# Warning: you should avoid sinking into dead circulation!
config :paging_wrapper_enabled, :validate => :boolean, :default => true

attr_reader :database # for test mocking/stubbing

public
Expand Down
19 changes: 16 additions & 3 deletions lib/logstash/plugin_mixins/jdbc/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,22 @@ def execute_statement
begin
sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
@tracking_column_warning_sent = false
@statement_handler.perform_query(@database, @value_tracker.value, @jdbc_paging_enabled, @jdbc_page_size) do |row|
sql_last_value = get_column_value(row) if @use_column_value
yield extract_values_from(row)
if !@paging_wrapper_enabled and @jdbc_paging_enabled
continue = true
while continue
continue = false
@statement_handler.perform_query(@database, @value_tracker.value, false, @jdbc_page_size) do |row|
sql_last_value = get_column_value(row) if @use_column_value
continue = true if !continue
yield extract_values_from(row)
end
@value_tracker.set_value(sql_last_value) if continue
end
else
@statement_handler.perform_query(@database, @value_tracker.value, @jdbc_paging_enabled, @jdbc_page_size) do |row|
sql_last_value = get_column_value(row) if @use_column_value
yield extract_values_from(row)
end
end
success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError, Java::JavaSql::SQLException => e
Expand Down