Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Runtime: implement database service discovery and refactor (#1798)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrychenhf authored Aug 23, 2023
1 parent c09e19a commit 79eba02
Show file tree
Hide file tree
Showing 28 changed files with 793 additions and 544 deletions.
6 changes: 5 additions & 1 deletion python/cloudtik/core/_private/core_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,11 @@ def remove_files(path):
os.remove(os.path.join(path, file_name))


def get_string_value_for_env(val):
def get_env_string_value(val):
if isinstance(val, str):
return val
return json.dumps(val, separators=(",", ":"))


def get_address_string(host, port):
return "{}:{}".format(host, port)
25 changes: 22 additions & 3 deletions python/cloudtik/core/_private/service_discovery/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy
from enum import Enum, auto
from typing import Optional, Dict, Any, List
from typing import Optional, Dict, Any, List, Union

from cloudtik.core._private.core_utils import deserialize_config, serialize_config, \
get_list_for_update
Expand Down Expand Up @@ -244,9 +244,28 @@ def get_service_selector_for_update(config, config_key):
return service_selector


def include_runtime_for_selector(service_selector, runtime):
def include_runtime_for_selector(
service_selector, runtime: Union[str, List[str]], override=False):
runtimes = get_list_for_update(
service_selector, SERVICE_SELECTOR_RUNTIMES)
if runtime not in runtimes:
if runtimes:
if not override:
return service_selector
runtimes.clear()

if isinstance(runtime, str):
runtimes.append(runtime)
else:
# list of runtime types
for runtime_type in runtime:
runtimes.append(runtime_type)
return service_selector


def include_feature_for_selector(service_selector, feature):
tags = get_list_for_update(
service_selector, SERVICE_SELECTOR_TAGS)
feature_tag = SERVICE_DISCOVERY_TAG_FEATURE_PREFIX + feature
if feature_tag not in tags:
tags.append(feature_tag)
return service_selector
89 changes: 89 additions & 0 deletions python/cloudtik/core/_private/util/database_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os

from cloudtik.core._private.core_utils import get_env_string_value

DATABASE_CONFIG_ENGINE = "engine"
DATABASE_CONFIG_ADDRESS = "address"
DATABASE_CONFIG_PORT = "port"
DATABASE_CONFIG_USERNAME = "username"
DATABASE_CONFIG_PASSWORD = "password"

DATABASE_ENGINE_MYSQL = "mysql"
DATABASE_ENGINE_POSTGRES = "postgres"

DATABASE_ENV_ENABLED = "CLOUD_DATABASE"
DATABASE_ENV_ENGINE = "CLOUD_DATABASE_ENGINE"
DATABASE_ENV_ADDRESS = "CLOUD_DATABASE_HOSTNAME"
DATABASE_ENV_PORT = "CLOUD_DATABASE_PORT"
DATABASE_ENV_USERNAME = "CLOUD_DATABASE_USERNAME"
DATABASE_ENV_PASSWORD = "CLOUD_DATABASE_PASSWORD"

DATABASE_USERNAME_MYSQL_DEFAULT = "root"
DATABASE_USERNAME_POSTGRES_DEFAULT = "cloudtik"
DATABASE_PASSWORD_DEFAULT = "cloudtik"


def get_database_engine(database_config):
engine = database_config.get(DATABASE_CONFIG_ENGINE)
return get_validated_engine(engine)


def get_validated_engine(engine):
if engine and engine != DATABASE_ENGINE_MYSQL and engine != DATABASE_ENGINE_POSTGRES:
raise ValueError(
"The database engine type {} is not supported.".format(engine))
return engine or DATABASE_ENGINE_MYSQL


def get_database_port(database_config):
port = database_config.get(DATABASE_CONFIG_PORT)
if not port:
engine = get_database_engine(database_config)
port = 3306 if engine == DATABASE_ENGINE_MYSQL else 5432
return port


def get_database_username(database_config):
username = database_config.get(
DATABASE_CONFIG_USERNAME)
if not username:
engine = get_database_engine(database_config)
username = (DATABASE_USERNAME_MYSQL_DEFAULT
if engine == DATABASE_ENGINE_MYSQL
else DATABASE_USERNAME_POSTGRES_DEFAULT)
return username


def get_database_password(database_config):
return database_config.get(
DATABASE_CONFIG_PASSWORD, DATABASE_PASSWORD_DEFAULT)


def is_database_configured(database_config):
if not database_config:
return False
return True if database_config.get(
DATABASE_CONFIG_ADDRESS) else False


def set_database_config(database_config, database_service):
engine, service_addresses = database_service
# take one address
service_address = service_addresses[0]
database_config[DATABASE_CONFIG_ENGINE] = get_validated_engine(engine)
database_config[DATABASE_CONFIG_ADDRESS] = service_address[0]
database_config[DATABASE_CONFIG_PORT] = service_address[1]


def export_database_environment_variables(database_config):
if not is_database_configured(database_config):
return

os.environ[DATABASE_ENV_ENABLED] = get_env_string_value(True)
os.environ[DATABASE_ENV_ENGINE] = get_database_engine(database_config)
os.environ[DATABASE_ENV_ADDRESS] = database_config[DATABASE_CONFIG_ADDRESS]
os.environ[DATABASE_ENV_PORT] = str(get_database_port(database_config))

# The defaults apply to built-in Database runtime.
os.environ[DATABASE_ENV_USERNAME] = get_database_username(database_config)
os.environ[DATABASE_ENV_PASSWORD] = get_database_password(database_config)
16 changes: 0 additions & 16 deletions python/cloudtik/core/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3242,22 +3242,6 @@ def run_script(script, script_args, with_output=False):
raise err


def get_database_engine(database_config):
engine = database_config.get("engine")
if engine and engine != "mysql" and engine != "postgres":
raise ValueError(
"The database engine type {} is not supported.".format(engine))
return database_config.get("engine") or "mysql"


def get_database_port(database_config):
port = database_config.get("port")
if not port:
engine = get_database_engine(database_config)
port = 3306 if engine == "mysql" else 5432
return port


def get_runtime_config(config):
# There key cannot be empty for current implementation
return config.get(RUNTIME_CONFIG_KEY, {})
Expand Down
84 changes: 72 additions & 12 deletions python/cloudtik/core/config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,33 @@
}
}
}
},
"database_connect": {
"type": "object",
"description": "Database parameters for connect to an existing database.",
"additionalProperties": true,
"properties": {
"engine": {
"type": "string",
"description": "Database engine: mysql or postgres."
},
"address": {
"type": "string",
"description": "Database server address"
},
"port": {
"type": "number",
"description": "Database server port. default: mysql=3306, postgres=5432"
},
"username": {
"type": "string",
"description": "Database administrator login name. default: cloudtik"
},
"password": {
"type": "string",
"description": "Database administrator login password."
}
}
}
},
"required": [
Expand Down Expand Up @@ -507,14 +534,14 @@
"description": "AWS RDS for MySQL options",
"additionalProperties": true,
"properties": {
"server_address": {
"type": "string",
"description": "AWS RDS server address"
},
"engine": {
"type": "string",
"description": "AWS RDS engine: mysql or postgres. default: mysql"
},
"address": {
"type": "string",
"description": "AWS RDS server address"
},
"port": {
"type": "number",
"description": "AWS RDS server port. default: mysql=3306, postgres=5432"
Expand All @@ -534,14 +561,14 @@
"description": "Azure Database for MySQL options",
"additionalProperties": true,
"properties": {
"server_address": {
"type": "string",
"description": "Azure Database server address"
},
"engine": {
"type": "string",
"description": "Azure Database engine: mysql or postgres. default: mysql"
},
"address": {
"type": "string",
"description": "Azure Database server address"
},
"port": {
"type": "number",
"description": "Azure Database server port. default: mysql=3306, postgres=5432"
Expand All @@ -561,14 +588,14 @@
"description": "GCP Cloud SQL for MySQL options",
"additionalProperties": true,
"properties": {
"server_address": {
"type": "string",
"description": "GCP Cloud SQL server address"
},
"engine": {
"type": "string",
"description": "GCP Cloud SQL engine: mysql or postgres. default: mysql"
},
"address": {
"type": "string",
"description": "GCP Cloud SQL server address"
},
"port": {
"type": "number",
"description": "GCP Cloud SQL server port. default: mysql=3306, postgres=5432"
Expand Down Expand Up @@ -1136,6 +1163,19 @@
"with_gpu": {
"type": "boolean",
"description": "Whether to use GPU frameworks and libraries for AI"
},
"database": {
"$ref": "#/definitions/database_connect",
"description": "The database parameters. Engine, address, port are optional if using service discovery."
},
"database_service_discovery": {
"type": "boolean",
"description": "Whether to discover and use database service in the same workspace.",
"default": true
},
"database_service_selector": {
"$ref": "#/definitions/service_selector",
"description": "The selector for database service if service discovery is enabled."
}
}
},
Expand Down Expand Up @@ -1241,6 +1281,26 @@
}
}
},
"metastore": {
"type": "object",
"description": "Metastore runtime configurations",
"additionalProperties": true,
"properties": {
"database": {
"$ref": "#/definitions/database_connect",
"description": "The database parameters. Engine, address, port are optional if using service discovery."
},
"database_service_discovery": {
"type": "boolean",
"description": "Whether to discover and use database service in the same workspace.",
"default": true
},
"database_service_selector": {
"$ref": "#/definitions/service_selector",
"description": "The selector for database service if service discovery is enabled."
}
}
},
"kafka": {
"type": "object",
"description": "Kafka runtime configurations",
Expand Down
5 changes: 3 additions & 2 deletions python/cloudtik/providers/_private/_azure/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
from azure.mgmt.privatedns import PrivateDnsManagementClient

from cloudtik.core._private.constants import CLOUDTIK_DEFAULT_CLOUD_STORAGE_URI
from cloudtik.core._private.util.database_utils import get_database_engine, get_database_port
from cloudtik.core._private.utils import get_storage_config_for_update, get_database_config_for_update, \
get_database_engine, get_database_port, get_config_for_update
get_config_for_update
from cloudtik.providers._private._azure.azure_identity_credential_adapter import AzureIdentityCredentialAdapter

AZURE_DATABASE_ENDPOINT = "server_address"
AZURE_DATABASE_ENDPOINT = "address"


def get_azure_sdk_function(client: Any, function_name: str) -> Callable:
Expand Down
5 changes: 3 additions & 2 deletions python/cloudtik/providers/_private/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@

from cloudtik.core._private.cli_logger import cli_logger, cf
from cloudtik.core._private.constants import env_integer, CLOUDTIK_DEFAULT_CLOUD_STORAGE_URI
from cloudtik.core._private.util.database_utils import get_database_engine, get_database_port

# Max number of retries to AWS (default is 5, time increases exponentially)
from cloudtik.core._private.utils import get_storage_config_for_update, get_database_config_for_update, \
get_database_engine, get_database_port, get_config_for_update
get_config_for_update

BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)

# Max number of retries to create an EC2 node (retry different subnet)
BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5)

AWS_S3_BUCKET = "s3.bucket"
AWS_DATABASE_ENDPOINT = "server_address"
AWS_DATABASE_ENDPOINT = "address"


class LazyDefaultDict(defaultdict):
Expand Down
5 changes: 3 additions & 2 deletions python/cloudtik/providers/_private/gcp/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

from cloudtik.core._private.cli_logger import cli_logger
from cloudtik.core._private.constants import CLOUDTIK_DEFAULT_CLOUD_STORAGE_URI
from cloudtik.core._private.util.database_utils import get_database_engine, get_database_port
from cloudtik.core._private.utils import get_storage_config_for_update, get_database_config_for_update, \
get_database_engine, get_database_port, get_config_for_update
get_config_for_update
from cloudtik.providers._private.gcp.node import (GCPNodeType, MAX_POLLS,
POLL_INTERVAL)
from cloudtik.providers._private.gcp.node import GCPNode
Expand All @@ -28,7 +29,7 @@
"{account_id}@{project_id}.iam.gserviceaccount.com")

GCP_GCS_BUCKET = "gcs.bucket"
GCP_DATABASE_ENDPOINT = "server_address"
GCP_DATABASE_ENDPOINT = "address"


def _create_crm(gcp_credentials=None):
Expand Down
Loading

0 comments on commit 79eba02

Please sign in to comment.