Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Runtime: clean up runtime config key using runtime name (#1776)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrychenhf authored Aug 16, 2023
1 parent 169cf7b commit 0e1cccb
Show file tree
Hide file tree
Showing 20 changed files with 71 additions and 95 deletions.
12 changes: 5 additions & 7 deletions python/cloudtik/runtime/ai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
["mlflow.server:app", False, "MLflow", "head"],
]

AI_RUNTIME_CONFIG_KEY = "ai"

MLFLOW_SERVICE_NAME = "mlflow"
MLFLOW_SERVICE_PORT = 5001


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(AI_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_AI, {})


def _get_runtime_processes():
Expand All @@ -36,7 +34,7 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i

ai_config = _get_config(runtime_config)
export_runtime_flags(
ai_config, AI_RUNTIME_CONFIG_KEY, runtime_envs)
ai_config, BUILT_IN_RUNTIME_AI, runtime_envs)

return runtime_envs

Expand All @@ -52,9 +50,9 @@ def register_service(cluster_config: Dict[str, Any], head_node_id: str) -> None:


def _get_runtime_logs():
mlflow_logs_dir = os.path.join(os.getenv("HOME"), "runtime", "mlflow", "logs")
all_logs = {"mlflow": mlflow_logs_dir
}
mlflow_logs_dir = os.path.join(
os.getenv("HOME"), "runtime", "mlflow", "logs")
all_logs = {"mlflow": mlflow_logs_dir}
return all_logs


Expand Down
5 changes: 2 additions & 3 deletions python/cloudtik/runtime/consul/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
["consul", True, "Consul", "node"],
]

CONSUL_RUNTIME_CONFIG_KEY = "consul"
CONFIG_KEY_JOIN_LIST = "join_list"
CONFIG_KEY_RPC_PORT = "rpc_port"
CONFIG_KEY_SERVICES = "services"
Expand All @@ -54,7 +53,7 @@


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(CONSUL_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_CONSUL, {})


def _get_runtime_processes():
Expand All @@ -69,7 +68,7 @@ def _is_agent_server_mode(runtime_config):

def _get_consul_config_for_update(cluster_config):
runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY)
return get_config_for_update(runtime_config, CONSUL_RUNTIME_CONFIG_KEY)
return get_config_for_update(runtime_config, BUILT_IN_RUNTIME_CONSUL)


def _get_cluster_name_tag(cluster_name):
Expand Down
6 changes: 2 additions & 4 deletions python/cloudtik/runtime/etcd/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
["etcd", True, "etcd", "worker"],
]

ETCD_RUNTIME_CONFIG_KEY = "etcd"

ETCD_SERVICE_NAME = "etcd"
ETCD_SERVICE_NAME = BUILT_IN_RUNTIME_ETCD
ETCD_SERVICE_PORT = 2379
ETCD_PEER_PORT = 2380


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(ETCD_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_ETCD, {})


def _get_home_dir():
Expand Down
11 changes: 5 additions & 6 deletions python/cloudtik/runtime/flink/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
["proc_nodemanager", False, "NodeManager", "worker"],
]

FLINK_RUNTIME_CONFIG_KEY = "flink"
FLINK_HDFS_NAMENODE_URI_KEY = "hdfs_namenode_uri"
FLINK_HIVE_METASTORE_URI_KEY = "hive_metastore_uri"
FLINK_HDFS_SERVICE_SELECTOR_KEY = "hdfs_service_selector"
Expand All @@ -53,12 +52,12 @@


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(FLINK_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_FLINK, {})


