Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clusteriing): introduce incremental sync for clustering #13157

Open
wants to merge 122 commits into
base: master
Choose a base branch
from

Conversation

dndx
Copy link
Member

@dndx dndx commented Jun 4, 2024

KAG-4865
KAG-2986
KAG-2987
KAG-3502
KAG-3258
KAG-5283

Flaky (perhaps need rerun):

  • spec/02-integration/09-hybrid_mode/03-pki_spec.lua
  • spec/02-integration/02-cmd/14-vault_spec.lua
  • spec/03-plugins/09-key-auth/04-hybrid_mode_spec.lua

@chronolaw chronolaw marked this pull request as ready for review September 30, 2024 08:17


local function get_all_nodes_with_sync_cap()
local res, err = kong.db.clustering_data_planes:page(DEFAULT_PAGE_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not use :each() here instead of simply fetching one page? It seems that this function will only ever return DEFAULT_PAGE_SIZE entries even if there are more in the database.

end


function _M:notify_all_nodes(new_version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new_version param appears to be unused.

Comment on lines +152 to +172
manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions)
-- currently only default is supported, and anything else is ignored
for namespace, new_version in pairs(new_versions) do
if namespace == "default" then
local version = new_version.new_version
if not version then
return nil, "'new_version' key does not exist"
end

local lmdb_ver = tonumber(declarative.get_current_hash()) or 0
if lmdb_ver < version then
return self:sync_once()
end

return true
end
end

return nil, "default namespace does not exist inside params"
end)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for the value of new_versions to contain multiple entries for the default namespace, like this?

{
  { namespace = "default", new_version = 1 },
  { namespace = "default", new_version = 3 },
}

If so, then this logic might be faulty. If lmdb_ver = 2, then we will exit the loop after one iteration (because lmdb_ver (2) < version (1)) instead of evaluating the next entry which has new_version = 3 and would trigger a sync operation.



function _M:sync_once(delay)
local hdl, err = ngx.timer.at(delay or 0, function(premature)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timer handler doesn't capture any upvalues from the surrounding function scope of _M:sync_once. Should we transform it into a reusable function at the module level instead?

end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- here must be 2 times
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why 2 times in the comment?

Comment on lines +297 to +306
if ns_delta.wipe then
kong.core_cache:purge()
kong.cache:purge()

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why purging the cache and executing the event handlers are mutually-exclusive outcomes. Can you explain why we skip the crud events when ns_delta.wipe is truthy?

Comment on lines +10 to +11
-- TODO: what is the proper value?
local FIRST_SYNC_DELAY = 0.5 -- seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think we would want to sync as soon as possible, no?

Suggested change
-- TODO: what is the proper value?
local FIRST_SYNC_DELAY = 0.5 -- seconds
local FIRST_SYNC_DELAY = 0

Copy link
Contributor

@chronolaw chronolaw Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our test suits we may need to wait cp to start, here 0.5 second perhaps is a proper number, or else the next sync will be 30 seconds later.
Sure, we need to find the appropriate number.


function _M:get_latest_version()
local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version"
return self.connector:query(sql)[1].max_version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other methods in this file all return directly from connector:query(), so I assume they will return nil, string in case of error.

On the other hand, this method may raise an error due to invalid table access if there is a failure in connector:query().

I suggest we make the error-handling more consistent:

Suggested change
return self.connector:query(sql)[1].max_version
local rows, err = self.connector:query(sql)
if not rows then
return nil, err
end
-- should we raise/return an error if no rows/max_version are returned?
-- or should we return a default value of 0?
return rows[1] and rows[1].max_version

If it's intended to raise an error when connector:query() fails, then I suggest using assert() or error() to make the intention more explicit.

We should also codify the behavior in the case that clustering_sync_version is empty. I'm seeing code elsewhere that assumes it cannot return nil. Is the caller incorrect?

-- location: kong/clustering/services/sync/rpc.lua

function _M:init_cp(manager)
-- [...]
    if default_namespace_version == 0 or
       self.strategy:get_latest_version() - default_namespace_version > FULL_SYNC_THRESHOLD
    then
-- [...]

CREATE TABLE IF NOT EXISTS clustering_sync_delta (
"version" INT NOT NULL,
"type" TEXT NOT NULL,
"id" UUID NOT NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if there is an entity without an id attribute or with an id that isn't a UUID type? Is it possible to create such an entity in Kong via a 3rd party plugin dao, or will the metaschema prevent this condition?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants