Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Runtime: implement the basic functions of Kong API Gateway #1813

Merged
merged 2 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions python/cloudtik/core/_private/cluster/cluster_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@
from cloudtik.core._private.utils import validate_config, \
hash_launch_conf, hash_runtime_conf, \
format_info_string, get_commands_to_run, with_head_node_ip_environment_variables, \
encode_cluster_secrets, _get_node_specific_commands, _get_node_specific_config, \
_get_node_specific_commands, _get_node_specific_config, \
_get_node_specific_docker_config, _get_node_specific_runtime_config, \
_has_node_type_specific_runtime_config, get_runtime_config_key, RUNTIME_CONFIG_KEY, \
process_config_with_privacy, decrypt_config, CLOUDTIK_CLUSTER_SCALING_STATUS, get_runtime_encryption_key, \
with_runtime_encryption_key
with_runtime_encryption_key, PROVIDER_STORAGE_CONFIG_KEY, PROVIDER_DATABASE_CONFIG_KEY
from cloudtik.core._private.constants import CLOUDTIK_MAX_NUM_FAILURES, \
CLOUDTIK_MAX_LAUNCH_BATCH, CLOUDTIK_MAX_CONCURRENT_LAUNCHES, \
CLOUDTIK_UPDATE_INTERVAL_S, CLOUDTIK_HEARTBEAT_TIMEOUT_S, CLOUDTIK_RUNTIME_ENV_SECRETS, \
CLOUDTIK_UPDATE_INTERVAL_S, CLOUDTIK_HEARTBEAT_TIMEOUT_S, \
CLOUDTIK_SCALER_PERIODIC_STATUS_LOG

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -981,7 +981,8 @@ def _update_runtime_hashes(self, new_config):
"worker_setup_commands": get_commands_to_run(new_config, "worker_setup_commands"),
"worker_start_commands": get_commands_to_run(new_config, "worker_start_commands"),
"runtime": new_config.get(RUNTIME_CONFIG_KEY, {}),
"storage": new_config["provider"].get("storage", {})
"storage": new_config["provider"].get(PROVIDER_STORAGE_CONFIG_KEY, {}),
"database": new_config["provider"].get(PROVIDER_DATABASE_CONFIG_KEY, {})
}
(new_runtime_hash,
new_file_mounts_contents_hash,
Expand Down
7 changes: 7 additions & 0 deletions python/cloudtik/core/_private/runtime_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
BUILT_IN_RUNTIME_DNSMASQ = "dnsmasq"
BUILT_IN_RUNTIME_BIND = "bind"
BUILT_IN_RUNTIME_COREDNS = "coredns"
BUILT_IN_RUNTIME_KONG = "kong"
BUILT_IN_RUNTIME_APISIX = "apisix"

DEFAULT_RUNTIMES = [BUILT_IN_RUNTIME_PROMETHEUS, BUILT_IN_RUNTIME_NODE_EXPORTER, BUILT_IN_RUNTIME_SPARK]
Expand Down Expand Up @@ -164,6 +165,11 @@ def _import_coredns():
return CoreDNSRuntime


def _import_kong():
from cloudtik.runtime.kong.runtime import KongRuntime
return KongRuntime


def _import_apisix():
from cloudtik.runtime.apisix.runtime import APISIXRuntime
return APISIXRuntime
Expand Down Expand Up @@ -193,6 +199,7 @@ def _import_apisix():
BUILT_IN_RUNTIME_DNSMASQ: _import_dnsmasq,
BUILT_IN_RUNTIME_BIND: _import_bind,
BUILT_IN_RUNTIME_COREDNS: _import_coredns,
BUILT_IN_RUNTIME_KONG: _import_kong,
BUILT_IN_RUNTIME_APISIX: _import_apisix,
}

Expand Down
6 changes: 5 additions & 1 deletion python/cloudtik/core/_private/runtime_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import yaml

from cloudtik.core._private.constants import CLOUDTIK_RUNTIME_ENV_NODE_TYPE, CLOUDTIK_RUNTIME_ENV_NODE_IP, \
CLOUDTIK_RUNTIME_ENV_SECRETS, CLOUDTIK_RUNTIME_ENV_HEAD_IP
CLOUDTIK_RUNTIME_ENV_SECRETS, CLOUDTIK_RUNTIME_ENV_HEAD_IP, env_bool
from cloudtik.core._private.crypto import AESCipher
from cloudtik.core._private.utils import load_head_cluster_config, _get_node_type_specific_runtime_config, \
get_runtime_config_key, _get_key_from_kv, decode_cluster_secrets, CLOUDTIK_CLUSTER_NODES_INFO_NODE_TYPE
Expand All @@ -22,6 +22,10 @@ def get_runtime_value(name):
return os.environ.get(name)


def get_runtime_bool(name, default=False):
return env_bool(name, default)


def get_runtime_node_type():
# Node type should always be set as env
node_type = get_runtime_value(CLOUDTIK_RUNTIME_ENV_NODE_TYPE)
Expand Down
9 changes: 7 additions & 2 deletions python/cloudtik/core/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@
RUNTIME_TYPES_CONFIG_KEY = "types"
ENCRYPTION_KEY_CONFIG_KEY = "encryption.key"

PROVIDER_STORAGE_CONFIG_KEY = "storage"
PROVIDER_DATABASE_CONFIG_KEY = "database"

PRIVACY_CONFIG_KEYS = ["credentials", "account.key", "secret", "access.key", "private.key", "encryption.key"]

NODE_INFO_NODE_ID = "node_id"
Expand Down Expand Up @@ -3065,11 +3068,13 @@ def convert_nodes_to_resource(


def get_storage_config_for_update(provider_config):
return get_config_for_update(provider_config, "storage")
return get_config_for_update(
provider_config, PROVIDER_STORAGE_CONFIG_KEY)


def get_database_config_for_update(provider_config):
return get_config_for_update(provider_config, "database")
return get_config_for_update(
provider_config, PROVIDER_DATABASE_CONFIG_KEY)


def print_json_formatted(json_bytes):
Expand Down
30 changes: 30 additions & 0 deletions python/cloudtik/core/config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,36 @@
"description": "Explicit set ETCD service to use."
}
}
},
"kong": {
"type": "object",
"description": "Kong runtime configurations",
"additionalProperties": true,
"properties": {
"port": {
"type": "integer",
"default": 8000,
"description": "Kong service port."
},
"ssl_port": {
"type": "integer",
"default": 8443,
"description": "Kong service SSL port."
},
"database": {
"$ref": "#/definitions/database_connect",
"description": "The database parameters. Engine, address, port are optional if using service discovery."
},
"database_service_discovery": {
"type": "boolean",
"description": "Whether to discover and use database service in the same workspace.",
"default": true
},
"database_service_selector": {
"$ref": "#/definitions/service_selector",
"description": "The selector for database service if service discovery is enabled."
}
}
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions python/cloudtik/providers/_private/_azure/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from cloudtik.core._private.util.database_utils import get_database_engine, get_database_port, DATABASE_ENV_ENGINE, \
DATABASE_ENV_ENABLED, DATABASE_ENV_PASSWORD, DATABASE_ENV_USERNAME, DATABASE_ENV_PORT, DATABASE_ENV_HOST
from cloudtik.core._private.utils import get_storage_config_for_update, get_database_config_for_update, \
get_config_for_update
get_config_for_update, PROVIDER_DATABASE_CONFIG_KEY, PROVIDER_STORAGE_CONFIG_KEY
from cloudtik.providers._private._azure.azure_identity_credential_adapter import AzureIdentityCredentialAdapter

AZURE_DATABASE_ENDPOINT = "address"
Expand Down Expand Up @@ -196,10 +196,11 @@ def _construct_private_dns_client(provider_config):


def get_azure_cloud_storage_config(provider_config: Dict[str, Any]):
if "storage" in provider_config and "azure_cloud_storage" in provider_config["storage"]:
return provider_config["storage"]["azure_cloud_storage"]
storage_config = provider_config.get(PROVIDER_STORAGE_CONFIG_KEY)
if not storage_config:
return None

return None
return storage_config.get("azure_cloud_storage")


def get_azure_cloud_storage_config_for_update(provider_config: Dict[str, Any]):
Expand Down Expand Up @@ -270,10 +271,11 @@ def get_default_azure_cloud_storage(provider_config):


def get_azure_database_config(provider_config: Dict[str, Any], default=None):
if "database" in provider_config and "azure.database" in provider_config["database"]:
return provider_config["database"]["azure.database"]
database_config = provider_config.get(PROVIDER_DATABASE_CONFIG_KEY)
if not database_config:
return default

return default
return database_config.get("azure.database", default)


def get_azure_database_engine(database_config):
Expand Down
9 changes: 5 additions & 4 deletions python/cloudtik/providers/_private/_kubernetes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from cloudtik.core._private.docker import get_versioned_image
from cloudtik.core._private.providers import _get_node_provider
from cloudtik.core._private.utils import is_use_internal_ip, get_running_head_node, binary_to_hex, hex_to_binary, \
get_head_service_ports, _is_use_managed_cloud_storage, _is_use_internal_ip, is_gpu_runtime
get_head_service_ports, _is_use_managed_cloud_storage, _is_use_internal_ip, is_gpu_runtime, \
PROVIDER_DATABASE_CONFIG_KEY, PROVIDER_STORAGE_CONFIG_KEY
from cloudtik.core.tags import CLOUDTIK_TAG_CLUSTER_NAME, CLOUDTIK_TAG_NODE_KIND, NODE_KIND_HEAD, \
CLOUDTIK_GLOBAL_VARIABLE_KEY, CLOUDTIK_GLOBAL_VARIABLE_KEY_PREFIX
from cloudtik.core.workspace_provider import Existence
Expand Down Expand Up @@ -1255,7 +1256,7 @@ def _parse_cpu_or_gpu_resource(resource):


def get_default_kubernetes_cloud_storage(provider_config):
storage_config = provider_config.get("storage", {})
storage_config = provider_config.get(PROVIDER_STORAGE_CONFIG_KEY, {})

if "aws_s3_storage" in storage_config:
from cloudtik.providers._private._kubernetes.aws_eks.config import get_default_kubernetes_cloud_storage_for_aws
Expand All @@ -1272,7 +1273,7 @@ def get_default_kubernetes_cloud_storage(provider_config):


def get_default_kubernetes_cloud_database(provider_config):
database_config = provider_config.get("database", {})
database_config = provider_config.get(PROVIDER_DATABASE_CONFIG_KEY, {})

if "aws.database" in database_config:
from cloudtik.providers._private._kubernetes.aws_eks.config import \
Expand All @@ -1293,7 +1294,7 @@ def get_default_kubernetes_cloud_database(provider_config):
def with_kubernetes_environment_variables(provider_config, node_type_config: Dict[str, Any], node_id: str):
config_dict = {}

storage_config = provider_config.get("storage", {})
storage_config = provider_config.get(PROVIDER_STORAGE_CONFIG_KEY, {})

if "aws_s3_storage" in storage_config:
from cloudtik.providers._private._kubernetes.aws_eks.config import with_aws_environment_variables
Expand Down
10 changes: 6 additions & 4 deletions python/cloudtik/providers/_private/aliyun/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from typing import Any, Dict, List

from cloudtik.core._private.constants import CLOUDTIK_DEFAULT_CLOUD_STORAGE_URI
from cloudtik.core._private.utils import get_storage_config_for_update, get_config_for_update
from cloudtik.core._private.utils import get_storage_config_for_update, get_config_for_update, \
PROVIDER_STORAGE_CONFIG_KEY

from cloudtik.core._private.cli_logger import cli_logger

Expand All @@ -31,10 +32,11 @@


def get_aliyun_oss_storage_config(provider_config: Dict[str, Any]):
if "storage" in provider_config and "aliyun_oss_storage" in provider_config["storage"]:
return provider_config["storage"]["aliyun_oss_storage"]
storage_config = provider_config.get(PROVIDER_STORAGE_CONFIG_KEY)
if not storage_config:
return None

return None
return storage_config.get("aliyun_oss_storage")


def get_aliyun_oss_storage_config_for_update(provider_config: Dict[str, Any]):
Expand Down
16 changes: 9 additions & 7 deletions python/cloudtik/providers/_private/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# Max number of retries to AWS (default is 5, time increases exponentially)
from cloudtik.core._private.utils import get_storage_config_for_update, get_database_config_for_update, \
get_config_for_update
get_config_for_update, PROVIDER_DATABASE_CONFIG_KEY, PROVIDER_STORAGE_CONFIG_KEY

BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)

