Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Runtime: clean up runtime config key using runtime name #1776

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions python/cloudtik/runtime/ai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
["mlflow.server:app", False, "MLflow", "head"],
]

AI_RUNTIME_CONFIG_KEY = "ai"

MLFLOW_SERVICE_NAME = "mlflow"
MLFLOW_SERVICE_PORT = 5001


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(AI_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_AI, {})


def _get_runtime_processes():
Expand All @@ -36,7 +34,7 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i

ai_config = _get_config(runtime_config)
export_runtime_flags(
ai_config, AI_RUNTIME_CONFIG_KEY, runtime_envs)
ai_config, BUILT_IN_RUNTIME_AI, runtime_envs)

return runtime_envs

Expand All @@ -52,9 +50,9 @@ def register_service(cluster_config: Dict[str, Any], head_node_id: str) -> None:


def _get_runtime_logs():
mlflow_logs_dir = os.path.join(os.getenv("HOME"), "runtime", "mlflow", "logs")
all_logs = {"mlflow": mlflow_logs_dir
}
mlflow_logs_dir = os.path.join(
os.getenv("HOME"), "runtime", "mlflow", "logs")
all_logs = {"mlflow": mlflow_logs_dir}
return all_logs


Expand Down
5 changes: 2 additions & 3 deletions python/cloudtik/runtime/consul/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
["consul", True, "Consul", "node"],
]

CONSUL_RUNTIME_CONFIG_KEY = "consul"
CONFIG_KEY_JOIN_LIST = "join_list"
CONFIG_KEY_RPC_PORT = "rpc_port"
CONFIG_KEY_SERVICES = "services"
Expand All @@ -54,7 +53,7 @@


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(CONSUL_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_CONSUL, {})


def _get_runtime_processes():
Expand All @@ -69,7 +68,7 @@ def _is_agent_server_mode(runtime_config):

def _get_consul_config_for_update(cluster_config):
runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY)
return get_config_for_update(runtime_config, CONSUL_RUNTIME_CONFIG_KEY)
return get_config_for_update(runtime_config, BUILT_IN_RUNTIME_CONSUL)


def _get_cluster_name_tag(cluster_name):
Expand Down
6 changes: 2 additions & 4 deletions python/cloudtik/runtime/etcd/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
["etcd", True, "etcd", "worker"],
]

ETCD_RUNTIME_CONFIG_KEY = "etcd"

ETCD_SERVICE_NAME = "etcd"
ETCD_SERVICE_NAME = BUILT_IN_RUNTIME_ETCD
ETCD_SERVICE_PORT = 2379
ETCD_PEER_PORT = 2380


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(ETCD_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_ETCD, {})


def _get_home_dir():
Expand Down
11 changes: 5 additions & 6 deletions python/cloudtik/runtime/flink/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
["proc_nodemanager", False, "NodeManager", "worker"],
]

FLINK_RUNTIME_CONFIG_KEY = "flink"
FLINK_HDFS_NAMENODE_URI_KEY = "hdfs_namenode_uri"
FLINK_HIVE_METASTORE_URI_KEY = "hive_metastore_uri"
FLINK_HDFS_SERVICE_SELECTOR_KEY = "hdfs_service_selector"
Expand All @@ -53,12 +52,12 @@


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(FLINK_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_FLINK, {})


def get_yarn_resource_memory_ratio(cluster_config: Dict[str, Any]):
yarn_resource_memory_ratio = YARN_RESOURCE_MEMORY_RATIO
flink_config = cluster_config.get(RUNTIME_CONFIG_KEY, {}).get(FLINK_RUNTIME_CONFIG_KEY, {})
flink_config = cluster_config.get(RUNTIME_CONFIG_KEY, {}).get(BUILT_IN_RUNTIME_FLINK, {})
memory_ratio = flink_config.get("yarn_resource_memory_ratio")
if memory_ratio:
yarn_resource_memory_ratio = memory_ratio
Expand Down Expand Up @@ -120,7 +119,7 @@ def _get_cluster_resources(

def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY)
flink_config = get_config_for_update(runtime_config, FLINK_RUNTIME_CONFIG_KEY)
flink_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_FLINK)

# We now support co-existence of local HDFS and remote HDFS
# 1) Try to use local hdfs first;
Expand Down Expand Up @@ -186,7 +185,7 @@ def _config_runtime_resources(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
runtime_resource["flink_jobmanager_memory"] = get_flink_jobmanager_memory(worker_memory_for_flink)

runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY)
flink_config = get_config_for_update(runtime_config, FLINK_RUNTIME_CONFIG_KEY)
flink_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_FLINK)

flink_config["yarn_container_resource"] = container_resource
flink_config["flink_resource"] = runtime_resource
Expand Down Expand Up @@ -214,7 +213,7 @@ def _get_flink_config(config: Dict[str, Any]):
if not runtime:
return None

flink = runtime.get(FLINK_RUNTIME_CONFIG_KEY)
flink = runtime.get(BUILT_IN_RUNTIME_FLINK)
if not flink:
return None

