From dbe62591666852257e58f8e1585b6e6575e52a96 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Mon, 9 Sep 2024 02:02:22 -0700 Subject: [PATCH] refactored import logic so it can be shared by DB-less import and incremental sync --- kong/db/declarative/import.lua | 256 ++++++++++++++----- kong/db/declarative/init.lua | 5 +- kong/db/schema/others/declarative_config.lua | 2 + kong/db/strategies/off/init.lua | 215 +++------------- 4 files changed, 237 insertions(+), 241 deletions(-) diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index f82e477cb8af..acc8c6f50caf 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -22,12 +22,47 @@ local insert = table.insert local string_format = string.format local null = ngx.null local get_phase = ngx.get_phase +local get_workspace_id = workspaces.get_workspace_id local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH +-- Generates the appropriate workspace ID for current operating context +-- depends on schema settings +-- +-- Non-workspaceable entities are always placed under the "default" +-- workspace +-- +-- If the query explicitly set options.workspace == null, then "default" +-- workspace shall be used +-- +-- If the query explicitly set options.workspace == "some UUID", then +-- it will be returned +-- +-- Otherwise, the current workspace ID will be returned +local function workspace_id(schema, options) + if not schema.workspaceable then + return kong.default_workspace + end + + if options then + -- options.workspace == null must be handled by caller by querying + -- all available workspaces one by one + if options.workspace == null then + return kong.default_workspace + end + + if options.workspace then + return options.workspace + end + end + + return get_workspace_id() +end + + local function find_or_create_current_workspace(name) name = name or "default" @@ -197,12 +232,22 @@ local function unique_field_key(schema_name, ws_id, field, value) end +local function foreign_field_key_prefix(schema_name, ws_id, field, foreign_id) + return string_format("%s|%s|%s|%s|", schema_name, ws_id, field, foreign_id) +end + + local function foreign_field_key(schema_name, ws_id, field, foreign_id, pk) - if pk then - return string_format("%s|%s|%s|%s|%s", schema_name, ws_id, field, foreign_id, pk) - end + return foreign_field_key_prefix(schema_name, ws_id, field, foreign_id) .. pk +end - return string_format("%s|%s|%s|%s|", schema_name, ws_id, field, foreign_id) +local function item_key_prefix(schema_name, ws_id) + return string_format("%s|%s|*|", schema_name, ws_id) +end + + +local function item_key(schema_name, ws_id, pk_str) + return item_key_prefix .. pk_str end @@ -238,7 +283,11 @@ local function load_into_cache(entities, meta, hash) local t = txn.begin(512) t:db_drop(false) + local phase = get_phase() + for entity_name, items in pairs(entities) do + yield(true, phase) + local dao = kong.db[entity_name] if not dao then return nil, "unknown entity: " .. entity_name @@ -246,72 +295,32 @@ local function load_into_cache(entities, meta, hash) local schema = dao.schema for id, item in pairs(items) do - local ws_id = default_workspace_id - - if schema.workspaceable and item.ws_id == null or item.ws_id == nil then - item.ws_id = ws_id + if schema.workspaceable and (item_ws_id == null or item_ws_id == nil) then + item.ws_id = default_workspace_id end - assert(type(ws_id) == "string") - - local pk = pk_string(schema, item) + assert(type(item.ws_id) == "string") - local item_key = string_format("%s|%s|*|%s", entity_name, ws_id, pk) + if should_transform and schema:has_transformations(item) then + local transformed_item = cycle_aware_deep_copy(item) + remove_nulls(transformed_item) - item = remove_nulls(item) - - if transform then local err - item, err = schema:transform(item) + transformed_item, err = schema:transform(transformed_item) + if not transformed_item then + return nil, err + end + + item = restore_nulls(item, transformed_item) if not item then return nil, err end end - local item_marshalled, err = marshall(item) - if not item_marshalled then + local res, err = insert_entity_for_txn(t, entity_name, item, nil) + if not res then return nil, err end - - t:set(item_key, item_marshalled) - - -- select_by_cache_key - if schema.cache_key then - local cache_key = dao:cache_key(item) - local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key) - t:set(key, item_key) - end - - for fname, fdata in schema:each_field() do - local is_foreign = fdata.type == "foreign" - local fdata_reference = fdata.reference - local value = item[fname] - - if value then - if fdata.unique then - -- unique and not a foreign key, or is a foreign key, but non-composite - if type(value) == "table" then - assert(is_foreign) - value = pk_string(kong.db[fdata_reference].schema, value) - end - - if fdata.unique_across_ws then - ws_id = default_workspace_id - end - - local key = unique_field_key(entity_name, ws_id, fname, value) - t:set(key, item_key) - - elseif is_foreign then - -- not unique and is foreign, generate page_for_foo indexes - assert(type(value) == "table") - value = pk_string(kong.db[fdata_reference].schema, value) - - local key = foreign_field_key(entity_name, ws_id, fname, value, pk) - t:set(key, item_key) - end - end - end end end @@ -428,10 +437,141 @@ do end +-- Serialize and set keys for a single validated entity into +-- the provided LMDB txn object, this operation is only safe +-- is the entity does not already exist inside the LMDB database +-- +-- This function sets the following: +-- * ||*| => serialized item +-- * |||sha256(field_value) => ||*| +-- * |||| -> ||*| +local function insert_entity_for_txn(t, entity_name, item, options) + local dao = kong.db[entity_name] + local schema = dao.schema + local pk = pk_string(schema, item) + local ws_id = workspace_id(schema, options) + + local item_key = item_key(entity_name, ws_id, pk) + item = remove_nulls(item) + + local item_marshalled, err = marshall(item) + if not item_marshalled then + return nil, err + end + + t:set(item_key, item_marshalled) + + -- select_by_cache_key + if schema.cache_key then + local cache_key = dao:cache_key(item) + local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key) + t:set(key, item_key) + end + + for fname, fdata in schema:each_field() do + local is_foreign = fdata.type == "foreign" + local fdata_reference = fdata.reference + local value = item[fname] + + if value then + if fdata.unique then + -- unique and not a foreign key, or is a foreign key, but non-composite + -- see: validate_foreign_key_is_single_primary_key, composite foreign + -- key is currently unsupported by the DAO + if type(value) == "table" then + assert(is_foreign) + value = pk_string(kong.db[fdata_reference].schema, value) + end + + if fdata.unique_across_ws then + ws_id = default_workspace_id + end + + local key = unique_field_key(entity_name, ws_id, fname, value) + t:set(key, item_key) + end + + if is_foreign then + -- is foreign, generate page_for_foreign_field indexes + assert(type(value) == "table") + value = pk_string(kong.db[fdata_reference].schema, value) + + local key = foreign_field_key(entity_name, ws_id, fname, value, pk) + t:set(key, item_key) + end + end + end + + return true +end + + +-- Serialize and remove keys for a single validated entity into +-- the provided LMDB txn object, this operation is safe whether the provided +-- entity exists inside LMDB or not, but the provided entity must contains the +-- correct field value so indexes can be deleted correctly +local function delete_entity_for_txn(t, entity_name, item, options) + local dao = kong.db[entity_name] + local schema = dao.schema + local pk = pk_string(schema, item) + local ws_id = workspace_id(schema, options) + + local item_key = item_key(entity_name, ws_id, pk) + t:set(item_key, nil) + + -- select_by_cache_key + if schema.cache_key then + local cache_key = dao:cache_key(item) + local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key) + t:set(key, nil) + end + + for fname, fdata in schema:each_field() do + local is_foreign = fdata.type == "foreign" + local fdata_reference = fdata.reference + local value = item[fname] + + if value then + if fdata.unique then + -- unique and not a foreign key, or is a foreign key, but non-composite + -- see: validate_foreign_key_is_single_primary_key, composite foreign + -- key is currently unsupported by the DAO + if type(value) == "table" then + assert(is_foreign) + value = pk_string(kong.db[fdata_reference].schema, value) + end + + if fdata.unique_across_ws then + ws_id = default_workspace_id + end + + local key = unique_field_key(entity_name, ws_id, fname, value) + t:set(key, nil) + end + + if is_foreign then + -- is foreign, generate page_for_foreign_field indexes + assert(type(value) == "table") + value = pk_string(kong.db[fdata_reference].schema, value) + + local key = foreign_field_key(entity_name, ws_id, fname, value, pk) + t:set(key, nil) + end + end + end + + return true +end + + return { get_current_hash = get_current_hash, unique_field_key = unique_field_key, foreign_field_key = foreign_field_key, + foreign_field_key_prefix = foreign_field_key_prefix, + item_key = item_key, + item_key_prefix = item_key_prefix, + workspace_id = workspace_id, load_into_db = load_into_db, load_into_cache = load_into_cache, diff --git a/kong/db/declarative/init.lua b/kong/db/declarative/init.lua index 89514d361d84..c6a832a08894 100644 --- a/kong/db/declarative/init.lua +++ b/kong/db/declarative/init.lua @@ -252,10 +252,13 @@ _M.sanitize_output = declarative_export.sanitize_output -- import _M.get_current_hash = declarative_import.get_current_hash _M.unique_field_key = declarative_import.unique_field_key -_M.foreign_field_key = declarative_import.foreign_field_key +_M.item_key = declarative_import.item_key +_M.item_key_prefix = declarative_import.item_key_prefix +_M.foreign_field_key_prefix = declarative_import.foreign_field_key_prefix _M.load_into_db = declarative_import.load_into_db _M.load_into_cache = declarative_import.load_into_cache _M.load_into_cache_with_events = declarative_import.load_into_cache_with_events +_M.workspace_id = declarative_import.workspace_id return _M diff --git a/kong/db/schema/others/declarative_config.lua b/kong/db/schema/others/declarative_config.lua index 186fcdfa1286..844ff749536d 100644 --- a/kong/db/schema/others/declarative_config.lua +++ b/kong/db/schema/others/declarative_config.lua @@ -43,6 +43,8 @@ local foreign_children = {} do local CACHED_OUT + -- Generate a stable and unique string key from primary key defined inside + -- schema, supports both non-composite and composite primary keys function DeclarativeConfig.pk_string(schema, object) if #schema.primary_key == 1 then return tostring(object[schema.primary_key[1]]) diff --git a/kong/db/strategies/off/init.lua b/kong/db/strategies/off/init.lua index a0a1047bbbcb..1f34d6725d9f 100644 --- a/kong/db/strategies/off/init.lua +++ b/kong/db/strategies/off/init.lua @@ -26,7 +26,9 @@ local lmdb_get = lmdb.get local get_workspace_id = workspaces.get_workspace_id local pk_string = declarative_config.pk_string local unique_field_key = declarative.unique_field_key -local foreign_field_key = declarative.foreign_field_key +local item_key = declarative.item_key +local item_key_prefix = declarative.item_key_prefix +local workspace_id = declarative.workspace_id local PROCESS_AUTO_FIELDS_OPTS = { @@ -42,27 +44,6 @@ local _mt = {} _mt.__index = _mt -local function ws(schema, options) - if not schema.workspaceable then - return kong.default_workspace - end - - if options then - -- options.workspace == null must be handled by caller by querying - -- all available workspaces one by one - if options.workspace == null then - return kong.default_workspace - end - - if options.workspace then - return options.workspace - end - end - - return get_workspace_id() -end - - local function process_ttl_field(entity) if entity and entity.ttl and entity.ttl ~= null then local ttl_value = entity.ttl - ngx.time() @@ -97,7 +78,12 @@ local function construct_entity(schema, value) end --- select item by key, if follow is true, then one indirection will be followed +-- select item by primary key, if follow is true, then one indirection +-- will be followed indirection means the value of `key` is not the actual +-- serialized item, but rather the value is a pointer to the key where +-- actual serialized item is located. This way this function can be shared +-- by both primary key lookup as well as unique key lookup without needing +-- to duplicate the item content local function select_by_key(schema, key, follow) if follow then local actual_key, err = lmdb_get(key) @@ -106,15 +92,14 @@ local function select_by_key(schema, key, follow) end return select_by_key(schema, actual_key, false) + end - else - local entity, err = construct_entity(schema, lmdb_get(key)) - if not entity then - return nil, err - end - - return entity + local entity, err = construct_entity(schema, lmdb_get(key)) + if not entity then + return nil, err end + + return entity end @@ -144,25 +129,21 @@ local function page_for_prefix(self, prefix, size, offset, options, follow) if follow then item, err = select_by_key(schema, kv.value, false) - if err then - return nil, err - end else item, err = construct_entity(schema, kv.value) - if not item then - return nil, err - end end - if item then - ret_idx = ret_idx + 1 - ret[ret_idx] = item + if err then + return nil, err end + + ret_idx = ret_idx + 1 + ret[ret_idx] = item end if err_or_more then - return ret, nil, last_key .. "1" + return ret, nil, last_key .. "\x00" end return ret @@ -171,8 +152,8 @@ end local function page(self, size, offset, options) local schema = self.schema - local ws_id = ws(schema, options) - local prefix = string_format("%s|%s|*|", schema.name, ws_id) + local ws_id = workspace_id(schema, options) + local prefix = item_key_prefix(schema.name, ws_id) return page_for_prefix(self, prefix, size, offset, options, false) end @@ -180,20 +161,21 @@ end -- select by primary key local function select(self, pk, options) local schema = self.schema - local ws_id = ws(schema, options) + local ws_id = workspace_id(schema, options) local pk = pk_string(schema, pk) - local key = string_format("%s|%s|*|%s", schema.name, ws_id, pk) + local key = item_key(schema.name, ws_id, pk) return select_by_key(schema, key, false) end -- select by unique field (including select_by_cache_key) --- the DAO guarentees this method only gets called for unique fields +-- the DAO guarantees this method only gets called for unique fields +-- see: validate_foreign_key_is_single_primary_key local function select_by_field(self, field, value, options) local schema = self.schema if type(value) == "table" then - -- select by foreign, we only support one key for now (no composites) + -- select by foreign, DAO only support one key for now (no composites) local fdata = schema.fields[field] assert(fdata.type == "foreign") assert(#kong.db[fdata.reference].schema.primary_key == 1) @@ -202,7 +184,7 @@ local function select_by_field(self, field, value, options) _, value = next(value) end - local ws_id = ws(schema, options) + local ws_id = workspace_id(schema, options) local key local unique_across_ws = schema.fields[field].unique_across_ws @@ -219,71 +201,6 @@ local function select_by_field(self, field, value, options) end -local function delete(self, pk, options) - local schema = self.schema - - local entity, err = select(self, pk, options) - if not entity then - return nil, err - end - - local t = lmdb_transaction.begin(16) - - local pk = pk_string(schema, pk) - local ws_id = ws(schema, options) - local entity_name = schema.name - local item_key = string_format("%s|%s|*|%s", entity_name, ws_id, pk) - t:set(item_key, nil) - - local dao = kong.db[entity_name] - - -- select_by_cache_key - if schema.cache_key then - local cache_key = dao:cache_key(entity) - local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key) - t:set(key, nil) - end - - for fname, fdata in schema:each_field() do - local is_foreign = fdata.type == "foreign" - local fdata_reference = fdata.reference - local value = entity[fname] - - if value and value ~= null then - if fdata.unique then - -- unique and not a foreign key, or is a foreign key, but non-composite - if type(value) == "table" then - assert(is_foreign) - value = pk_string(kong.db[fdata_reference].schema, value) - end - - if fdata.unique_across_ws then - ws_id = default_workspace_id - end - - local key = unique_field_key(entity_name, ws_id, fname, value) - t:set(key, nil) - - elseif is_foreign then - -- not unique and is foreign, generate page_for_foo indexes - assert(type(value) == "table") - value = pk_string(kong.db[fdata_reference].schema, value) - - local key = foreign_field_key(entity_name, ws_id, fname, value, pk) - t:set(key, nil) - end - end - end - - local res, err = t:commit() - if not res then - return nil, self.errors:database_error(err) - end - - return true -end - - local function remove_nulls(tbl) for k,v in pairs(tbl) do if v == null then @@ -297,72 +214,6 @@ local function remove_nulls(tbl) end -local function insert(self, item, options) - local schema = self.schema - local t = lmdb_transaction.begin(16) - - local pk = pk_string(schema, item) - local entity_name = schema.name - local ws_id = ws(schema, options) - local dao = kong.db[entity_name] - - local item_key = string_format("%s|%s|*|%s", entity_name, ws_id, pk) - item = remove_nulls(item) - - local item_marshalled, err = marshall(item) - if not item_marshalled then - return nil, err - end - - t:set(item_key, item_marshalled) - - -- select_by_cache_key - if schema.cache_key then - local cache_key = dao:cache_key(item) - local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key) - t:set(key, item_key) - end - - for fname, fdata in schema:each_field() do - local is_foreign = fdata.type == "foreign" - local fdata_reference = fdata.reference - local value = item[fname] - - if value then - if fdata.unique then - -- unique and not a foreign key, or is a foreign key, but non-composite - if type(value) == "table" then - assert(is_foreign) - value = pk_string(kong.db[fdata_reference].schema, value) - end - - if fdata.unique_across_ws then - ws_id = default_workspace_id - end - - local key = unique_field_key(entity_name, ws_id, fname, value) - t:set(key, item_key) - - elseif is_foreign then - -- not unique and is foreign, generate page_for_foo indexes - assert(type(value) == "table") - value = pk_string(kong.db[fdata_reference].schema, value) - - local key = foreign_field_key(entity_name, ws_id, fname, value, pk) - t:set(key, item_key) - end - end - end - - local res, err = t:commit() - if not res then - return nil, self.errors:database_error(err) - end - - return item -end - - do local unsupported = function(operation) return function(self) @@ -383,10 +234,10 @@ do _mt.select = select _mt.page = page _mt.select_by_field = select_by_field - _mt.insert = insert + _mt.insert = unsupported("create") _mt.update = unsupported("update") _mt.upsert = unsupported("create or update") - _mt.delete = delete + _mt.delete = unsupported("remove") _mt.update_by_field = unsupported_by("update") _mt.upsert_by_field = unsupported_by("create or update") _mt.delete_by_field = unsupported_by("remove") @@ -413,8 +264,8 @@ function off.new(connector, schema, errors) if fdata.type == "foreign" then local method = "page_for_" .. fname self[method] = function(_, foreign_key, size, offset, options) - local ws_id = ws(schema, options) - local prefix = foreign_field_key(name, ws_id, fname, foreign_key.id) + local ws_id = workspace_id(schema, options) + local prefix = foreign_field_key_prefix(name, ws_id, fname, foreign_key.id) return page_for_prefix(self, prefix, size, offset, options, true) end end