Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Spark: validate storage config consider the case of local remote HDFS (
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrychenhf authored Jul 5, 2023
1 parent 77046db commit 23e7175
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 84 deletions.
25 changes: 25 additions & 0 deletions python/cloudtik/core/config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,11 @@
}
}
},
"auto_detect_hdfs": {
"type": "boolean",
"description": "Whether to auto detect and use HDFS service in the same workspace.",
"default": false
},
"hdfs_namenode_uri": {
"type": "string",
"description": "HDFS service endpoint if Spark need to access HDFS."
Expand All @@ -998,6 +1003,11 @@
"type": "string",
"description": "HDFS mount method: nfs or fuse. Default fuse if not specified."
},
"auto_detect_metastore": {
"type": "boolean",
"description": "Whether to auto detect and use metastore service in the same workspace.",
"default": false
},
"hive_metastore_uri": {
"type": "string",
"description": "Metastore service endpoint for Spark to use."
Expand Down Expand Up @@ -1108,10 +1118,20 @@
}
}
},
"auto_detect_hdfs": {
"type": "boolean",
"description": "Whether to auto detect and use HDFS service in the same workspace.",
"default": false
},
"hdfs_namenode_uri": {
"type": "string",
"description": "HDFS service endpoint if Flink need to access HDFS."
},
"auto_detect_metastore": {
"type": "boolean",
"description": "Whether to auto detect and use metastore service in the same workspace.",
"default": false
},
"hive_metastore_uri": {
"type": "string",
"description": "Metastore service endpoint for Flink to use."
Expand Down Expand Up @@ -1165,6 +1185,11 @@
"type": "object",
"description": "Presto catalogs to be configured."
},
"auto_detect_metastore": {
"type": "boolean",
"description": "Whether to auto detect and use metastore service in the same workspace.",
"default": false
},
"hive_metastore_uri": {
"type": "string",
"description": "Metastore service endpoint for Presto to use."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@
<description>Aliyun OSS endpoint to connect to. </description>
</property>
{%hadoop.credential.property%}
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
Expand Down
8 changes: 0 additions & 8 deletions python/cloudtik/runtime/common/conf/hadoop/aws/core-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,6 @@
<value>{%fs.s3a.access.key%}</value>
</property>
{%hadoop.credential.property%}
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@
<value>OAuth</value>
</property>
{%hadoop.credential.property%}
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
Expand Down
8 changes: 0 additions & 8 deletions python/cloudtik/runtime/common/conf/hadoop/core-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@
<name>fs.defaultFS</name>
<value>{%fs.default.name%}</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
Expand Down
8 changes: 0 additions & 8 deletions python/cloudtik/runtime/common/conf/hadoop/gcp/core-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,6 @@
<name>fs.gs.working.dir</name>
<value>/</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@
</property>
{%fs.obs.security.provider.property%}
{%hadoop.credential.property%}
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
Expand Down
42 changes: 26 additions & 16 deletions python/cloudtik/runtime/flink/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
]

FLINK_RUNTIME_CONFIG_KEY = "flink"
FLINK_HDFS_NAMENODE_URI_KEY = "hdfs_namenode_uri"
FLINK_HIVE_METASTORE_URI_KEY = "hive_metastore_uri"

YARN_RESOURCE_MEMORY_RATIO = 0.8

Expand Down Expand Up @@ -120,20 +122,20 @@ def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
# 2) Try to use defined hdfs_namenode_uri;
# 3) If subscribed_hdfs_namenode_uri=true,try to subscribe global variables to find remote hdfs_namenode_uri

if not is_runtime_enabled(runtime_config, "hdfs"):
if flink_config.get("hdfs_namenode_uri") is None:
if not is_runtime_enabled(runtime_config, BUILT_IN_RUNTIME_HDFS):
if flink_config.get(FLINK_HDFS_NAMENODE_URI_KEY) is None:
if flink_config.get("auto_detect_hdfs", False):
hdfs_namenode_uri = global_variables.get("hdfs-namenode-uri")
if hdfs_namenode_uri is not None:
flink_config["hdfs_namenode_uri"] = hdfs_namenode_uri
flink_config[FLINK_HDFS_NAMENODE_URI_KEY] = hdfs_namenode_uri

# Check metastore
if not is_runtime_enabled(runtime_config, "metastore"):
if flink_config.get("hive_metastore_uri") is None:
if not is_runtime_enabled(runtime_config, BUILT_IN_RUNTIME_METASTORE):
if flink_config.get(FLINK_HIVE_METASTORE_URI_KEY) is None:
if flink_config.get("auto_detect_metastore", True):
hive_metastore_uri = global_variables.get("hive-metastore-uri")
if hive_metastore_uri is not None:
flink_config["hive_metastore_uri"] = hive_metastore_uri
flink_config[FLINK_HIVE_METASTORE_URI_KEY] = hive_metastore_uri

