Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Runtime: improve the rsync name handling for docker cases (#1784)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrychenhf authored Aug 18, 2023
1 parent 1952653 commit 71250ce
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 15 deletions.
58 changes: 54 additions & 4 deletions python/cloudtik/core/_private/cluster/cluster_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import time
import urllib
import urllib.parse
import uuid
from types import ModuleType
from typing import Any, Dict, List, Optional, Tuple, Union

Expand All @@ -24,6 +25,7 @@
from cloudtik.core._private import services, constants
from cloudtik.core._private.call_context import CallContext
from cloudtik.core._private.cluster.cluster_config import _load_cluster_config, _bootstrap_config, try_logging_config
from cloudtik.core._private.cluster.cluster_exec import exec_cluster
from cloudtik.core._private.cluster.cluster_tunnel_request import request_tunnel_to_head
from cloudtik.core._private.cluster.cluster_utils import create_node_updater_for_exec
from cloudtik.core._private.cluster.node_availability_tracker import NodeAvailabilitySummary
Expand Down Expand Up @@ -1336,16 +1338,25 @@ def rsync_to_node(node_id, source, target, is_head_node):
else:
updater.sync_file_mounts(rsync)

head_node = _get_running_head_node(config)
head_node = _get_running_head_node(config, _provider=provider)
if not node_ip:
# No node specified, rsync with head or rsync up with all nodes
rsync_to_node(head_node, source, target, is_head_node=True)
if not down and all_nodes:
# rsync up with all workers
source_for_target = target
if os.path.isdir(source):
# source is the contents of the target folder
source_for_target = source_for_target.rstrip("/")
source_for_target += "/."
else:
# if it is a file, and the target is a directory
if target[-1] == "/" or is_directory_on_head(
config=config,
call_context=call_context,
path=target):
source_file = os.path.basename(source)
source_for_target = os.path.join(target, source_file)

rsync_to_node_from_head(config,
call_context=call_context,
Expand All @@ -1357,8 +1368,8 @@ def rsync_to_node(node_id, source, target, is_head_node):
if not source or not target:
cli_logger.abort("Need to specify both source and target when rsync with specific node")

target_base = os.path.basename(target)
target_on_head = tempfile.mktemp(prefix=f"{target_base}_")
# Use the tmp dir and UUID as a temp target
target_on_head = os.path.join("/tmp", str(uuid.uuid4()))
if down:
# rsync down
# first run rsync from head with the specific node
Expand All @@ -1367,8 +1378,24 @@ def rsync_to_node(node_id, source, target, is_head_node):
source=source, target=target_on_head, down=True,
node_ip=node_ip)
# then rsync local node with the head
if source[-1] == "/":

# We need to know whether the source is a file or a directory to handle this right
# 1. if the source is a file, target_on_head is the file, if target is a directory or end with "/"
# we need to add the source file name to target
# 2. if the source is a directory, target_on_head is the directory with contents
# we need append "/" here
is_source_dir = True if source[-1] == "/" or is_directory_on_head(
config=config,
call_context=call_context,
path=target_on_head) else False
if is_source_dir:
target_on_head += "/."
else:
if target[-1] == "/" or os.path.isdir(target):
# modify target to include the source name
source_file = os.path.basename(source)
target = os.path.join(target, source_file)

rsync_to_node(head_node, target_on_head, target, is_head_node=True)
else:
# rsync up
Expand All @@ -1378,12 +1405,35 @@ def rsync_to_node(node_id, source, target, is_head_node):
# then rsync from head to the specific node
if os.path.isdir(source):
target_on_head += "/."
else:
# if it is a file, target_on_head is a file
if target[-1] == "/" or is_directory_on_head(
config=config,
call_context=call_context,
path=target):
# if the target is a directory, we add the file name to the final target
source_file = os.path.basename(source)
target = os.path.join(target, source_file)

rsync_to_node_from_head(config,
call_context=call_context,
source=target_on_head, target=target, down=False,
node_ip=node_ip)


def is_directory_on_head(config: Dict[str, Any],
call_context: CallContext,
path: str):
cmd = '([ -d "{}" ] && echo true) || true'.format(path)
output = exec_cluster(
config, call_context,
cmd=cmd,
with_output=True).strip()
if output == "true":
return True
return False


def rsync_to_node_from_head(config: Dict[str, Any],
call_context: CallContext,
source: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,35 @@ def run(
silent=silent)

def run_rsync_up(self, source, target, options=None):
"""
We should distinguish the handling for file and directory sources.
For source = file,
1. if the target is a folder (existing folder or ends with "/")
We should not use uuid as intermediate name as the target needs the source name.
2. if the target is not a folder (the file name)
We can use the uuid as intermediate name and the target name will be used.
For source = directory, we can use uuid as intermediate name,
1. if source doesn't end with "/", which means copy the source (including source name)
2. if source end with "/", which means copy the contents of the source (doesn't include source name)
When copy from intermediate folder to target, we should always end intermediate name with "/".
"""
options = options or {}
do_with_rsync = self._check_container_status() and not options.get(
"docker_mount_if_possible", False)
identical = False if do_with_rsync else True

is_source_dir = os.path.isdir(source)
identifier_path = None
if not is_source_dir and (
target[-1] == "/" or self._is_directory(target)):
# target needs the source name
identical = True
identifier_path = source

host_destination = get_docker_host_mount_location_for_object(
self.host_command_executor.cluster_name, target,
identical=identical)
identical=identical, identifier_path=identifier_path)

host_mount_location = os.path.dirname(host_destination.rstrip("/"))
self.host_command_executor.run(
Expand All @@ -104,7 +126,7 @@ def run_rsync_up(self, source, target, options=None):
self.host_command_executor.run_rsync_up(
source, host_destination, options=options)
if do_with_rsync:
if os.path.isdir(source):
if is_source_dir:
# Adding a "." means that docker copies the *contents*
# Without it, docker copies the source *into* the target
host_destination += "/."
Expand All @@ -127,9 +149,30 @@ def run_rsync_up(self, source, target, options=None):
silent=self.call_context.is_rsync_silent())

def run_rsync_down(self, source, target, options=None):
"""
We should distinguish the handling for file and directory.
For source = file,
1. if the target is a folder (existing folder or ends with "/")
We should not use uuid as intermediate name as the target needs the source name.
2. if the target is not a folder (the file name)
We can use the uuid as intermediate name and the target name will be used.
For source = directory, we can use uuid as intermediate name,
1. if source doesn't end with "/", which means copy the source (including source name)
2. if source end with "/", which means copy the contents of the source (doesn't include source name)
When copy from intermediate folder to target, we should always end intermediate name with "/".
"""
options = options or {}
do_with_rsync = not options.get("docker_mount_if_possible", False)
identical = False if do_with_rsync else True

# Check the source is a file or a directory
is_source_dir = self._is_directory(source)
if not is_source_dir and (
target[-1] == "/" or os.path.isdir(target)):
# target needs the source name
identical = True

host_source = get_docker_host_mount_location_for_object(
self.host_command_executor.cluster_name, source,
identical=identical)
Expand All @@ -138,21 +181,36 @@ def run_rsync_down(self, source, target, options=None):
f"mkdir -p {host_mount_location} && chown -R "
f"{self.host_command_executor.ssh_user} {host_mount_location}",
silent=self.call_context.is_rsync_silent())
if source[-1] == "/":
source += "."
# Adding a "." means that docker copies the *contents*
# Without it, docker copies the source *into* the target
if do_with_rsync:
docker_source = source
if docker_source[-1] == "/":
docker_source += "."
# Adding a "." means that docker copies the *contents*
# Without it, docker copies the source *into* the target
# NOTE: `--delete` is okay here because the container is the source
# of truth.
self.host_command_executor.run(
"rsync -e '{} exec -i' -avz --delete {}:{} {}".format(
self.get_docker_cmd(), self.container_name,
self._docker_expand_user(source), host_source),
self._docker_expand_user(docker_source), host_source),
silent=self.call_context.is_rsync_silent())

