Skip to content

Commit

Permalink
refactored import logic so it can be shared by DB-less import and
Browse files Browse the repository at this point in the history
incremental sync
  • Loading branch information
dndx committed Sep 9, 2024
1 parent a49557b commit dbe6259
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 241 deletions.
256 changes: 198 additions & 58 deletions kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,47 @@ local insert = table.insert
local string_format = string.format
local null = ngx.null
local get_phase = ngx.get_phase
local get_workspace_id = workspaces.get_workspace_id


local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH


-- Generates the appropriate workspace ID for current operating context
-- depends on schema settings
--
-- Non-workspaceable entities are always placed under the "default"
-- workspace
--
-- If the query explicitly set options.workspace == null, then "default"
-- workspace shall be used
--
-- If the query explicitly set options.workspace == "some UUID", then
-- it will be returned
--
-- Otherwise, the current workspace ID will be returned
local function workspace_id(schema, options)
if not schema.workspaceable then
return kong.default_workspace
end

if options then
-- options.workspace == null must be handled by caller by querying
-- all available workspaces one by one
if options.workspace == null then
return kong.default_workspace
end

if options.workspace then
return options.workspace
end
end

return get_workspace_id()
end


local function find_or_create_current_workspace(name)
name = name or "default"

Expand Down Expand Up @@ -197,12 +232,22 @@ local function unique_field_key(schema_name, ws_id, field, value)
end


local function foreign_field_key_prefix(schema_name, ws_id, field, foreign_id)
return string_format("%s|%s|%s|%s|", schema_name, ws_id, field, foreign_id)
end


local function foreign_field_key(schema_name, ws_id, field, foreign_id, pk)
if pk then
return string_format("%s|%s|%s|%s|%s", schema_name, ws_id, field, foreign_id, pk)
end
return foreign_field_key_prefix(schema_name, ws_id, field, foreign_id) .. pk
end

return string_format("%s|%s|%s|%s|", schema_name, ws_id, field, foreign_id)
local function item_key_prefix(schema_name, ws_id)
return string_format("%s|%s|*|", schema_name, ws_id)
end


local function item_key(schema_name, ws_id, pk_str)
return item_key_prefix .. pk_str
end


Expand Down Expand Up @@ -238,80 +283,44 @@ local function load_into_cache(entities, meta, hash)
local t = txn.begin(512)
t:db_drop(false)

local phase = get_phase()

for entity_name, items in pairs(entities) do
yield(true, phase)

local dao = kong.db[entity_name]
if not dao then
return nil, "unknown entity: " .. entity_name
end
local schema = dao.schema

for id, item in pairs(items) do
local ws_id = default_workspace_id

if schema.workspaceable and item.ws_id == null or item.ws_id == nil then
item.ws_id = ws_id
if schema.workspaceable and (item_ws_id == null or item_ws_id == nil) then
item.ws_id = default_workspace_id
end

assert(type(ws_id) == "string")

local pk = pk_string(schema, item)
assert(type(item.ws_id) == "string")

local item_key = string_format("%s|%s|*|%s", entity_name, ws_id, pk)
if should_transform and schema:has_transformations(item) then
local transformed_item = cycle_aware_deep_copy(item)
remove_nulls(transformed_item)

item = remove_nulls(item)

if transform then
local err
item, err = schema:transform(item)
transformed_item, err = schema:transform(transformed_item)
if not transformed_item then
return nil, err
end

item = restore_nulls(item, transformed_item)
if not item then
return nil, err
end
end

local item_marshalled, err = marshall(item)
if not item_marshalled then
local res, err = insert_entity_for_txn(t, entity_name, item, nil)
if not res then
return nil, err
end

t:set(item_key, item_marshalled)

-- select_by_cache_key
if schema.cache_key then
local cache_key = dao:cache_key(item)
local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key)
t:set(key, item_key)
end

for fname, fdata in schema:each_field() do
local is_foreign = fdata.type == "foreign"
local fdata_reference = fdata.reference
local value = item[fname]

if value then
if fdata.unique then
-- unique and not a foreign key, or is a foreign key, but non-composite
if type(value) == "table" then
assert(is_foreign)
value = pk_string(kong.db[fdata_reference].schema, value)
end

if fdata.unique_across_ws then
ws_id = default_workspace_id
end

local key = unique_field_key(entity_name, ws_id, fname, value)
t:set(key, item_key)

elseif is_foreign then
-- not unique and is foreign, generate page_for_foo indexes
assert(type(value) == "table")
value = pk_string(kong.db[fdata_reference].schema, value)

local key = foreign_field_key(entity_name, ws_id, fname, value, pk)
t:set(key, item_key)
end
end
end
end
end

Expand Down Expand Up @@ -428,10 +437,141 @@ do
end


