diff --git a/python/cloudtik/core/_private/core_utils.py b/python/cloudtik/core/_private/core_utils.py index d953e1955..3e147be2e 100644 --- a/python/cloudtik/core/_private/core_utils.py +++ b/python/cloudtik/core/_private/core_utils.py @@ -906,7 +906,11 @@ def remove_files(path): os.remove(os.path.join(path, file_name)) -def get_string_value_for_env(val): +def get_env_string_value(val): if isinstance(val, str): return val return json.dumps(val, separators=(",", ":")) + + +def get_address_string(host, port): + return "{}:{}".format(host, port) diff --git a/python/cloudtik/core/_private/service_discovery/utils.py b/python/cloudtik/core/_private/service_discovery/utils.py index ca95706e8..8d6b52775 100644 --- a/python/cloudtik/core/_private/service_discovery/utils.py +++ b/python/cloudtik/core/_private/service_discovery/utils.py @@ -1,6 +1,6 @@ import copy from enum import Enum, auto -from typing import Optional, Dict, Any, List +from typing import Optional, Dict, Any, List, Union from cloudtik.core._private.core_utils import deserialize_config, serialize_config, \ get_list_for_update @@ -244,9 +244,28 @@ def get_service_selector_for_update(config, config_key): return service_selector -def include_runtime_for_selector(service_selector, runtime): +def include_runtime_for_selector( + service_selector, runtime: Union[str, List[str]], override=False): runtimes = get_list_for_update( service_selector, SERVICE_SELECTOR_RUNTIMES) - if runtime not in runtimes: + if runtimes: + if not override: + return service_selector + runtimes.clear() + + if isinstance(runtime, str): runtimes.append(runtime) + else: + # list of runtime types + for runtime_type in runtime: + runtimes.append(runtime_type) + return service_selector + + +def include_feature_for_selector(service_selector, feature): + tags = get_list_for_update( + service_selector, SERVICE_SELECTOR_TAGS) + feature_tag = SERVICE_DISCOVERY_TAG_FEATURE_PREFIX + feature + if feature_tag not in tags: + tags.append(feature_tag) return service_selector diff --git a/python/cloudtik/core/_private/util/database_utils.py b/python/cloudtik/core/_private/util/database_utils.py new file mode 100644 index 000000000..dfff46204 --- /dev/null +++ b/python/cloudtik/core/_private/util/database_utils.py @@ -0,0 +1,89 @@ +import os + +from cloudtik.core._private.core_utils import get_env_string_value + +DATABASE_CONFIG_ENGINE = "engine" +DATABASE_CONFIG_ADDRESS = "address" +DATABASE_CONFIG_PORT = "port" +DATABASE_CONFIG_USERNAME = "username" +DATABASE_CONFIG_PASSWORD = "password" + +DATABASE_ENGINE_MYSQL = "mysql" +DATABASE_ENGINE_POSTGRES = "postgres" + +DATABASE_ENV_ENABLED = "CLOUD_DATABASE" +DATABASE_ENV_ENGINE = "CLOUD_DATABASE_ENGINE" +DATABASE_ENV_ADDRESS = "CLOUD_DATABASE_HOSTNAME" +DATABASE_ENV_PORT = "CLOUD_DATABASE_PORT" +DATABASE_ENV_USERNAME = "CLOUD_DATABASE_USERNAME" +DATABASE_ENV_PASSWORD = "CLOUD_DATABASE_PASSWORD" + +DATABASE_USERNAME_MYSQL_DEFAULT = "root" +DATABASE_USERNAME_POSTGRES_DEFAULT = "cloudtik" +DATABASE_PASSWORD_DEFAULT = "cloudtik" + + +def get_database_engine(database_config): + engine = database_config.get(DATABASE_CONFIG_ENGINE) + return get_validated_engine(engine) + + +def get_validated_engine(engine): + if engine and engine != DATABASE_ENGINE_MYSQL and engine != DATABASE_ENGINE_POSTGRES: + raise ValueError( + "The database engine type {} is not supported.".format(engine)) + return engine or DATABASE_ENGINE_MYSQL + + +def get_database_port(database_config): + port = database_config.get(DATABASE_CONFIG_PORT) + if not port: + engine = get_database_engine(database_config) + port = 3306 if engine == DATABASE_ENGINE_MYSQL else 5432 + return port + + +def get_database_username(database_config): + username = database_config.get( + DATABASE_CONFIG_USERNAME) + if not username: + engine = get_database_engine(database_config) + username = (DATABASE_USERNAME_MYSQL_DEFAULT + if engine == DATABASE_ENGINE_MYSQL + else DATABASE_USERNAME_POSTGRES_DEFAULT) + return username + + +def get_database_password(database_config): + return database_config.get( + DATABASE_CONFIG_PASSWORD, DATABASE_PASSWORD_DEFAULT) + + +def is_database_configured(database_config): + if not database_config: + return False + return True if database_config.get( + DATABASE_CONFIG_ADDRESS) else False + + +def set_database_config(database_config, database_service): + engine, service_addresses = database_service + # take one address + service_address = service_addresses[0] + database_config[DATABASE_CONFIG_ENGINE] = get_validated_engine(engine) + database_config[DATABASE_CONFIG_ADDRESS] = service_address[0] + database_config[DATABASE_CONFIG_PORT] = service_address[1] + + +def export_database_environment_variables(database_config): + if not is_database_configured(database_config): + return + + os.environ[DATABASE_ENV_ENABLED] = get_env_string_value(True) + os.environ[DATABASE_ENV_ENGINE] = get_database_engine(database_config) + os.environ[DATABASE_ENV_ADDRESS] = database_config[DATABASE_CONFIG_ADDRESS] + os.environ[DATABASE_ENV_PORT] = str(get_database_port(database_config)) + + # The defaults apply to built-in Database runtime. + os.environ[DATABASE_ENV_USERNAME] = get_database_username(database_config) + os.environ[DATABASE_ENV_PASSWORD] = get_database_password(database_config) diff --git a/python/cloudtik/core/_private/utils.py b/python/cloudtik/core/_private/utils.py index 1921b3061..44d3213a2 100644 --- a/python/cloudtik/core/_private/utils.py +++ b/python/cloudtik/core/_private/utils.py @@ -3242,22 +3242,6 @@ def run_script(script, script_args, with_output=False): raise err -def get_database_engine(database_config): - engine = database_config.get("engine") - if engine and engine != "mysql" and engine != "postgres": - raise ValueError( - "The database engine type {} is not supported.".format(engine)) - return database_config.get("engine") or "mysql" - - -def get_database_port(database_config): - port = database_config.get("port") - if not port: - engine = get_database_engine(database_config) - port = 3306 if engine == "mysql" else 5432 - return port - - def get_runtime_config(config): # There key cannot be empty for current implementation return config.get(RUNTIME_CONFIG_KEY, {}) diff --git a/python/cloudtik/core/config-schema.json b/python/cloudtik/core/config-schema.json index a9e211a7f..823e9d34d 100644 --- a/python/cloudtik/core/config-schema.json +++ b/python/cloudtik/core/config-schema.json @@ -76,6 +76,33 @@ } } } + }, + "database_connect": { + "type": "object", + "description": "Database parameters for connect to an existing database.", + "additionalProperties": true, + "properties": { + "engine": { + "type": "string", + "description": "Database engine: mysql or postgres." + }, + "address": { + "type": "string", + "description": "Database server address" + }, + "port": { + "type": "number", + "description": "Database server port. default: mysql=3306, postgres=5432" + }, + "username": { + "type": "string", + "description": "Database administrator login name. default: cloudtik" + }, + "password": { + "type": "string", + "description": "Database administrator login password." + } + } } }, "required": [ @@ -507,14 +534,14 @@ "description": "AWS RDS for MySQL options", "additionalProperties": true, "properties": { - "server_address": { - "type": "string", - "description": "AWS RDS server address" - }, "engine": { "type": "string", "description": "AWS RDS engine: mysql or postgres. default: mysql" }, + "address": { + "type": "string", + "description": "AWS RDS server address" + }, "port": { "type": "number", "description": "AWS RDS server port. default: mysql=3306, postgres=5432" @@ -534,14 +561,14 @@ "description": "Azure Database for MySQL options", "additionalProperties": true, "properties": { - "server_address": { - "type": "string", - "description": "Azure Database server address" - }, "engine": { "type": "string", "description": "Azure Database engine: mysql or postgres. default: mysql" }, + "address": { + "type": "string", + "description": "Azure Database server address" + }, "port": { "type": "number", "description": "Azure Database server port. default: mysql=3306, postgres=5432" @@ -561,14 +588,14 @@ "description": "GCP Cloud SQL for MySQL options", "additionalProperties": true, "properties": { - "server_address": { - "type": "string", - "description": "GCP Cloud SQL server address" - }, "engine": { "type": "string", "description": "GCP Cloud SQL engine: mysql or postgres. default: mysql" }, + "address": { + "type": "string", + "description": "GCP Cloud SQL server address" + }, "port": { "type": "number", "description": "GCP Cloud SQL server port. default: mysql=3306, postgres=5432" @@ -1136,6 +1163,19 @@ "with_gpu": { "type": "boolean", "description": "Whether to use GPU frameworks and libraries for AI" + }, + "database": { + "$ref": "#/definitions/database_connect", + "description": "The database parameters. Engine, address, port are optional if using service discovery." + }, + "database_service_discovery": { + "type": "boolean", + "description": "Whether to discover and use database service in the same workspace.", + "default": true + }, + "database_service_selector": { + "$ref": "#/definitions/service_selector", + "description": "The selector for database service if service discovery is enabled." } } }, @@ -1241,6 +1281,26 @@ } } }, + "metastore": { + "type": "object", + "description": "Metastore runtime configurations", + "additionalProperties": true, + "properties": { + "database": { + "$ref": "#/definitions/database_connect", + "description": "The database parameters. Engine, address, port are optional if using service discovery." + }, + "database_service_discovery": { + "type": "boolean", + "description": "Whether to discover and use database service in the same workspace.", + "default": true + }, + "database_service_selector": { + "$ref": "#/definitions/service_selector", + "description": "The selector for database service if service discovery is enabled." + } + } + }, "kafka": { "type": "object", "description": "Kafka runtime configurations", diff --git a/python/cloudtik/providers/_private/_azure/utils.py b/python/cloudtik/providers/_private/_azure/utils.py index 1148a99fd..3fa429e55 100644 --- a/python/cloudtik/providers/_private/_azure/utils.py +++ b/python/cloudtik/providers/_private/_azure/utils.py @@ -14,11 +14,12 @@ from azure.mgmt.privatedns import PrivateDnsManagementClient from cloudtik.core._private.constants import CLOUDTIK_DEFAULT_CLOUD_STORAGE_URI +from cloudtik.core._private.util.database_utils import get_database_engine, get_database_port from cloudtik.core._private.utils import get_storage_config_for_update, get_database_config_for_update, \ - get_database_engine, get_database_port, get_config_for_update + get_config_for_update from cloudtik.providers._private._azure.azure_identity_credential_adapter import AzureIdentityCredentialAdapter -AZURE_DATABASE_ENDPOINT = "server_address" +AZURE_DATABASE_ENDPOINT = "address" def get_azure_sdk_function(client: Any, function_name: str) -> Callable: diff --git a/python/cloudtik/providers/_private/aws/utils.py b/python/cloudtik/providers/_private/aws/utils.py index 804024af2..33c34c2c7 100644 --- a/python/cloudtik/providers/_private/aws/utils.py +++ b/python/cloudtik/providers/_private/aws/utils.py @@ -8,10 +8,11 @@ from cloudtik.core._private.cli_logger import cli_logger, cf from cloudtik.core._private.constants import env_integer, CLOUDTIK_DEFAULT_CLOUD_STORAGE_URI +from cloudtik.core._private.util.database_utils import get_database_engine, get_database_port # Max number of retries to AWS (default is 5, time increases exponentially) from cloudtik.core._private.utils import get_storage_config_for_update, get_database_config_for_update, \ - get_database_engine, get_database_port, get_config_for_update + get_config_for_update BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12) @@ -19,7 +20,7 @@ BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5) AWS_S3_BUCKET = "s3.bucket" -AWS_DATABASE_ENDPOINT = "server_address" +AWS_DATABASE_ENDPOINT = "address" class LazyDefaultDict(defaultdict): diff --git a/python/cloudtik/providers/_private/gcp/utils.py b/python/cloudtik/providers/_private/gcp/utils.py index f386cdf29..7a89dc45c 100644 --- a/python/cloudtik/providers/_private/gcp/utils.py +++ b/python/cloudtik/providers/_private/gcp/utils.py @@ -10,8 +10,9 @@ from cloudtik.core._private.cli_logger import cli_logger from cloudtik.core._private.constants import CLOUDTIK_DEFAULT_CLOUD_STORAGE_URI +from cloudtik.core._private.util.database_utils import get_database_engine, get_database_port from cloudtik.core._private.utils import get_storage_config_for_update, get_database_config_for_update, \ - get_database_engine, get_database_port, get_config_for_update + get_config_for_update from cloudtik.providers._private.gcp.node import (GCPNodeType, MAX_POLLS, POLL_INTERVAL) from cloudtik.providers._private.gcp.node import GCPNode @@ -28,7 +29,7 @@ "{account_id}@{project_id}.iam.gserviceaccount.com") GCP_GCS_BUCKET = "gcs.bucket" -GCP_DATABASE_ENDPOINT = "server_address" +GCP_DATABASE_ENDPOINT = "address" def _create_crm(gcp_credentials=None): diff --git a/python/cloudtik/runtime/ai/utils.py b/python/cloudtik/runtime/ai/utils.py index aeb8402df..eb19dba15 100644 --- a/python/cloudtik/runtime/ai/utils.py +++ b/python/cloudtik/runtime/ai/utils.py @@ -1,14 +1,14 @@ import os from typing import Any, Dict -from cloudtik.core._private.core_utils import get_config_for_update, get_string_value_for_env +from cloudtik.core._private.core_utils import get_env_string_value from cloudtik.core._private.providers import _get_node_provider from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_AI from cloudtik.core._private.service_discovery.utils import get_canonical_service_name, define_runtime_service_on_head, \ get_service_discovery_config, SERVICE_DISCOVERY_PROTOCOL_HTTP -from cloudtik.core._private.utils import export_runtime_flags, RUNTIME_CONFIG_KEY, get_runtime_config -from cloudtik.runtime.common.service_discovery.discovery import DiscoveryType -from cloudtik.runtime.common.service_discovery.runtime_discovery import discover_hdfs +from cloudtik.core._private.utils import export_runtime_flags +from cloudtik.runtime.common.service_discovery.runtime_discovery import discover_hdfs_on_head, \ + discover_hdfs_from_workspace, HDFS_URI_KEY from cloudtik.runtime.common.service_discovery.workspace import register_service_to_workspace from cloudtik.runtime.common.utils import get_runtime_endpoints_of @@ -20,9 +20,6 @@ ["mlflow.server:app", False, "MLflow", "head"], ] -AI_HDFS_NAMENODE_URI_KEY = "hdfs_namenode_uri" -AI_HDFS_SERVICE_SELECTOR_KEY = "hdfs_service_selector" - MLFLOW_SERVICE_NAME = "mlflow" MLFLOW_SERVICE_PORT = 5001 @@ -31,55 +28,19 @@ def _get_config(runtime_config: Dict[str, Any]): return runtime_config.get(BUILT_IN_RUNTIME_AI, {}) -def _is_hdfs_service_discovery(ai_config): - return ai_config.get("hdfs_service_discovery", True) - - def _get_runtime_processes(): return RUNTIME_PROCESSES def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]: - runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY) - ai_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_AI) - - if ai_config.get(AI_HDFS_NAMENODE_URI_KEY) is None: - if _is_hdfs_service_discovery(ai_config): - hdfs_namenode_uri = discover_hdfs( - ai_config, AI_HDFS_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.WORKSPACE) - if hdfs_namenode_uri is not None: - ai_config[AI_HDFS_NAMENODE_URI_KEY] = hdfs_namenode_uri - + cluster_config = discover_hdfs_from_workspace( + cluster_config, BUILT_IN_RUNTIME_AI) return cluster_config def _prepare_config_on_head(cluster_config: Dict[str, Any]): - cluster_config = _discover_hdfs_on_head(cluster_config) - return cluster_config - - -def _discover_hdfs_on_head(cluster_config: Dict[str, Any]): - runtime_config = get_runtime_config(cluster_config) - ai_config = _get_config(runtime_config) - if not _is_hdfs_service_discovery(ai_config): - return cluster_config - - hdfs_namenode_uri = ai_config.get(AI_HDFS_NAMENODE_URI_KEY) - if hdfs_namenode_uri: - # HDFS already configured - return cluster_config - - # There is service discovery to come here - hdfs_namenode_uri = discover_hdfs( - ai_config, AI_HDFS_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.CLUSTER) - if hdfs_namenode_uri: - ai_config = get_config_for_update( - runtime_config, BUILT_IN_RUNTIME_AI) - ai_config[AI_HDFS_NAMENODE_URI_KEY] = hdfs_namenode_uri + cluster_config = discover_hdfs_on_head( + cluster_config, BUILT_IN_RUNTIME_AI) return cluster_config @@ -100,12 +61,12 @@ def _configure(runtime_config, head: bool): hadoop_default_cluster = ai_config.get( "hadoop_default_cluster", False) if hadoop_default_cluster: - os.environ["HADOOP_DEFAULT_CLUSTER"] = get_string_value_for_env( + os.environ["HADOOP_DEFAULT_CLUSTER"] = get_env_string_value( hadoop_default_cluster) - hdfs_namenode_uri = ai_config.get(AI_HDFS_NAMENODE_URI_KEY) - if hdfs_namenode_uri: - os.environ["HDFS_NAMENODE_URI"] = hdfs_namenode_uri + hdfs_uri = ai_config.get(HDFS_URI_KEY) + if hdfs_uri: + os.environ["HDFS_NAMENODE_URI"] = hdfs_uri def register_service(cluster_config: Dict[str, Any], head_node_id: str) -> None: diff --git a/python/cloudtik/runtime/common/service_discovery/consul.py b/python/cloudtik/runtime/common/service_discovery/consul.py index 859587d3e..185c2dbad 100644 --- a/python/cloudtik/runtime/common/service_discovery/consul.py +++ b/python/cloudtik/runtime/common/service_discovery/consul.py @@ -4,9 +4,10 @@ from cloudtik.core._private.service_discovery.utils import SERVICE_SELECTOR_SERVICES, SERVICE_SELECTOR_TAGS, \ SERVICE_SELECTOR_LABELS, SERVICE_SELECTOR_EXCLUDE_LABELS, SERVICE_DISCOVERY_LABEL_CLUSTER, \ SERVICE_SELECTOR_RUNTIMES, SERVICE_SELECTOR_CLUSTERS, SERVICE_SELECTOR_EXCLUDE_JOINED_LABELS, \ - SERVICE_DISCOVERY_TAG_CLUSTER_PREFIX, SERVICE_DISCOVERY_TAG_SYSTEM_PREFIX, ServiceAddressType + SERVICE_DISCOVERY_TAG_CLUSTER_PREFIX, SERVICE_DISCOVERY_TAG_SYSTEM_PREFIX, ServiceAddressType, \ + SERVICE_DISCOVERY_LABEL_RUNTIME from cloudtik.core._private.util.rest_api import rest_api_get_json - +from cloudtik.runtime.common.service_discovery.utils import ServiceInstance REST_ENDPOINT_URL_FORMAT = "http://{}:{}{}" REST_ENDPOINT_CATALOG = "/v1/catalog" @@ -173,6 +174,14 @@ def get_service_cluster_of_node(service_node): return service_meta.get(SERVICE_DISCOVERY_LABEL_CLUSTER) +def get_service_runtime_of_node(service_node): + # This is our service implementation specific + # each service will be labeled with its runtime name + service_meta = service_node.get( + "ServiceMeta", {}) + return service_meta.get(SERVICE_DISCOVERY_LABEL_RUNTIME) + + def get_service_dns_name( service_name, service_tag=None, service_cluster=None): if service_tag and service_cluster: @@ -229,39 +238,64 @@ def get_service_fqdn_address(service_name, service_tags): return get_service_dns_name(service_name, service_tag) -def query_one_service_nodes_from_consul( - service_selector, address: Optional[Tuple[str, int]] = None): +def _get_cluster_of_service_nodes(service_nodes): + cluster_names = set() + for service_node in service_nodes: + cluster_name = get_service_cluster_of_node(service_node) + if cluster_name: + cluster_names.add(cluster_name) + if len(cluster_names) > 1: + return None + return next(iter(cluster_names)) + + +def query_one_service_from_consul( + service_selector, + address_type: ServiceAddressType = ServiceAddressType.NODE_IP, + address: Optional[Tuple[str, int]] = None): services = query_services(service_selector, address=address) if not services: return None - for service_name in services: - return service_name, query_service_nodes( - service_name, service_selector, address=address) - - return None + service_name, service_tags = next(iter(services.items())) + service_instance = query_service_from_consul( + service_name, service_tags, service_selector, + address_type=address_type, address=address + ) + return service_instance -def query_one_service_from_consul( - service_selector, address_type: ServiceAddressType = ServiceAddressType.NODE_IP, +def query_service_from_consul( + service_name, service_tags, service_selector, + address_type: ServiceAddressType = ServiceAddressType.NODE_IP, address: Optional[Tuple[str, int]] = None): + service_nodes = query_service_nodes( + service_name, service_selector, address=address) + if not service_nodes: + return None + + # the runtime of the service nodes should be the same + service_node = service_nodes[0] + runtime_type = get_service_runtime_of_node(service_node) + + # WARNING: the cluster of the service nodes may not be the same. None if not the same + cluster_name = _get_cluster_of_service_nodes(service_nodes) + if address_type == ServiceAddressType.SERVICE_FQDN: # return service FQDN - services = query_services(service_selector, address=address) - if not services: - return None - service_name, service_tags = next(iter(services.items())) - return get_service_fqdn_address(service_name, service_tags) + # TODO: the service tags include tags from all nodes? + service_addresses = [get_service_fqdn_address( + service_name, service_tags)] else: # return service nodes IP or nodes FQDN - service_and_nodes = query_one_service_nodes_from_consul( - service_selector, address=address) - if not service_and_nodes: - return None - service_name, service_nodes = service_and_nodes - return service_name, [get_service_address_of_node( + service_addresses = [get_service_address_of_node( service_node, address_type=address_type) for service_node in service_nodes] + return ServiceInstance( + service_name, service_addresses, + runtime_type=runtime_type, + cluster_name=cluster_name) + def query_services_from_consul( service_selector, address_type: ServiceAddressType = ServiceAddressType.NODE_IP, @@ -271,16 +305,12 @@ def query_services_from_consul( return None services_to_return = {} - if address_type == ServiceAddressType.SERVICE_FQDN: - # return service FQDN - for service_name, service_tags in services.items(): - services_to_return[service_name] = get_service_fqdn_address( - service_name, service_tags) - else: - # return service nodes IP or nodes FQDN - for service_name, service_tags in services.items(): - service_nodes = query_service_nodes( - service_name, service_selector, address=address) - services_to_return[service_name] = [get_service_address_of_node( - service_node, address_type=address_type) for service_node in service_nodes] + for service_name, service_tags in services.items(): + service_instance = query_service_from_consul( + service_name, service_tags, service_selector, + address_type=address_type, address=address + ) + if service_instance: + services_to_return[service_name] = service_instance + return services_to_return diff --git a/python/cloudtik/runtime/common/service_discovery/runtime_discovery.py b/python/cloudtik/runtime/common/service_discovery/runtime_discovery.py index 3e96d58c8..58a21bd36 100644 --- a/python/cloudtik/runtime/common/service_discovery/runtime_discovery.py +++ b/python/cloudtik/runtime/common/service_discovery/runtime_discovery.py @@ -1,30 +1,91 @@ -from typing import Dict, Any +from typing import Dict, Any, Union, List +from cloudtik.core._private.core_utils import get_config_for_update from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_HDFS, BUILT_IN_RUNTIME_METASTORE, \ - BUILT_IN_RUNTIME_CONSUL, BUILT_IN_RUNTIME_ZOOKEEPER -from cloudtik.core._private.service_discovery.utils import get_service_selector_for_update, include_runtime_for_selector -from cloudtik.runtime.common.service_discovery.discovery import query_one_service + BUILT_IN_RUNTIME_CONSUL, BUILT_IN_RUNTIME_ZOOKEEPER, BUILT_IN_RUNTIME_MYSQL, BUILT_IN_RUNTIME_POSTGRES +from cloudtik.core._private.service_discovery.utils import get_service_selector_for_update, \ + include_runtime_for_selector, include_feature_for_selector +from cloudtik.core._private.util.database_utils import is_database_configured, set_database_config +from cloudtik.core._private.utils import get_runtime_config +from cloudtik.runtime.common.service_discovery.cluster import has_runtime_in_cluster +from cloudtik.runtime.common.service_discovery.discovery import query_one_service, DiscoveryType from cloudtik.runtime.common.service_discovery.utils import get_service_addresses_string +BUILT_IN_DATABASE_RUNTIMES = [BUILT_IN_RUNTIME_MYSQL, BUILT_IN_RUNTIME_POSTGRES] -def discover_runtime_service_addresses( +HDFS_URI_KEY = "hdfs_namenode_uri" +HDFS_SERVICE_DISCOVERY_KEY = "hdfs_service_discovery" +HDFS_SERVICE_SELECTOR_KEY = "hdfs_service_selector" + +METASTORE_URI_KEY = "hive_metastore_uri" +METASTORE_SERVICE_DISCOVERY_KEY = "metastore_service_discovery" +METASTORE_SERVICE_SELECTOR_KEY = "metastore_service_selector" + +ZOOKEEPER_CONNECT_KEY = "zookeeper_connect" +ZOOKEEPER_SERVICE_DISCOVERY_KEY = "zookeeper_service_discovery" +ZOOKEEPER_SERVICE_SELECTOR_KEY = "zookeeper_service_selector" + +DATABASE_CONNECT_KEY = "database" +DATABASE_SERVICE_DISCOVERY_KEY = "database_service_discovery" +DATABASE_SERVICE_SELECTOR_KEY = "database_service_selector" + + +def has_database_runtime_in_cluster(runtime_config): + for runtime_type in BUILT_IN_DATABASE_RUNTIMES: + if has_runtime_in_cluster(runtime_config, runtime_type): + return True + return False + + +def discover_runtime_service( config: Dict[str, Any], service_selector_key: str, - runtime_type: str, + runtime_type: Union[str, List[str]], cluster_config: Dict[str, Any], - discovery_type,): + discovery_type: DiscoveryType,): service_selector = get_service_selector_for_update( config, service_selector_key) + # if user provide runtimes in the selector, we don't override it + # because any runtimes in the list will be selected service_selector = include_runtime_for_selector( service_selector, runtime_type) - service = query_one_service( + service_instance = query_one_service( + cluster_config, service_selector, + discovery_type=discovery_type) + return service_instance + + +def discover_runtime_service_addresses( + config: Dict[str, Any], + service_selector_key: str, + runtime_type: Union[str, List[str]], + cluster_config: Dict[str, Any], + discovery_type: DiscoveryType,): + service_instance = discover_runtime_service( + config, service_selector_key, + runtime_type, cluster_config, discovery_type) + if not service_instance: + return None + return service_instance.service_addresses + + +def discover_runtime_service_addresses_by_feature( + config: Dict[str, Any], + service_selector_key: str, + feature: str, + cluster_config: Dict[str, Any], + discovery_type: DiscoveryType,): + service_selector = get_service_selector_for_update( + config, service_selector_key) + # WARNING: feature selecting doesn't work for workspace service registry + service_selector = include_feature_for_selector( + service_selector, feature) + service_instance = query_one_service( cluster_config, service_selector, discovery_type=discovery_type) - if not service: + if not service_instance: return None - # service include service name and service addresses - _, service_addresses = service - return service_addresses + return service_instance.service_addresses def discover_consul( @@ -90,6 +151,281 @@ def discover_metastore( return None # take one of them service_address = service_addresses[0] - hive_metastore_uri = "thrift://{}:{}".format( + metastore_uri = "thrift://{}:{}".format( service_address[0], service_address[1]) - return hive_metastore_uri + return metastore_uri + + +def discover_database( + config: Dict[str, Any], + service_selector_key: str, + cluster_config: Dict[str, Any], + discovery_type): + # TODO: because feature tag is not supported for workspace based discovery + # Use a list of database runtimes here. + service_instance = discover_runtime_service( + config, service_selector_key, + runtime_type=BUILT_IN_DATABASE_RUNTIMES, + cluster_config=cluster_config, + discovery_type=discovery_type, + ) + if not service_instance: + return None + engine = service_instance.runtime_type + return engine, service_instance.service_addresses + + +""" +Common help functions for runtime service discovery used by runtimes. +To use these common functions, some conventions are used. +The conventions to follow for each function are explained per function. +""" + + +""" +HDFS service discovery conventions: + 1. The hdfs service discovery flag is stored at METASTORE_SERVICE_DISCOVERY_KEY defined above + 2. The hdfs service selector is stored at HDFS_SERVICE_SELECTOR_KEY defined above + 3. The hdfs uri is stored at HDFS_URI_KEY defined above +""" + + +def is_hdfs_service_discovery(runtime_type_config): + return runtime_type_config.get(HDFS_SERVICE_DISCOVERY_KEY, True) + + +def discover_hdfs_from_workspace( + cluster_config: Dict[str, Any], runtime_type): + runtime_config = get_runtime_config(cluster_config) + runtime_type_config = runtime_config.get(runtime_type, {}) + if (runtime_type_config.get(HDFS_URI_KEY) or + not is_hdfs_service_discovery(runtime_type_config)): + return cluster_config + + hdfs_uri = discover_hdfs( + runtime_type_config, HDFS_SERVICE_SELECTOR_KEY, + cluster_config=cluster_config, + discovery_type=DiscoveryType.WORKSPACE) + if hdfs_uri: + runtime_type_config = get_config_for_update( + runtime_config, runtime_type) + runtime_type_config[HDFS_URI_KEY] = hdfs_uri + return cluster_config + + +def discover_hdfs_on_head( + cluster_config: Dict[str, Any], runtime_type): + runtime_config = get_runtime_config(cluster_config) + runtime_type_config = runtime_config.get(runtime_type, {}) + if is_hdfs_service_discovery(runtime_type_config): + return cluster_config + + hdfs_uri = runtime_type_config.get(HDFS_URI_KEY) + if hdfs_uri: + # HDFS already configured + return cluster_config + + # There is service discovery to come here + hdfs_uri = discover_hdfs( + runtime_type_config, HDFS_SERVICE_SELECTOR_KEY, + cluster_config=cluster_config, + discovery_type=DiscoveryType.CLUSTER) + if hdfs_uri: + runtime_type_config = get_config_for_update( + runtime_config, runtime_type) + runtime_type_config[HDFS_URI_KEY] = hdfs_uri + return cluster_config + + +""" +Metastore service discovery conventions: + 1. The metastore service discovery flag is stored at METASTORE_SERVICE_DISCOVERY_KEY defined above + 2. The metastore service selector is stored at METASTORE_SERVICE_SELECTOR_KEY defined above + 3. The metastore uri is stored at METASTORE_URI_KEY defined above +""" + + +def is_metastore_service_discovery(runtime_type_config): + return runtime_type_config.get(METASTORE_SERVICE_DISCOVERY_KEY, True) + + +def discover_metastore_from_workspace( + cluster_config: Dict[str, Any], runtime_type): + runtime_config = get_runtime_config(cluster_config) + runtime_type_config = runtime_config.get(runtime_type, {}) + if (runtime_type_config.get(METASTORE_URI_KEY) or + has_runtime_in_cluster( + runtime_config, BUILT_IN_RUNTIME_METASTORE) or + not is_metastore_service_discovery(runtime_type_config)): + return cluster_config + + metastore_uri = discover_metastore( + runtime_type_config, METASTORE_SERVICE_SELECTOR_KEY, + cluster_config=cluster_config, + discovery_type=DiscoveryType.WORKSPACE) + if metastore_uri: + runtime_type_config = get_config_for_update( + runtime_config, runtime_type) + runtime_type_config[METASTORE_URI_KEY] = metastore_uri + + return cluster_config + + +def discover_metastore_on_head( + cluster_config: Dict[str, Any], runtime_type): + runtime_config = get_runtime_config(cluster_config) + runtime_type_config = runtime_config.get(runtime_type, {}) + if is_metastore_service_discovery(runtime_type_config): + return cluster_config + + metastore_uri = runtime_type_config.get(METASTORE_URI_KEY) + if metastore_uri: + # Metastore already configured + return cluster_config + + if has_runtime_in_cluster( + runtime_config, BUILT_IN_RUNTIME_METASTORE): + # There is a metastore + return cluster_config + + # There is service discovery to come here + metastore_uri = discover_metastore( + runtime_type_config, METASTORE_SERVICE_SELECTOR_KEY, + cluster_config=cluster_config, + discovery_type=DiscoveryType.CLUSTER) + if metastore_uri: + runtime_type_config = get_config_for_update( + runtime_config, runtime_type) + runtime_type_config[METASTORE_URI_KEY] = metastore_uri + return cluster_config + + +""" +Zookeeper service discovery conventions: + 1. The Zookeeper service discovery flag is stored at ZOOKEEPER_SERVICE_DISCOVERY_KEY defined above + 2. The Zookeeper service selector is stored at ZOOKEEPER_SERVICE_SELECTOR_KEY defined above + 3. The Zookeeper connect is stored at ZOOKEEPER_CONNECT_KEY defined above +""" + + +def is_zookeeper_service_discovery(runtime_type_config): + return runtime_type_config.get(ZOOKEEPER_SERVICE_DISCOVERY_KEY, True) + + +def discover_zookeeper_from_workspace( + cluster_config: Dict[str, Any], runtime_type): + runtime_config = get_runtime_config(cluster_config) + runtime_type_config = runtime_config.get(runtime_type, {}) + + # Discover zookeeper through workspace + if (runtime_type_config.get(ZOOKEEPER_CONNECT_KEY) or + has_runtime_in_cluster(runtime_config, BUILT_IN_RUNTIME_ZOOKEEPER) or + not is_zookeeper_service_discovery(runtime_type_config)): + return cluster_config + + zookeeper_uri = discover_zookeeper( + runtime_type_config, ZOOKEEPER_SERVICE_SELECTOR_KEY, + cluster_config=cluster_config, + discovery_type=DiscoveryType.WORKSPACE) + if zookeeper_uri is not None: + runtime_type_config = get_config_for_update( + runtime_config, runtime_type) + runtime_type_config[ZOOKEEPER_CONNECT_KEY] = zookeeper_uri + + return cluster_config + + +def discover_zookeeper_on_head( + cluster_config: Dict[str, Any], runtime_type): + runtime_config = get_runtime_config(cluster_config) + runtime_type_config = runtime_config.get(runtime_type, {}) + + zookeeper_uri = runtime_type_config.get(ZOOKEEPER_CONNECT_KEY) + if zookeeper_uri: + # Zookeeper already configured + return cluster_config + + if has_runtime_in_cluster( + runtime_config, BUILT_IN_RUNTIME_ZOOKEEPER): + # There is a local Zookeeper + return cluster_config + + # There is service discovery to come here + zookeeper_uri = discover_zookeeper( + runtime_type_config, ZOOKEEPER_SERVICE_SELECTOR_KEY, + cluster_config=cluster_config, + discovery_type=DiscoveryType.CLUSTER) + if zookeeper_uri: + runtime_type_config = get_config_for_update( + runtime_config, runtime_type) + runtime_type_config[ZOOKEEPER_CONNECT_KEY] = zookeeper_uri + return cluster_config + + +""" +Database service discovery conventions: + 1. The Database service discovery flag is stored at DATABASE_SERVICE_DISCOVERY_KEY defined above + 2. The Database service selector is stored at DATABASE_SERVICE_SELECTOR_KEY defined above + 3. The Database connect is stored at DATABASE_CONNECT_KEY defined above +""" + + +def is_database_service_discovery(runtime_type_config): + return runtime_type_config.get(DATABASE_SERVICE_DISCOVERY_KEY, True) + + +def discover_database_from_workspace( + cluster_config: Dict[str, Any], runtime_type): + runtime_config = get_runtime_config(cluster_config) + runtime_type_config = runtime_config.get(runtime_type, {}) + database_config = runtime_type_config.get(DATABASE_CONNECT_KEY, {}) + + # Database check order: + # 1. if there is a configured database + # 2. if there is database runtime in the same cluster + # 3. if there is a cloud database configured (unless disabled by use_managed_database) + # 4. if there is database can be discovered + + if (is_database_configured(database_config) or + has_database_runtime_in_cluster(runtime_config) or + not is_database_service_discovery(runtime_type_config)): + return cluster_config + + database_service = discover_database( + runtime_type_config, DATABASE_SERVICE_SELECTOR_KEY, + cluster_config=cluster_config, + discovery_type=DiscoveryType.WORKSPACE) + if database_service: + runtime_type_config = get_config_for_update( + runtime_config, runtime_type) + database_config = get_config_for_update( + runtime_type_config, DATABASE_CONNECT_KEY) + set_database_config(database_config, database_service) + + return cluster_config + + +def discover_database_on_head( + cluster_config: Dict[str, Any], runtime_type): + runtime_config = get_runtime_config(cluster_config) + runtime_type_config = runtime_config.get(runtime_type, {}) + if not is_database_service_discovery(runtime_type_config): + return cluster_config + + database_config = runtime_type_config.get(DATABASE_CONNECT_KEY, {}) + if is_database_configured(database_config): + # Database already configured + return cluster_config + + # There is service discovery to come here + database_service = discover_database( + runtime_type_config, DATABASE_SERVICE_SELECTOR_KEY, + cluster_config=cluster_config, + discovery_type=DiscoveryType.CLUSTER) + if database_service: + runtime_type_config = get_config_for_update( + runtime_config, runtime_type) + database_config = get_config_for_update( + runtime_type_config, DATABASE_CONNECT_KEY) + set_database_config(database_config, database_service) + return cluster_config diff --git a/python/cloudtik/runtime/common/service_discovery/utils.py b/python/cloudtik/runtime/common/service_discovery/utils.py index 3e6b04059..bed0c5397 100644 --- a/python/cloudtik/runtime/common/service_discovery/utils.py +++ b/python/cloudtik/runtime/common/service_discovery/utils.py @@ -1,8 +1,21 @@ +from cloudtik.core._private.core_utils import get_address_string + + +class ServiceInstance: + """A service instance returned by discovering processes""" + def __init__( + self, service_name, service_addresses, + runtime_type, cluster_name=None, tags=None): + self.service_name = service_name + self.service_addresses = service_addresses + self.runtime_type = runtime_type + self.cluster_name = cluster_name + self.tags = tags def get_service_addresses_string(service_addresses): # allow two format: host,host,host or host:port,host:port - return ",".join(["{}:{}".format( + return ",".join([get_address_string( service_address[0], service_address[1]) if service_address[1] else service_address[0] for service_address in service_addresses]) diff --git a/python/cloudtik/runtime/common/service_discovery/workspace.py b/python/cloudtik/runtime/common/service_discovery/workspace.py index 7bc5f9f35..81567f33e 100644 --- a/python/cloudtik/runtime/common/service_discovery/workspace.py +++ b/python/cloudtik/runtime/common/service_discovery/workspace.py @@ -11,7 +11,7 @@ from cloudtik.core._private.utils import RUNTIME_CONFIG_KEY from cloudtik.core._private.workspace.workspace_operator import _get_workspace_provider from cloudtik.runtime.common.service_discovery.utils import get_service_addresses_string, \ - get_service_addresses_from_string + get_service_addresses_from_string, ServiceInstance def get_service_registry_name( @@ -101,18 +101,19 @@ def query_one_service_from_workspace( return None # match through the clusters, runtimes, and services if they are provided - services = _query_one_service_registry( + service = _query_one_service_registry( global_variables, service_selector) - if not services: - return None - # return one of them - return next(iter(services.items())) + return service def _query_one_service_registry( service_registries, service_selector): - return _query_service_registry( + matched_services = _query_service_registry( service_registries, service_selector, first_match=True) + if not matched_services: + return None + + return next(iter(matched_services.values())) def _query_service_registry( @@ -131,8 +132,13 @@ def _query_service_registry( if _match_service_registry( registry_name, clusters, runtimes, services, tags=tags, labels=labels, exclude_labels=exclude_labels): - matched_services[registry_name] = get_service_addresses_from_string( + cluster_name, runtime_type, service_name = parse_service_registry_name( + registry_name) + service_addresses = get_service_addresses_from_string( registry_addresses) + matched_services[registry_name] = ServiceInstance( + service_name, service_addresses, + runtime_type=runtime_type, cluster_name=cluster_name) if first_match: return matched_services return matched_services diff --git a/python/cloudtik/runtime/flink/utils.py b/python/cloudtik/runtime/flink/utils.py index c46f6c0e2..a927aba4b 100644 --- a/python/cloudtik/runtime/flink/utils.py +++ b/python/cloudtik/runtime/flink/utils.py @@ -3,7 +3,7 @@ from cloudtik.core._private.cluster.cluster_config import _load_cluster_config from cloudtik.core._private.cluster.cluster_tunnel_request import _request_rest_to_head -from cloudtik.core._private.core_utils import double_quote, get_string_value_for_env +from cloudtik.core._private.core_utils import double_quote, get_env_string_value from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_HDFS, BUILT_IN_RUNTIME_METASTORE, \ BUILT_IN_RUNTIME_FLINK from cloudtik.core._private.service_discovery.runtime_services import get_service_discovery_runtime @@ -14,8 +14,9 @@ print_json_formatted, get_config_for_update, get_runtime_config from cloudtik.core.scaling_policy import ScalingPolicy from cloudtik.runtime.common.service_discovery.cluster import has_runtime_in_cluster -from cloudtik.runtime.common.service_discovery.discovery import DiscoveryType -from cloudtik.runtime.common.service_discovery.runtime_discovery import discover_hdfs, discover_metastore +from cloudtik.runtime.common.service_discovery.runtime_discovery import \ + discover_hdfs_from_workspace, discover_metastore_from_workspace, discover_hdfs_on_head, discover_metastore_on_head, \ + is_hdfs_service_discovery, HDFS_URI_KEY, METASTORE_URI_KEY from cloudtik.runtime.common.utils import get_runtime_endpoints_of, get_runtime_default_storage_of from cloudtik.runtime.flink.scaling_policy import FlinkScalingPolicy @@ -29,11 +30,6 @@ ["proc_nodemanager", False, "NodeManager", "worker"], ] -FLINK_HDFS_NAMENODE_URI_KEY = "hdfs_namenode_uri" -FLINK_HIVE_METASTORE_URI_KEY = "hive_metastore_uri" -FLINK_HDFS_SERVICE_SELECTOR_KEY = "hdfs_service_selector" -FLINK_METASTORE_SERVICE_SELECTOR_KEY = "metastore_service_selector" - YARN_RESOURCE_MEMORY_RATIO = 0.8 FLINK_TASKMANAGER_MEMORY_RATIO = 1 @@ -57,14 +53,6 @@ def _get_config(runtime_config: Dict[str, Any]): return runtime_config.get(BUILT_IN_RUNTIME_FLINK, {}) -def _is_metastore_service_discovery(flink_config): - return flink_config.get("metastore_service_discovery", True) - - -def _is_hdfs_service_discovery(spark_config): - return spark_config.get("hdfs_service_discovery", True) - - def get_yarn_resource_memory_ratio(cluster_config: Dict[str, Any]): yarn_resource_memory_ratio = YARN_RESOURCE_MEMORY_RATIO flink_config = cluster_config.get(RUNTIME_CONFIG_KEY, {}).get(BUILT_IN_RUNTIME_FLINK, {}) @@ -128,41 +116,19 @@ def _get_cluster_resources( def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]: - runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY) - flink_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_FLINK) - - # We now support co-existence of local HDFS and remote HDFS - # 1) Try to use local hdfs first; - # 2) Try to use defined hdfs_namenode_uri; - # 3) Try to discover HDFS service through any service discovery available - - if flink_config.get(FLINK_HDFS_NAMENODE_URI_KEY) is None: - if _is_hdfs_service_discovery(flink_config): - hdfs_namenode_uri = discover_hdfs( - flink_config, FLINK_HDFS_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.WORKSPACE) - if hdfs_namenode_uri: - flink_config[FLINK_HDFS_NAMENODE_URI_KEY] = hdfs_namenode_uri - - # Check metastore - if (not flink_config.get(FLINK_HIVE_METASTORE_URI_KEY) and - not has_runtime_in_cluster( - runtime_config, BUILT_IN_RUNTIME_METASTORE)): - if _is_metastore_service_discovery(flink_config): - hive_metastore_uri = discover_metastore( - flink_config, FLINK_METASTORE_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.WORKSPACE) - if hive_metastore_uri: - flink_config[FLINK_HIVE_METASTORE_URI_KEY] = hive_metastore_uri + cluster_config = discover_hdfs_from_workspace( + cluster_config, BUILT_IN_RUNTIME_FLINK) + cluster_config = discover_metastore_from_workspace( + cluster_config, BUILT_IN_RUNTIME_FLINK) return cluster_config def _prepare_config_on_head(cluster_config: Dict[str, Any]): - cluster_config = _discover_hdfs_on_head(cluster_config) - cluster_config = _discover_metastore_on_head(cluster_config) + cluster_config = discover_hdfs_on_head( + cluster_config, BUILT_IN_RUNTIME_FLINK) + cluster_config = discover_metastore_on_head( + cluster_config, BUILT_IN_RUNTIME_FLINK) # call validate config to fail earlier _validate_config(cluster_config, final=True) @@ -170,57 +136,6 @@ def _prepare_config_on_head(cluster_config: Dict[str, Any]): return cluster_config -def _discover_hdfs_on_head(cluster_config: Dict[str, Any]): - runtime_config = get_runtime_config(cluster_config) - flink_config = _get_config(runtime_config) - if not _is_hdfs_service_discovery(flink_config): - return cluster_config - - hdfs_namenode_uri = flink_config.get(FLINK_HDFS_NAMENODE_URI_KEY) - if hdfs_namenode_uri: - # HDFS already configured - return cluster_config - - # There is service discovery to come here - hdfs_namenode_uri = discover_hdfs( - flink_config, FLINK_HDFS_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.CLUSTER) - if hdfs_namenode_uri: - flink_config = get_config_for_update( - runtime_config, BUILT_IN_RUNTIME_FLINK) - flink_config[FLINK_HDFS_NAMENODE_URI_KEY] = hdfs_namenode_uri - return cluster_config - - -def _discover_metastore_on_head(cluster_config: Dict[str, Any]): - runtime_config = get_runtime_config(cluster_config) - flink_config = _get_config(runtime_config) - if not _is_metastore_service_discovery(flink_config): - return cluster_config - - hive_metastore_uri = flink_config.get(FLINK_HIVE_METASTORE_URI_KEY) - if hive_metastore_uri: - # Metastore already configured - return cluster_config - - if has_runtime_in_cluster( - runtime_config, BUILT_IN_RUNTIME_METASTORE): - # There is a metastore - return cluster_config - - # There is service discovery to come here - hive_metastore_uri = discover_metastore( - flink_config, FLINK_METASTORE_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.CLUSTER) - if hive_metastore_uri: - flink_config = get_config_for_update( - runtime_config, BUILT_IN_RUNTIME_FLINK) - flink_config[FLINK_HIVE_METASTORE_URI_KEY] = hive_metastore_uri - return cluster_config - - def _config_runtime_resources(cluster_config: Dict[str, Any]) -> Dict[str, Any]: cluster_resource = _get_cluster_resources(cluster_config) container_resource = {"yarn_container_maximum_vcores": cluster_resource["worker_cpu"]} @@ -342,16 +257,16 @@ def _configure(runtime_config, head: bool): hadoop_default_cluster = flink_config.get( "hadoop_default_cluster", False) if hadoop_default_cluster: - os.environ["HADOOP_DEFAULT_CLUSTER"] = get_string_value_for_env( + os.environ["HADOOP_DEFAULT_CLUSTER"] = get_env_string_value( hadoop_default_cluster) - hdfs_namenode_uri = flink_config.get(FLINK_HDFS_NAMENODE_URI_KEY) - if hdfs_namenode_uri: - os.environ["HDFS_NAMENODE_URI"] = hdfs_namenode_uri + hdfs_uri = flink_config.get(HDFS_URI_KEY) + if hdfs_uri: + os.environ["HDFS_NAMENODE_URI"] = hdfs_uri - hive_metastore_uri = flink_config.get(FLINK_HIVE_METASTORE_URI_KEY) - if hive_metastore_uri: - os.environ["HIVE_METASTORE_URI"] = hive_metastore_uri + metastore_uri = flink_config.get(METASTORE_URI_KEY) + if metastore_uri: + os.environ["HIVE_METASTORE_URI"] = metastore_uri def get_runtime_logs(): @@ -373,7 +288,7 @@ def _is_valid_storage_config(config: Dict[str, Any], final=False): return True # check if there is remote HDFS configured flink_config = _get_config(runtime_config) - if flink_config.get(FLINK_HDFS_NAMENODE_URI_KEY) is not None: + if flink_config.get(HDFS_URI_KEY) is not None: return True # Check any cloud storage is configured @@ -382,7 +297,7 @@ def _is_valid_storage_config(config: Dict[str, Any], final=False): return True # if there is service discovery mechanism, assume we can get from service discovery - if (not final and _is_hdfs_service_discovery(flink_config) and + if (not final and is_hdfs_service_discovery(flink_config) and get_service_discovery_runtime(runtime_config)): return True diff --git a/python/cloudtik/runtime/kafka/utils.py b/python/cloudtik/runtime/kafka/utils.py index feda50274..51a6fa62f 100644 --- a/python/cloudtik/runtime/kafka/utils.py +++ b/python/cloudtik/runtime/kafka/utils.py @@ -7,12 +7,11 @@ from cloudtik.core._private.service_discovery.utils import get_canonical_service_name, define_runtime_service_on_worker, \ get_service_discovery_config, SERVICE_DISCOVERY_FEATURE_MESSAGING from cloudtik.core._private.utils import \ - RUNTIME_CONFIG_KEY, load_properties_file, \ - save_properties_file, get_config_for_update, get_runtime_config + load_properties_file, save_properties_file, get_runtime_config from cloudtik.runtime.common.service_discovery.cluster import query_service_from_cluster, get_service_addresses_string, \ has_runtime_in_cluster -from cloudtik.runtime.common.service_discovery.discovery import DiscoveryType -from cloudtik.runtime.common.service_discovery.runtime_discovery import discover_zookeeper +from cloudtik.runtime.common.service_discovery.runtime_discovery import \ + discover_zookeeper_from_workspace, discover_zookeeper_on_head, is_zookeeper_service_discovery, ZOOKEEPER_CONNECT_KEY RUNTIME_PROCESSES = [ # The first element is the substring to filter. @@ -22,9 +21,6 @@ ["kafka.Kafka", False, "KafkaBroker", "node"], ] -KAFKA_ZOOKEEPER_CONNECT_KEY = "zookeeper_connect" -KAFKA_ZOOKEEPER_SERVICE_SELECTOR_KEY = "zookeeper_service_selector" - KAFKA_SERVICE_NAME = BUILT_IN_RUNTIME_KAFKA KAFKA_SERVICE_PORT = 9092 @@ -33,62 +29,21 @@ def _get_config(runtime_config: Dict[str, Any]): return runtime_config.get(BUILT_IN_RUNTIME_KAFKA, {}) -def _is_zookeeper_service_discovery(kafka_config): - return kafka_config.get("zookeeper_service_discovery", True) - - def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]: - runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY) - kafka_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_KAFKA) - - # Discover zookeeper through workspace - if (not kafka_config.get(KAFKA_ZOOKEEPER_CONNECT_KEY) and - not has_runtime_in_cluster(runtime_config, BUILT_IN_RUNTIME_ZOOKEEPER)): - if _is_zookeeper_service_discovery(kafka_config): - zookeeper_uri = discover_zookeeper( - kafka_config, KAFKA_ZOOKEEPER_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.WORKSPACE) - if zookeeper_uri is not None: - kafka_config[KAFKA_ZOOKEEPER_CONNECT_KEY] = zookeeper_uri - + cluster_config = discover_zookeeper_from_workspace( + cluster_config, BUILT_IN_RUNTIME_KAFKA) return cluster_config def _prepare_config_on_head(cluster_config: Dict[str, Any]): - cluster_config = _discover_zookeeper_on_head(cluster_config) + cluster_config = discover_zookeeper_on_head( + cluster_config, BUILT_IN_RUNTIME_KAFKA) # call validate config to fail earlier _validate_config(cluster_config, final=True) return cluster_config -def _discover_zookeeper_on_head(cluster_config: Dict[str, Any]): - runtime_config = get_runtime_config(cluster_config) - kafka_config = _get_config(runtime_config) - - zookeeper_uri = kafka_config.get(KAFKA_ZOOKEEPER_CONNECT_KEY) - if zookeeper_uri: - # Zookeeper already configured - return cluster_config - - if has_runtime_in_cluster( - runtime_config, BUILT_IN_RUNTIME_ZOOKEEPER): - # There is a local Zookeeper - return cluster_config - - # There is service discovery to come here - zookeeper_uri = discover_zookeeper( - kafka_config, KAFKA_ZOOKEEPER_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.CLUSTER) - if zookeeper_uri: - kafka_config = get_config_for_update( - runtime_config, BUILT_IN_RUNTIME_KAFKA) - kafka_config[KAFKA_ZOOKEEPER_CONNECT_KEY] = zookeeper_uri - return cluster_config - - def _get_runtime_processes(): return RUNTIME_PROCESSES @@ -108,11 +63,11 @@ def _validate_config(config: Dict[str, Any], final=False): # Check zookeeper connect configured runtime_config = get_runtime_config(config) kafka_config = _get_config(runtime_config) - zookeeper_uri = kafka_config.get(KAFKA_ZOOKEEPER_CONNECT_KEY) + zookeeper_uri = kafka_config.get(ZOOKEEPER_CONNECT_KEY) if not zookeeper_uri and not has_runtime_in_cluster( runtime_config, BUILT_IN_RUNTIME_ZOOKEEPER): # if there is service discovery mechanism, assume we can get from service discovery - if (final or not _is_zookeeper_service_discovery(kafka_config) or + if (final or not is_zookeeper_service_discovery(kafka_config) or not get_service_discovery_runtime(runtime_config)): raise ValueError("Zookeeper must be configured for Kafka.") @@ -128,7 +83,7 @@ def _get_zookeeper_connect(runtime_config): kafka_config = _get_config(runtime_config) # check config - zookeeper_connect = kafka_config.get(KAFKA_ZOOKEEPER_CONNECT_KEY) + zookeeper_connect = kafka_config.get(ZOOKEEPER_CONNECT_KEY) if zookeeper_connect is not None: return zookeeper_connect diff --git a/python/cloudtik/runtime/metastore/runtime.py b/python/cloudtik/runtime/metastore/runtime.py index 0e8ecc2d8..b8b6a7677 100644 --- a/python/cloudtik/runtime/metastore/runtime.py +++ b/python/cloudtik/runtime/metastore/runtime.py @@ -7,7 +7,8 @@ from cloudtik.runtime.common.runtime_base import RuntimeBase from cloudtik.runtime.metastore.utils import _with_runtime_environment_variables, \ _get_runtime_processes, _get_runtime_logs, \ - _get_runtime_endpoints, register_service, _get_head_service_ports, _get_runtime_services + _get_runtime_endpoints, register_service, _get_head_service_ports, _get_runtime_services,\ + _prepare_config_on_head, _config_depended_services, _configure logger = logging.getLogger(__name__) @@ -18,6 +19,20 @@ class MetastoreRuntime(RuntimeBase): def __init__(self, runtime_config: Dict[str, Any]) -> None: super().__init__(runtime_config) + def prepare_config(self, cluster_config: Dict[str, Any]) -> Dict[str, Any]: + """Prepare runtime specific configurations""" + cluster_config = _config_depended_services(cluster_config) + return cluster_config + + def prepare_config_on_head( + self, cluster_config: Dict[str, Any] + ) -> Dict[str, Any]: + """Configure runtime such as using service discovery to configure + internal service addresses the runtime depends. + The head configuration will be updated and saved with the returned configuration. + """ + return _prepare_config_on_head(cluster_config) + def with_environment_variables( self, config: Dict[str, Any], provider: NodeProvider, node_id: str) -> Dict[str, Any]: @@ -27,6 +42,12 @@ def with_environment_variables( return _with_runtime_environment_variables( self.runtime_config, config=config, provider=provider, node_id=node_id) + def configure(self, head: bool): + """ This method is called on every node as the first step of + executing runtime configure command. + """ + _configure(self.runtime_config, head) + def cluster_booting_completed( self, cluster_config: Dict[str, Any], head_node_id: str) -> None: register_service(cluster_config, head_node_id) diff --git a/python/cloudtik/runtime/metastore/scripts/configure.sh b/python/cloudtik/runtime/metastore/scripts/configure.sh index fb84b90bc..1c31070d7 100644 --- a/python/cloudtik/runtime/metastore/scripts/configure.sh +++ b/python/cloudtik/runtime/metastore/scripts/configure.sh @@ -96,6 +96,8 @@ function configure_hive_metastore() { cp -r ${config_template_file} ${METASTORE_HOME}/conf/metastore-site.xml + # How about move this step to service start step + # so that the depended service already started when running this initialization if [ "${CLOUD_DATABASE}" != "true" ] || [ "$METASTORE_WITH_CLOUD_DATABASE" == "false" ]; then # Start mariadb sudo service mysql start diff --git a/python/cloudtik/runtime/metastore/utils.py b/python/cloudtik/runtime/metastore/utils.py index 2298ece17..ba5520647 100644 --- a/python/cloudtik/runtime/metastore/utils.py +++ b/python/cloudtik/runtime/metastore/utils.py @@ -5,7 +5,11 @@ from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_METASTORE from cloudtik.core._private.service_discovery.utils import get_canonical_service_name, define_runtime_service_on_head, \ get_service_discovery_config +from cloudtik.core._private.util.database_utils import is_database_configured, \ + export_database_environment_variables from cloudtik.core._private.utils import export_runtime_flags +from cloudtik.runtime.common.service_discovery.runtime_discovery import \ + discover_database_from_workspace, discover_database_on_head, DATABASE_CONNECT_KEY from cloudtik.runtime.common.service_discovery.workspace import register_service_to_workspace RUNTIME_PROCESSES = [ @@ -25,10 +29,26 @@ def _get_config(runtime_config: Dict[str, Any]): return runtime_config.get(BUILT_IN_RUNTIME_METASTORE, {}) +def _get_database_config(metastore_config): + return metastore_config.get(DATABASE_CONNECT_KEY, {}) + + def _get_runtime_processes(): return RUNTIME_PROCESSES +def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]: + cluster_config = discover_database_from_workspace( + cluster_config, BUILT_IN_RUNTIME_METASTORE) + return cluster_config + + +def _prepare_config_on_head(cluster_config: Dict[str, Any]): + cluster_config = discover_database_on_head( + cluster_config, BUILT_IN_RUNTIME_METASTORE) + return cluster_config + + def _with_runtime_environment_variables( runtime_config, config, provider, node_id: str): runtime_envs = {"METASTORE_ENABLED": True} @@ -39,6 +59,15 @@ def _with_runtime_environment_variables( return runtime_envs +def _configure(runtime_config, head: bool): + metastore_config = _get_config(runtime_config) + database_config = _get_database_config(metastore_config) + if is_database_configured(database_config): + # TODO: set the database environments from database config + # This may override the environments from provider + export_database_environment_variables(database_config) + + def register_service(cluster_config: Dict[str, Any], head_node_id: str) -> None: provider = _get_node_provider( cluster_config["provider"], cluster_config["cluster_name"]) diff --git a/python/cloudtik/runtime/nginx/discovery.py b/python/cloudtik/runtime/nginx/discovery.py index 07f0414a1..95448ef42 100644 --- a/python/cloudtik/runtime/nginx/discovery.py +++ b/python/cloudtik/runtime/nginx/discovery.py @@ -1,6 +1,6 @@ import logging -from cloudtik.core._private.core_utils import get_json_object_hash +from cloudtik.core._private.core_utils import get_json_object_hash, get_address_string from cloudtik.core._private.service_discovery.utils import deserialize_service_selector from cloudtik.core._private.util.pull.pull_job import PullJob from cloudtik.runtime.common.service_discovery.consul \ @@ -49,7 +49,7 @@ def pull(self): # it should be filtered by service selector using service name ,tags or labels for service_node in service_nodes: server_address = get_service_address_of_node(service_node) - server_key = "{}:{}".format(server_address[0], server_address[1]) + server_key = get_address_string(server_address[0], server_address[1]) backend_servers[server_key] = server_address if not backend_servers: logger.warning("No live servers return from the service selector.") @@ -90,7 +90,7 @@ def pull(self): backend_servers = {} for service_node in service_nodes: server_address = get_service_address_of_node(service_node) - server_key = "{}:{}".format(server_address[0], server_address[1]) + server_key = get_address_string(server_address[0], server_address[1]) backend_servers[server_key] = server_address # TODO: currently use service_name as backend_name and path prefix for simplicity diff --git a/python/cloudtik/runtime/nginx/utils.py b/python/cloudtik/runtime/nginx/utils.py index 21e35d518..f81a18aa8 100644 --- a/python/cloudtik/runtime/nginx/utils.py +++ b/python/cloudtik/runtime/nginx/utils.py @@ -3,7 +3,7 @@ from typing import Any, Dict from cloudtik.core._private.constants import CLOUDTIK_RUNTIME_ENV_CLUSTER -from cloudtik.core._private.core_utils import exec_with_call, exec_with_output, remove_files +from cloudtik.core._private.core_utils import exec_with_call, exec_with_output, remove_files, get_address_string from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_NGINX from cloudtik.core._private.runtime_utils import get_runtime_value, get_runtime_config_from_node from cloudtik.core._private.service_discovery.runtime_services import get_service_discovery_runtime @@ -416,7 +416,7 @@ def stop_pull_server(): def update_load_balancer_configuration( backend_servers, balance_method): # write load balancer upstream config file - servers = ["{}:{}".format( + servers = [get_address_string( server_address[0], server_address[1] ) for _, server_address in backend_servers.items()] @@ -451,7 +451,7 @@ def _update_api_gateway_dynamic_upstreams( for backend_name, backend_servers in sorted_api_gateway_backends: upstream_config_file = os.path.join( upstreams_dir, "{}.conf".format(backend_name)) - servers = ["{}:{}".format( + servers = [get_address_string( server_address[0], server_address[1] ) for _, server_address in backend_servers.items()] _save_upstream_config( diff --git a/python/cloudtik/runtime/presto/utils.py b/python/cloudtik/runtime/presto/utils.py index 9e9d8430a..f31fe044b 100644 --- a/python/cloudtik/runtime/presto/utils.py +++ b/python/cloudtik/runtime/presto/utils.py @@ -6,11 +6,10 @@ from cloudtik.core._private.service_discovery.utils import get_canonical_service_name, define_runtime_service_on_head, \ get_service_discovery_config, SERVICE_DISCOVERY_FEATURE_ANALYTICS from cloudtik.core._private.utils import \ - get_node_type, get_resource_of_node_type, RUNTIME_CONFIG_KEY, get_node_type_config, get_config_for_update, \ - get_runtime_config + get_node_type, get_resource_of_node_type, RUNTIME_CONFIG_KEY from cloudtik.runtime.common.service_discovery.cluster import has_runtime_in_cluster -from cloudtik.runtime.common.service_discovery.discovery import DiscoveryType -from cloudtik.runtime.common.service_discovery.runtime_discovery import discover_metastore +from cloudtik.runtime.common.service_discovery.runtime_discovery import \ + discover_metastore_from_workspace, discover_metastore_on_head, METASTORE_URI_KEY RUNTIME_PROCESSES = [ # The first element is the substring to filter. @@ -20,9 +19,6 @@ ["com.facebook.presto.server.PrestoServer", False, "PrestoServer", "node"], ] -PRESTO_HIVE_METASTORE_URI_KEY = "hive_metastore_uri" -PRESTO_METASTORE_SERVICE_SELECTOR_KEY = "metastore_service_selector" - JVM_MAX_MEMORY_RATIO = 0.8 QUERY_MAX_MEMORY_PER_NODE_RATIO = 0.5 QUERY_MAX_TOTAL_MEMORY_PER_NODE_RATIO = 0.7 @@ -36,10 +32,6 @@ def _get_config(runtime_config: Dict[str, Any]): return runtime_config.get(BUILT_IN_RUNTIME_PRESTO, {}) -def _is_metastore_service_discovery(presto_config): - return presto_config.get("metastore_service_discovery", True) - - def get_jvm_max_memory(total_memory): return int(total_memory * JVM_MAX_MEMORY_RATIO) @@ -57,54 +49,14 @@ def get_memory_heap_headroom_per_node(jvm_max_memory): def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]: - runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY) - presto_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_PRESTO) - - # Check metastore - if (not presto_config.get(PRESTO_HIVE_METASTORE_URI_KEY) and - not has_runtime_in_cluster( - runtime_config, BUILT_IN_RUNTIME_METASTORE)): - if _is_metastore_service_discovery(presto_config): - hive_metastore_uri = discover_metastore( - presto_config, PRESTO_METASTORE_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.WORKSPACE) - if hive_metastore_uri: - presto_config[PRESTO_HIVE_METASTORE_URI_KEY] = hive_metastore_uri - + cluster_config = discover_metastore_from_workspace( + cluster_config, BUILT_IN_RUNTIME_PRESTO) return cluster_config def _prepare_config_on_head(cluster_config: Dict[str, Any]): - cluster_config = _discover_metastore_on_head(cluster_config) - return cluster_config - - -def _discover_metastore_on_head(cluster_config: Dict[str, Any]): - runtime_config = get_runtime_config(cluster_config) - presto_config = _get_config(runtime_config) - if not _is_metastore_service_discovery(presto_config): - return cluster_config - - hive_metastore_uri = presto_config.get(PRESTO_HIVE_METASTORE_URI_KEY) - if hive_metastore_uri: - # Metastore already configured - return cluster_config - - if has_runtime_in_cluster( - runtime_config, BUILT_IN_RUNTIME_METASTORE): - # There is a metastore - return cluster_config - - # There is service discovery to come here - hive_metastore_uri = discover_metastore( - presto_config, PRESTO_METASTORE_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.CLUSTER) - if hive_metastore_uri: - presto_config = get_config_for_update( - runtime_config, BUILT_IN_RUNTIME_PRESTO) - presto_config[PRESTO_HIVE_METASTORE_URI_KEY] = hive_metastore_uri + cluster_config = discover_metastore_on_head( + cluster_config, BUILT_IN_RUNTIME_PRESTO) return cluster_config @@ -144,9 +96,9 @@ def _with_runtime_environment_variables( def _configure(runtime_config, head: bool): # TODO: move more runtime specific environment_variables to here presto_config = _get_config(runtime_config) - hive_metastore_uri = presto_config.get(PRESTO_HIVE_METASTORE_URI_KEY) - if hive_metastore_uri: - os.environ["PRESTO_HIVE_METASTORE_URI"] = hive_metastore_uri + metastore_uri = presto_config.get(METASTORE_URI_KEY) + if metastore_uri: + os.environ["HIVE_METASTORE_URI"] = metastore_uri def _get_runtime_logs(): diff --git a/python/cloudtik/runtime/prometheus/discovery.py b/python/cloudtik/runtime/prometheus/discovery.py index 03ae99bc9..d0ce1d7b1 100644 --- a/python/cloudtik/runtime/prometheus/discovery.py +++ b/python/cloudtik/runtime/prometheus/discovery.py @@ -5,7 +5,7 @@ from typing import Dict, Any from cloudtik.core._private.constants import CLOUDTIK_HEARTBEAT_TIMEOUT_S -from cloudtik.core._private.core_utils import get_json_object_hash +from cloudtik.core._private.core_utils import get_json_object_hash, get_address_string from cloudtik.core._private.runtime_utils import save_yaml from cloudtik.core._private.state.control_state import ControlState from cloudtik.core._private.state.state_utils import NODE_STATE_NODE_IP, \ @@ -43,7 +43,7 @@ def _get_service_targets( if not nodes_of_service: return None - targets = ["{}:{}".format( + targets = [get_address_string( node, service_port) for node in nodes_of_service] service_targets = { "labels": { diff --git a/python/cloudtik/runtime/prometheus/utils.py b/python/cloudtik/runtime/prometheus/utils.py index 2b24a9e29..bc14e92f9 100644 --- a/python/cloudtik/runtime/prometheus/utils.py +++ b/python/cloudtik/runtime/prometheus/utils.py @@ -2,7 +2,8 @@ from typing import Any, Dict from cloudtik.core._private import constants -from cloudtik.core._private.core_utils import exec_with_output, get_list_for_update, get_config_for_update +from cloudtik.core._private.core_utils import exec_with_output, get_list_for_update, get_config_for_update, \ + get_address_string from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_PROMETHEUS from cloudtik.core._private.runtime_utils import load_and_save_yaml, \ get_runtime_config_from_node, save_yaml, get_runtime_head_ip, get_runtime_value @@ -401,7 +402,7 @@ def start_pull_server(head): pull_identifier = _get_pull_identifier() redis_ip = get_runtime_head_ip(head) - redis_address = "{}:{}".format(redis_ip, constants.CLOUDTIK_DEFAULT_PORT) + redis_address = get_address_string(redis_ip, constants.CLOUDTIK_DEFAULT_PORT) cmd = ["cloudtik", "node", "pull", pull_identifier, "start"] cmd += ["--pull-class=cloudtik.runtime.prometheus.discovery.DiscoverLocalTargets"] diff --git a/python/cloudtik/runtime/ray/scaling_policy.py b/python/cloudtik/runtime/ray/scaling_policy.py index 55c114aaa..8f628dcf8 100644 --- a/python/cloudtik/runtime/ray/scaling_policy.py +++ b/python/cloudtik/runtime/ray/scaling_policy.py @@ -3,6 +3,7 @@ import time from typing import Any, Dict, Optional +from cloudtik.core._private.core_utils import get_address_string from cloudtik.core._private.services import address_to_ip from cloudtik.core._private.state.state_utils import NODE_STATE_NODE_ID, NODE_STATE_NODE_IP, NODE_STATE_TIME from cloudtik.core._private.utils import make_node_id, RUNTIME_CONFIG_KEY @@ -179,7 +180,7 @@ def _init_gcs_client(self): import ray._private.ray_constants as ray_constants from ray.core.generated import gcs_pb2, gcs_service_pb2, gcs_service_pb2_grpc - gcs_address = "{}:{}".format(self.head_ip, self.ray_port) + gcs_address = get_address_string(self.head_ip, self.ray_port) options = ray_constants.GLOBAL_GRPC_OPTIONS gcs_channel = ray._private.utils.init_grpc_channel(gcs_address, options) self.gcs_node_resources_stub = ( diff --git a/python/cloudtik/runtime/ray/utils.py b/python/cloudtik/runtime/ray/utils.py index 9aa3f6bec..d9592af39 100644 --- a/python/cloudtik/runtime/ray/utils.py +++ b/python/cloudtik/runtime/ray/utils.py @@ -4,7 +4,6 @@ from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_RAY from cloudtik.core._private.service_discovery.utils import get_canonical_service_name, define_runtime_service_on_head, \ get_service_discovery_config, SERVICE_DISCOVERY_FEATURE_SCHEDULER -from cloudtik.core._private.utils import get_node_type_config from cloudtik.core.scaling_policy import ScalingPolicy from cloudtik.runtime.ray.scaling_policy import RayScalingPolicy diff --git a/python/cloudtik/runtime/spark/utils.py b/python/cloudtik/runtime/spark/utils.py index 3cd3a50e0..c574d743f 100644 --- a/python/cloudtik/runtime/spark/utils.py +++ b/python/cloudtik/runtime/spark/utils.py @@ -5,7 +5,7 @@ from cloudtik.core._private.cli_logger import cli_logger from cloudtik.core._private.cluster.cluster_config import _load_cluster_config from cloudtik.core._private.cluster.cluster_tunnel_request import _request_rest_to_head -from cloudtik.core._private.core_utils import double_quote, get_string_value_for_env +from cloudtik.core._private.core_utils import double_quote, get_env_string_value from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_HDFS, BUILT_IN_RUNTIME_METASTORE, \ BUILT_IN_RUNTIME_SPARK from cloudtik.core._private.service_discovery.runtime_services import get_service_discovery_runtime @@ -13,12 +13,13 @@ get_service_discovery_config, SERVICE_DISCOVERY_FEATURE_SCHEDULER from cloudtik.core._private.utils import \ round_memory_size_to_gb, load_head_cluster_config, \ - RUNTIME_CONFIG_KEY, load_properties_file, save_properties_file, is_use_managed_cloud_storage, get_node_type_config, \ + RUNTIME_CONFIG_KEY, load_properties_file, save_properties_file, is_use_managed_cloud_storage, \ print_json_formatted, get_config_for_update, get_runtime_config from cloudtik.core.scaling_policy import ScalingPolicy from cloudtik.runtime.common.service_discovery.cluster import has_runtime_in_cluster -from cloudtik.runtime.common.service_discovery.discovery import DiscoveryType -from cloudtik.runtime.common.service_discovery.runtime_discovery import discover_hdfs, discover_metastore +from cloudtik.runtime.common.service_discovery.runtime_discovery import \ + discover_metastore_on_head, discover_hdfs_on_head, discover_hdfs_from_workspace, \ + discover_metastore_from_workspace, is_hdfs_service_discovery, HDFS_URI_KEY, METASTORE_URI_KEY from cloudtik.runtime.common.utils import get_runtime_endpoints_of, get_runtime_default_storage_of from cloudtik.runtime.spark.scaling_policy import SparkScalingPolicy @@ -32,11 +33,6 @@ ["proc_nodemanager", False, "NodeManager", "worker"], ] -SPARK_HDFS_NAMENODE_URI_KEY = "hdfs_namenode_uri" -SPARK_HIVE_METASTORE_URI_KEY = "hive_metastore_uri" -SPARK_HDFS_SERVICE_SELECTOR_KEY = "hdfs_service_selector" -SPARK_METASTORE_SERVICE_SELECTOR_KEY = "metastore_service_selector" - YARN_RESOURCE_MEMORY_RATIO = 0.8 SPARK_EXECUTOR_MEMORY_RATIO = 1 SPARK_DRIVER_MEMORY_RATIO = 0.1 @@ -64,14 +60,6 @@ def _get_config(runtime_config: Dict[str, Any]): return runtime_config.get(BUILT_IN_RUNTIME_SPARK, {}) -def _is_hdfs_service_discovery(spark_config): - return spark_config.get("hdfs_service_discovery", True) - - -def _is_metastore_service_discovery(spark_config): - return spark_config.get("metastore_service_discovery", True) - - def get_yarn_resource_memory_ratio(cluster_config: Dict[str, Any]): yarn_resource_memory_ratio = YARN_RESOURCE_MEMORY_RATIO spark_config = cluster_config.get(RUNTIME_CONFIG_KEY, {}).get(BUILT_IN_RUNTIME_SPARK, {}) @@ -142,97 +130,25 @@ def _get_cluster_resources( def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]: - runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY) - spark_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_SPARK) - - # We now support co-existence of local HDFS and remote HDFS - # 1) Try to use local hdfs first; - # 2) Try to use defined hdfs_namenode_uri; - # 3) Try to discover HDFS service through any service discovery available - if spark_config.get(SPARK_HDFS_NAMENODE_URI_KEY) is None: - if _is_hdfs_service_discovery(spark_config): - hdfs_namenode_uri = discover_hdfs( - spark_config, SPARK_HDFS_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.WORKSPACE) - if hdfs_namenode_uri is not None: - spark_config[SPARK_HDFS_NAMENODE_URI_KEY] = hdfs_namenode_uri - - # Check metastore - if (not spark_config.get(SPARK_HIVE_METASTORE_URI_KEY) and - not has_runtime_in_cluster( - runtime_config, BUILT_IN_RUNTIME_METASTORE)): - if _is_metastore_service_discovery(spark_config): - hive_metastore_uri = discover_metastore( - spark_config, SPARK_METASTORE_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.WORKSPACE) - if hive_metastore_uri: - spark_config[SPARK_HIVE_METASTORE_URI_KEY] = hive_metastore_uri + cluster_config = discover_hdfs_from_workspace( + cluster_config, BUILT_IN_RUNTIME_SPARK) + cluster_config = discover_metastore_from_workspace( + cluster_config, BUILT_IN_RUNTIME_SPARK) return cluster_config def _prepare_config_on_head(cluster_config: Dict[str, Any]): - cluster_config = _discover_hdfs_on_head(cluster_config) - cluster_config = _discover_metastore_on_head(cluster_config) + cluster_config = discover_hdfs_on_head( + cluster_config, BUILT_IN_RUNTIME_SPARK) + cluster_config = discover_metastore_on_head( + cluster_config, BUILT_IN_RUNTIME_SPARK) # call validate config to fail earlier _validate_config(cluster_config, final=True) return cluster_config -def _discover_hdfs_on_head(cluster_config: Dict[str, Any]): - runtime_config = get_runtime_config(cluster_config) - spark_config = _get_config(runtime_config) - if not _is_hdfs_service_discovery(spark_config): - return cluster_config - - hdfs_namenode_uri = spark_config.get(SPARK_HDFS_NAMENODE_URI_KEY) - if hdfs_namenode_uri: - # HDFS already configured - return cluster_config - - # There is service discovery to come here - hdfs_namenode_uri = discover_hdfs( - spark_config, SPARK_HDFS_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.CLUSTER) - if hdfs_namenode_uri: - spark_config = get_config_for_update( - runtime_config, BUILT_IN_RUNTIME_SPARK) - spark_config[SPARK_HDFS_NAMENODE_URI_KEY] = hdfs_namenode_uri - return cluster_config - - -def _discover_metastore_on_head(cluster_config: Dict[str, Any]): - runtime_config = get_runtime_config(cluster_config) - spark_config = _get_config(runtime_config) - if not _is_metastore_service_discovery(spark_config): - return cluster_config - - hive_metastore_uri = spark_config.get(SPARK_HIVE_METASTORE_URI_KEY) - if hive_metastore_uri: - # Metastore already configured - return cluster_config - - if has_runtime_in_cluster( - runtime_config, BUILT_IN_RUNTIME_METASTORE): - # There is a metastore - return cluster_config - - # There is service discovery to come here - hive_metastore_uri = discover_metastore( - spark_config, SPARK_METASTORE_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.CLUSTER) - if hive_metastore_uri: - spark_config = get_config_for_update( - runtime_config, BUILT_IN_RUNTIME_SPARK) - spark_config[SPARK_HIVE_METASTORE_URI_KEY] = hive_metastore_uri - return cluster_config - - def _config_runtime_resources(cluster_config: Dict[str, Any]) -> Dict[str, Any]: cluster_resource = _get_cluster_resources(cluster_config) worker_cpu = cluster_resource["worker_cpu"] @@ -381,16 +297,16 @@ def _configure(runtime_config, head: bool): hadoop_default_cluster = spark_config.get( "hadoop_default_cluster", False) if hadoop_default_cluster: - os.environ["HADOOP_DEFAULT_CLUSTER"] = get_string_value_for_env( + os.environ["HADOOP_DEFAULT_CLUSTER"] = get_env_string_value( hadoop_default_cluster) - hdfs_namenode_uri = spark_config.get(SPARK_HDFS_NAMENODE_URI_KEY) - if hdfs_namenode_uri: - os.environ["HDFS_NAMENODE_URI"] = hdfs_namenode_uri + hdfs_uri = spark_config.get(HDFS_URI_KEY) + if hdfs_uri: + os.environ["HDFS_NAMENODE_URI"] = hdfs_uri - hive_metastore_uri = spark_config.get(SPARK_HIVE_METASTORE_URI_KEY) - if hive_metastore_uri: - os.environ["HIVE_METASTORE_URI"] = hive_metastore_uri + metastore_uri = spark_config.get(METASTORE_URI_KEY) + if metastore_uri: + os.environ["HIVE_METASTORE_URI"] = metastore_uri def get_runtime_logs(): @@ -411,7 +327,7 @@ def _is_valid_storage_config(config: Dict[str, Any], final=False): return True # check if there is remote HDFS configured spark_config = _get_config(runtime_config) - if spark_config.get(SPARK_HDFS_NAMENODE_URI_KEY) is not None: + if spark_config.get(HDFS_URI_KEY) is not None: return True # Check any cloud storage is configured @@ -421,7 +337,7 @@ def _is_valid_storage_config(config: Dict[str, Any], final=False): return True # if there is service discovery mechanism, assume we can get from service discovery - if (not final and _is_hdfs_service_discovery(spark_config) and + if (not final and is_hdfs_service_discovery(spark_config) and get_service_discovery_runtime(runtime_config)): return True diff --git a/python/cloudtik/runtime/trino/utils.py b/python/cloudtik/runtime/trino/utils.py index 05d9745ca..1996c4b7d 100644 --- a/python/cloudtik/runtime/trino/utils.py +++ b/python/cloudtik/runtime/trino/utils.py @@ -6,11 +6,10 @@ from cloudtik.core._private.service_discovery.utils import get_canonical_service_name, define_runtime_service_on_head, \ get_service_discovery_config, SERVICE_DISCOVERY_FEATURE_ANALYTICS from cloudtik.core._private.utils import \ - get_node_type, get_resource_of_node_type, RUNTIME_CONFIG_KEY, get_node_type_config, get_config_for_update, \ - get_runtime_config + get_node_type, get_resource_of_node_type, get_runtime_config from cloudtik.runtime.common.service_discovery.cluster import has_runtime_in_cluster -from cloudtik.runtime.common.service_discovery.discovery import DiscoveryType -from cloudtik.runtime.common.service_discovery.runtime_discovery import discover_metastore +from cloudtik.runtime.common.service_discovery.runtime_discovery import \ + discover_metastore_from_workspace, discover_metastore_on_head, METASTORE_URI_KEY RUNTIME_PROCESSES = [ # The first element is the substring to filter. @@ -20,9 +19,6 @@ ["io.trino.server.TrinoServer", False, "TrinoServer", "node"], ] -TRINO_HIVE_METASTORE_URI_KEY = "hive_metastore_uri" -TRINO_METASTORE_SERVICE_SELECTOR_KEY = "metastore_service_selector" - JVM_MAX_MEMORY_RATIO = 0.8 QUERY_MAX_MEMORY_PER_NODE_RATIO = 0.5 MEMORY_HEAP_HEADROOM_PER_NODE_RATIO = 0.25 @@ -35,10 +31,6 @@ def _get_config(runtime_config: Dict[str, Any]): return runtime_config.get(BUILT_IN_RUNTIME_TRINO, {}) -def _is_metastore_service_discovery(trino_config): - return trino_config.get("metastore_service_discovery", True) - - def get_jvm_max_memory(total_memory): return int(total_memory * JVM_MAX_MEMORY_RATIO) @@ -52,54 +44,14 @@ def get_memory_heap_headroom_per_node(jvm_max_memory): def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]: - runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY) - trino_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_TRINO) - - # Check metastore - if (not trino_config.get(TRINO_HIVE_METASTORE_URI_KEY) and - not has_runtime_in_cluster( - runtime_config, BUILT_IN_RUNTIME_METASTORE)): - if _is_metastore_service_discovery(trino_config): - hive_metastore_uri = discover_metastore( - trino_config, TRINO_METASTORE_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.WORKSPACE) - if hive_metastore_uri: - trino_config[TRINO_HIVE_METASTORE_URI_KEY] = hive_metastore_uri - + cluster_config = discover_metastore_from_workspace( + cluster_config, BUILT_IN_RUNTIME_TRINO) return cluster_config def _prepare_config_on_head(cluster_config: Dict[str, Any]): - cluster_config = _discover_metastore_on_head(cluster_config) - return cluster_config - - -def _discover_metastore_on_head(cluster_config: Dict[str, Any]): - runtime_config = get_runtime_config(cluster_config) - trino_config = _get_config(runtime_config) - if not _is_metastore_service_discovery(trino_config): - return cluster_config - - hive_metastore_uri = trino_config.get(TRINO_HIVE_METASTORE_URI_KEY) - if hive_metastore_uri: - # Metastore already configured - return cluster_config - - if has_runtime_in_cluster( - runtime_config, BUILT_IN_RUNTIME_METASTORE): - # There is a metastore - return cluster_config - - # There is service discovery to come here - hive_metastore_uri = discover_metastore( - trino_config, TRINO_METASTORE_SERVICE_SELECTOR_KEY, - cluster_config=cluster_config, - discovery_type=DiscoveryType.CLUSTER) - if hive_metastore_uri: - trino_config = get_config_for_update( - runtime_config, BUILT_IN_RUNTIME_TRINO) - trino_config[TRINO_HIVE_METASTORE_URI_KEY] = hive_metastore_uri + cluster_config = discover_metastore_on_head( + cluster_config, BUILT_IN_RUNTIME_TRINO) return cluster_config @@ -123,7 +75,7 @@ def _with_runtime_environment_variables( runtime_config, config, provider, node_id: str): runtime_envs = {"TRINO_ENABLED": True} trino_config = _get_config(runtime_config) - cluster_runtime_config = config.get(RUNTIME_CONFIG_KEY) + cluster_runtime_config = get_runtime_config(config) if has_runtime_in_cluster( cluster_runtime_config, BUILT_IN_RUNTIME_METASTORE): @@ -140,9 +92,9 @@ def _configure(runtime_config, head: bool): # TODO: move more runtime specific environment_variables to here # only needed for applying head service discovery settings trino_config = _get_config(runtime_config) - hive_metastore_uri = trino_config.get(TRINO_HIVE_METASTORE_URI_KEY) - if hive_metastore_uri: - os.environ["HIVE_METASTORE_URI"] = hive_metastore_uri + metastore_uri = trino_config.get(METASTORE_URI_KEY) + if metastore_uri: + os.environ["HIVE_METASTORE_URI"] = metastore_uri def _get_runtime_logs(): diff --git a/python/cloudtik/runtime/zookeeper/utils.py b/python/cloudtik/runtime/zookeeper/utils.py index d25d4342d..3f8f0d608 100644 --- a/python/cloudtik/runtime/zookeeper/utils.py +++ b/python/cloudtik/runtime/zookeeper/utils.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List from cloudtik.core._private.constants import CLOUDTIK_RUNTIME_ENV_NODE_IP, CLOUDTIK_RUNTIME_ENV_NODE_SEQ_ID +from cloudtik.core._private.core_utils import get_address_string from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_ZOOKEEPER from cloudtik.core._private.runtime_utils import subscribe_runtime_config, RUNTIME_NODE_SEQ_ID, RUNTIME_NODE_IP, \ sort_nodes_by_seq_id @@ -211,7 +212,7 @@ def _request_member_add(endpoints, node_ip, seq_id): def _try_member_add(endpoint, zk_cli, server_to_add): # zkCli.sh -server existing_server_ip:2181 reconfig -add server.id=node_ip:2888:3888;2181 cmd = ["bash", zk_cli] - endpoints_str = "{}:{}".format( + endpoints_str = get_address_string( endpoint[RUNTIME_NODE_IP], ZOOKEEPER_SERVICE_PORT) cmd += ["-server", endpoints_str] cmd += ["reconfig", "-add"]