return cluster_config

Expand Down Expand Up @@ -254,8 +256,8 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i
if is_runtime_enabled(cluster_runtime_config, BUILT_IN_RUNTIME_HDFS):
runtime_envs["HDFS_ENABLED"] = True
else:
if flink_config.get("hdfs_namenode_uri") is not None:
runtime_envs["HDFS_NAMENODE_URI"] = flink_config.get("hdfs_namenode_uri")
if flink_config.get(FLINK_HDFS_NAMENODE_URI_KEY) is not None:
runtime_envs["HDFS_NAMENODE_URI"] = flink_config.get(FLINK_HDFS_NAMENODE_URI_KEY)

# We always export the cloud storage even for remote HDFS case
node_type_config = get_node_type_config(config, provider, node_id)
Expand All @@ -266,8 +268,8 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i
# 2) Try to use defined metastore_uri;
if is_runtime_enabled(cluster_runtime_config, BUILT_IN_RUNTIME_METASTORE):
runtime_envs["METASTORE_ENABLED"] = True
elif flink_config.get("hive_metastore_uri") is not None:
runtime_envs["HIVE_METASTORE_URI"] = flink_config.get("hive_metastore_uri")
elif flink_config.get(FLINK_HIVE_METASTORE_URI_KEY) is not None:
runtime_envs["HIVE_METASTORE_URI"] = flink_config.get(FLINK_HIVE_METASTORE_URI_KEY)
return runtime_envs


Expand All @@ -283,13 +285,21 @@ def get_runtime_logs():


def _validate_config(config: Dict[str, Any]):
runtime_config = config.get(RUNTIME_CONFIG_KEY)

# if HDFS enabled, we ignore the cloud storage configurations
if not is_runtime_enabled(config.get(RUNTIME_CONFIG_KEY), "hdfs"):
# Check any cloud storage is configured
provider_config = config["provider"]
if ("storage" not in provider_config) and \
not is_use_managed_cloud_storage(config):
raise ValueError("No storage configuration found for Flink.")
if is_runtime_enabled(runtime_config, BUILT_IN_RUNTIME_HDFS):
return
# check if there is remote HDFS configured
flink_config = runtime_config.get(FLINK_RUNTIME_CONFIG_KEY, {})
if flink_config.get(FLINK_HDFS_NAMENODE_URI_KEY) is not None:
return

# Check any cloud storage is configured
provider_config = config["provider"]
if ("storage" not in provider_config) and \
not is_use_managed_cloud_storage(config):
raise ValueError("No storage configuration found for Flink.")


def _get_runtime_services(cluster_head_ip):
Expand Down
9 changes: 5 additions & 4 deletions python/cloudtik/runtime/presto/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
]

PRESTO_RUNTIME_CONFIG_KEY = "presto"
PRESTO_HIVE_METASTORE_URI_KEY = "hive_metastore_uri"

JVM_MAX_MEMORY_RATIO = 0.8
QUERY_MAX_MEMORY_PER_NODE_RATIO = 0.5
Expand Down Expand Up @@ -54,11 +55,11 @@ def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]:

# Check metastore
if not is_runtime_enabled(runtime_config, "metastore"):
if presto_config.get("hive_metastore_uri") is None:
if presto_config.get(PRESTO_HIVE_METASTORE_URI_KEY) is None:
if presto_config.get("auto_detect_metastore", True):
hive_metastore_uri = global_variables.get("hive-metastore-uri")
if hive_metastore_uri is not None:
presto_config["hive_metastore_uri"] = hive_metastore_uri
presto_config[PRESTO_HIVE_METASTORE_URI_KEY] = hive_metastore_uri

return cluster_config

Expand Down Expand Up @@ -88,8 +89,8 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i
# 2) Try to use defined metastore_uri;
if is_runtime_enabled(cluster_runtime_config, BUILT_IN_RUNTIME_METASTORE):
runtime_envs["METASTORE_ENABLED"] = True
elif presto_config.get("hive_metastore_uri") is not None:
runtime_envs["HIVE_METASTORE_URI"] = presto_config.get("hive_metastore_uri")
elif presto_config.get(PRESTO_HIVE_METASTORE_URI_KEY) is not None:
runtime_envs["HIVE_METASTORE_URI"] = presto_config.get(PRESTO_HIVE_METASTORE_URI_KEY)

_with_memory_configurations(
runtime_envs, presto_config=presto_config,
Expand Down
42 changes: 26 additions & 16 deletions python/cloudtik/runtime/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
]