def get_yarn_resource_memory_ratio(cluster_config: Dict[str, Any]):
yarn_resource_memory_ratio = YARN_RESOURCE_MEMORY_RATIO
flink_config = cluster_config.get(RUNTIME_CONFIG_KEY, {}).get(FLINK_RUNTIME_CONFIG_KEY, {})
flink_config = cluster_config.get(RUNTIME_CONFIG_KEY, {}).get(BUILT_IN_RUNTIME_FLINK, {})
memory_ratio = flink_config.get("yarn_resource_memory_ratio")
if memory_ratio:
yarn_resource_memory_ratio = memory_ratio
Expand Down Expand Up @@ -120,7 +119,7 @@ def _get_cluster_resources(

def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY)
flink_config = get_config_for_update(runtime_config, FLINK_RUNTIME_CONFIG_KEY)
flink_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_FLINK)

# We now support co-existence of local HDFS and remote HDFS
# 1) Try to use local hdfs first;
Expand Down Expand Up @@ -186,7 +185,7 @@ def _config_runtime_resources(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
runtime_resource["flink_jobmanager_memory"] = get_flink_jobmanager_memory(worker_memory_for_flink)

runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY)
flink_config = get_config_for_update(runtime_config, FLINK_RUNTIME_CONFIG_KEY)
flink_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_FLINK)

flink_config["yarn_container_resource"] = container_resource
flink_config["flink_resource"] = runtime_resource
Expand Down Expand Up @@ -214,7 +213,7 @@ def _get_flink_config(config: Dict[str, Any]):
if not runtime:
return None

flink = runtime.get(FLINK_RUNTIME_CONFIG_KEY)
flink = runtime.get(BUILT_IN_RUNTIME_FLINK)
if not flink:
return None

Expand Down
6 changes: 2 additions & 4 deletions python/cloudtik/runtime/grafana/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from cloudtik.core._private.constants import CLOUDTIK_RUNTIME_ENV_CLUSTER
from cloudtik.core._private.core_utils import exec_with_output
from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_PROMETHEUS
from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_PROMETHEUS, BUILT_IN_RUNTIME_GRAFANA
from cloudtik.core._private.runtime_utils import get_runtime_config_from_node, get_runtime_value, get_runtime_head_ip, \
save_yaml, get_runtime_node_ip
from cloudtik.core._private.service_discovery.runtime_services import get_service_discovery_runtime, \
Expand All @@ -22,8 +22,6 @@
["grafana", True, "Grafana", "node"],
]


GRAFANA_RUNTIME_CONFIG_KEY = "grafana"
GRAFANA_SERVICE_PORT_CONFIG_KEY = "port"
GRAFANA_HIGH_AVAILABILITY_CONFIG_KEY = "high_availability"
GRAFANA_DATA_SOURCES_SCOPE_CONFIG_KEY = "data_sources_scope"
Expand Down Expand Up @@ -51,7 +49,7 @@ def get_data_source_name(service_name, cluster_name):


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(GRAFANA_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_GRAFANA, {})


def _get_service_port(grafana_config: Dict[str, Any]):
Expand Down
7 changes: 3 additions & 4 deletions python/cloudtik/runtime/haproxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
["haproxy", True, "HAProxy", "node"],
]

HAPROXY_RUNTIME_CONFIG_KEY = "haproxy"
HAPROXY_SERVICE_PORT_CONFIG_KEY = "port"
HAPROXY_SERVICE_PROTOCOL_CONFIG_KEY = "protocol"
HAPROXY_HIGH_AVAILABILITY_CONFIG_KEY = "high_availability"
Expand All @@ -41,7 +40,7 @@
HAPROXY_BACKEND_SELECTOR_CONFIG_KEY = "selector"
HAPROXY_BACKEND_SESSION_PERSISTENCE_CONFIG_KEY = "session_persistence"

HAPROXY_SERVICE_NAME = "haproxy"
HAPROXY_SERVICE_NAME = BUILT_IN_RUNTIME_HAPROXY
HAPROXY_SERVICE_PORT_DEFAULT = 80
HAPROXY_SERVICE_PROTOCOL_TCP = "tcp"
HAPROXY_SERVICE_PROTOCOL_HTTP = "http"
Expand Down Expand Up @@ -103,7 +102,7 @@ def get_default_server_name(server_id):


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(HAPROXY_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_HAPROXY, {})