Expand Down
6 changes: 2 additions & 4 deletions python/cloudtik/runtime/grafana/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from cloudtik.core._private.constants import CLOUDTIK_RUNTIME_ENV_CLUSTER
from cloudtik.core._private.core_utils import exec_with_output
from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_PROMETHEUS
from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_PROMETHEUS, BUILT_IN_RUNTIME_GRAFANA
from cloudtik.core._private.runtime_utils import get_runtime_config_from_node, get_runtime_value, get_runtime_head_ip, \
save_yaml, get_runtime_node_ip
from cloudtik.core._private.service_discovery.runtime_services import get_service_discovery_runtime, \
Expand All @@ -22,8 +22,6 @@
["grafana", True, "Grafana", "node"],
]


GRAFANA_RUNTIME_CONFIG_KEY = "grafana"
GRAFANA_SERVICE_PORT_CONFIG_KEY = "port"
GRAFANA_HIGH_AVAILABILITY_CONFIG_KEY = "high_availability"
GRAFANA_DATA_SOURCES_SCOPE_CONFIG_KEY = "data_sources_scope"
Expand Down Expand Up @@ -51,7 +49,7 @@ def get_data_source_name(service_name, cluster_name):


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(GRAFANA_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_GRAFANA, {})


def _get_service_port(grafana_config: Dict[str, Any]):
Expand Down
7 changes: 3 additions & 4 deletions python/cloudtik/runtime/haproxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
["haproxy", True, "HAProxy", "node"],
]

HAPROXY_RUNTIME_CONFIG_KEY = "haproxy"
HAPROXY_SERVICE_PORT_CONFIG_KEY = "port"
HAPROXY_SERVICE_PROTOCOL_CONFIG_KEY = "protocol"
HAPROXY_HIGH_AVAILABILITY_CONFIG_KEY = "high_availability"
Expand All @@ -41,7 +40,7 @@
HAPROXY_BACKEND_SELECTOR_CONFIG_KEY = "selector"
HAPROXY_BACKEND_SESSION_PERSISTENCE_CONFIG_KEY = "session_persistence"

HAPROXY_SERVICE_NAME = "haproxy"
HAPROXY_SERVICE_NAME = BUILT_IN_RUNTIME_HAPROXY
HAPROXY_SERVICE_PORT_DEFAULT = 80
HAPROXY_SERVICE_PROTOCOL_TCP = "tcp"
HAPROXY_SERVICE_PROTOCOL_HTTP = "http"
Expand Down Expand Up @@ -103,7 +102,7 @@ def get_default_server_name(server_id):


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(HAPROXY_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_HAPROXY, {})


def _get_service_port(haproxy_config: Dict[str, Any]):
Expand Down Expand Up @@ -138,7 +137,7 @@ def _is_high_availability(haproxy_config: Dict[str, Any]):

def _get_home_dir():
return os.path.join(
os.getenv("HOME"), "runtime", HAPROXY_RUNTIME_CONFIG_KEY)
os.getenv("HOME"), "runtime", BUILT_IN_RUNTIME_HAPROXY)


def _get_runtime_processes():
Expand Down
4 changes: 1 addition & 3 deletions python/cloudtik/runtime/hdfs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
["proc_datanode", False, "DataNode", "worker"],
]

HDFS_RUNTIME_CONFIG_KEY = "hdfs"

HDFS_WEB_PORT = 9870

HDFS_SERVICE_NAME = "hdfs"
HDFS_SERVICE_PORT = 9000


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(HDFS_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_HDFS, {})


def register_service(
Expand Down
15 changes: 7 additions & 8 deletions python/cloudtik/runtime/kafka/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from typing import Any, Dict

from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_ZOOKEEPER
from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_ZOOKEEPER, BUILT_IN_RUNTIME_KAFKA
from cloudtik.core._private.runtime_utils import subscribe_runtime_config
from cloudtik.core._private.service_discovery.utils import get_canonical_service_name, define_runtime_service_on_worker, \
get_service_discovery_config
Expand All @@ -20,21 +20,20 @@
["kafka.Kafka", False, "KafkaBroker", "node"],
]

KAFKA_RUNTIME_CONFIG_KEY = "kafka"
KAFKA_ZOOKEEPER_CONNECT_KEY = "zookeeper_connect"
KAFKA_ZOOKEEPER_SERVICE_SELECTOR_KEY = "zookeeper_service_selector"

KAFKA_SERVICE_NAME = "kafka"
KAFKA_SERVICE_NAME = BUILT_IN_RUNTIME_KAFKA
KAFKA_SERVICE_PORT = 9092


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(KAFKA_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_KAFKA, {})


def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY)
kafka_config = get_config_for_update(runtime_config, KAFKA_RUNTIME_CONFIG_KEY)
kafka_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_KAFKA)