Expand Down Expand Up @@ -155,10 +155,11 @@ def __exit__(self, type, value, tb):


def get_aws_s3_storage_config(provider_config: Dict[str, Any]):
if "storage" in provider_config and "aws_s3_storage" in provider_config["storage"]:
return provider_config["storage"]["aws_s3_storage"]
storage_config = provider_config.get(PROVIDER_STORAGE_CONFIG_KEY)
if not storage_config:
return None

return None
return storage_config.get("aws_s3_storage")


def get_aws_s3_storage_config_for_update(provider_config: Dict[str, Any]):
Expand Down Expand Up @@ -209,10 +210,11 @@ def get_default_aws_cloud_storage(provider_config):


def get_aws_database_config(provider_config: Dict[str, Any], default=None):
if "database" in provider_config and "aws.database" in provider_config["database"]:
return provider_config["database"]["aws.database"]
database_config = provider_config.get(PROVIDER_DATABASE_CONFIG_KEY)
if not database_config:
return default

return default
return database_config.get("aws.database", default)


def get_aws_database_engine(database_config):
Expand Down
16 changes: 9 additions & 7 deletions python/cloudtik/providers/_private/gcp/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from cloudtik.core._private.util.database_utils import get_database_engine, get_database_port, DATABASE_ENV_ENABLED, \
DATABASE_ENV_ENGINE, DATABASE_ENV_HOST, DATABASE_ENV_PORT, DATABASE_ENV_USERNAME, DATABASE_ENV_PASSWORD
from cloudtik.core._private.utils import get_storage_config_for_update, get_database_config_for_update, \
get_config_for_update
get_config_for_update, PROVIDER_DATABASE_CONFIG_KEY, PROVIDER_STORAGE_CONFIG_KEY
from cloudtik.providers._private.gcp.node import (GCPNodeType, MAX_POLLS,
POLL_INTERVAL)
from cloudtik.providers._private.gcp.node import GCPNode
Expand Down Expand Up @@ -354,10 +354,11 @@ def _is_head_node_a_tpu(config: dict) -> bool:


def get_gcp_cloud_storage_config(provider_config: Dict[str, Any]):
if "storage" in provider_config and "gcp_cloud_storage" in provider_config["storage"]:
return provider_config["storage"]["gcp_cloud_storage"]
storage_config = provider_config.get(PROVIDER_STORAGE_CONFIG_KEY)
if not storage_config:
return None

return None
return storage_config.get("gcp_cloud_storage")


def get_gcp_cloud_storage_config_for_update(provider_config: Dict[str, Any]):
Expand Down Expand Up @@ -419,10 +420,11 @@ def get_default_gcp_cloud_storage(provider_config):