SPARK_RUNTIME_CONFIG_KEY = "spark"
SPARK_HDFS_NAMENODE_URI_KEY = "hdfs_namenode_uri"
SPARK_HIVE_METASTORE_URI_KEY = "hive_metastore_uri"

YARN_RESOURCE_MEMORY_RATIO = 0.8
SPARK_EXECUTOR_MEMORY_RATIO = 1
Expand Down Expand Up @@ -134,20 +136,20 @@ def _config_depended_services(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
# 2) Try to use defined hdfs_namenode_uri;
# 3) If subscribed_hdfs_namenode_uri=true,try to subscribe global variables to find remote hdfs_namenode_uri

if not is_runtime_enabled(runtime_config, "hdfs"):
if spark_config.get("hdfs_namenode_uri") is None:
if not is_runtime_enabled(runtime_config, BUILT_IN_RUNTIME_HDFS):
if spark_config.get(SPARK_HDFS_NAMENODE_URI_KEY) is None:
if spark_config.get("auto_detect_hdfs", False):
hdfs_namenode_uri = global_variables.get("hdfs-namenode-uri")
if hdfs_namenode_uri is not None:
spark_config["hdfs_namenode_uri"] = hdfs_namenode_uri
spark_config[SPARK_HDFS_NAMENODE_URI_KEY] = hdfs_namenode_uri

# Check metastore
if not is_runtime_enabled(runtime_config, "metastore"):
if spark_config.get("hive_metastore_uri") is None:
if not is_runtime_enabled(runtime_config, BUILT_IN_RUNTIME_METASTORE):
if spark_config.get(SPARK_HIVE_METASTORE_URI_KEY) is None:
if spark_config.get("auto_detect_metastore", True):
hive_metastore_uri = global_variables.get("hive-metastore-uri")
if hive_metastore_uri is not None:
spark_config["hive_metastore_uri"] = hive_metastore_uri
spark_config[SPARK_HIVE_METASTORE_URI_KEY] = hive_metastore_uri

return cluster_config

Expand Down Expand Up @@ -296,8 +298,8 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i
runtime_envs["HDFS_ENABLED"] = True
_with_hdfs_mount_method(spark_config, runtime_envs)
else:
if spark_config.get("hdfs_namenode_uri") is not None:
runtime_envs["HDFS_NAMENODE_URI"] = spark_config.get("hdfs_namenode_uri")
if spark_config.get(SPARK_HDFS_NAMENODE_URI_KEY) is not None:
runtime_envs["HDFS_NAMENODE_URI"] = spark_config.get(SPARK_HDFS_NAMENODE_URI_KEY)
_with_hdfs_mount_method(spark_config, runtime_envs)

# We always export the cloud storage even for remote HDFS case
Expand All @@ -309,8 +311,8 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i
# 2) Try to use defined metastore_uri;
if is_runtime_enabled(cluster_runtime_config, BUILT_IN_RUNTIME_METASTORE):
runtime_envs["METASTORE_ENABLED"] = True
elif spark_config.get("hive_metastore_uri") is not None:
runtime_envs["HIVE_METASTORE_URI"] = spark_config.get("hive_metastore_uri")
elif spark_config.get(SPARK_HIVE_METASTORE_URI_KEY) is not None:
runtime_envs["HIVE_METASTORE_URI"] = spark_config.get(SPARK_HIVE_METASTORE_URI_KEY)
return runtime_envs


Expand All @@ -326,13 +328,21 @@ def get_runtime_logs():


def _validate_config(config: Dict[str, Any]):
runtime_config = config.get(RUNTIME_CONFIG_KEY)

# if HDFS enabled, we ignore the cloud storage configurations
if not is_runtime_enabled(config.get(RUNTIME_CONFIG_KEY), "hdfs"):
# Check any cloud storage is configured
provider_config = config["provider"]
if ("storage" not in provider_config) and \
not is_use_managed_cloud_storage(config):
raise ValueError("No storage configuration found for Spark.")
if is_runtime_enabled(runtime_config, BUILT_IN_RUNTIME_HDFS):
return
# check if there is remote HDFS configured
spark_config = runtime_config.get(SPARK_RUNTIME_CONFIG_KEY, {})
if spark_config.get(SPARK_HDFS_NAMENODE_URI_KEY) is not None:
return

# Check any cloud storage is configured
provider_config = config["provider"]
if ("storage" not in provider_config) and \
not is_use_managed_cloud_storage(config):
raise ValueError("No storage configuration found for Spark.")


def _get_runtime_services(cluster_head_ip):
Expand Down

0 comments on commit 23e7175

Please sign in to comment.