Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
Signed-off-by: Ubuntu <[email protected]>
  • Loading branch information
Ubuntu committed Sep 18, 2024
1 parent d060ec2 commit 6b66049
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def do_stream_tasks(
if done:
break
for operation in schedule:
done = tasks[operation.exec_task_idx].exec_operation(
done = tasks[operation.exec_task_idx].stream_operation(
self, operation, exec_stream
)
if done:
Expand Down Expand Up @@ -264,7 +264,7 @@ def do_profile_stream_tasks(
if done:
break
for operation in schedule:
done = tasks[operation.exec_task_idx].exec_operation(
done = tasks[operation.exec_task_idx].stream_operation(
self, operation, exec_stream
)
if done:
Expand Down Expand Up @@ -613,7 +613,7 @@ def set_stream_buffer(self, op: _DAGNodeOperation, data: Any):
op: The operation that generates the intermediate result.
data: The intermediate result of a READ or COMPUTE operation.
"""
self._stream_buffer[op.next_operation] = data
self._stream_buffer[op.next_operation()] = data

def reset_stream_buffer(self, op: _DAGNodeOperation) -> Any:
"""
Expand Down Expand Up @@ -708,7 +708,7 @@ def _stream_write(self, op: _DAGNodeOperation) -> bool:
Returns:
True if system error occurs and exit the loop; otherwise, False.
"""
output_val, exec_event = self.reset_intermediate_buffer(op)
output_val, exec_event = self.reset_stream_buffer(op)
exit = False
exec_event.synchronize()
try:
Expand Down

0 comments on commit 6b66049

Please sign in to comment.