Skip to content

Commit

Permalink
Align work statuses definition with GR
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-cukic authored and RalphSteinhagen committed Feb 1, 2023
1 parent 1748f17 commit 4201658
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 30 deletions.
10 changes: 5 additions & 5 deletions bench/bm_case1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memco
return a;
}

fair::graph::node_ports_state
fair::graph::work_return_t
work() noexcept { // TODO - make this an alternate version to 'process_one'
auto &out_port = output_port<"out">(this);
auto &in_port = input_port<"in">(this);
Expand All @@ -62,9 +62,9 @@ class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memco
const auto n_readable = std::min(reader.available(), in_port.max_buffer_size());
const auto n_writable = std::min(writer.available(), out_port.max_buffer_size());
if (n_readable == 0) {
return fair::graph::node_ports_state::inputs_empty;
return fair::graph::work_return_t::DONE;
} else if (n_writable == 0) {
return fair::graph::node_ports_state::writers_not_available;
return fair::graph::work_return_t::INSUFFICIENT_OUTPUT_ITEMS;
}
const std::size_t n_to_publish = std::min(n_readable, n_writable);

Expand All @@ -83,9 +83,9 @@ class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memco
n_to_publish);
}
if (!reader.consume(n_to_publish)) {
return fair::graph::node_ports_state::error;
return fair::graph::work_return_t::ERROR;
}
return fair::graph::node_ports_state::success;
return fair::graph::work_return_t::OK;
}
};

Expand Down
10 changes: 5 additions & 5 deletions bench/bm_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class source : public fg::node<source<T, min, count>, fg::OUT<T, "out">, fg::lim
return T{};
}

fair::graph::node_ports_state
fair::graph::work_return_t
work() {
const std::size_t n_to_publish = _n_samples_max - n_samples_produced;
if (n_to_publish > 0) {
Expand All @@ -35,7 +35,7 @@ class source : public fg::node<source<T, min, count>, fg::OUT<T, "out">, fg::lim
if constexpr (use_bulk_operation) {
std::size_t n_write = std::clamp(n_to_publish, 0UL, std::min(writer.available(), port.max_buffer_size()));
if (n_write == 0) {
return fair::graph::node_ports_state::has_unprocessed_data;
return fair::graph::work_return_t::INSUFFICIENT_INPUT_ITEMS;
}

writer.publish( //
Expand All @@ -48,14 +48,14 @@ class source : public fg::node<source<T, min, count>, fg::OUT<T, "out">, fg::lim
} else {
auto [data, token] = writer.get(1);
if (data.size() == 0) {
return fair::graph::node_ports_state::error;
return fair::graph::work_return_t::ERROR;
}
data[0] = process_one();
writer.publish(token, 1);
}
return fair::graph::node_ports_state::success;
return fair::graph::work_return_t::OK;
} else {
return fair::graph::node_ports_state::inputs_empty;
return fair::graph::work_return_t::DONE;
}
}
};
Expand Down
22 changes: 11 additions & 11 deletions include/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ class graph {
name() const
= 0;

virtual node_ports_state
virtual work_return_t
work() = 0;

virtual void *
Expand Down Expand Up @@ -390,7 +390,7 @@ class graph {
template<typename In>
reference_node_wrapper(In &&node) : _node(std::forward<In>(node)) {}

node_ports_state
work_return_t
work() override {
return data().work();
}
Expand Down Expand Up @@ -607,33 +607,33 @@ class graph {
}));
}