-- Serialize and set keys for a single validated entity into
-- the provided LMDB txn object, this operation is only safe
-- is the entity does not already exist inside the LMDB database
--
-- This function sets the following:
-- * <entity_name>|<ws_id>|*|<pk_string> => serialized item
-- * <entity_name>|<ws_id>|<unique_field_name>|sha256(field_value) => <entity_name>|<ws_id>|*|<pk_string>
-- * <entity_name>|<ws_id>|<foreign_field_name>|<foreign_key>|<pk_string> -> <entity_name>|<ws_id>|*|<pk_string>
local function insert_entity_for_txn(t, entity_name, item, options)
local dao = kong.db[entity_name]
local schema = dao.schema
local pk = pk_string(schema, item)
local ws_id = workspace_id(schema, options)

local item_key = item_key(entity_name, ws_id, pk)
item = remove_nulls(item)

local item_marshalled, err = marshall(item)
if not item_marshalled then
return nil, err
end

t:set(item_key, item_marshalled)

-- select_by_cache_key
if schema.cache_key then
local cache_key = dao:cache_key(item)
local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key)
t:set(key, item_key)
end

for fname, fdata in schema:each_field() do
local is_foreign = fdata.type == "foreign"
local fdata_reference = fdata.reference
local value = item[fname]

if value then
if fdata.unique then
-- unique and not a foreign key, or is a foreign key, but non-composite
-- see: validate_foreign_key_is_single_primary_key, composite foreign
-- key is currently unsupported by the DAO
if type(value) == "table" then
assert(is_foreign)
value = pk_string(kong.db[fdata_reference].schema, value)
end

if fdata.unique_across_ws then
ws_id = default_workspace_id
end

local key = unique_field_key(entity_name, ws_id, fname, value)
t:set(key, item_key)
end

if is_foreign then
-- is foreign, generate page_for_foreign_field indexes
assert(type(value) == "table")
value = pk_string(kong.db[fdata_reference].schema, value)

local key = foreign_field_key(entity_name, ws_id, fname, value, pk)
t:set(key, item_key)
end
end
end

return true
end


-- Serialize and remove keys for a single validated entity into
-- the provided LMDB txn object, this operation is safe whether the provided
-- entity exists inside LMDB or not, but the provided entity must contains the
-- correct field value so indexes can be deleted correctly
local function delete_entity_for_txn(t, entity_name, item, options)
local dao = kong.db[entity_name]
local schema = dao.schema
local pk = pk_string(schema, item)
local ws_id = workspace_id(schema, options)

local item_key = item_key(entity_name, ws_id, pk)
t:set(item_key, nil)

-- select_by_cache_key
if schema.cache_key then
local cache_key = dao:cache_key(item)
local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key)
t:set(key, nil)
end

for fname, fdata in schema:each_field() do
local is_foreign = fdata.type == "foreign"
local fdata_reference = fdata.reference
local value = item[fname]

if value then
if fdata.unique then
-- unique and not a foreign key, or is a foreign key, but non-composite
-- see: validate_foreign_key_is_single_primary_key, composite foreign
-- key is currently unsupported by the DAO
if type(value) == "table" then
assert(is_foreign)
value = pk_string(kong.db[fdata_reference].schema, value)
end

if fdata.unique_across_ws then
ws_id = default_workspace_id
end

local key = unique_field_key(entity_name, ws_id, fname, value)
t:set(key, nil)
end

if is_foreign then
-- is foreign, generate page_for_foreign_field indexes
assert(type(value) == "table")
value = pk_string(kong.db[fdata_reference].schema, value)

local key = foreign_field_key(entity_name, ws_id, fname, value, pk)
t:set(key, nil)
end
end
end

return true
end


return {
get_current_hash = get_current_hash,
unique_field_key = unique_field_key,
foreign_field_key = foreign_field_key,
foreign_field_key_prefix = foreign_field_key_prefix,
item_key = item_key,
item_key_prefix = item_key_prefix,
workspace_id = workspace_id,

load_into_db = load_into_db,
load_into_cache = load_into_cache,
Expand Down
5 changes: 4 additions & 1 deletion kong/db/declarative/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,13 @@ _M.sanitize_output = declarative_export.sanitize_output
-- import
_M.get_current_hash = declarative_import.get_current_hash
_M.unique_field_key = declarative_import.unique_field_key
_M.foreign_field_key = declarative_import.foreign_field_key
_M.item_key = declarative_import.item_key
_M.item_key_prefix = declarative_import.item_key_prefix
_M.foreign_field_key_prefix = declarative_import.foreign_field_key_prefix
_M.load_into_db = declarative_import.load_into_db
_M.load_into_cache = declarative_import.load_into_cache
_M.load_into_cache_with_events = declarative_import.load_into_cache_with_events
_M.workspace_id = declarative_import.workspace_id


return _M
2 changes: 2 additions & 0 deletions kong/db/schema/others/declarative_config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ local foreign_children = {}
do
local CACHED_OUT

-- Generate a stable and unique string key from primary key defined inside
-- schema, supports both non-composite and composite primary keys
function DeclarativeConfig.pk_string(schema, object)
if #schema.primary_key == 1 then
return tostring(object[schema.primary_key[1]])
Expand Down
Loading

0 comments on commit dbe6259

Please sign in to comment.