if is_source_dir:
# because the host source directory is uuid which is meaningless for user
if host_source[-1] != "/":
host_source += "/"

self.host_command_executor.run_rsync_down(
host_source, target, options=options)

def _is_directory(self, path):
output = self.run_with_retry(
'([ -d "{}" ] && echo true) || true'.format(path),
with_output=True).decode("utf-8").strip()
if output == "true":
return True
return False

def remote_shell_command_str(self):
inner_str = self.host_command_executor.remote_shell_command_str().replace(
"ssh", "ssh -tt", 1).strip("\n")
Expand Down
11 changes: 7 additions & 4 deletions python/cloudtik/core/_private/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ def get_docker_host_mount_location(cluster_name: str) -> str:


def get_docker_host_mount_location_for_object(
cluster_name: str, object_path: str, identical=True) -> str:
cluster_name: str, object_path: str,
identical=True, identifier_path=None) -> str:
"""Return the docker host mount directory location for a specific target"""
docker_mount_prefix = get_docker_host_mount_location(cluster_name)
normalized_object_path = object_path.rstrip("/")
object_identifier = str(uuid.uuid3(uuid.NAMESPACE_OID, normalized_object_path))
# We need distinguish the path of abc and abc/
object_identifier = str(uuid.uuid3(uuid.NAMESPACE_OID, object_path))
object_root = os.path.join(docker_mount_prefix, object_identifier)
if identical:
return os.path.join(object_root, object_path.lstrip("/"))
if not identifier_path:
identifier_path = object_path
return os.path.join(object_root, identifier_path.lstrip("/"))
else:
# using a unique uuid instead
return os.path.join(object_root, str(uuid.uuid4()))
Expand Down

0 comments on commit 71250ce

Please sign in to comment.