From 123f3b94d8870cb15135e247eaa159dd39f34e15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20=C4=8Cuki=C4=87?= Date: Wed, 1 Feb 2023 09:13:08 +0100 Subject: [PATCH 1/4] Split into headers and refactor --- bench/bm_case1.cpp | 10 +- bench/bm_test_helper.hpp | 13 +- include/graph.hpp | 907 +-------------------------------------- include/merged_node.hpp | 121 ++++++ include/node.hpp | 391 +++++++++++++++++ include/port.hpp | 374 ++++++++++++++++ include/utils.hpp | 35 ++ src/main.cpp | 1 + test/qa_dynamic_port.cpp | 6 +- 9 files changed, 948 insertions(+), 910 deletions(-) create mode 100644 include/merged_node.hpp create mode 100644 include/node.hpp create mode 100644 include/port.hpp diff --git a/bench/bm_case1.cpp b/bench/bm_case1.cpp index 7f6050e3b..eb0a681b8 100644 --- a/bench/bm_case1.cpp +++ b/bench/bm_case1.cpp @@ -52,7 +52,7 @@ class copy : public fg::node(this); auto &in_port = input_port<"in">(this); @@ -62,9 +62,9 @@ class copy : public fg::node -#endif +#include inline constexpr std::size_t N_MAX = std::numeric_limits::max(); @@ -26,7 +25,7 @@ class source : public fg::node, fg::OUT, fg::lim return T{}; } - fair::graph::work_result + fair::graph::node_ports_state work() { const std::size_t n_to_publish = _n_samples_max - n_samples_produced; if (n_to_publish > 0) { @@ -36,7 +35,7 @@ class source : public fg::node, fg::OUT, fg::lim if constexpr (use_bulk_operation) { std::size_t n_write = std::clamp(n_to_publish, 0UL, std::min(writer.available(), port.max_buffer_size())); if (n_write == 0) { - return fair::graph::work_result::has_unprocessed_data; + return fair::graph::node_ports_state::has_unprocessed_data; } writer.publish( // @@ -49,14 +48,14 @@ class source : public fg::node, fg::OUT, fg::lim } else { auto [data, token] = writer.get(1); if (data.size() == 0) { - return fair::graph::work_result::error; + return fair::graph::node_ports_state::error; } data[0] = process_one(); writer.publish(token, 1); } - return fair::graph::work_result::success; + return fair::graph::node_ports_state::success; } else { - return fair::graph::work_result::inputs_empty; + return fair::graph::node_ports_state::inputs_empty; } } }; diff --git a/include/graph.hpp b/include/graph.hpp index 6fd63c7af..81409f06e 100644 --- a/include/graph.hpp +++ b/include/graph.hpp @@ -5,6 +5,8 @@ #include // localinclude #include // localinclude #include // localinclude +#include // localinclude +#include // localinclude #include // localinclude #include @@ -30,575 +32,6 @@ namespace fair::graph { -namespace stdx = vir::stdx; - -using fair::meta::fixed_string; - -// #### default supported types -- TODO: to be replaced by pmt::pmtv declaration -using supported_type = std::variant, std::complex /*, ...*/>; - -enum class port_direction_t { INPUT, OUTPUT, ANY }; // 'ANY' only for query and not to be used for port declarations -enum class connection_result_t { SUCCESS, FAILED }; -enum class port_type_t { STREAM, MESSAGE }; // TODO: Think of a better name -enum class port_domain_t { CPU, GPU, NET, FPGA, DSP, MLU }; - -template -concept Port = requires(T t, const std::size_t n_items) { // dynamic definitions - typename T::value_type; - { t.pmt_type() } -> std::same_as; - { t.type() } -> std::same_as; - { t.direction() } -> std::same_as; - { t.name() } -> std::same_as; - { t.resize_buffer(n_items) } -> std::same_as; - { t.disconnect() } -> std::same_as; - }; - -class dynamic_port; - -template -concept has_fixed_port_info_v = requires { - typename T::value_type; - { T::static_name() }; - { T::direction() } -> std::same_as; - { T::type() } -> std::same_as; - }; - -template -using has_fixed_port_info = std::integral_constant>; - -template -struct has_fixed_port_info_or_is_typelist : std::false_type {}; - -template - requires has_fixed_port_info_v -struct has_fixed_port_info_or_is_typelist : std::true_type {}; - -template - requires(meta::is_typelist_v and T::template all_of) -struct has_fixed_port_info_or_is_typelist : std::true_type {}; - -template -struct limits { - using limits_tag = std::true_type; - static constexpr std::size_t min = _min; - static constexpr std::size_t max = _max; -}; - -template -concept is_limits_v = requires { typename T::limits_tag; }; - -static_assert(!is_limits_v); -static_assert(!is_limits_v); -static_assert(is_limits_v>); - -template -using is_limits = std::integral_constant>; - -template -using port_type = typename Port::value_type; - -template -using is_in_port = std::integral_constant; - -template -concept InPort = is_in_port::value; - -template -using is_out_port = std::integral_constant; - -template -concept OutPort = is_out_port::value; - -enum class work_result { success, has_unprocessed_data, inputs_empty, writers_not_available, error }; - -namespace work_policies { -struct one_by_one { // TODO: remove -- benchmark indicate this being inefficient - - template - static work_result - work(Self &self) noexcept { - auto inputs_available = [&self](std::index_sequence) { return ((input_port(&self).reader().available() > 0) && ...); } - (std::make_index_sequence()); - - if (!inputs_available) { - return work_result::inputs_empty; - } - - auto results = [&self](std::index_sequence) { - auto result = meta::invoke_void_wrapped([&self](Args... args) { return self.process_one(std::forward(args)...); }, input_port(&self).reader().get(1)[0]...); - - bool success = true; - ((success = success && input_port(&self).reader().consume(1)), ...); - if (!success) throw fmt::format("Buffers reported more available data than was available"); - - return result; - } - (std::make_index_sequence()); - - if constexpr (std::is_same_v) { - // process_one returned void - - } else if constexpr (requires { std::get<0>(results); }) { - static_assert(std::tuple_size_v == Self::output_ports::size); - [&self, &results](std::index_sequence) { - ((output_port(&self).writer().publish([&results](auto &w) { w[0] = std::get(std::move(results)); }, 1)), ...); - } - (std::make_index_sequence()); - - } else { - static_assert(Self::output_ports::size == 1); - output_port<0>(&self).writer().publish([&results](auto &w) { w[0] = std::move(results); }, 1); - } - - return work_result::success; - } -}; - -struct read_many_and_publish_one_by_one { // TODO: remove -- benchmark indicate this being inefficient - - template - static work_result - work(Self &self) noexcept { - auto available_values_count = [&self](std::index_sequence) { - return std::min({ std::numeric_limits::max(), std::clamp((input_port(&self).reader().available()), std::size_t{ 0 }, std::size_t{ 1024 })... }); // TODO min and max - } - (std::make_index_sequence()); - - if (available_values_count == 0) { - return work_result::inputs_empty; - } - - auto input_spans = [&self, available_values_count](std::index_sequence) { return std::make_tuple(input_port(&self).reader().get(available_values_count)...); } - (std::make_index_sequence()); - - for (std::size_t i = 0; i < available_values_count; ++i) { - auto results = [&self, &input_spans, i](std::index_sequence) { - auto result = meta::invoke_void_wrapped([&self, &input_spans, i](Args... args) { return self.process_one(std::forward(args)...); }, - std::get(input_spans)[i]...); - - return result; - } - (std::make_index_sequence()); - - if constexpr (std::is_same_v) { - // process_one returned void - - } else if constexpr (requires { std::get<0>(results); }) { - static_assert(std::tuple_size_v == Self::output_ports::size); - [&self, &results](std::index_sequence) { - ((output_port(&self).writer().publish([&results](auto &w) { w[0] = std::get(std::move(results)); }, 1)), ...); - } - (std::make_index_sequence()); - - } else { - static_assert(Self::output_ports::size == 1); - output_port<0>(&self).writer().publish([&results](auto &w) { w[0] = std::move(results); }, 1); - } - } - - [&self, available_values_count](std::index_sequence) { return std::make_tuple(input_port(&self).reader().consume(available_values_count)...); } - (std::make_index_sequence()); - } -}; - -struct read_many_and_publish_many { - template - static work_result - work(Self &self) noexcept { - bool at_least_one_input_has_data = false; - const std::size_t available_values_count = [&self, &at_least_one_input_has_data]() { - if constexpr (Self::input_ports::size > 0) { - const auto availableForPort = [&at_least_one_input_has_data](Port &port) noexcept { - const std::size_t available = port.reader().available(); - if (available > 0LU) at_least_one_input_has_data = true; - if (available < port.min_buffer_size()) { - return 0LU; - } else { - return std::min(available, port.max_buffer_size()); - } - }; - - const auto availableInAll = [&self, &availableForPort](std::index_sequence) { - const auto betterMin = [](Arg arg, Args &&...args) noexcept -> std::size_t { - if constexpr (sizeof...(Args) == 0) { - return arg; - } else { - return std::min(arg, args...); - } - }; - return betterMin(availableForPort(input_port(&self))...); - } - (std::make_index_sequence()); - - if (availableInAll < self.min_samples()) { - return 0LU; - } else { - return std::min(availableInAll, self.max_samples()); - } - } else { - (void) self; - return std::size_t{ 1 }; - } - }(); - - if (available_values_count == 0) { - return at_least_one_input_has_data ? work_result::has_unprocessed_data : work_result::inputs_empty; - } - - bool all_writers_available = [&self, available_values_count](std::index_sequence) { - return ((output_port(&self).writer().available() >= available_values_count) && ... && true); - } - (std::make_index_sequence()); - - if (!all_writers_available) { - return work_result::writers_not_available; - } - - auto input_spans = [&self, available_values_count](std::index_sequence) { return std::make_tuple(input_port(&self).reader().get(available_values_count)...); } - (std::make_index_sequence()); - - auto writers_tuple = [&self, available_values_count](std::index_sequence) { - return std::make_tuple(output_port(&self).writer().get(available_values_count)...); - } - (std::make_index_sequence()); - - // TODO: check here whether a process_one(...) or a bulk access process has been defined, cases: - // case 1a: N-in->N-out -> process_one(...) -> auto-handling of streaming tags - // case 1b: N-in->N-out -> process_bulk(, ) -> auto-handling of streaming tags - // case 2a: N-in->M-out -> process_bulk(, ) N,M fixed -> aka. interpolator (M>N) or decimator (MM-out -> process_bulk(<{ins,tag-IO}...>, <{outs,tag-IO}...>) user-level tag handling - // case 3: N-in->M-out -> work() N,M arbitrary -> used need to handle the full logic (e.g. PLL algo) - // case 4: Python -> map to cases 1-3 and/or dedicated callback - // special cases: - // case sources: HW triggered vs. generating data per invocation (generators via Port::MIN) - // case sinks: HW triggered vs. fixed-size consumer (may block/never finish for insufficient input data and fixed Port::MIN>0) - for (std::size_t i = 0; i < available_values_count; ++i) { - auto results = [&self, &input_spans, i](std::index_sequence) noexcept { - return meta::invoke_void_wrapped([&self](Args... args) { return self.process_one(std::forward(args)...); }, std::get(input_spans)[i]...); - } - (std::make_index_sequence()); - - if constexpr (std::is_same_v) { - // process_one returned void - - } else if constexpr (requires { std::get<0>(results); }) { - static_assert(std::tuple_size_v == Self::output_ports::size); - [&self, &results, &writers_tuple, i](std::index_sequence) { ((std::get(writers_tuple).first /*data*/[i] = std::get(std::move(results))), ...); } - (std::make_index_sequence()); - - } else { - static_assert(Self::output_ports::size == 1); - std::get<0>(writers_tuple).first /*data*/[i] = std::move(results); - } - } - - if constexpr (Self::output_ports::size > 0) { - [&self, &writers_tuple, available_values_count](std::index_sequence) { - ((output_port(&self).writer().publish(std::get(writers_tuple).second, available_values_count)), ...); - } - (std::make_index_sequence()); - } - - bool success = true; - if constexpr (Self::input_ports::size > 0) { - [&self, available_values_count, &success](std::index_sequence) { - ((success = success && input_port(&self).reader().consume(available_values_count)), ...); - } - (std::make_index_sequence()); - } - - if (!success) { - fmt::print("Node {} failed to consume {} values from inputs\n", self.name(), available_values_count); - } - - return success ? work_result::success : work_result::error; - } -}; - -using default_policy = read_many_and_publish_many; -} // namespace work_policies - -template> -class port { -public: - static_assert(PortDirection != port_direction_t::ANY, "ANY reserved for queries and not port direction declarations"); - - using value_type = T; - - static constexpr bool IS_INPUT = PortDirection == port_direction_t::INPUT; - static constexpr bool IS_OUTPUT = PortDirection == port_direction_t::OUTPUT; - - using port_tag = std::true_type; - -private: - using ReaderType = decltype(std::declval().new_reader()); - using WriterType = decltype(std::declval().new_writer()); - using IoType = std::conditional_t; - - std::string _name = static_cast(PortName); - std::int16_t _priority = 0; // → dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) - std::size_t _n_history = N_HISTORY; - std::size_t _min_samples = (MIN_SAMPLES == std::dynamic_extent ? 1 : MIN_SAMPLES); - std::size_t _max_samples = MAX_SAMPLES; - bool _connected = false; - - IoType _ioHandler = new_io_handler(); - -public: - [[nodiscard]] constexpr auto - new_io_handler() const noexcept { - if constexpr (IS_INPUT) { - return BufferType(65536).new_reader(); - } else { - return BufferType(65536).new_writer(); - } - } - - [[nodiscard]] void * - writer_handler_internal() noexcept { - static_assert(IS_OUTPUT, "only to be used with output ports"); - return static_cast(std::addressof(_ioHandler)); - } - - [[nodiscard]] bool - update_reader_internal(void *buffer_writer_handler_other) noexcept { - static_assert(IS_INPUT, "only to be used with input ports"); - - if (buffer_writer_handler_other == nullptr) { - return false; - } - - // TODO: If we want to allow ports with different buffer types to be mixed - // this will fail. We need to add a check that two ports that - // connect to each other use the same buffer type - // (std::any could be a viable approach) - auto typed_buffer_writer = static_cast(buffer_writer_handler_other); - setBuffer(typed_buffer_writer->buffer()); - return true; - } - -public: - port() = default; - - port(const port &) = delete; - - auto - operator=(const port &) - = delete; - - port(std::string port_name, std::int16_t priority = 0, std::size_t n_history = 0, std::size_t min_samples = 0U, std::size_t max_samples = SIZE_MAX) noexcept - : _name(std::move(port_name)) - , _priority{ priority } - , _n_history(n_history) - , _min_samples(min_samples) - , _max_samples(max_samples) { - static_assert(PortName.empty(), "port name must be exclusively declared via NTTP or constructor parameter"); - } - - constexpr port(port &&other) noexcept - : _name(std::move(other._name)) - , _priority{ other._priority } - , _n_history(other._n_history) - , _min_samples(other._min_samples) - , _max_samples(other._max_samples) {} - - constexpr port & - operator=(port &&other) { - port tmp(std::move(other)); - std::swap(_name, tmp._name); - std::swap(_priority, tmp._priority); - std::swap(_n_history, tmp._n_history); - std::swap(_min_samples, tmp._min_samples); - std::swap(_max_samples, tmp._max_samples); - std::swap(_connected, tmp._connected); - std::swap(_ioHandler, tmp._ioHandler); - return *this; - } - - [[nodiscard]] constexpr static port_type_t - type() noexcept { - return Porttype; - } - - [[nodiscard]] constexpr static port_direction_t - direction() noexcept { - return PortDirection; - } - - [[nodiscard]] constexpr static decltype(PortName) - static_name() noexcept - requires(!PortName.empty()) - { - return PortName; - } - - [[nodiscard]] constexpr supported_type - pmt_type() const noexcept { - return T(); - } - - [[nodiscard]] constexpr std::string_view - name() const noexcept { - if constexpr (!PortName.empty()) { - return static_cast(PortName); - } else { - return _name; - } - } - - [[nodiscard]] constexpr static bool - is_optional() noexcept { - return OPTIONAL; - } - - [[nodiscard]] constexpr std::int16_t - priority() const noexcept { - return _priority; - } - - [[nodiscard]] constexpr static std::size_t - available() noexcept { - return 0; - } // ↔ maps to Buffer::Buffer[Reader, Writer].available() - - [[nodiscard]] constexpr std::size_t - n_history() const noexcept { - if constexpr (N_HISTORY == std::dynamic_extent) { - return _n_history; - } else { - return N_HISTORY; - } - } - - [[nodiscard]] constexpr std::size_t - min_buffer_size() const noexcept { - if constexpr (MIN_SAMPLES == std::dynamic_extent) { - return _min_samples; - } else { - return MIN_SAMPLES; - } - } - - [[nodiscard]] constexpr std::size_t - max_buffer_size() const noexcept { - if constexpr (MAX_SAMPLES == std::dynamic_extent) { - return _max_samples; - } else { - return MAX_SAMPLES; - } - } - - [[nodiscard]] constexpr connection_result_t - resize_buffer(std::size_t min_size) noexcept { - if constexpr (IS_INPUT) { - return connection_result_t::SUCCESS; - } else { - try { - _ioHandler = BufferType(min_size).new_writer(); - } catch (...) { - return connection_result_t::FAILED; - } - } - return connection_result_t::SUCCESS; - } - - [[nodiscard]] BufferType - buffer() { - return _ioHandler.buffer(); - } - - void - setBuffer(gr::Buffer auto buffer) noexcept { - if constexpr (IS_INPUT) { - _ioHandler = std::move(buffer.new_reader()); - _connected = true; - } else { - _ioHandler = std::move(buffer.new_writer()); - } - } - - [[nodiscard]] constexpr ReaderType & - reader() const noexcept { - static_assert(!IS_OUTPUT, "reader() not applicable for outputs (yet)"); - return _ioHandler; - } - - [[nodiscard]] constexpr ReaderType & - reader() noexcept { - static_assert(!IS_OUTPUT, "reader() not applicable for outputs (yet)"); - return _ioHandler; - } - - [[nodiscard]] constexpr WriterType & - writer() const noexcept { - static_assert(!IS_INPUT, "writer() not applicable for inputs (yet)"); - return _ioHandler; - } - - [[nodiscard]] constexpr WriterType & - writer() noexcept { - static_assert(!IS_INPUT, "writer() not applicable for inputs (yet)"); - return _ioHandler; - } - - [[nodiscard]] connection_result_t - disconnect() noexcept { - if (_connected == false) { - return connection_result_t::FAILED; - } - _ioHandler = new_io_handler(); - _connected = false; - return connection_result_t::SUCCESS; - } - - template - [[nodiscard]] connection_result_t - connect(Other &&other) { - static_assert(IS_OUTPUT && std::remove_cvref_t::IS_INPUT); - auto src_buffer = writer_handler_internal(); - return std::forward(other).update_reader_internal(src_buffer) ? connection_result_t::SUCCESS - : connection_result_t::FAILED; - } - - friend class dynamic_port; -}; - -namespace detail { -template -using just_t = T; - -template -consteval fair::meta::typelist...> -repeated_ports_impl(std::index_sequence) { - return {}; -} -} // namespace detail - -// TODO: Add port index to BaseName -template -using repeated_ports = decltype(detail::repeated_ports_impl>(std::make_index_sequence())); - -template -using IN = port; -template -using OUT = port; -template -using IN_MSG = port; -template -using OUT_MSG = port; - -static_assert(Port>); -static_assert(Port("in"))>); -static_assert(Port>); -static_assert(Port>); -static_assert(Port>); - -static_assert(IN::static_name() == fixed_string("in")); -static_assert(requires { IN("in").name(); }); - /** * Runtime capable wrapper to be used within a block. It's primary purpose is to allow the runtime * initialisation/connections between blocks that are not in the same compilation unit. @@ -808,213 +241,6 @@ class dynamic_port { static_assert(Port); -template -struct fixed_node_ports_data_helper; - -template - requires InputPorts::template -all_of &&OutputPorts::template all_of struct fixed_node_ports_data_helper { - using input_ports = InputPorts; - using output_ports = OutputPorts; - - using input_port_types = typename input_ports ::template transform; - using output_port_types = typename output_ports ::template transform; - - using all_ports = meta::concat; -}; - -template -struct fixed_node_ports_data_helper { - using all_ports = meta::concat, Ports, meta::typelist>...>; - - using input_ports = typename all_ports ::template filter; - using output_ports = typename all_ports ::template filter; - - using input_port_types = typename input_ports ::template transform; - using output_port_types = typename output_ports ::template transform; -}; - -template -using fixed_node_ports_data = typename meta::typelist::template filter::template apply; - -// Ports can either be a list of ports instances, -// or two typelists containing port instances -- one for input -// ports and one for output ports -template -// class node: fixed_node_ports_data::all_ports::tuple_type { -class node : protected std::tuple { -public: - using fixed_ports_data = fixed_node_ports_data; - - using all_ports = typename fixed_ports_data::all_ports; - using input_ports = typename fixed_ports_data::input_ports; - using output_ports = typename fixed_ports_data::output_ports; - using input_port_types = typename fixed_ports_data::input_port_types; - using output_port_types = typename fixed_ports_data::output_port_types; - - using return_type = typename output_port_types::tuple_or_type; - using work_policy = work_policies::default_policy; - friend work_policy; - - using min_max_limits = typename meta::typelist::template filter; - static_assert(min_max_limits::size <= 1); - -private: - using setting_map = std::map>; - std::string _name{ std::string(fair::meta::type_name()) }; - - setting_map _exec_metrics{}; // → std::map → fair scheduling, 'int' stand-in for pmtv - - friend class graph; - -public: - auto & - self() { - return *static_cast(this); - } - - const auto & - self() const { - return *static_cast(this); - } - - [[nodiscard]] std::string_view - name() const noexcept { - return _name; - } - - void - set_name(std::string name) noexcept { - _name = std::move(name); - } - - template - friend constexpr auto & - input_port(Self *self) noexcept; - - template - friend constexpr auto & - output_port(Self *self) noexcept; - - template - friend constexpr auto & - input_port(Self *self) noexcept; - - template - friend constexpr auto & - output_port(Self *self) noexcept; - - template - [[gnu::always_inline]] constexpr bool - process_batch_simd_epilogue(std::size_t n, auto out_ptr, auto... in_ptr) { - if constexpr (N == 0) return true; - else if (N <= n) { - using In0 = meta::first_type; - using V = stdx::resize_simd_t>; - using Vs = meta::transform_types::template rebind, input_port_types>; - const std::tuple input_simds = Vs::template construct(std::tuple{ in_ptr... }); - const stdx::simd result = std::apply([this](auto... args) { return self().process_one(args...); }, input_simds); - result.copy_to(out_ptr, stdx::element_aligned); - return process_batch_simd_epilogue(n, out_ptr + N, (in_ptr + N)...); - } else - return process_batch_simd_epilogue(n, out_ptr, in_ptr...); - } - -#ifdef NOT_YET_PORTED_AS_IT_IS_UNUSED - template - requires(std::ranges::sized_range && ...) && input_port_types::template - are_equal...> constexpr bool process_batch(port_data &out, Ins &&...inputs) { - const auto &in0 = std::get<0>(std::tie(inputs...)); - const std::size_t n = std::ranges::size(in0); - detail::precondition(((n == std::ranges::size(inputs)) && ...)); - auto &&out_range = out.request_write(n); - // if SIMD makes sense (i.e. input and output ranges are contiguous and all types are - // vectorizable) - if constexpr ((std::ranges::contiguous_range && ... && std::ranges::contiguous_range) &&detail::vectorizable && detail::node_can_process_simd - && input_port_types ::template transform::template all_of) { - using V = detail::reduce_to_widest_simd; - using Vs = detail::transform_by_rebind_simd; - std::size_t i = 0; - for (i = 0; i + V::size() <= n; i += V::size()) { - const std::tuple input_simds = Vs::template construct(std::tuple{ (std::ranges::data(inputs) + i)... }); - const stdx::simd result = std::apply([this](auto... args) { return self().process_one(args...); }, input_simds); - result.copy_to(std::ranges::data(out_range) + i, stdx::element_aligned); - } - - return process_batch_simd_epilogue(n - i, std::ranges::data(out_range) + i, (std::ranges::data(inputs) + i)...); - } else { // no explicit SIMD - auto out_it = out_range.begin(); - std::tuple it_tuple = { std::ranges::begin(inputs)... }; - const std::tuple end_tuple = { std::ranges::end(inputs)... }; - while (std::get<0>(it_tuple) != std::get<0>(end_tuple)) { - *out_it = std::apply([this](auto &...its) { return self().process_one((*its++)...); }, it_tuple); - ++out_it; - } - return true; - } - } -#endif - - [[nodiscard]] setting_map & - exec_metrics() noexcept { - return _exec_metrics; - } - - [[nodiscard]] setting_map const & - exec_metrics() const noexcept { - return _exec_metrics; - } - - [[nodiscard]] constexpr std::size_t - min_samples() const noexcept { - if constexpr (min_max_limits::size == 1) { - return min_max_limits::template at<0>::min; - } else { - return 0; - } - } - - [[nodiscard]] constexpr std::size_t - max_samples() const noexcept { - if constexpr (min_max_limits::size == 1) { - return min_max_limits::template at<0>::max; - } else { - return std::numeric_limits::max(); - } - } - - work_result - work() noexcept { - return work_policy::work(self()); - } -}; - -template -[[nodiscard]] constexpr auto & -input_port(Self *self) noexcept { - return std::get>(*self); -} - -template -[[nodiscard]] constexpr auto & -output_port(Self *self) noexcept { - return std::get>(*self); -} - -template -[[nodiscard]] constexpr auto & -input_port(Self *self) noexcept { - constexpr int Index = meta::indexForName(); - return std::get>(*self); -} - -template -[[nodiscard]] constexpr auto & -output_port(Self *self) noexcept { - constexpr int Index = meta::indexForName(); - return std::get>(*self); -} - #define ENABLE_PYTHON_INTEGRATION #ifdef ENABLE_PYTHON_INTEGRATION @@ -1109,115 +335,6 @@ class dynamic_node { #endif -template -class merged_node : public node, meta::concat>, - meta::concat, typename Right::output_ports>> { -private: - // copy-paste from above, keep in sync - using base = node, meta::concat>, - meta::concat, typename Right::output_ports>>; - - Left left; - Right right; - - template - [[gnu::always_inline]] constexpr auto - apply_left(auto &&input_tuple) { - return [&](std::index_sequence) { return left.process_one(std::get(input_tuple)...); } - (std::make_index_sequence()); - } - - template - [[gnu::always_inline]] constexpr auto - apply_right(auto &&input_tuple, auto &&tmp) { - return [&](std::index_sequence, std::index_sequence) { - constexpr std::size_t first_offset = Left::input_port_types::size; - constexpr std::size_t second_offset = Left::input_port_types::size + sizeof...(Is); - static_assert(second_offset + sizeof...(Js) == std::tuple_size_v>); - return right.process_one(std::get(input_tuple)..., std::move(tmp), std::get(input_tuple)...); - } - (std::make_index_sequence(), std::make_index_sequence()); - } - -public: - using input_port_types = typename base::input_port_types; - using output_port_types = typename base::output_port_types; - using return_type = typename base::return_type; - - [[gnu::always_inline]] constexpr merged_node(Left l, Right r) : left(std::move(l)), right(std::move(r)) {} - - template - requires meta::vectorizable && input_port_types::template - are_equal::value_type...> &&meta::node_can_process_simd - &&meta::node_can_process_simd constexpr stdx::rebind_simd_t...>>> - process_one(Ts... inputs) { - return apply_right(std::tie(inputs...), apply_left(std::tie(inputs...))); - } - - template - // In order to have nicer error messages, this is checked in the function body - // requires input_port_types::template are_equal...> - constexpr return_type - process_one(Ts &&...inputs) { - if constexpr (!input_port_types::template are_equal...>) { - meta::print_types...> error{}; - } - - if constexpr (Left::output_port_types::size == 1) { // only the result from the right node needs to be returned - return apply_right(std::forward_as_tuple(std::forward(inputs)...), - apply_left(std::forward_as_tuple(std::forward(inputs)...))); - - } else { - // left produces a tuple - auto left_out = apply_left(std::forward_as_tuple(std::forward(inputs)...)); - auto right_out = apply_right(std::forward_as_tuple(std::forward(inputs)...), std::move(std::get(left_out))); - - if constexpr (Left::output_port_types::size == 2 && Right::output_port_types::size == 1) { - return std::make_tuple(std::move(std::get(left_out)), std::move(right_out)); - - } else if constexpr (Left::output_port_types::size == 2) { - return std::tuple_cat(std::make_tuple(std::move(std::get(left_out))), std::move(right_out)); - - } else if constexpr (Right::output_port_types::size == 1) { - return [&](std::index_sequence, std::index_sequence) { - return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(right_out)); - } - (std::make_index_sequence(), std::make_index_sequence()); - - } else { - return [&](std::index_sequence, std::index_sequence, std::index_sequence) { - return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(std::get(right_out)...)); - } - (std::make_index_sequence(), std::make_index_sequence(), std::make_index_sequence()); - } - } - } -}; - -template -[[gnu::always_inline]] constexpr auto -merge_by_index(A &&a, B &&b) -> merged_node, std::remove_cvref_t, OutId, InId> { - if constexpr (!std::is_same_v::output_port_types::template at, typename std::remove_cvref_t::input_port_types::template at>) { - fair::meta::print_types, typename std::remove_cvref_t::output_port_types, std::integral_constant, - typename std::remove_cvref_t::output_port_types::template at, - - fair::meta::message_type<"INPUT_PORTS_ARE:">, typename std::remove_cvref_t::input_port_types, std::integral_constant, - typename std::remove_cvref_t::input_port_types::template at>{}; - } - return { std::forward(a), std::forward(b) }; -} - -template -[[gnu::always_inline]] constexpr auto -merge(A &&a, B &&b) { - constexpr std::size_t OutId = meta::indexForName(); - constexpr std::size_t InId = meta::indexForName(); - static_assert(OutId != -1); - static_assert(InId != -1); - static_assert(std::same_as::output_port_types::template at, typename std::remove_cvref_t::input_port_types::template at>, - "Port types do not match"); - return merged_node, std::remove_cvref_t, OutId, InId>{ std::forward(a), std::forward(b) }; -} class graph { private: @@ -1229,7 +346,7 @@ class graph { name() const = 0; - virtual work_result + virtual node_ports_state work() = 0; virtual void * @@ -1273,7 +390,7 @@ class graph { template reference_node_wrapper(In &&node) : _node(std::forward(node)) {} - work_result + node_ports_state work() override { return data().work(); } @@ -1449,22 +566,22 @@ class graph { _nodes.push_back(std::make_unique>(std::addressof(node))); } - work_result + node_ports_state work() { bool run = true; while (run) { bool something_happened = false; for (auto &node : _nodes) { auto result = node->work(); - if (result == work_result::error) { - return work_result::error; - } else if (result == work_result::has_unprocessed_data) { + if (result == node_ports_state::error) { + return node_ports_state::error; + } else if (result == node_ports_state::has_unprocessed_data) { // nothing - } else if (result == work_result::inputs_empty) { + } else if (result == node_ports_state::inputs_empty) { // nothing - } else if (result == work_result::success) { + } else if (result == node_ports_state::success) { something_happened = true; - } else if (result == work_result::writers_not_available) { + } else if (result == node_ports_state::writers_not_available) { something_happened = true; } } @@ -1472,7 +589,7 @@ class graph { run = something_happened; } - return work_result::inputs_empty; + return node_ports_state::inputs_empty; } }; diff --git a/include/merged_node.hpp b/include/merged_node.hpp new file mode 100644 index 000000000..90549272e --- /dev/null +++ b/include/merged_node.hpp @@ -0,0 +1,121 @@ +#ifndef GNURADIO_MERGED_NODE_HPP +#define GNURADIO_MERGED_NODE_HPP + +#include +#include + +namespace fair::graph { + +template +class merged_node : public node, meta::concat>, + meta::concat, typename Right::output_ports>> { +private: + // copy-paste from above, keep in sync + using base = node, meta::concat>, + meta::concat, typename Right::output_ports>>; + + Left left; + Right right; + + template + [[gnu::always_inline]] constexpr auto + apply_left(auto &&input_tuple) { + return [&](std::index_sequence) { return left.process_one(std::get(input_tuple)...); } + (std::make_index_sequence()); + } + + template + [[gnu::always_inline]] constexpr auto + apply_right(auto &&input_tuple, auto &&tmp) { + return [&](std::index_sequence, std::index_sequence) { + constexpr std::size_t first_offset = Left::input_port_types::size; + constexpr std::size_t second_offset = Left::input_port_types::size + sizeof...(Is); + static_assert(second_offset + sizeof...(Js) == std::tuple_size_v>); + return right.process_one(std::get(input_tuple)..., std::move(tmp), std::get(input_tuple)...); + } + (std::make_index_sequence(), std::make_index_sequence()); + } + +public: + using input_port_types = typename base::input_port_types; + using output_port_types = typename base::output_port_types; + using return_type = typename base::return_type; + + [[gnu::always_inline]] constexpr merged_node(Left l, Right r) : left(std::move(l)), right(std::move(r)) {} + + template + requires meta::vectorizable && input_port_types::template + are_equal::value_type...> &&meta::node_can_process_simd + &&meta::node_can_process_simd constexpr stdx::rebind_simd_t...>>> + process_one(Ts... inputs) { + return apply_right(std::tie(inputs...), apply_left(std::tie(inputs...))); + } + + template + // In order to have nicer error messages, this is checked in the function body + // requires input_port_types::template are_equal...> + constexpr return_type + process_one(Ts &&...inputs) { + if constexpr (!input_port_types::template are_equal...>) { + meta::print_types...> error{}; + } + + if constexpr (Left::output_port_types::size == 1) { // only the result from the right node needs to be returned + return apply_right(std::forward_as_tuple(std::forward(inputs)...), + apply_left(std::forward_as_tuple(std::forward(inputs)...))); + + } else { + // left produces a tuple + auto left_out = apply_left(std::forward_as_tuple(std::forward(inputs)...)); + auto right_out = apply_right(std::forward_as_tuple(std::forward(inputs)...), std::move(std::get(left_out))); + + if constexpr (Left::output_port_types::size == 2 && Right::output_port_types::size == 1) { + return std::make_tuple(std::move(std::get(left_out)), std::move(right_out)); + + } else if constexpr (Left::output_port_types::size == 2) { + return std::tuple_cat(std::make_tuple(std::move(std::get(left_out))), std::move(right_out)); + + } else if constexpr (Right::output_port_types::size == 1) { + return [&](std::index_sequence, std::index_sequence) { + return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(right_out)); + } + (std::make_index_sequence(), std::make_index_sequence()); + + } else { + return [&](std::index_sequence, std::index_sequence, std::index_sequence) { + return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(std::get(right_out)...)); + } + (std::make_index_sequence(), std::make_index_sequence(), std::make_index_sequence()); + } + } + } +}; + +template +[[gnu::always_inline]] constexpr auto +merge_by_index(A &&a, B &&b) -> merged_node, std::remove_cvref_t, OutId, InId> { + if constexpr (!std::is_same_v::output_port_types::template at, typename std::remove_cvref_t::input_port_types::template at>) { + fair::meta::print_types, typename std::remove_cvref_t::output_port_types, std::integral_constant, + typename std::remove_cvref_t::output_port_types::template at, + + fair::meta::message_type<"INPUT_PORTS_ARE:">, typename std::remove_cvref_t::input_port_types, std::integral_constant, + typename std::remove_cvref_t::input_port_types::template at>{}; + } + return { std::forward(a), std::forward(b) }; +} + +template +[[gnu::always_inline]] constexpr auto +merge(A &&a, B &&b) { + constexpr std::size_t OutId = meta::indexForName(); + constexpr std::size_t InId = meta::indexForName(); + static_assert(OutId != -1); + static_assert(InId != -1); + static_assert(std::same_as::output_port_types::template at, typename std::remove_cvref_t::input_port_types::template at>, + "Port types do not match"); + return merged_node, std::remove_cvref_t, OutId, InId>{ std::forward(a), std::forward(b) }; +} + +} // namespace fair::graph + +#endif // include guard diff --git a/include/node.hpp b/include/node.hpp new file mode 100644 index 000000000..598d374bc --- /dev/null +++ b/include/node.hpp @@ -0,0 +1,391 @@ +#ifndef GNURADIO_NODE_HPP +#define GNURADIO_NODE_HPP + +#include + +#include // localinclude +#include // localinclude +#include // localinclude + +#include +#include + +namespace fair::graph { + +namespace stdx = vir::stdx; +using fair::meta::fixed_string; + +enum class node_ports_state { success, has_unprocessed_data, inputs_empty, writers_not_available, error }; + +namespace work_strategies { +template +static auto +inputs_status(Self &self) noexcept { + bool at_least_one_input_has_data = false; + const std::size_t available_values_count = [&self, &at_least_one_input_has_data]() { + if constexpr (Self::input_ports::size > 0) { + const auto availableForPort = [&at_least_one_input_has_data](Port &port) noexcept { + const std::size_t available = port.reader().available(); + if (available > 0LU) at_least_one_input_has_data = true; + if (available < port.min_buffer_size()) { + return 0LU; + } else { + return std::min(available, port.max_buffer_size()); + } + }; + + const auto availableInAll = std::apply( + [&availableForPort] (auto&... input_port) { + return meta::safe_min(availableForPort(input_port)...); + }, + input_ports(&self)); + + if (availableInAll < self.min_samples()) { + return 0LU; + } else { + return std::min(availableInAll, self.max_samples()); + } + } else { + (void) self; + return std::size_t{ 1 }; + } + }(); + + struct result { + bool at_least_one_input_has_data; + std::size_t available_values_count; + }; + + return result { + .at_least_one_input_has_data = at_least_one_input_has_data, + .available_values_count = available_values_count + }; +} + +template +static auto +write_to_outputs(Self& self, std::size_t available_values_count, auto& writers_tuple) { + if constexpr (Self::output_ports::size > 0) { + meta::tuple_for_each([available_values_count] (auto& output_port, auto& writer) { + output_port.writer().publish(writer.second, available_values_count); + }, + output_ports(&self), writers_tuple); + } +} + +template +static bool +consume_readers(Self& self, std::size_t available_values_count) { + bool success = true; + if constexpr (Self::input_ports::size > 0) { + std::apply([available_values_count, &success] (auto&... input_port) { + ((success = success && input_port.reader().consume(available_values_count)), ...); + }, input_ports(&self)); + } + return success; +} + +struct read_many_and_publish_many { + + template + static node_ports_state + work(Self &self) noexcept { + // Capturing structured bindings does not work in Clang... + auto inputs_status = work_strategies::inputs_status(self); + + if (inputs_status.available_values_count == 0) { + return inputs_status.at_least_one_input_has_data ? node_ports_state::has_unprocessed_data : node_ports_state::inputs_empty; + } + + bool all_writers_available = std::apply([inputs_status](auto&... output_port) { + return ((output_port.writer().available() >= inputs_status.available_values_count) && ... && true); + }, output_ports(&self)); + + if (!all_writers_available) { + return node_ports_state::writers_not_available; + } + + auto input_spans = meta::tuple_transform([inputs_status](auto& input_port) { + return input_port.reader().get(inputs_status.available_values_count); + }, input_ports(&self)); + + auto writers_tuple = meta::tuple_transform([inputs_status](auto& output_port) { + return output_port.writer().get(inputs_status.available_values_count); + }, output_ports(&self)); + + // TODO: check here whether a process_one(...) or a bulk access process has been defined, cases: + // case 1a: N-in->N-out -> process_one(...) -> auto-handling of streaming tags + // case 1b: N-in->N-out -> process_bulk(, ) -> auto-handling of streaming tags + // case 2a: N-in->M-out -> process_bulk(, ) N,M fixed -> aka. interpolator (M>N) or decimator (MM-out -> process_bulk(<{ins,tag-IO}...>, <{outs,tag-IO}...>) user-level tag handling + // case 3: N-in->M-out -> work() N,M arbitrary -> used need to handle the full logic (e.g. PLL algo) + // case 4: Python -> map to cases 1-3 and/or dedicated callback + // special cases: + // case sources: HW triggered vs. generating data per invocation (generators via Port::MIN) + // case sinks: HW triggered vs. fixed-size consumer (may block/never finish for insufficient input data and fixed Port::MIN>0) + for (std::size_t i = 0; i < inputs_status.available_values_count; ++i) { + auto results = std::apply([&self, &input_spans, i](auto&... input_span) { + return meta::invoke_void_wrapped([&self](Args... args) { return self.process_one(std::forward(args)...); }, input_span[i]...); + }, input_spans); + + if constexpr (std::is_same_v) { + // process_one returned void + + } else if constexpr (requires { std::get<0>(results); }) { + static_assert(std::tuple_size_v == Self::output_ports::size); + + meta::tuple_for_each( + [i] (auto& writer, auto& result) { + writer.first/*data*/[i] = std::move(result); }, + writers_tuple, results); + + } else { + static_assert(Self::output_ports::size == 1); + std::get<0>(writers_tuple).first /*data*/[i] = std::move(results); + } + } + + write_to_outputs(self, inputs_status.available_values_count, writers_tuple); + + const bool success = consume_readers(self, inputs_status.available_values_count); + + if (!success) { + fmt::print("Node {} failed to consume {} values from inputs\n", self.name(), inputs_status.available_values_count); + } + + return success ? node_ports_state::success : node_ports_state::error; + } +}; + +using default_strategy = read_many_and_publish_many; +} // namespace work_strategies + +template +struct fixed_node_ports_data_helper; + +template + requires InputPorts::template +all_of &&OutputPorts::template all_of struct fixed_node_ports_data_helper { + using input_ports = InputPorts; + using output_ports = OutputPorts; + + using input_port_types = typename input_ports ::template transform; + using output_port_types = typename output_ports ::template transform; + + using all_ports = meta::concat; +}; + +template +struct fixed_node_ports_data_helper { + using all_ports = meta::concat, Ports, meta::typelist>...>; + + using input_ports = typename all_ports ::template filter; + using output_ports = typename all_ports ::template filter; + + using input_port_types = typename input_ports ::template transform; + using output_port_types = typename output_ports ::template transform; +}; + +template +using fixed_node_ports_data = typename meta::typelist::template filter::template apply; + +// Ports can either be a list of ports instances, +// or two typelists containing port instances -- one for input +// ports and one for output ports +template +// class node: fixed_node_ports_data::all_ports::tuple_type { +class node : protected std::tuple { +public: + using fixed_ports_data = fixed_node_ports_data; + + using all_ports = typename fixed_ports_data::all_ports; + using input_ports = typename fixed_ports_data::input_ports; + using output_ports = typename fixed_ports_data::output_ports; + using input_port_types = typename fixed_ports_data::input_port_types; + using output_port_types = typename fixed_ports_data::output_port_types; + + using return_type = typename output_port_types::tuple_or_type; + using work_strategy = work_strategies::default_strategy; + friend work_strategy; + + using min_max_limits = typename meta::typelist::template filter; + static_assert(min_max_limits::size <= 1); + +private: + using setting_map = std::map>; + std::string _name{ std::string(fair::meta::type_name()) }; + + setting_map _exec_metrics{}; // → std::map → fair scheduling, 'int' stand-in for pmtv + + friend class graph; + +public: + auto & + self() { + return *static_cast(this); + } + + const auto & + self() const { + return *static_cast(this); + } + + [[nodiscard]] std::string_view + name() const noexcept { + return _name; + } + + void + set_name(std::string name) noexcept { + _name = std::move(name); + } + + template + friend constexpr auto & + input_port(Self *self) noexcept; + + template + friend constexpr auto & + output_port(Self *self) noexcept; + + template + friend constexpr auto & + input_port(Self *self) noexcept; + + template + friend constexpr auto & + output_port(Self *self) noexcept; + + template + [[gnu::always_inline]] constexpr bool + process_batch_simd_epilogue(std::size_t n, auto out_ptr, auto... in_ptr) { + if constexpr (N == 0) return true; + else if (N <= n) { + using In0 = meta::first_type; + using V = stdx::resize_simd_t>; + using Vs = meta::transform_types::template rebind, input_port_types>; + const std::tuple input_simds = Vs::template construct(std::tuple{ in_ptr... }); + const stdx::simd result = std::apply([this](auto... args) { return self().process_one(args...); }, input_simds); + result.copy_to(out_ptr, stdx::element_aligned); + return process_batch_simd_epilogue(n, out_ptr + N, (in_ptr + N)...); + } else + return process_batch_simd_epilogue(n, out_ptr, in_ptr...); + } + +#ifdef NOT_YET_PORTED_AS_IT_IS_UNUSED + template + requires(std::ranges::sized_range && ...) && input_port_types::template + are_equal...> constexpr bool process_batch(port_data &out, Ins &&...inputs) { + const auto &in0 = std::get<0>(std::tie(inputs...)); + const std::size_t n = std::ranges::size(in0); + detail::precondition(((n == std::ranges::size(inputs)) && ...)); + auto &&out_range = out.request_write(n); + // if SIMD makes sense (i.e. input and output ranges are contiguous and all types are + // vectorizable) + if constexpr ((std::ranges::contiguous_range && ... && std::ranges::contiguous_range) &&detail::vectorizable && detail::node_can_process_simd + && input_port_types ::template transform::template all_of) { + using V = detail::reduce_to_widest_simd; + using Vs = detail::transform_by_rebind_simd; + std::size_t i = 0; + for (i = 0; i + V::size() <= n; i += V::size()) { + const std::tuple input_simds = Vs::template construct(std::tuple{ (std::ranges::data(inputs) + i)... }); + const stdx::simd result = std::apply([this](auto... args) { return self().process_one(args...); }, input_simds); + result.copy_to(std::ranges::data(out_range) + i, stdx::element_aligned); + } + + return process_batch_simd_epilogue(n - i, std::ranges::data(out_range) + i, (std::ranges::data(inputs) + i)...); + } else { // no explicit SIMD + auto out_it = out_range.begin(); + std::tuple it_tuple = { std::ranges::begin(inputs)... }; + const std::tuple end_tuple = { std::ranges::end(inputs)... }; + while (std::get<0>(it_tuple) != std::get<0>(end_tuple)) { + *out_it = std::apply([this](auto &...its) { return self().process_one((*its++)...); }, it_tuple); + ++out_it; + } + return true; + } + } +#endif + + [[nodiscard]] setting_map & + exec_metrics() noexcept { + return _exec_metrics; + } + + [[nodiscard]] setting_map const & + exec_metrics() const noexcept { + return _exec_metrics; + } + + [[nodiscard]] constexpr std::size_t + min_samples() const noexcept { + if constexpr (min_max_limits::size == 1) { + return min_max_limits::template at<0>::min; + } else { + return 0; + } + } + + [[nodiscard]] constexpr std::size_t + max_samples() const noexcept { + if constexpr (min_max_limits::size == 1) { + return min_max_limits::template at<0>::max; + } else { + return std::numeric_limits::max(); + } + } + + node_ports_state + work() noexcept { + return work_strategy::work(self()); + } +}; + +template +[[nodiscard]] constexpr auto & +input_port(Self *self) noexcept { + return std::get>(*self); +} + +template +[[nodiscard]] constexpr auto & +output_port(Self *self) noexcept { + return std::get>(*self); +} + +template +[[nodiscard]] constexpr auto & +input_port(Self *self) noexcept { + constexpr int Index = meta::indexForName(); + return std::get>(*self); +} + +template +[[nodiscard]] constexpr auto & +output_port(Self *self) noexcept { + constexpr int Index = meta::indexForName(); + return std::get>(*self); +} + +template +[[nodiscard]] constexpr auto +input_ports(Self *self) noexcept { + return [self](std::index_sequence) { + return std::tie(input_port(self)...); + } + (std::make_index_sequence()); +} + +template +[[nodiscard]] constexpr auto +output_ports(Self *self) noexcept { + return [self](std::index_sequence) { + return std::tie(output_port(self)...); + } + (std::make_index_sequence()); +} + +} // namespace fair::graph + +#endif // include guard + diff --git a/include/port.hpp b/include/port.hpp new file mode 100644 index 000000000..1414516cc --- /dev/null +++ b/include/port.hpp @@ -0,0 +1,374 @@ +#ifndef GNURADIO_PORT_HPP +#define GNURADIO_PORT_HPP + +#include +#include +#include + +#include +#include + +namespace fair::graph { + +using fair::meta::fixed_string; + +// #### default supported types -- TODO: to be replaced by pmt::pmtv declaration +using supported_type = std::variant, std::complex /*, ...*/>; + +enum class port_direction_t { INPUT, OUTPUT, ANY }; // 'ANY' only for query and not to be used for port declarations +enum class connection_result_t { SUCCESS, FAILED }; +enum class port_type_t { STREAM, MESSAGE }; // TODO: Think of a better name +enum class port_domain_t { CPU, GPU, NET, FPGA, DSP, MLU }; + +template +concept Port = requires(T t, const std::size_t n_items) { // dynamic definitions + typename T::value_type; + { t.pmt_type() } -> std::same_as; + { t.type() } -> std::same_as; + { t.direction() } -> std::same_as; + { t.name() } -> std::same_as; + { t.resize_buffer(n_items) } -> std::same_as; + { t.disconnect() } -> std::same_as; + }; + +template +concept has_fixed_port_info_v = requires { + typename T::value_type; + { T::static_name() }; + { T::direction() } -> std::same_as; + { T::type() } -> std::same_as; + }; + +template +using has_fixed_port_info = std::integral_constant>; + +template +struct has_fixed_port_info_or_is_typelist : std::false_type {}; + +template + requires has_fixed_port_info_v +struct has_fixed_port_info_or_is_typelist : std::true_type {}; + +template + requires(meta::is_typelist_v and T::template all_of) +struct has_fixed_port_info_or_is_typelist : std::true_type {}; + +template +struct limits { + using limits_tag = std::true_type; + static constexpr std::size_t min = _min; + static constexpr std::size_t max = _max; +}; + +template +concept is_limits_v = requires { typename T::limits_tag; }; + +static_assert(!is_limits_v); +static_assert(!is_limits_v); +static_assert(is_limits_v>); + +template +using is_limits = std::integral_constant>; + +template +using port_type = typename Port::value_type; + +template +using is_in_port = std::integral_constant; + +template +concept InPort = is_in_port::value; + +template +using is_out_port = std::integral_constant; + +template +concept OutPort = is_out_port::value; + + +template> +class port { +public: + static_assert(PortDirection != port_direction_t::ANY, "ANY reserved for queries and not port direction declarations"); + + using value_type = T; + + static constexpr bool IS_INPUT = PortDirection == port_direction_t::INPUT; + static constexpr bool IS_OUTPUT = PortDirection == port_direction_t::OUTPUT; + + using port_tag = std::true_type; + +private: + using ReaderType = decltype(std::declval().new_reader()); + using WriterType = decltype(std::declval().new_writer()); + using IoType = std::conditional_t; + + std::string _name = static_cast(PortName); + std::int16_t _priority = 0; // → dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) + std::size_t _n_history = N_HISTORY; + std::size_t _min_samples = (MIN_SAMPLES == std::dynamic_extent ? 1 : MIN_SAMPLES); + std::size_t _max_samples = MAX_SAMPLES; + bool _connected = false; + + IoType _ioHandler = new_io_handler(); + +public: + [[nodiscard]] constexpr auto + new_io_handler() const noexcept { + if constexpr (IS_INPUT) { + return BufferType(65536).new_reader(); + } else { + return BufferType(65536).new_writer(); + } + } + + [[nodiscard]] void * + writer_handler_internal() noexcept { + static_assert(IS_OUTPUT, "only to be used with output ports"); + return static_cast(std::addressof(_ioHandler)); + } + + [[nodiscard]] bool + update_reader_internal(void *buffer_writer_handler_other) noexcept { + static_assert(IS_INPUT, "only to be used with input ports"); + + if (buffer_writer_handler_other == nullptr) { + return false; + } + + // TODO: If we want to allow ports with different buffer types to be mixed + // this will fail. We need to add a check that two ports that + // connect to each other use the same buffer type + // (std::any could be a viable approach) + auto typed_buffer_writer = static_cast(buffer_writer_handler_other); + setBuffer(typed_buffer_writer->buffer()); + return true; + } + +public: + port() = default; + + port(const port &) = delete; + + auto + operator=(const port &) + = delete; + + port(std::string port_name, std::int16_t priority = 0, std::size_t n_history = 0, std::size_t min_samples = 0U, std::size_t max_samples = SIZE_MAX) noexcept + : _name(std::move(port_name)) + , _priority{ priority } + , _n_history(n_history) + , _min_samples(min_samples) + , _max_samples(max_samples) { + static_assert(PortName.empty(), "port name must be exclusively declared via NTTP or constructor parameter"); + } + + constexpr port(port &&other) noexcept + : _name(std::move(other._name)) + , _priority{ other._priority } + , _n_history(other._n_history) + , _min_samples(other._min_samples) + , _max_samples(other._max_samples) {} + + constexpr port & + operator=(port &&other) { + port tmp(std::move(other)); + std::swap(_name, tmp._name); + std::swap(_priority, tmp._priority); + std::swap(_n_history, tmp._n_history); + std::swap(_min_samples, tmp._min_samples); + std::swap(_max_samples, tmp._max_samples); + std::swap(_connected, tmp._connected); + std::swap(_ioHandler, tmp._ioHandler); + return *this; + } + + [[nodiscard]] constexpr static port_type_t + type() noexcept { + return Porttype; + } + + [[nodiscard]] constexpr static port_direction_t + direction() noexcept { + return PortDirection; + } + + [[nodiscard]] constexpr static decltype(PortName) + static_name() noexcept + requires(!PortName.empty()) + { + return PortName; + } + + [[nodiscard]] constexpr supported_type + pmt_type() const noexcept { + return T(); + } + + [[nodiscard]] constexpr std::string_view + name() const noexcept { + if constexpr (!PortName.empty()) { + return static_cast(PortName); + } else { + return _name; + } + } + + [[nodiscard]] constexpr static bool + is_optional() noexcept { + return OPTIONAL; + } + + [[nodiscard]] constexpr std::int16_t + priority() const noexcept { + return _priority; + } + + [[nodiscard]] constexpr static std::size_t + available() noexcept { + return 0; + } // ↔ maps to Buffer::Buffer[Reader, Writer].available() + + [[nodiscard]] constexpr std::size_t + n_history() const noexcept { + if constexpr (N_HISTORY == std::dynamic_extent) { + return _n_history; + } else { + return N_HISTORY; + } + } + + [[nodiscard]] constexpr std::size_t + min_buffer_size() const noexcept { + if constexpr (MIN_SAMPLES == std::dynamic_extent) { + return _min_samples; + } else { + return MIN_SAMPLES; + } + } + + [[nodiscard]] constexpr std::size_t + max_buffer_size() const noexcept { + if constexpr (MAX_SAMPLES == std::dynamic_extent) { + return _max_samples; + } else { + return MAX_SAMPLES; + } + } + + [[nodiscard]] constexpr connection_result_t + resize_buffer(std::size_t min_size) noexcept { + if constexpr (IS_INPUT) { + return connection_result_t::SUCCESS; + } else { + try { + _ioHandler = BufferType(min_size).new_writer(); + } catch (...) { + return connection_result_t::FAILED; + } + } + return connection_result_t::SUCCESS; + } + + [[nodiscard]] BufferType + buffer() { + return _ioHandler.buffer(); + } + + void + setBuffer(gr::Buffer auto buffer) noexcept { + if constexpr (IS_INPUT) { + _ioHandler = std::move(buffer.new_reader()); + _connected = true; + } else { + _ioHandler = std::move(buffer.new_writer()); + } + } + + [[nodiscard]] constexpr ReaderType & + reader() const noexcept { + static_assert(!IS_OUTPUT, "reader() not applicable for outputs (yet)"); + return _ioHandler; + } + + [[nodiscard]] constexpr ReaderType & + reader() noexcept { + static_assert(!IS_OUTPUT, "reader() not applicable for outputs (yet)"); + return _ioHandler; + } + + [[nodiscard]] constexpr WriterType & + writer() const noexcept { + static_assert(!IS_INPUT, "writer() not applicable for inputs (yet)"); + return _ioHandler; + } + + [[nodiscard]] constexpr WriterType & + writer() noexcept { + static_assert(!IS_INPUT, "writer() not applicable for inputs (yet)"); + return _ioHandler; + } + + [[nodiscard]] connection_result_t + disconnect() noexcept { + if (_connected == false) { + return connection_result_t::FAILED; + } + _ioHandler = new_io_handler(); + _connected = false; + return connection_result_t::SUCCESS; + } + + template + [[nodiscard]] connection_result_t + connect(Other &&other) { + static_assert(IS_OUTPUT && std::remove_cvref_t::IS_INPUT); + auto src_buffer = writer_handler_internal(); + return std::forward(other).update_reader_internal(src_buffer) ? connection_result_t::SUCCESS + : connection_result_t::FAILED; + } + + friend class dynamic_port; +}; + + +namespace detail { +template +using just_t = T; + +template +consteval fair::meta::typelist...> +repeated_ports_impl(std::index_sequence) { + return {}; +} +} // namespace detail + +// TODO: Add port index to BaseName +template +using repeated_ports = decltype(detail::repeated_ports_impl>(std::make_index_sequence())); + +template +using IN = port; +template +using OUT = port; +template +using IN_MSG = port; +template +using OUT_MSG = port; + +static_assert(Port>); +static_assert(Port("in"))>); +static_assert(Port>); +static_assert(Port>); +static_assert(Port>); + +static_assert(IN::static_name() == fixed_string("in")); +static_assert(requires { IN("in").name(); }); + + +} + +#endif // include guard diff --git a/include/utils.hpp b/include/utils.hpp index dbd002016..0d159b9a4 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -255,6 +255,41 @@ type_transform_impl(fair::meta::dummy_t *); template typename Mapper, typename T> using type_transform = std::remove_pointer_t(static_cast(nullptr)))>; +template +auto safe_min(Arg&& arg, Args&&... args) +{ + if constexpr (sizeof...(Args) == 0) { + return arg; + } else { + return std::min(std::forward(arg), std::forward(args)...); + } +} + +template +auto tuple_for_each(Function&& function, Tuple&& tuple, Tuples&&... tuples) +{ + static_assert(((std::tuple_size_v> == std::tuple_size_v>) && ...)); + return [&](std::index_sequence) { + auto callFunction = [&function, &tuple, &tuples...]() { + function(std::get(tuple), std::get(tuples)...); + }; + ((callFunction.template operator()(), ...)); + }(std::make_index_sequence>()); +} + +template +auto tuple_transform(Function&& function, Tuple&& tuple, Tuples&&... tuples) +{ + static_assert(((std::tuple_size_v> == std::tuple_size_v>) && ...)); + return [&](std::index_sequence) { + auto callFunction = [&function, &tuple, &tuples...]() { + return function(std::get(tuple), std::get(tuples)...); + }; + return std::make_tuple(callFunction.template operator()()...); + }(std::make_index_sequence>()); +} + + static_assert(std::is_same_v, type_transform>); static_assert(std::is_same_v, std::vector>, type_transform>>); static_assert(std::is_same_v>); diff --git a/src/main.cpp b/src/main.cpp index b4c9919f4..a618abb87 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -4,6 +4,7 @@ #include #include "graph.hpp" +#include "merged_node.hpp" namespace fg = fair::graph; diff --git a/test/qa_dynamic_port.cpp b/test/qa_dynamic_port.cpp index 009eaa0c0..f8a32e5c0 100644 --- a/test/qa_dynamic_port.cpp +++ b/test/qa_dynamic_port.cpp @@ -54,7 +54,7 @@ class repeater_source : public fg::node, fg::OUT, fg::OUT Date: Wed, 1 Feb 2023 09:45:09 +0100 Subject: [PATCH 2/4] Fluent connection API design --- bench/bm_case1.cpp | 38 +++++++++++++++--------------- include/graph.hpp | 50 +++++++++++++++++++++++++++++----------- test/qa_dynamic_port.cpp | 8 +++---- 3 files changed, 59 insertions(+), 37 deletions(-) diff --git a/bench/bm_case1.cpp b/bench/bm_case1.cpp index eb0a681b8..b9bdd2464 100644 --- a/bench/bm_case1.cpp +++ b/bench/bm_case1.cpp @@ -274,7 +274,7 @@ inline const boost::ut::suite _runtime_tests = [] { fg::graph flow_graph; flow_graph.register_node(src); flow_graph.register_node(sink); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, sink))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); "runtime src->sink overhead"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; @@ -294,8 +294,8 @@ inline const boost::ut::suite _runtime_tests = [] { flow_graph.register_node(src); flow_graph.register_node(cpy); flow_graph.register_node(sink); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, cpy))); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(cpy, sink))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(cpy))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(cpy).to<"in">(sink))); "runtime src->copy->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; @@ -319,13 +319,13 @@ inline const boost::ut::suite _runtime_tests = [] { flow_graph.register_node(cpy[i]); if (i == 0) { - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, cpy[i]))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(cpy[i]))); } else { - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(cpy[i - 1], cpy[i]))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(cpy[i - 1]).to<"in">(cpy[i]))); } } - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(cpy[cpy.size() - 1], sink))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(cpy[cpy.size() - 1]).to <"in">(sink))); "runtime src->copy^10->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; @@ -393,10 +393,10 @@ inline const boost::ut::suite _runtime_tests = [] { flow_graph.register_node(b2); flow_graph.register_node(b3); flow_graph.register_node(sink); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, b1))); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(b1, b2))); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(b2, b3))); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(b3, sink))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(b1))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(b1).to<"in">(b2))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(b2).to<"in">(b3))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(b3).to<"in">(sink))); "runtime src(N=1024)->b1(N≤128)->b2(N=1024)->b3(N=32...128)->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; @@ -419,10 +419,10 @@ inline const boost::ut::suite _runtime_tests = [] { flow_graph.register_node(mult2); flow_graph.register_node(add1); flow_graph.register_node(sink); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, mult1))); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(mult1, mult2))); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(mult2, add1))); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(add1, sink))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(mult1))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(mult1).to<"in">(mult2))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(mult2).to<"in">(add1))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(add1).to<"in">(sink))); "runtime src->mult(2.0)->mult(0.5)->add(-1)->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; @@ -457,14 +457,14 @@ inline const boost::ut::suite _runtime_tests = [] { for (std::size_t i = 0; i < add1.size(); i++) { if (i == 0) { - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, mult1[i]))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(mult1[i]))); } else { - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(add1[i - 1], mult1[i]))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(add1[i - 1]).to<"in">(mult1[i]))); } - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(mult1[i], mult2[i]))); - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(mult2[i], add1[i]))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(mult1[i]).to<"in">(mult2[i]))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(mult2[i]).to<"in">(add1[i]))); } - expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(add1[add1.size() - 1], sink))); + expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(add1[add1.size() - 1]).to<"in">(sink))); "runtime src->(mult(2.0)->mult(0.5)->add(-1))^10->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; diff --git a/include/graph.hpp b/include/graph.hpp index 81409f06e..35991c969 100644 --- a/include/graph.hpp +++ b/include/graph.hpp @@ -507,10 +507,9 @@ class graph { std::vector _edges; std::vector> _nodes; -public: template [[nodiscard]] connection_result_t - connect(Source_ &src_node_raw, Destination_ &dst_node_raw, int32_t weight = 0, + connect_impl(Source_ &src_node_raw, Destination_ &dst_node_raw, int32_t weight = 0, std::string_view name = "unnamed edge") { using Source = std::remove_cvref_t; using Destination = std::remove_cvref_t; @@ -540,18 +539,41 @@ class graph { return result; } - template - [[nodiscard]] connection_result_t - connect(Source_ &&src_node_raw, Destination_ &&dst_node_raw, int32_t weight = 0, - std::string_view name = "unnamed edge") { - using Source = std::remove_cvref_t; - using Destination = std::remove_cvref_t; - return connect < meta::indexForName(), - meta::indexForName() > - (std::forward(src_node_raw), - std::forward( - dst_node_raw), - weight, name); + // Just a dummy class that stores the graph and the source node and port + // to be able to split the connection into two separate calls + // connect(source) and .to(destination) + template + struct source_connector { + graph& self; + Source& source; + + source_connector(graph& _self, Source& _source) : self(_self), source(_source) {} + + template + [[nodiscard]] auto to(Destination& destination) { + return self.connect_impl(source, destination); + } + + template + [[nodiscard]] auto to(Destination& destination) { + return self.connect_impl()>(source, destination); + } + + source_connector(const source_connector&) = delete; + source_connector(source_connector&&) = delete; + source_connector& operator=(const source_connector&) = delete; + source_connector& operator=(source_connector&&) = delete; + }; + +public: + template + [[nodiscard]] auto connect(Source& source) { + return source_connector(*this, source); + } + + template + [[nodiscard]] auto connect(Source& source) { + return connect(), Source>(source); } auto diff --git a/test/qa_dynamic_port.cpp b/test/qa_dynamic_port.cpp index f8a32e5c0..5178a94ef 100644 --- a/test/qa_dynamic_port.cpp +++ b/test/qa_dynamic_port.cpp @@ -142,11 +142,11 @@ const boost::ut::suite PortApiTests = [] { flow.register_node(added); flow.register_node(out); - expect(eq(connection_result_t::SUCCESS, flow.connect<"value", "original">(number, scaled))); - expect(eq(connection_result_t::SUCCESS, flow.connect<"scaled", "addend0">(scaled, added))); - expect(eq(connection_result_t::SUCCESS, flow.connect<"value", "addend1">(answer, added))); + expect(eq(connection_result_t::SUCCESS, flow.connect<"value">(number).to<"original">(scaled))); + expect(eq(connection_result_t::SUCCESS, flow.connect<"scaled">(scaled).to<"addend0">(added))); + expect(eq(connection_result_t::SUCCESS, flow.connect<"value">(answer).to<"addend1">(added))); - expect(eq(connection_result_t::SUCCESS, flow.connect<"sum", "sink">(added, out))); + expect(eq(connection_result_t::SUCCESS, flow.connect<"sum">(added).to<"sink">(out))); flow.work(); }; From 58c79e852314644ffc17cbb2e0ceb38652376b9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20=C4=8Cuki=C4=87?= Date: Wed, 1 Feb 2023 10:25:14 +0100 Subject: [PATCH 3/4] Make connections when init is called, not immediately --- bench/bm_case1.cpp | 24 ++++++++++++++++++------ include/graph.hpp | 28 +++++++++++++++++++++++++--- test/qa_dynamic_port.cpp | 4 +++- 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/bench/bm_case1.cpp b/bench/bm_case1.cpp index b9bdd2464..46d1634b8 100644 --- a/bench/bm_case1.cpp +++ b/bench/bm_case1.cpp @@ -279,7 +279,9 @@ inline const boost::ut::suite _runtime_tests = [] { "runtime src->sink overhead"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; - flow_graph.work(); + auto token = flow_graph.init(); + expect(token); + flow_graph.work(token); expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples"; expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples"; }; @@ -299,7 +301,9 @@ inline const boost::ut::suite _runtime_tests = [] { "runtime src->copy->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; - flow_graph.work(); + auto token = flow_graph.init(); + expect(token); + flow_graph.work(token); expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples"; expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples"; }; @@ -330,7 +334,9 @@ inline const boost::ut::suite _runtime_tests = [] { "runtime src->copy^10->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; - flow_graph.work(); + auto token = flow_graph.init(); + expect(token); + flow_graph.work(token); expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples"; expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples"; }; @@ -400,7 +406,9 @@ inline const boost::ut::suite _runtime_tests = [] { "runtime src(N=1024)->b1(N≤128)->b2(N=1024)->b3(N=32...128)->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; - flow_graph.work(); + auto token = flow_graph.init(); + expect(token); + flow_graph.work(token); expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples"; expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples"; }; @@ -426,7 +434,9 @@ inline const boost::ut::suite _runtime_tests = [] { "runtime src->mult(2.0)->mult(0.5)->add(-1)->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; - flow_graph.work(); + auto token = flow_graph.init(); + expect(token); + flow_graph.work(token); expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples"; expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples"; }; @@ -469,7 +479,9 @@ inline const boost::ut::suite _runtime_tests = [] { "runtime src->(mult(2.0)->mult(0.5)->add(-1))^10->sink"_benchmark.repeat(N_SAMPLES) = [&flow_graph]() { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; - flow_graph.work(); + auto token = flow_graph.init(); + expect(token); + flow_graph.work(token); expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples"; expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples"; }; diff --git a/include/graph.hpp b/include/graph.hpp index 35991c969..31b6755b3 100644 --- a/include/graph.hpp +++ b/include/graph.hpp @@ -539,6 +539,8 @@ class graph { return result; } + std::vector> _connection_definitions; + // Just a dummy class that stores the graph and the source node and port // to be able to split the connection into two separate calls // connect(source) and .to(destination) @@ -551,12 +553,15 @@ class graph { template [[nodiscard]] auto to(Destination& destination) { - return self.connect_impl(source, destination); + self._connection_definitions.push_back([source = &source, &destination] (graph& _self) { + return _self.connect_impl(*source, destination); + }); + return connection_result_t::SUCCESS; } template [[nodiscard]] auto to(Destination& destination) { - return self.connect_impl()>(source, destination); + return to()>(destination); } source_connector(const source_connector&) = delete; @@ -565,6 +570,13 @@ class graph { source_connector& operator=(source_connector&&) = delete; }; + struct init_proof { + init_proof(bool _success) : success(_success) {} + bool success = true; + + operator bool() const { return success; } + }; + public: template [[nodiscard]] auto connect(Source& source) { @@ -588,8 +600,18 @@ class graph { _nodes.push_back(std::make_unique>(std::addressof(node))); } + init_proof init() { + return init_proof( + std::all_of(_connection_definitions.begin(), _connection_definitions.end(), [this] (auto& connection_definition) { + return connection_definition(*this) == connection_result_t::SUCCESS; + })); + } + node_ports_state - work() { + work(init_proof& init) { + if (!init) { + return node_ports_state::error; + } bool run = true; while (run) { bool something_happened = false; diff --git a/test/qa_dynamic_port.cpp b/test/qa_dynamic_port.cpp index 5178a94ef..ed9db865b 100644 --- a/test/qa_dynamic_port.cpp +++ b/test/qa_dynamic_port.cpp @@ -148,7 +148,9 @@ const boost::ut::suite PortApiTests = [] { expect(eq(connection_result_t::SUCCESS, flow.connect<"sum">(added).to<"sink">(out))); - flow.work(); + auto token = flow.init(); + expect(token); + flow.work(token); }; #ifdef ENABLE_DYNAMIC_PORTS From f84995e7ba88f06ebc890997fd30b01b1d419ad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20=C4=8Cuki=C4=87?= Date: Wed, 1 Feb 2023 14:12:44 +0100 Subject: [PATCH 4/4] Align work statuses definition with GR --- bench/bm_case1.cpp | 10 +++++----- bench/bm_test_helper.hpp | 10 +++++----- include/graph.hpp | 22 +++++++++++----------- include/node.hpp | 24 ++++++++++++++++++------ test/qa_dynamic_port.cpp | 6 +++--- 5 files changed, 42 insertions(+), 30 deletions(-) diff --git a/bench/bm_case1.cpp b/bench/bm_case1.cpp index 46d1634b8..274455fe1 100644 --- a/bench/bm_case1.cpp +++ b/bench/bm_case1.cpp @@ -52,7 +52,7 @@ class copy : public fg::node(this); auto &in_port = input_port<"in">(this); @@ -62,9 +62,9 @@ class copy : public fg::node, fg::OUT, fg::lim return T{}; } - fair::graph::node_ports_state + fair::graph::work_return_t work() { const std::size_t n_to_publish = _n_samples_max - n_samples_produced; if (n_to_publish > 0) { @@ -35,7 +35,7 @@ class source : public fg::node, fg::OUT, fg::lim if constexpr (use_bulk_operation) { std::size_t n_write = std::clamp(n_to_publish, 0UL, std::min(writer.available(), port.max_buffer_size())); if (n_write == 0) { - return fair::graph::node_ports_state::has_unprocessed_data; + return fair::graph::work_return_t::INSUFFICIENT_INPUT_ITEMS; } writer.publish( // @@ -48,14 +48,14 @@ class source : public fg::node, fg::OUT, fg::lim } else { auto [data, token] = writer.get(1); if (data.size() == 0) { - return fair::graph::node_ports_state::error; + return fair::graph::work_return_t::ERROR; } data[0] = process_one(); writer.publish(token, 1); } - return fair::graph::node_ports_state::success; + return fair::graph::work_return_t::OK; } else { - return fair::graph::node_ports_state::inputs_empty; + return fair::graph::work_return_t::DONE; } } }; diff --git a/include/graph.hpp b/include/graph.hpp index 31b6755b3..5ed1abd83 100644 --- a/include/graph.hpp +++ b/include/graph.hpp @@ -346,7 +346,7 @@ class graph { name() const = 0; - virtual node_ports_state + virtual work_return_t work() = 0; virtual void * @@ -390,7 +390,7 @@ class graph { template reference_node_wrapper(In &&node) : _node(std::forward(node)) {} - node_ports_state + work_return_t work() override { return data().work(); } @@ -607,25 +607,25 @@ class graph { })); } - node_ports_state + work_return_t work(init_proof& init) { if (!init) { - return node_ports_state::error; + return work_return_t::ERROR; } bool run = true; while (run) { bool something_happened = false; for (auto &node : _nodes) { auto result = node->work(); - if (result == node_ports_state::error) { - return node_ports_state::error; - } else if (result == node_ports_state::has_unprocessed_data) { + if (result == work_return_t::ERROR) { + return work_return_t::ERROR; + } else if (result == work_return_t::INSUFFICIENT_INPUT_ITEMS) { // nothing - } else if (result == node_ports_state::inputs_empty) { + } else if (result == work_return_t::DONE) { // nothing - } else if (result == node_ports_state::success) { + } else if (result == work_return_t::OK) { something_happened = true; - } else if (result == node_ports_state::writers_not_available) { + } else if (result == work_return_t::INSUFFICIENT_OUTPUT_ITEMS) { something_happened = true; } } @@ -633,7 +633,7 @@ class graph { run = something_happened; } - return node_ports_state::inputs_empty; + return work_return_t::DONE; } }; diff --git a/include/node.hpp b/include/node.hpp index 598d374bc..dbc084587 100644 --- a/include/node.hpp +++ b/include/node.hpp @@ -15,7 +15,19 @@ namespace fair::graph { namespace stdx = vir::stdx; using fair::meta::fixed_string; -enum class node_ports_state { success, has_unprocessed_data, inputs_empty, writers_not_available, error }; +enum class work_return_t { + ERROR = -100, /// error occurred in the work function + INSUFFICIENT_OUTPUT_ITEMS = + -3, /// work requires a larger output buffer to produce output + INSUFFICIENT_INPUT_ITEMS = + -2, /// work requires a larger input buffer to produce output + DONE = + -1, /// this block has completed its processing and the flowgraph should be done + OK = 0, /// work call was successful and return values in i/o structs are valid + CALLBACK_INITIATED = + 1, /// rather than blocking in the work function, the block will call back to the + /// parent interface when it is ready to be called again +}; namespace work_strategies { template @@ -88,13 +100,13 @@ consume_readers(Self& self, std::size_t available_values_count) { struct read_many_and_publish_many { template - static node_ports_state + static work_return_t work(Self &self) noexcept { // Capturing structured bindings does not work in Clang... auto inputs_status = work_strategies::inputs_status(self); if (inputs_status.available_values_count == 0) { - return inputs_status.at_least_one_input_has_data ? node_ports_state::has_unprocessed_data : node_ports_state::inputs_empty; + return inputs_status.at_least_one_input_has_data ? work_return_t::INSUFFICIENT_INPUT_ITEMS : work_return_t::DONE; } bool all_writers_available = std::apply([inputs_status](auto&... output_port) { @@ -102,7 +114,7 @@ struct read_many_and_publish_many { }, output_ports(&self)); if (!all_writers_available) { - return node_ports_state::writers_not_available; + return work_return_t::INSUFFICIENT_OUTPUT_ITEMS; } auto input_spans = meta::tuple_transform([inputs_status](auto& input_port) { @@ -153,7 +165,7 @@ struct read_many_and_publish_many { fmt::print("Node {} failed to consume {} values from inputs\n", self.name(), inputs_status.available_values_count); } - return success ? node_ports_state::success : node_ports_state::error; + return success ? work_return_t::OK : work_return_t::ERROR; } }; @@ -335,7 +347,7 @@ class node : protected std::tuple { } } - node_ports_state + work_return_t work() noexcept { return work_strategy::work(self()); } diff --git a/test/qa_dynamic_port.cpp b/test/qa_dynamic_port.cpp index ed9db865b..fd681bc9d 100644 --- a/test/qa_dynamic_port.cpp +++ b/test/qa_dynamic_port.cpp @@ -54,7 +54,7 @@ class repeater_source : public fg::node, fg::OUT, fg::OUT