def _get_service_port(haproxy_config: Dict[str, Any]):
Expand Down Expand Up @@ -138,7 +137,7 @@ def _is_high_availability(haproxy_config: Dict[str, Any]):

def _get_home_dir():
return os.path.join(
os.getenv("HOME"), "runtime", HAPROXY_RUNTIME_CONFIG_KEY)
os.getenv("HOME"), "runtime", BUILT_IN_RUNTIME_HAPROXY)


def _get_runtime_processes():
Expand Down
4 changes: 1 addition & 3 deletions python/cloudtik/runtime/hdfs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
["proc_datanode", False, "DataNode", "worker"],
]

HDFS_RUNTIME_CONFIG_KEY = "hdfs"

HDFS_WEB_PORT = 9870

HDFS_SERVICE_NAME = "hdfs"
HDFS_SERVICE_PORT = 9000


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(HDFS_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_HDFS, {})


def register_service(
Expand Down
15 changes: 7 additions & 8 deletions python/cloudtik/runtime/kafka/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from typing import Any, Dict

from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_ZOOKEEPER
from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_ZOOKEEPER, BUILT_IN_RUNTIME_KAFKA
from cloudtik.core._private.runtime_utils import subscribe_runtime_config
from cloudtik.core._private.service_discovery.utils import get_canonical_service_name, define_runtime_service_on_worker, \
get_service_discovery_config
Expand All @@ -20,21 +20,20 @@
["kafka.Kafka", False, "KafkaBroker", "node"],
]

KAFKA_RUNTIME_CONFIG_KEY = "kafka"
KAFKA_ZOOKEEPER_CONNECT_KEY = "zookeeper_connect"
KAFKA_ZOOKEEPER_SERVICE_SELECTOR_KEY = "zookeeper_service_selector"

KAFKA_SERVICE_NAME = "kafka"
KAFKA_SERVICE_NAME = BUILT_IN_RUNTIME_KAFKA
KAFKA_SERVICE_PORT = 9092


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(KAFKA_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_KAFKA, {})


def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
runtime_config = get_config_for_update(cluster_config, RUNTIME_CONFIG_KEY)
kafka_config = get_config_for_update(runtime_config, KAFKA_RUNTIME_CONFIG_KEY)
kafka_config = get_config_for_update(runtime_config, BUILT_IN_RUNTIME_KAFKA)

# Check zookeeper
if not is_runtime_enabled(runtime_config, BUILT_IN_RUNTIME_ZOOKEEPER):
Expand Down Expand Up @@ -71,8 +70,8 @@ def _validate_config(config: Dict[str, Any]):
# Check zookeeper connect configured
runtime_config = config.get(RUNTIME_CONFIG_KEY)
if (runtime_config is None) or (
KAFKA_RUNTIME_CONFIG_KEY not in runtime_config) or (
KAFKA_ZOOKEEPER_CONNECT_KEY not in runtime_config[KAFKA_RUNTIME_CONFIG_KEY]):
BUILT_IN_RUNTIME_KAFKA not in runtime_config) or (
KAFKA_ZOOKEEPER_CONNECT_KEY not in runtime_config[BUILT_IN_RUNTIME_KAFKA]):
raise ValueError("Zookeeper connect must be configured!")
# TODO: dynamic discover zookeeper through service discovery

Expand All @@ -86,7 +85,7 @@ def _get_zookeeper_connect(runtime_config):
if runtime_config is None:
return None

kafka_config = runtime_config.get(KAFKA_RUNTIME_CONFIG_KEY)
kafka_config = runtime_config.get(BUILT_IN_RUNTIME_KAFKA)
if kafka_config is None:
return None

Expand Down
8 changes: 3 additions & 5 deletions python/cloudtik/runtime/metastore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
["mysql", False, "MySQL", "head"],
]

METASTORE_RUNTIME_CONFIG_KEY = "metastore"

