Skip to content

Commit

Permalink
Merge pull request #7606 from aldbr/v8.0_FEAT_robust-interactions-wit…
Browse files Browse the repository at this point in the history
…h-ce

[8.0] feat: Make RemoteRunner more resilient to CE issues
  • Loading branch information
fstagni authored May 10, 2024
2 parents 716726b + 86ce1af commit 89c2b7a
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,22 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
# Request the whole directory as output
outputs = ["/"]

# Submit the command as a job
if not (result := workloadCE.submitJob(self.executable, workloadCE.proxy, inputs=inputs, outputs=outputs))[
"OK"
]:
# Interactions with the CE might be unstable, we need to retry the operations
maxRetries = 10
timeBetweenRetries = 120

# Submit the command as a job with retries
for _ in range(maxRetries):
result = workloadCE.submitJob(self.executable, workloadCE.proxy, inputs=inputs, outputs=outputs)
if result["OK"]:
break
else:
self.log.warn("Failed to submit job, retrying...")
time.sleep(timeBetweenRetries)
else:
result["Errno"] = DErrno.EWMSSUBM
return result

jobID = result["Value"][0]
stamp = result["PilotStampDict"][jobID]
self.log.info("The command has been wrapped in a job and sent. Remote JobID: ", jobID)
Expand All @@ -106,18 +116,33 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
jobStatus = PilotStatus.RUNNING
while jobStatus not in PilotStatus.PILOT_FINAL_STATES:
time.sleep(120)
result = workloadCE.getJobStatus([jobID])
if not result["OK"]:
for _ in range(maxRetries):
result = workloadCE.getJobStatus([jobID])
if result["OK"]:
break
else:
self.log.warn("Failed to get job status, retrying...")
time.sleep(timeBetweenRetries)
else:
result["Errno"] = DErrno.EWMSSTATUS
return result

jobStatus = result["Value"][jobID]
self.log.info("The final status of the application/script is: ", jobStatus)

# Get job outputs
self.log.info("Getting the outputs of the command...")
if not (result := workloadCE.getJobOutput(f"{jobID}:::{stamp}", os.path.abspath(".")))["OK"]:
for _ in range(maxRetries):
result = workloadCE.getJobOutput(f"{jobID}:::{stamp}", os.path.abspath("."))
if result["OK"]:
break
else:
self.log.warn("Failed to get job output, retrying...")
time.sleep(timeBetweenRetries)
else:
result["Errno"] = DErrno.EWMSJMAN
return result

output, error = result["Value"]

# Make sure the output is correct
Expand Down

0 comments on commit 89c2b7a

Please sign in to comment.