node_ports_state
work_return_t
work(init_proof& init) {
if (!init) {
return node_ports_state::error;
return work_return_t::ERROR;
}
bool run = true;
while (run) {
bool something_happened = false;
for (auto &node : _nodes) {
auto result = node->work();
if (result == node_ports_state::error) {
return node_ports_state::error;
} else if (result == node_ports_state::has_unprocessed_data) {
if (result == work_return_t::ERROR) {
return work_return_t::ERROR;
} else if (result == work_return_t::INSUFFICIENT_INPUT_ITEMS) {
// nothing
} else if (result == node_ports_state::inputs_empty) {
} else if (result == work_return_t::DONE) {
// nothing
} else if (result == node_ports_state::success) {
} else if (result == work_return_t::OK) {
something_happened = true;
} else if (result == node_ports_state::writers_not_available) {
} else if (result == work_return_t::INSUFFICIENT_OUTPUT_ITEMS) {
something_happened = true;
}
}

run = something_happened;
}

return node_ports_state::inputs_empty;
return work_return_t::DONE;
}
};

Expand Down
24 changes: 18 additions & 6 deletions include/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,19 @@ namespace fair::graph {
namespace stdx = vir::stdx;
using fair::meta::fixed_string;

enum class node_ports_state { success, has_unprocessed_data, inputs_empty, writers_not_available, error };
enum class work_return_t {
ERROR = -100, /// error occurred in the work function
INSUFFICIENT_OUTPUT_ITEMS =
-3, /// work requires a larger output buffer to produce output
INSUFFICIENT_INPUT_ITEMS =
-2, /// work requires a larger input buffer to produce output
DONE =
-1, /// this block has completed its processing and the flowgraph should be done
OK = 0, /// work call was successful and return values in i/o structs are valid
CALLBACK_INITIATED =
1, /// rather than blocking in the work function, the block will call back to the
/// parent interface when it is ready to be called again
};

namespace work_strategies {
template<typename Self>
Expand Down Expand Up @@ -88,21 +100,21 @@ consume_readers(Self& self, std::size_t available_values_count) {
struct read_many_and_publish_many {

template<typename Self>
static node_ports_state
static work_return_t
work(Self &self) noexcept {
// Capturing structured bindings does not work in Clang...
auto inputs_status = work_strategies::inputs_status(self);

if (inputs_status.available_values_count == 0) {
return inputs_status.at_least_one_input_has_data ? node_ports_state::has_unprocessed_data : node_ports_state::inputs_empty;
return inputs_status.at_least_one_input_has_data ? work_return_t::INSUFFICIENT_INPUT_ITEMS : work_return_t::DONE;
}

bool all_writers_available = std::apply([inputs_status](auto&... output_port) {
return ((output_port.writer().available() >= inputs_status.available_values_count) && ... && true);
}, output_ports(&self));

if (!all_writers_available) {
return node_ports_state::writers_not_available;
return work_return_t::INSUFFICIENT_OUTPUT_ITEMS;
}

auto input_spans = meta::tuple_transform([inputs_status](auto& input_port) {
Expand Down Expand Up @@ -153,7 +165,7 @@ struct read_many_and_publish_many {
fmt::print("Node {} failed to consume {} values from inputs\n", self.name(), inputs_status.available_values_count);
}

return success ? node_ports_state::success : node_ports_state::error;
return success ? work_return_t::OK : work_return_t::ERROR;
}
};

Expand Down Expand Up @@ -335,7 +347,7 @@ class node : protected std::tuple<Arguments...> {
}
}

node_ports_state
work_return_t
work() noexcept {
return work_strategy::work(self());
}
Expand Down
6 changes: 3 additions & 3 deletions test/qa_dynamic_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class repeater_source : public fg::node<repeater_source<T, value>, fg::OUT<T, "v
std::size_t _counter = 0;

public:
fair::graph::node_ports_state
fair::graph::work_return_t
work() {
if (_counter < count) {
_counter++;
Expand All @@ -63,9 +63,9 @@ class repeater_source : public fg::node<repeater_source<T, value>, fg::OUT<T, "v
data[0] = value;
writer.publish(token, 1);

return fair::graph::node_ports_state::success;
return fair::graph::work_return_t::OK;
} else {
return fair::graph::node_ports_state::inputs_empty;
return fair::graph::work_return_t::DONE;
}
}
};
Expand Down

0 comments on commit 4201658

Please sign in to comment.