def get_gcp_database_config(provider_config: Dict[str, Any], default=None):
if "database" in provider_config and "gcp.database" in provider_config["database"]:
return provider_config["database"]["gcp.database"]
database_config = provider_config.get(PROVIDER_DATABASE_CONFIG_KEY)
if not database_config:
return default

return default
return database_config.get("gcp.database", default)


def get_gcp_database_engine(database_config):
Expand Down
11 changes: 6 additions & 5 deletions python/cloudtik/providers/_private/huaweicloud/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

from cloudtik.core._private.constants import \
CLOUDTIK_DEFAULT_CLOUD_STORAGE_URI, env_bool
from cloudtik.core._private.utils import get_storage_config_for_update, get_config_for_update
from cloudtik.core._private.utils import get_storage_config_for_update, get_config_for_update, \
PROVIDER_STORAGE_CONFIG_KEY

OBS_SERVICES_URL = 'https://obs.{location}.myhuaweicloud.com'
OBS_SERVICES_DEFAULT_URL = 'https://obs.myhuaweicloud.com'
Expand Down Expand Up @@ -197,11 +198,11 @@ def _make_obs_client(config_provider: Dict[str, Any], region=None) -> Any:


def get_huaweicloud_obs_storage_config(provider_config: Dict[str, Any]):
if "storage" in provider_config and "huaweicloud_obs_storage" in \
provider_config["storage"]:
return provider_config["storage"]["huaweicloud_obs_storage"]
storage_config = provider_config.get(PROVIDER_STORAGE_CONFIG_KEY)
if not storage_config:
return None

return None
return storage_config.get("huaweicloud_obs_storage")


def get_huaweicloud_obs_storage_endpoint(region: str = None) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from kubernetes.client.rest import ApiException

from cloudtik.core._private import constants
from cloudtik.core._private.core_utils import get_config_for_update
from cloudtik.providers._private._kubernetes import custom_objects_api
from cloudtik.providers._private._kubernetes.config import _get_cluster_selector
from cloudtik.providers._private._kubernetes.node_provider import head_service_selector
from cloudtik.core._private.utils import _get_default_config
from cloudtik.core._private.utils import _get_default_config, PROVIDER_STORAGE_CONFIG_KEY

CLOUDTIK_API_GROUP = "cloudtik.io"
CLOUDTIK_API_VERSION = "v1"
Expand Down Expand Up @@ -230,9 +231,8 @@ def configure_cloud_storage(
if "cloudStorage" not in cloud_config:
return

if "storage" not in provider_config:
provider_config["storage"] = {}
storage_config = provider_config["storage"]
storage_config = get_config_for_update(
provider_config, PROVIDER_STORAGE_CONFIG_KEY)

cloud_storage = cloud_config["cloudStorage"]
for field in cloud_storage:
Expand Down
1 change: 0 additions & 1 deletion python/cloudtik/runtime/ai/scripts/schema-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ function create_database_schema() {
}

function init_schema() {
DATABASE_NAME=hive_metastore
if [ "${SQL_DATABASE}" == "true" ] \
&& [ "$AI_WITH_SQL_DATABASE" != "false" ]; then
create_database_schema
Expand Down
6 changes: 5 additions & 1 deletion python/cloudtik/runtime/apisix/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cloudtik.runtime.common.runtime_base import RuntimeBase
from cloudtik.runtime.apisix.utils import _get_runtime_processes, \
_get_runtime_services, _with_runtime_environment_variables, _config_depended_services, _prepare_config_on_head, \
_validate_config, _get_runtime_endpoints, _get_head_service_ports
_validate_config, _get_runtime_endpoints, _get_head_service_ports, _get_runtime_logs

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,6 +53,10 @@ def get_head_service_ports(self) -> Dict[str, Any]:
def get_runtime_services(self, cluster_name: str):
return _get_runtime_services(self.runtime_config, cluster_name)

@staticmethod
def get_logs() -> Dict[str, str]:
return _get_runtime_logs()

@staticmethod
def get_processes():
return _get_runtime_processes()
Expand Down
Loading