Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Runtime: Spark to support HDFS NFS mount as a new option to Fuse (#1619)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrychenhf authored Jul 4, 2023
1 parent 26e9510 commit cc3857f
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 12 deletions.
4 changes: 4 additions & 0 deletions python/cloudtik/core/config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,10 @@
"type": "string",
"description": "HDFS service endpoint if Spark need to access HDFS."
},
"hdfs_mount_method": {
"type": "string",
"description": "HDFS mount method: nfs or fuse. Default fuse if not specified."
},
"hive_metastore_uri": {
"type": "string",
"description": "Metastore service endpoint for Spark to use."
Expand Down
1 change: 1 addition & 0 deletions python/cloudtik/runtime/common/conf/hadoop/core-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
{%hadoop.proxyuser.properties%}
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
Expand Down
67 changes: 57 additions & 10 deletions python/cloudtik/runtime/common/scripts/cloud-storage-fuse.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function get_fuse_cache_path() {
fi

if [ -z $fuse_cache_dir ]; then
fuse_cache_dir="/mnt/cache/"
fuse_cache_dir="/tmp/.cache"
fi
echo $fuse_cache_dir
}
Expand Down Expand Up @@ -175,17 +175,20 @@ function configure_cloud_fs() {

# Installing functions
function install_hdfs_fuse() {
if ! type fuse_dfs >/dev/null 2>&1;then
if ! type fuse_dfs >/dev/null 2>&1; then
arch=$(uname -m)
sudo wget -q --show-progress https://d30257nes7d4fq.cloudfront.net/downloads/hadoop/fuse_dfs-${HADOOP_VERSION}-${arch} -O /usr/bin/fuse_dfs
sudo wget -q --show-progress https://d30257nes7d4fq.cloudfront.net/downloads/hadoop/fuse_dfs_wrapper-${HADOOP_VERSION}.sh -O /usr/bin/fuse_dfs_wrapper.sh
sudo chmod +x /usr/bin/fuse_dfs
sudo chmod +x /usr/bin/fuse_dfs_wrapper.sh
fi

# nfs mount may needed
which mount.nfs > /dev/null || sudo apt-get -qq update -y > /dev/null; sudo DEBIAN_FRONTEND=noninteractive apt-get -qq install nfs-common -y > /dev/null
}

function install_s3_fuse() {
if ! type s3fs >/dev/null 2>&1;then
if ! type s3fs >/dev/null 2>&1; then
echo "Installing S3 Fuse..."
sudo apt-get -qq update -y > /dev/null
sudo apt-get install -qq s3fs -y > /dev/null
Expand Down Expand Up @@ -257,8 +260,18 @@ function mount_local_hdfs_fs() {
fi
# Mount local hdfs fuse here
mkdir -p ${FS_MOUNT_PATH}
echo "Mounting HDFS ${fs_default_dir} to ${FS_MOUNT_PATH}..."
fuse_dfs_wrapper.sh -oinitchecks ${fs_default_dir} ${FS_MOUNT_PATH} > /dev/null

if [ "${HDFS_MOUNT_METHOD}" == "nfs" ]; then
# TODO: Use the local HDFS dedicated core-site.xml and hdfs-site.xml
echo "Staring HDFS NFS Gateway..."
$HADOOP_HOME/bin/hdfs --daemon start portmap
$HADOOP_HOME/bin/hdfs --daemon start nfs3
echo "Mounting HDFS ${fs_default_dir} with NFS Gateway ${CLOUDTIK_NODE_IP} to ${FS_MOUNT_PATH}..."
sudo mount -t nfs -o vers=3,proto=tcp,nolock,noacl,sync ${CLOUDTIK_NODE_IP}:/ ${FS_MOUNT_PATH}
else
echo "Mounting HDFS ${fs_default_dir} with fuse to ${FS_MOUNT_PATH}..."
fuse_dfs_wrapper.sh -oinitchecks ${fs_default_dir} ${FS_MOUNT_PATH} > /dev/null
fi
}

function mount_hdfs_fs() {
Expand All @@ -269,10 +282,20 @@ function mount_hdfs_fs() {
else
FS_MOUNT_PATH=${CLUSTER_FS_MOUNT_PATH}
fi
# Mount remote hdfs fuse here
# Mount remote hdfs here
mkdir -p ${FS_MOUNT_PATH}
echo "Mounting HDFS ${fs_default_dir} to ${FS_MOUNT_PATH}..."
fuse_dfs_wrapper.sh -oinitchecks ${fs_default_dir} ${FS_MOUNT_PATH} > /dev/null

if [ "${HDFS_MOUNT_METHOD}" == "nfs" ]; then
# TODO: Use the remote HDFS dedicated core-site.xml and hdfs-site.xml
echo "Staring HDFS NFS Gateway..."
$HADOOP_HOME/bin/hdfs --daemon start portmap
$HADOOP_HOME/bin/hdfs --daemon start nfs3
echo "Mounting HDFS ${fs_default_dir} with NFS Gateway ${CLOUDTIK_NODE_IP} to ${FS_MOUNT_PATH}..."
sudo mount -t nfs -o vers=3,proto=tcp,nolock,noacl,sync ${CLOUDTIK_NODE_IP}:/ ${FS_MOUNT_PATH}
else
echo "Mounting HDFS ${fs_default_dir} with fuse to ${FS_MOUNT_PATH}..."
fuse_dfs_wrapper.sh -oinitchecks ${fs_default_dir} ${FS_MOUNT_PATH} > /dev/null
fi
}

function mount_s3_fs() {
Expand Down Expand Up @@ -353,6 +376,7 @@ function mount_aliyun_oss_fs() {
}

function mount_cloud_fs() {
MOUNTED_CLOUD_FS=""
# cloud storage from provider
if [ "$AWS_CLOUD_STORAGE" == "true" ]; then
mount_s3_fs
Expand Down Expand Up @@ -384,7 +408,30 @@ function mount_cloud_fs() {
fi
}


function unmount_fs() {
local fs_mount_path="$1"
if findmnt -o fstype -l -n ${fs_mount_path} >/dev/null 2>&1; then
echo "Unmounting cloud fs at ${fs_mount_path}..."
local fstype=$(findmnt -o fstype -l -n ${fs_mount_path})
if [ "${fstype}" == "nfs" ]; then
sudo umount -f ${fs_mount_path}
else
fusermount -u ${fs_mount_path} > /dev/null
fi
fi
}

function unmount_cloud_fs() {
echo "Unmounting cloud fs at ${CLOUD_FS_MOUNT_PATH}..."
fusermount -u ${CLOUD_FS_MOUNT_PATH} > /dev/null
# use findmnt to check the existence and type of the mount
# if findmnt doesn't exist, install it
which findmnt > /dev/null || sudo apt-get -qq update -y > /dev/null; sudo DEBIAN_FRONTEND=noninteractive apt-get -qq install util-linux -y > /dev/null

if [ "${CLOUD_FS_MOUNT_PATH}" != "" ]; then
unmount_fs "${CLOUD_FS_MOUNT_PATH}"
fi

if [ "${CLUSTER_FS_MOUNT_PATH}" != "" ]; then
unmount_fs "${CLUSTER_FS_MOUNT_PATH}"
fi
}
24 changes: 24 additions & 0 deletions python/cloudtik/runtime/common/scripts/hadoop-cloud-credential.sh
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,27 @@ function update_credential_config_for_huaweicloud() {
fi
}

function update_system_credential() {
CURRENT_SYSTEM_USER=$(whoami)

if [ ${CURRENT_SYSTEM_USER} != "root" ]; then
HADOOP_PROXY_USER_PROPERTIES="$(cat <<-EOF
<property>
<name>hadoop.proxyuser.${CURRENT_SYSTEM_USER}.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.${CURRENT_SYSTEM_USER}.hosts</name>
<value>*</value>
</property>
EOF
)"
sed -i "s#{%hadoop.proxyuser.properties%}#${HADOOP_PROXY_USER_PROPERTIES}#g" `grep "{%hadoop.proxyuser.properties%}" -rl ./`
else
sed -i "s#{%hadoop.proxyuser.properties%}#""#g" `grep "{%hadoop.proxyuser.properties%}" -rl ./`
fi
}

function set_cloud_storage_provider() {
cloud_storage_provider="none"
if [ "$AWS_CLOUD_STORAGE" == "true" ]; then
Expand Down Expand Up @@ -205,6 +226,9 @@ function update_credential_config_for_provider() {
elif [ "${cloud_storage_provider}" == "huaweicloud" ]; then
update_credential_config_for_huaweicloud
fi

update_system_credential

if [ -f "$HADOOP_CREDENTIAL_TMP_FILE" ]; then
cp ${HADOOP_CREDENTIAL_TMP_FILE} ${HADOOP_CREDENTIAL_FILE}
fi
Expand Down
27 changes: 27 additions & 0 deletions python/cloudtik/runtime/common/scripts/util-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,30 @@ function clean_install_cache() {
&& sudo apt-get clean \
&& which conda > /dev/null && conda clean -itqy)
}

function get_data_disk_dirs() {
local data_disk_dirs=""
if [ -d "/mnt/cloudtik" ]; then
for data_disk in /mnt/cloudtik/*; do
[ -d "$data_disk" ] || continue
if [ -z "$data_disk_dirs" ]; then
data_disk_dirs=$data_disk
else
data_disk_dirs="$data_disk_dirs,$data_disk"
fi
done
fi
echo "${data_disk_dirs}"
}

function get_any_data_disk_dir() {
local data_disk_dir=""
if [ -d "/mnt/cloudtik" ]; then
for data_disk in /mnt/cloudtik/*; do
[ -d "$data_disk" ] || continue
data_disk_dir=$data_disk
break
done
fi
echo "${data_disk_dir}"
}
13 changes: 13 additions & 0 deletions python/cloudtik/runtime/hdfs/conf/hadoop/hdfs-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,17 @@
<name>dfs.block.size</name>
<value>268435456</value>
</property>
<property>
<name>dfs.namenode.accesstime.precision</name>
<value>3600000</value>
<description>
The access time for HDFS file is precise up to this value.
The default value is 1 hour. Setting a value of 0 disables
access times for HDFS.
</description>
</property>
<property>
<name>dfs.nfs3.dump.dir</name>
<value>{%dfs.nfs3.dump.dir%}</value>
</property>
</configuration>
13 changes: 13 additions & 0 deletions python/cloudtik/runtime/hdfs/scripts/configure.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ function check_hadoop_installed() {
fi
}

function update_nfs_dump_dir() {
data_disk_dir = $(get_any_data_disk_dir)
if [ -z "$data_disk_dir" ]; then
nfs_dump_dir="/tmp/.hdfs-nfs"
else
nfs_dump_dir="$data_disk_dir/tmp/.hdfs-nfs"
fi
sed -i "s!{%dfs.nfs3.dump.dir%}!${nfs_dump_dir}!g" `grep "{%dfs.nfs3.dump.dir%}" -rl ./`
}

function update_hdfs_data_disks_config() {
hdfs_nn_dirs="${HADOOP_HOME}/data/dfs/nn"
hdfs_dn_dirs=""
Expand All @@ -55,6 +65,9 @@ function update_hdfs_data_disks_config() {
fi
sed -i "s!{%dfs.namenode.name.dir%}!${hdfs_nn_dirs}!g" `grep "{%dfs.namenode.name.dir%}" -rl ./`
sed -i "s!{%dfs.datanode.data.dir%}!${hdfs_dn_dirs}!g" `grep "{%dfs.datanode.data.dir%}" -rl ./`

# set nfs gateway dump dir
update_nfs_dump_dir
}

function update_cloud_storage_credential_config() {
Expand Down
40 changes: 40 additions & 0 deletions python/cloudtik/runtime/spark/conf/hadoop/hdfs-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.socket.timeout</name>
<value>1200000</value>
</property>
<property>
<name>dfs.block.size</name>
<value>268435456</value>
</property>
<property>
<name>dfs.nfs3.dump.dir</name>
<value>{%dfs.nfs3.dump.dir%}</value>
</property>
</configuration>
16 changes: 15 additions & 1 deletion python/cloudtik/runtime/spark/scripts/configure.sh
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,20 @@ function update_config_for_remote_storage() {
fi
}

function update_nfs_dump_dir() {
# set nfs gateway dump dir
data_disk_dir = $(get_any_data_disk_dir)
if [ -z "$data_disk_dir" ]; then
nfs_dump_dir="/tmp/.hdfs-nfs"
else
nfs_dump_dir="$data_disk_dir/tmp/.hdfs-nfs"
fi
sed -i "s!{%dfs.nfs3.dump.dir%}!${nfs_dump_dir}!g" `grep "{%dfs.nfs3.dump.dir%}" -rl ./`
}

function update_config_for_storage() {
if [ "$HDFS_ENABLED" == "true" ];then
# TODO: local HDFS has setup core-site.xml and hdfs-site.xml
update_config_for_local_hdfs
else
check_hdfs_storage
Expand All @@ -253,7 +265,9 @@ function update_config_for_storage() {
cp -r ${output_dir}/hadoop/${cloud_storage_provider}/core-site.xml ${HADOOP_HOME}/etc/hadoop/
else
# Possible remote hdfs without cloud storage
update_nfs_dump_dir
cp -r ${output_dir}/hadoop/core-site.xml ${HADOOP_HOME}/etc/hadoop/
cp -r ${output_dir}/hadoop/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/
fi
fi
}
Expand Down Expand Up @@ -295,7 +309,7 @@ function update_data_disks_config() {
else
local_dirs="$local_dirs,$data_disk"
fi
done
done
fi

# set nodemanager.local-dirs
Expand Down
8 changes: 8 additions & 0 deletions python/cloudtik/runtime/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ def update_spark_configurations():
save_properties_file(spark_conf_file, spark_conf, separator=' ', comments=comments)


def _with_hdfs_mount_method(spark_config, runtime_envs):
mount_method = spark_config.get("hdfs_mount_method")
if mount_method:
runtime_envs["HDFS_MOUNT_METHOD"] = mount_method


def _with_runtime_environment_variables(runtime_config, config, provider, node_id: str):
runtime_envs = {}
spark_config = runtime_config.get(SPARK_RUNTIME_CONFIG_KEY, {})
Expand All @@ -288,9 +294,11 @@ def _with_runtime_environment_variables(runtime_config, config, provider, node_i
# 3) Try to use provider storage;
if is_runtime_enabled(cluster_runtime_config, BUILT_IN_RUNTIME_HDFS):
runtime_envs["HDFS_ENABLED"] = True
_with_hdfs_mount_method(spark_config, runtime_envs)
else:
if spark_config.get("hdfs_namenode_uri") is not None:
runtime_envs["HDFS_NAMENODE_URI"] = spark_config.get("hdfs_namenode_uri")
_with_hdfs_mount_method(spark_config, runtime_envs)

# We always export the cloud storage even for remote HDFS case
node_type_config = get_node_type_config(config, provider, node_id)
Expand Down
3 changes: 2 additions & 1 deletion runtime/hadoop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ in which everything needed for building and testing Hadoop is included.

Execute the following to build the docker image and start it in interactive mode.
```
bash ./start-build-env.sh
# the script will mount .m2 and .gnupg, problem if they don't exist
mkdir -p ~/.m2 && touch ~/.gnupg && bash ./start-build-env.sh
```
The above command will build the docker image for hadoop build environment and
run the image with your current user. It suggests you configure your docker to
Expand Down

0 comments on commit cc3857f

Please sign in to comment.