diff --git a/python/cloudtik/core/config-schema.json b/python/cloudtik/core/config-schema.json
index 5e2452bd5..19bd864a9 100644
--- a/python/cloudtik/core/config-schema.json
+++ b/python/cloudtik/core/config-schema.json
@@ -994,6 +994,10 @@
"type": "string",
"description": "HDFS service endpoint if Spark need to access HDFS."
},
+ "hdfs_mount_method": {
+ "type": "string",
+ "description": "HDFS mount method: nfs or fuse. Default fuse if not specified."
+ },
"hive_metastore_uri": {
"type": "string",
"description": "Metastore service endpoint for Spark to use."
diff --git a/python/cloudtik/runtime/common/conf/hadoop/core-site.xml b/python/cloudtik/runtime/common/conf/hadoop/core-site.xml
index ceb205517..787ad4763 100644
--- a/python/cloudtik/runtime/common/conf/hadoop/core-site.xml
+++ b/python/cloudtik/runtime/common/conf/hadoop/core-site.xml
@@ -29,6 +29,7 @@
hadoop.proxyuser.root.hosts
*
+ {%hadoop.proxyuser.properties%}
io.file.buffer.size
131072
diff --git a/python/cloudtik/runtime/common/scripts/cloud-storage-fuse.sh b/python/cloudtik/runtime/common/scripts/cloud-storage-fuse.sh
index aed6499f7..f97913fbf 100644
--- a/python/cloudtik/runtime/common/scripts/cloud-storage-fuse.sh
+++ b/python/cloudtik/runtime/common/scripts/cloud-storage-fuse.sh
@@ -37,7 +37,7 @@ function get_fuse_cache_path() {
fi
if [ -z $fuse_cache_dir ]; then
- fuse_cache_dir="/mnt/cache/"
+ fuse_cache_dir="/tmp/.cache"
fi
echo $fuse_cache_dir
}
@@ -175,17 +175,20 @@ function configure_cloud_fs() {
# Installing functions
function install_hdfs_fuse() {
- if ! type fuse_dfs >/dev/null 2>&1;then
+ if ! type fuse_dfs >/dev/null 2>&1; then
arch=$(uname -m)
sudo wget -q --show-progress https://d30257nes7d4fq.cloudfront.net/downloads/hadoop/fuse_dfs-${HADOOP_VERSION}-${arch} -O /usr/bin/fuse_dfs
sudo wget -q --show-progress https://d30257nes7d4fq.cloudfront.net/downloads/hadoop/fuse_dfs_wrapper-${HADOOP_VERSION}.sh -O /usr/bin/fuse_dfs_wrapper.sh
sudo chmod +x /usr/bin/fuse_dfs
sudo chmod +x /usr/bin/fuse_dfs_wrapper.sh
fi
+
+ # nfs mount may needed
+ which mount.nfs > /dev/null || sudo apt-get -qq update -y > /dev/null; sudo DEBIAN_FRONTEND=noninteractive apt-get -qq install nfs-common -y > /dev/null
}
function install_s3_fuse() {
- if ! type s3fs >/dev/null 2>&1;then
+ if ! type s3fs >/dev/null 2>&1; then
echo "Installing S3 Fuse..."
sudo apt-get -qq update -y > /dev/null
sudo apt-get install -qq s3fs -y > /dev/null
@@ -257,8 +260,18 @@ function mount_local_hdfs_fs() {
fi
# Mount local hdfs fuse here
mkdir -p ${FS_MOUNT_PATH}
- echo "Mounting HDFS ${fs_default_dir} to ${FS_MOUNT_PATH}..."
- fuse_dfs_wrapper.sh -oinitchecks ${fs_default_dir} ${FS_MOUNT_PATH} > /dev/null
+
+ if [ "${HDFS_MOUNT_METHOD}" == "nfs" ]; then
+ # TODO: Use the local HDFS dedicated core-site.xml and hdfs-site.xml
+ echo "Staring HDFS NFS Gateway..."
+ $HADOOP_HOME/bin/hdfs --daemon start portmap
+ $HADOOP_HOME/bin/hdfs --daemon start nfs3
+ echo "Mounting HDFS ${fs_default_dir} with NFS Gateway ${CLOUDTIK_NODE_IP} to ${FS_MOUNT_PATH}..."
+ sudo mount -t nfs -o vers=3,proto=tcp,nolock,noacl,sync ${CLOUDTIK_NODE_IP}:/ ${FS_MOUNT_PATH}
+ else
+ echo "Mounting HDFS ${fs_default_dir} with fuse to ${FS_MOUNT_PATH}..."
+ fuse_dfs_wrapper.sh -oinitchecks ${fs_default_dir} ${FS_MOUNT_PATH} > /dev/null
+ fi
}
function mount_hdfs_fs() {
@@ -269,10 +282,20 @@ function mount_hdfs_fs() {
else
FS_MOUNT_PATH=${CLUSTER_FS_MOUNT_PATH}
fi
- # Mount remote hdfs fuse here
+ # Mount remote hdfs here
mkdir -p ${FS_MOUNT_PATH}
- echo "Mounting HDFS ${fs_default_dir} to ${FS_MOUNT_PATH}..."
- fuse_dfs_wrapper.sh -oinitchecks ${fs_default_dir} ${FS_MOUNT_PATH} > /dev/null
+
+ if [ "${HDFS_MOUNT_METHOD}" == "nfs" ]; then
+ # TODO: Use the remote HDFS dedicated core-site.xml and hdfs-site.xml
+ echo "Staring HDFS NFS Gateway..."
+ $HADOOP_HOME/bin/hdfs --daemon start portmap
+ $HADOOP_HOME/bin/hdfs --daemon start nfs3
+ echo "Mounting HDFS ${fs_default_dir} with NFS Gateway ${CLOUDTIK_NODE_IP} to ${FS_MOUNT_PATH}..."
+ sudo mount -t nfs -o vers=3,proto=tcp,nolock,noacl,sync ${CLOUDTIK_NODE_IP}:/ ${FS_MOUNT_PATH}
+ else
+ echo "Mounting HDFS ${fs_default_dir} with fuse to ${FS_MOUNT_PATH}..."
+ fuse_dfs_wrapper.sh -oinitchecks ${fs_default_dir} ${FS_MOUNT_PATH} > /dev/null
+ fi
}
function mount_s3_fs() {
@@ -353,6 +376,7 @@ function mount_aliyun_oss_fs() {
}
function mount_cloud_fs() {
+ MOUNTED_CLOUD_FS=""
# cloud storage from provider
if [ "$AWS_CLOUD_STORAGE" == "true" ]; then
mount_s3_fs
@@ -384,7 +408,30 @@ function mount_cloud_fs() {
fi
}
+
+function unmount_fs() {
+ local fs_mount_path="$1"
+ if findmnt -o fstype -l -n ${fs_mount_path} >/dev/null 2>&1; then
+ echo "Unmounting cloud fs at ${fs_mount_path}..."
+ local fstype=$(findmnt -o fstype -l -n ${fs_mount_path})
+ if [ "${fstype}" == "nfs" ]; then
+ sudo umount -f ${fs_mount_path}
+ else
+ fusermount -u ${fs_mount_path} > /dev/null
+ fi
+ fi
+}
+
function unmount_cloud_fs() {
- echo "Unmounting cloud fs at ${CLOUD_FS_MOUNT_PATH}..."
- fusermount -u ${CLOUD_FS_MOUNT_PATH} > /dev/null
+ # use findmnt to check the existence and type of the mount
+ # if findmnt doesn't exist, install it
+ which findmnt > /dev/null || sudo apt-get -qq update -y > /dev/null; sudo DEBIAN_FRONTEND=noninteractive apt-get -qq install util-linux -y > /dev/null
+
+ if [ "${CLOUD_FS_MOUNT_PATH}" != "" ]; then
+ unmount_fs "${CLOUD_FS_MOUNT_PATH}"
+ fi
+
+ if [ "${CLUSTER_FS_MOUNT_PATH}" != "" ]; then
+ unmount_fs "${CLUSTER_FS_MOUNT_PATH}"
+ fi
}
diff --git a/python/cloudtik/runtime/common/scripts/hadoop-cloud-credential.sh b/python/cloudtik/runtime/common/scripts/hadoop-cloud-credential.sh
index a2f0d0040..b1c7b11eb 100644
--- a/python/cloudtik/runtime/common/scripts/hadoop-cloud-credential.sh
+++ b/python/cloudtik/runtime/common/scripts/hadoop-cloud-credential.sh
@@ -176,6 +176,27 @@ function update_credential_config_for_huaweicloud() {
fi
}
+function update_system_credential() {
+ CURRENT_SYSTEM_USER=$(whoami)
+
+ if [ ${CURRENT_SYSTEM_USER} != "root" ]; then
+ HADOOP_PROXY_USER_PROPERTIES="$(cat <<-EOF
+
+ hadoop.proxyuser.${CURRENT_SYSTEM_USER}.groups
+ *
+
+
+ hadoop.proxyuser.${CURRENT_SYSTEM_USER}.hosts
+ *
+
+EOF
+)"
+ sed -i "s#{%hadoop.proxyuser.properties%}#${HADOOP_PROXY_USER_PROPERTIES}#g" `grep "{%hadoop.proxyuser.properties%}" -rl ./`
+ else
+ sed -i "s#{%hadoop.proxyuser.properties%}#""#g" `grep "{%hadoop.proxyuser.properties%}" -rl ./`
+ fi
+}
+
function set_cloud_storage_provider() {
cloud_storage_provider="none"
if [ "$AWS_CLOUD_STORAGE" == "true" ]; then
@@ -205,6 +226,9 @@ function update_credential_config_for_provider() {
elif [ "${cloud_storage_provider}" == "huaweicloud" ]; then
update_credential_config_for_huaweicloud
fi
+
+ update_system_credential
+
if [ -f "$HADOOP_CREDENTIAL_TMP_FILE" ]; then
cp ${HADOOP_CREDENTIAL_TMP_FILE} ${HADOOP_CREDENTIAL_FILE}
fi
diff --git a/python/cloudtik/runtime/common/scripts/util-functions.sh b/python/cloudtik/runtime/common/scripts/util-functions.sh
index 547a4714a..e2982c25d 100644
--- a/python/cloudtik/runtime/common/scripts/util-functions.sh
+++ b/python/cloudtik/runtime/common/scripts/util-functions.sh
@@ -68,3 +68,30 @@ function clean_install_cache() {
&& sudo apt-get clean \
&& which conda > /dev/null && conda clean -itqy)
}
+
+function get_data_disk_dirs() {
+ local data_disk_dirs=""
+ if [ -d "/mnt/cloudtik" ]; then
+ for data_disk in /mnt/cloudtik/*; do
+ [ -d "$data_disk" ] || continue
+ if [ -z "$data_disk_dirs" ]; then
+ data_disk_dirs=$data_disk
+ else
+ data_disk_dirs="$data_disk_dirs,$data_disk"
+ fi
+ done
+ fi
+ echo "${data_disk_dirs}"
+}
+
+function get_any_data_disk_dir() {
+ local data_disk_dir=""
+ if [ -d "/mnt/cloudtik" ]; then
+ for data_disk in /mnt/cloudtik/*; do
+ [ -d "$data_disk" ] || continue
+ data_disk_dir=$data_disk
+ break
+ done
+ fi
+ echo "${data_disk_dir}"
+}
diff --git a/python/cloudtik/runtime/hdfs/conf/hadoop/hdfs-site.xml b/python/cloudtik/runtime/hdfs/conf/hadoop/hdfs-site.xml
index 5b3f06b18..eca208198 100644
--- a/python/cloudtik/runtime/hdfs/conf/hadoop/hdfs-site.xml
+++ b/python/cloudtik/runtime/hdfs/conf/hadoop/hdfs-site.xml
@@ -65,4 +65,17 @@
dfs.block.size
268435456
+
+ dfs.namenode.accesstime.precision
+ 3600000
+
+ The access time for HDFS file is precise up to this value.
+ The default value is 1 hour. Setting a value of 0 disables
+ access times for HDFS.
+
+
+
+ dfs.nfs3.dump.dir
+ {%dfs.nfs3.dump.dir%}
+
diff --git a/python/cloudtik/runtime/hdfs/scripts/configure.sh b/python/cloudtik/runtime/hdfs/scripts/configure.sh
index c790e86cd..e24d1fcd8 100644
--- a/python/cloudtik/runtime/hdfs/scripts/configure.sh
+++ b/python/cloudtik/runtime/hdfs/scripts/configure.sh
@@ -32,6 +32,16 @@ function check_hadoop_installed() {
fi
}
+function update_nfs_dump_dir() {
+ data_disk_dir = $(get_any_data_disk_dir)
+ if [ -z "$data_disk_dir" ]; then
+ nfs_dump_dir="/tmp/.hdfs-nfs"
+ else
+ nfs_dump_dir="$data_disk_dir/tmp/.hdfs-nfs"
+ fi
+ sed -i "s!{%dfs.nfs3.dump.dir%}!${nfs_dump_dir}!g" `grep "{%dfs.nfs3.dump.dir%}" -rl ./`
+}
+
function update_hdfs_data_disks_config() {
hdfs_nn_dirs="${HADOOP_HOME}/data/dfs/nn"
hdfs_dn_dirs=""
@@ -55,6 +65,9 @@ function update_hdfs_data_disks_config() {
fi
sed -i "s!{%dfs.namenode.name.dir%}!${hdfs_nn_dirs}!g" `grep "{%dfs.namenode.name.dir%}" -rl ./`
sed -i "s!{%dfs.datanode.data.dir%}!${hdfs_dn_dirs}!g" `grep "{%dfs.datanode.data.dir%}" -rl ./`
+
+ # set nfs gateway dump dir
+ update_nfs_dump_dir
}
function update_cloud_storage_credential_config() {
diff --git a/python/cloudtik/runtime/spark/conf/hadoop/hdfs-site.xml b/python/cloudtik/runtime/spark/conf/hadoop/hdfs-site.xml
new file mode 100644
index 000000000..c493ae9b1
--- /dev/null
+++ b/python/cloudtik/runtime/spark/conf/hadoop/hdfs-site.xml
@@ -0,0 +1,40 @@
+
+
+
+
+
+
+
+
+ dfs.replication
+ 3
+
+
+ dfs.permissions
+ false
+
+
+ dfs.socket.timeout
+ 1200000
+
+
+ dfs.block.size
+ 268435456
+
+
+ dfs.nfs3.dump.dir
+ {%dfs.nfs3.dump.dir%}
+
+
diff --git a/python/cloudtik/runtime/spark/scripts/configure.sh b/python/cloudtik/runtime/spark/scripts/configure.sh
index b185aff5a..797852ce4 100644
--- a/python/cloudtik/runtime/spark/scripts/configure.sh
+++ b/python/cloudtik/runtime/spark/scripts/configure.sh
@@ -241,8 +241,20 @@ function update_config_for_remote_storage() {
fi
}
+function update_nfs_dump_dir() {
+ # set nfs gateway dump dir
+ data_disk_dir = $(get_any_data_disk_dir)
+ if [ -z "$data_disk_dir" ]; then
+ nfs_dump_dir="/tmp/.hdfs-nfs"
+ else
+ nfs_dump_dir="$data_disk_dir/tmp/.hdfs-nfs"
+ fi
+ sed -i "s!{%dfs.nfs3.dump.dir%}!${nfs_dump_dir}!g" `grep "{%dfs.nfs3.dump.dir%}" -rl ./`
+}
+
function update_config_for_storage() {
if [ "$HDFS_ENABLED" == "true" ];then
+ # TODO: local HDFS has setup core-site.xml and hdfs-site.xml
update_config_for_local_hdfs
else
check_hdfs_storage
@@ -253,7 +265,9 @@ function update_config_for_storage() {
cp -r ${output_dir}/hadoop/${cloud_storage_provider}/core-site.xml ${HADOOP_HOME}/etc/hadoop/
else
# Possible remote hdfs without cloud storage
+ update_nfs_dump_dir
cp -r ${output_dir}/hadoop/core-site.xml ${HADOOP_HOME}/etc/hadoop/
+ cp -r ${output_dir}/hadoop/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/
fi
fi
}
@@ -295,7 +309,7 @@ function update_data_disks_config() {
else
local_dirs="$local_dirs,$data_disk"
fi
- done
+ done
fi
# set nodemanager.local-dirs
diff --git a/python/cloudtik/runtime/spark/utils.py b/python/cloudtik/runtime/spark/utils.py
index a219f6f32..fd216bda5 100644
--- a/python/cloudtik/runtime/spark/utils.py
+++ b/python/cloudtik/runtime/spark/utils.py
@@ -268,6 +268,12 @@ def update_spark_configurations():
save_properties_file(spark_conf_file, spark_conf, separator=' ', comments=comments)
+def _with_hdfs_mount_method(spark_config, runtime_envs):
+ mount_method = spark_config.get("hdfs_mount_method")
+ if mount_method:
+ runtime_envs["HDFS_MOUNT_METHOD"] = mount_method
+
+
def _with_runtime_environment_variables(runtime_config, config, provider, node_id: str):
runtime_envs = {}
spark_config = runtime_config.get(SPARK_RUNTIME_CONFIG_KEY, {})
@@ -288,9 +294,11 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i
# 3) Try to use provider storage;
if is_runtime_enabled(cluster_runtime_config, BUILT_IN_RUNTIME_HDFS):
runtime_envs["HDFS_ENABLED"] = True
+ _with_hdfs_mount_method(spark_config, runtime_envs)
else:
if spark_config.get("hdfs_namenode_uri") is not None:
runtime_envs["HDFS_NAMENODE_URI"] = spark_config.get("hdfs_namenode_uri")
+ _with_hdfs_mount_method(spark_config, runtime_envs)
# We always export the cloud storage even for remote HDFS case
node_type_config = get_node_type_config(config, provider, node_id)
diff --git a/runtime/hadoop/README.md b/runtime/hadoop/README.md
index 241f22d77..0060dcfd2 100644
--- a/runtime/hadoop/README.md
+++ b/runtime/hadoop/README.md
@@ -19,7 +19,8 @@ in which everything needed for building and testing Hadoop is included.
Execute the following to build the docker image and start it in interactive mode.
```
-bash ./start-build-env.sh
+# the script will mount .m2 and .gnupg, problem if they don't exist
+mkdir -p ~/.m2 && touch ~/.gnupg && bash ./start-build-env.sh
```
The above command will build the docker image for hadoop build environment and
run the image with your current user. It suggests you configure your docker to