Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Runtime: database service discovery for MLflow service #1803

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions python/cloudtik/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,19 @@ def with_environment_variables(

def configure(
self, head: bool):
""" This method is called on every node as the first step of
executing runtime configure command.
After this configure method complete successfully, configure scripts are executed
within the context of this configure method such that the environment variables are
set in this method will be available in configure scripts
""" This method is called on every node as the first step of executing runtime
configure command. After this configure method complete successfully, configure
scripts are executed within the context of this configure method such that the
environment variables are set in this method will be available in configure scripts
"""
pass

def services(
self, command: str, head: bool):
""" This method is called on every node as the first step of executing runtime
services command. This method can be used either to do some real service start
or stop work, or be used to prepare some environments for the following services
scripts.
"""
pass

Expand Down
13 changes: 10 additions & 3 deletions python/cloudtik/runtime/ai/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cloudtik.core.node_provider import NodeProvider
from cloudtik.runtime.ai.utils import _with_runtime_environment_variables, \
_get_runtime_processes, _get_runtime_logs, _get_runtime_endpoints, register_service, _get_head_service_ports, \
_get_runtime_services, _prepare_config_on_head, _config_depended_services, _configure
_get_runtime_services, _prepare_config_on_head, _config_depended_services, _configure, _services
from cloudtik.runtime.common.runtime_base import RuntimeBase

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -42,11 +42,18 @@ def with_environment_variables(
self.runtime_config, config=config, provider=provider, node_id=node_id)

def configure(self, head: bool):
""" This method is called on every node as the first step of
executing runtime configure command.
""" This method is called on every node as the first step of executing runtime
configure command.
"""
_configure(self.runtime_config, head)

def services(
self, command: str, head: bool):
""" This method is called on every node as the first step of executing runtime
services command.
"""
_services(self.runtime_config, command, head)

def cluster_booting_completed(
self, cluster_config: Dict[str, Any], head_node_id: str) -> None:
register_service(cluster_config, head_node_id)
Expand Down
26 changes: 2 additions & 24 deletions python/cloudtik/runtime/ai/scripts/configure.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ function set_artifact_config_for_cloud_storage() {
}

function update_mlflow_server_config() {
if [ "${SQL_DATABASE}" == "true" ] && [ "$AI_WITH_SQL_DATABASE" != "false" ]; then
if [ "${SQL_DATABASE}" == "true" ] \
&& [ "$AI_WITH_SQL_DATABASE" != "false" ]; then
DATABASE_NAME=mlflow
CONNECTION_INFO=${SQL_DATABASE_USERNAME}:${SQL_DATABASE_PASSWORD}@${SQL_DATABASE_HOST}:${SQL_DATABASE_PORT}/${DATABASE_NAME}
if [ "${SQL_DATABASE_ENGINE}" == "mysql" ]; then
Expand Down Expand Up @@ -227,22 +228,6 @@ function patch_libraries() {
fi
}

function prepare_database_schema() {
DATABASE_NAME=mlflow
if [ "${SQL_DATABASE_ENGINE}" == "mysql" ]; then
mysql --host=${SQL_DATABASE_HOST} --port=${SQL_DATABASE_PORT} --user=${SQL_DATABASE_USERNAME} --password=${SQL_DATABASE_PASSWORD} -e "
CREATE DATABASE IF NOT EXISTS ${DATABASE_NAME};" > ${MLFLOW_HOME}/logs/configure.log
else
# Use psql to create the database
echo "SELECT 'CREATE DATABASE ${DATABASE_NAME}' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '${DATABASE_NAME}')\gexec" | PGPASSWORD=${SQL_DATABASE_PASSWORD} \
psql \
--host=${SQL_DATABASE_HOST} \
--port=${SQL_DATABASE_PORT} \
--username=${SQL_DATABASE_USERNAME} > ${MLFLOW_HOME}/logs/configure.log
fi
# Future improvement: mlflow db upgrade [db_uri]
}

function configure_ai() {
# Do necessary configurations for AI runtime
prepare_base_conf
Expand All @@ -256,13 +241,6 @@ function configure_ai() {

cp $output_dir/mlflow ${MLFLOW_CONF_DIR}/mlflow

if [ "$IS_HEAD_NODE" == "true" ]; then
# Preparing database if external database used
if [ "${SQL_DATABASE}" == "true" ] && [ "$AI_WITH_SQL_DATABASE" != "false" ]; then
prepare_database_schema
fi
fi

patch_libraries
}

Expand Down
24 changes: 24 additions & 0 deletions python/cloudtik/runtime/ai/scripts/schema-init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

function create_database_schema() {
DATABASE_NAME=mlflow
if [ "${SQL_DATABASE_ENGINE}" == "mysql" ]; then
mysql --host=${SQL_DATABASE_HOST} --port=${SQL_DATABASE_PORT} --user=${SQL_DATABASE_USERNAME} --password=${SQL_DATABASE_PASSWORD} -e "
CREATE DATABASE IF NOT EXISTS ${DATABASE_NAME};" > ${MLFLOW_HOME}/logs/configure.log
else
# Use psql to create the database
echo "SELECT 'CREATE DATABASE ${DATABASE_NAME}' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '${DATABASE_NAME}')\gexec" | PGPASSWORD=${SQL_DATABASE_PASSWORD} \
psql \
--host=${SQL_DATABASE_HOST} \
--port=${SQL_DATABASE_PORT} \
--username=${SQL_DATABASE_USERNAME} > ${MLFLOW_HOME}/logs/configure.log
fi
# Future improvement: mlflow db upgrade [db_uri]
}

function init_schema() {
DATABASE_NAME=hive_metastore
if [ "${SQL_DATABASE}" == "true" ] \
&& [ "$AI_WITH_SQL_DATABASE" != "false" ]; then
create_database_schema
fi
}
6 changes: 6 additions & 0 deletions python/cloudtik/runtime/ai/scripts/services.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ eval set -- "${args}"
# import util functions
. "$ROOT_DIR"/common/scripts/util-functions.sh

# schema initialization functions
. "$BIN_DIR"/schema-init.sh

USER_HOME=/home/$(whoami)
RUNTIME_PATH=$USER_HOME/runtime
MLFLOW_HOME=$RUNTIME_PATH/mlflow
Expand All @@ -25,6 +28,9 @@ start)
# Will set BACKEND_STORE_URI and DEFAULT_ARTIFACT_ROOT
. $MLFLOW_HOME/conf/mlflow

# do schema check and init
init_schema

# Start MLflow service
nohup mlflow server \
--backend-store-uri ${BACKEND_STORE_URI} \
Expand Down
29 changes: 23 additions & 6 deletions python/cloudtik/runtime/ai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from cloudtik.core._private.utils import export_runtime_flags
from cloudtik.runtime.common.service_discovery.runtime_discovery import discover_hdfs_on_head, \
discover_hdfs_from_workspace, HDFS_URI_KEY, discover_database_from_workspace, discover_database_on_head, \
DATABASE_CONNECT_KEY
DATABASE_CONNECT_KEY, get_database_runtime_in_cluster, export_database_runtime_environment_variables
from cloudtik.runtime.common.service_discovery.workspace import register_service_to_workspace
from cloudtik.runtime.common.utils import get_runtime_endpoints_of

Expand Down Expand Up @@ -65,6 +65,21 @@ def _with_runtime_environment_variables(
return runtime_envs


def _export_database_configurations(runtime_config):
ai_config = _get_config(runtime_config)
database_config = _get_database_config(ai_config)
if is_database_configured(database_config):
# set the database environments from database config
# This may override the environments from provider
export_database_environment_variables(database_config)
else:
database_runtime = get_database_runtime_in_cluster(
runtime_config)
if database_runtime:
export_database_runtime_environment_variables(
runtime_config, database_runtime)


def _configure(runtime_config, head: bool):
ai_config = _get_config(runtime_config)

Expand All @@ -78,11 +93,13 @@ def _configure(runtime_config, head: bool):
if hdfs_uri:
os.environ["HDFS_NAMENODE_URI"] = hdfs_uri

database_config = _get_database_config(ai_config)
if is_database_configured(database_config):
# set the database environments from database config
# This may override the environments from provider
export_database_environment_variables(database_config)
_export_database_configurations(runtime_config)


def _services(runtime_config, command: str, head: bool):
if command == "start":
# We put the database schema init right before the start of metastore service
_export_database_configurations(runtime_config)


def register_service(cluster_config: Dict[str, Any], head_node_id: str) -> None:
Expand Down
4 changes: 2 additions & 2 deletions python/cloudtik/runtime/flink/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def with_environment_variables(
self.runtime_config, config=config, provider=provider, node_id=node_id)

def configure(self, head: bool):
""" This method is called on every node as the first step of
executing runtime configure command.
""" This method is called on every node as the first step of executing runtime
configure command.
"""
_configure(self.runtime_config, head)

Expand Down
15 changes: 11 additions & 4 deletions python/cloudtik/runtime/metastore/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from cloudtik.runtime.common.runtime_base import RuntimeBase
from cloudtik.runtime.metastore.utils import _with_runtime_environment_variables, \
_get_runtime_processes, _get_runtime_logs, \
_get_runtime_endpoints, register_service, _get_head_service_ports, _get_runtime_services,\
_prepare_config_on_head, _config_depended_services, _configure
_get_runtime_endpoints, register_service, _get_head_service_ports, _get_runtime_services, \
_prepare_config_on_head, _config_depended_services, _configure, _services

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,11 +43,18 @@ def with_environment_variables(
self.runtime_config, config=config, provider=provider, node_id=node_id)

def configure(self, head: bool):
""" This method is called on every node as the first step of
executing runtime configure command.
""" This method is called on every node as the first step of executing runtime
configure command.
"""
_configure(self.runtime_config, head)

def services(
self, command: str, head: bool):
""" This method is called on every node as the first step of executing runtime
services command.
"""
_services(self.runtime_config, command, head)

def cluster_booting_completed(
self, cluster_config: Dict[str, Any], head_node_id: str) -> None:
register_service(cluster_config, head_node_id)
Expand Down
16 changes: 9 additions & 7 deletions python/cloudtik/runtime/metastore/scripts/schema-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ function init_schema() {
if [ "${SQL_DATABASE}" == "true" ] \
&& [ "$METASTORE_WITH_SQL_DATABASE" != "false" ]; then
DATABASE_ENGINE=${SQL_DATABASE_ENGINE}
DATABASE_ADDRESS=${SQL_DATABASE_HOST}:${SQL_DATABASE_PORT}
DATABASE_USER=${SQL_DATABASE_USERNAME}
DATABASE_PASSWORD=${SQL_DATABASE_PASSWORD}

Expand All @@ -46,18 +45,21 @@ function init_schema() {
else
# local mariadb
DATABASE_ENGINE="mysql"
DATABASE_ADDRESS=localhost
DATABASE_USER=hive
DATABASE_PASSWORD=hive

# Do we need wait a few seconds for mysql to startup?
# We may not need to create database as hive can create if it not exist
# create user (this is done only once)
sudo mysql -u root -e "
CREATE DATABASE IF NOT EXISTS ${DATABASE_NAME};
CREATE USER '${DATABASE_USER}'@localhost IDENTIFIED BY '${DATABASE_PASSWORD}';
GRANT ALL PRIVILEGES ON *.* TO '${DATABASE_USER}'@'localhost';
FLUSH PRIVILEGES;" > ${METASTORE_HOME}/logs/configure.log
DB_INIT_DIR=${METASTORE_HOME}/conf/db.init
if [ ! -d "${DB_INIT_DIR}" ]; then
sudo mysql -u root -e "
CREATE DATABASE IF NOT EXISTS ${DATABASE_NAME};
CREATE USER '${DATABASE_USER}'@localhost IDENTIFIED BY '${DATABASE_PASSWORD}';
GRANT ALL PRIVILEGES ON *.* TO '${DATABASE_USER}'@'localhost';
FLUSH PRIVILEGES;" > ${METASTORE_HOME}/logs/configure.log
mkdir -p ${DB_INIT_DIR}
fi
fi

# validate and initialize the metastore database schema (can be done multiple times)
Expand Down
1 change: 1 addition & 0 deletions python/cloudtik/runtime/metastore/scripts/services.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ eval set -- "${args}"
# import util functions
. "$ROOT_DIR"/common/scripts/util-functions.sh

# schema initialization functions
. "$BIN_DIR"/schema-init.sh

if [ ! -n "${METASTORE_HOME}" ]; then
Expand Down
12 changes: 11 additions & 1 deletion python/cloudtik/runtime/metastore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _with_runtime_environment_variables(
return runtime_envs


def _configure(runtime_config, head: bool):
def _export_database_configurations(runtime_config):
metastore_config = _get_config(runtime_config)
database_config = _get_database_config(metastore_config)
if is_database_configured(database_config):
Expand All @@ -75,6 +75,16 @@ def _configure(runtime_config, head: bool):
runtime_config, database_runtime)


def _configure(runtime_config, head: bool):
_export_database_configurations(runtime_config)


def _services(runtime_config, command: str, head: bool):
if command == "start":
# We put the database schema init right before the start of metastore service
_export_database_configurations(runtime_config)


def register_service(cluster_config: Dict[str, Any], head_node_id: str) -> None:
provider = _get_node_provider(
cluster_config["provider"], cluster_config["cluster_name"])
Expand Down
7 changes: 3 additions & 4 deletions python/cloudtik/runtime/presto/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ def with_environment_variables(
return _with_runtime_environment_variables(
self.runtime_config, config=config, provider=provider, node_id=node_id)

def configure(
self, head: bool):
""" This method is called on every node as the first step of
executing runtime configure command.
def configure(self, head: bool):
""" This method is called on every node as the first step of executing runtime
configure command.
"""
_configure(self.runtime_config, head)

Expand Down
4 changes: 2 additions & 2 deletions python/cloudtik/runtime/spark/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def with_environment_variables(
self.runtime_config, config=config, provider=provider, node_id=node_id)

def configure(self, head: bool):
""" This method is called on every node as the first step of
executing runtime configure command.
""" This method is called on every node as the first step of executing runtime
configure command.
"""
_configure(self.runtime_config, head)

Expand Down
4 changes: 2 additions & 2 deletions python/cloudtik/runtime/trino/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ def with_environment_variables(
self.runtime_config, config=config, provider=provider, node_id=node_id)

def configure(self, head: bool):
""" This method is called on every node as the first step of
executing runtime configure command.
""" This method is called on every node as the first step of executing runtime
configure command.
"""
_configure(self.runtime_config, head)

Expand Down
8 changes: 8 additions & 0 deletions python/cloudtik/scripts/runtime_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ def _run_runtime_configure(runtime_type, head: bool):
_runtime.configure(head)


def _run_runtime_services(runtime_type, command, head: bool):
runtime_config = get_runtime_config_from_node(head)
_runtime = _get_runtime(runtime_type, runtime_config)
_runtime.services(command, head)


@click.group(cls=NaturalOrderGroup)
def runtime():
"""
Expand Down Expand Up @@ -297,6 +303,8 @@ def configure(runtime, head, reverse, script_args):
"This flag reverse the order.")
@click.argument("script_args", nargs=-1)
def services(runtime, command, head, reverse, script_args):
_run_runtime_services(
runtime, command, head)
_run_runtime_script(
runtime, command, head, reverse,
script_args, RUNTIME_SERVICES_SCRIPT_NAME)
Expand Down