METASTORE_SERVICE_NAME = "metastore"
METASTORE_SERVICE_NAME = BUILT_IN_RUNTIME_METASTORE
METASTORE_SERVICE_PORT = 9083


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(METASTORE_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_METASTORE, {})


def _get_runtime_processes():
Expand All @@ -36,7 +34,7 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i

metastore_config = _get_config(runtime_config)
export_runtime_flags(
metastore_config, METASTORE_RUNTIME_CONFIG_KEY, runtime_envs)
metastore_config, BUILT_IN_RUNTIME_METASTORE, runtime_envs)
return runtime_envs


Expand Down
9 changes: 4 additions & 5 deletions python/cloudtik/runtime/mysql/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from typing import Any, Dict

from cloudtik.core._private.runtime_factory import BUILT_IN_RUNTIME_MYSQL
from cloudtik.core._private.service_discovery.utils import \
get_canonical_service_name, define_runtime_service, \
get_service_discovery_config
Expand All @@ -14,8 +15,6 @@
["mysqld", True, "MySQL", "node"],
]


MYSQL_RUNTIME_CONFIG_KEY = "mysql"
MYSQL_SERVICE_PORT_CONFIG_KEY = "port"

MYSQL_HIGH_AVAILABILITY_CONFIG_KEY = "high_availability"
Expand All @@ -26,14 +25,14 @@
MYSQL_DATABASE_USER_CONFIG_KEY = "user"
MYSQL_DATABASE_PASSWORD_CONFIG_KEY = "password"

MYSQL_SERVICE_NAME = "mysql"
MYSQL_SERVICE_NAME = BUILT_IN_RUNTIME_MYSQL
MYSQL_SERVICE_PORT_DEFAULT = 3306

MYSQL_ROOT_PASSWORD_DEFAULT = "cloudtik"


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(MYSQL_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_MYSQL, {})


def _get_service_port(mysql_config: Dict[str, Any]):
Expand All @@ -43,7 +42,7 @@ def _get_service_port(mysql_config: Dict[str, Any]):

def _get_home_dir():
return os.path.join(
os.getenv("HOME"), "runtime", MYSQL_RUNTIME_CONFIG_KEY)
os.getenv("HOME"), "runtime", BUILT_IN_RUNTIME_MYSQL)


def _get_runtime_processes():
Expand Down
7 changes: 3 additions & 4 deletions python/cloudtik/runtime/nginx/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
["nginx", True, "NGINX", "node"],
]

NGINX_RUNTIME_CONFIG_KEY = "nginx"
NGINX_SERVICE_PORT_CONFIG_KEY = "port"

NGINX_HIGH_AVAILABILITY_CONFIG_KEY = "high_availability"
Expand All @@ -37,7 +36,7 @@
NGINX_BACKEND_SERVERS_CONFIG_KEY = "servers"
NGINX_BACKEND_SELECTOR_CONFIG_KEY = "selector"

NGINX_SERVICE_NAME = "nginx"
NGINX_SERVICE_NAME = BUILT_IN_RUNTIME_NGINX
NGINX_SERVICE_PORT_DEFAULT = 80

NGINX_APP_MODE_WEB = "web"
Expand All @@ -60,7 +59,7 @@


def _get_config(runtime_config: Dict[str, Any]):
return runtime_config.get(NGINX_RUNTIME_CONFIG_KEY, {})
return runtime_config.get(BUILT_IN_RUNTIME_NGINX, {})


def _get_service_port(runtime_config: Dict[str, Any]):
Expand All @@ -86,7 +85,7 @@ def _is_high_availability(nginx_config: Dict[str, Any]):

def _get_home_dir():
return os.path.join(
os.getenv("HOME"), "runtime", NGINX_RUNTIME_CONFIG_KEY)
os.getenv("HOME"), "runtime", BUILT_IN_RUNTIME_NGINX)


def _get_runtime_processes():
Expand Down
Loading

0 comments on commit 0e1cccb

Please sign in to comment.