# Check zookeeper
if not is_runtime_enabled(runtime_config, BUILT_IN_RUNTIME_ZOOKEEPER):
Expand Down Expand Up @@ -71,8 +70,8 @@ def _validate_config(config: Dict[str, Any]):
# Check zookeeper connect configured
runtime_config = config.get(RUNTIME_CONFIG_KEY)
if (runtime_config is None) or (
KAFKA_RUNTIME_CONFIG_KEY not in runtime_config) or (
KAFKA_ZOOKEEPER_CONNECT_KEY not in runtime_config[KAFKA_RUNTIME_CONFIG_KEY]):
BUILT_IN_RUNTIME_KAFKA not in runtime_config) or (
KAFKA_ZOOKEEPER_CONNECT_KEY not in runtime_config[BUILT_IN_RUNTIME_KAFKA]):
raise ValueError("Zookeeper connect must be configured!")
# TODO: dynamic discover zookeeper through service discovery

Expand All @@ -86,7 +85,7 @@ def _get_zookeeper_connect(runtime_config):
if runtime_config is None:
return None

kafka_config = runtime_config.get(KAFKA_RUNTIME_CONFIG_KEY)
kafka_config = runtime_config.get(BUILT_IN_RUNTIME_KAFKA)
if kafka_config is None:
return None

Expand Down
8 changes: 3 additions & 5 deletions python/cloudtik/runtime/metastore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
["mysql", False, "MySQL", "head"],
]

METASTORE_RUNTIME_CONFIG_KEY = "metastore"

METASTORE_SERVICE_NAME = "metastore"
METASTORE_SERVICE_NAME = BUILT_IN_RUNTIME_METASTORE
METASTORE_SERVICE_PORT = 9083


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(METASTORE_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_METASTORE, {})


def _get_runtime_processes():
Expand All @@ -36,7 +34,7 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i

metastore_config = _get_config(runtime_config)
export_runtime_flags(
metastore_config, METASTORE_RUNTIME_CONFIG_KEY, runtime_envs)
metastore_config, BUILT_IN_RUNTIME_METASTORE, runtime_envs)
return runtime_envs


Expand Down
9 changes: 4 additions & 5 deletions python/cloudtik/runtime/mysql/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from typing import Any, Dict

from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_MYSQL
from cloudtik.core._private.service_discovery.utils import \
get_canonical_service_name, define_runtime_service, \
get_service_discovery_config
Expand All @@ -14,8 +15,6 @@
["mysqld", True, "MySQL", "node"],
]


MYSQL_RUNTIME_CONFIG_KEY = "mysql"
MYSQL_SERVICE_PORT_CONFIG_KEY = "port"

MYSQL_HIGH_AVAILABILITY_CONFIG_KEY = "high_availability"
Expand All @@ -26,14 +25,14 @@
MYSQL_DATABASE_USER_CONFIG_KEY = "user"
MYSQL_DATABASE_PASSWORD_CONFIG_KEY = "password"

MYSQL_SERVICE_NAME = "mysql"
MYSQL_SERVICE_NAME = BUILT_IN_RUNTIME_MYSQL
MYSQL_SERVICE_PORT_DEFAULT = 3306

MYSQL_ROOT_PASSWORD_DEFAULT = "cloudtik"


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(MYSQL_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_MYSQL, {})


def _get_service_port(mysql_config: Dict[str, Any]):
Expand All @@ -43,7 +42,7 @@ def _get_service_port(mysql_config: Dict[str, Any]):

def _get_home_dir():
return os.path.join(
os.getenv("HOME"), "runtime", MYSQL_RUNTIME_CONFIG_KEY)
os.getenv("HOME"), "runtime", BUILT_IN_RUNTIME_MYSQL)


def _get_runtime_processes():
Expand Down
7 changes: 3 additions & 4 deletions python/cloudtik/runtime/nginx/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
["nginx", True, "NGINX", "node"],
]

NGINX_RUNTIME_CONFIG_KEY = "nginx"
NGINX_SERVICE_PORT_CONFIG_KEY = "port"

NGINX_HIGH_AVAILABILITY_CONFIG_KEY = "high_availability"
Expand All @@ -37,7 +36,7 @@
NGINX_BACKEND_SERVERS_CONFIG_KEY = "servers"
NGINX_BACKEND_SELECTOR_CONFIG_KEY = "selector"

NGINX_SERVICE_NAME = "nginx"
NGINX_SERVICE_NAME = BUILT_IN_RUNTIME_NGINX
NGINX_SERVICE_PORT_DEFAULT = 80

NGINX_APP_MODE_WEB = "web"
Expand All @@ -60,7 +59,7 @@


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(NGINX_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_NGINX, {})


def _get_service_port(runtime_config: Dict[str, Any]):
Expand All @@ -86,7 +85,7 @@ def _is_high_availability(nginx_config: Dict[str, Any]):

def _get_home_dir():
return os.path.join(
os.getenv("HOME"), "runtime", NGINX_RUNTIME_CONFIG_KEY)
os.getenv("HOME"), "runtime", BUILT_IN_RUNTIME_NGINX)


def _get_runtime_processes():
Expand Down
Loading