From 86f8565e1811cf2ccc1139cfe6932619785d6216 Mon Sep 17 00:00:00 2001 From: "Ralph J. Steinhagen" Date: Fri, 15 Sep 2023 08:26:54 +0200 Subject: [PATCH] refactored port interface - WIP ... as outlined by GR Architecture WG and https://github.com/fair-acc/graph-prototype/issues/148 tackled items: * refactored port structure (mandatory enum NTTPs vs. optional type-wrapped arguments) * added optional domain argument * added default init value (needed for cyclic graphs) * add isOptional() annotation Signed-off-by: Ralph J. Steinhagen --- bench/bm_case1.cpp | 12 +- bench/bm_scheduler.cpp | 2 +- bench/bm_test_helper.hpp | 2 +- include/data_sink.hpp | 46 ++--- include/graph.hpp | 5 +- include/node.hpp | 19 +- include/port.hpp | 290 +++++++++++++++++++++++------- include/port_traits.hpp | 28 +-- include/typelist.hpp | 3 + src/main.cpp | 38 ++-- test/plugins/good_base_plugin.cpp | 4 +- test/plugins/good_math_plugin.cpp | 4 +- test/qa_dynamic_node.cpp | 4 +- test/qa_dynamic_port.cpp | 40 +++-- test/qa_hier_node.cpp | 19 +- test/qa_scheduler.cpp | 9 +- 16 files changed, 359 insertions(+), 166 deletions(-) diff --git a/bench/bm_case1.cpp b/bench/bm_case1.cpp index 33ce8f550..2f2038f86 100644 --- a/bench/bm_case1.cpp +++ b/bench/bm_case1.cpp @@ -19,7 +19,7 @@ inline constexpr std::size_t N_ITER = 10; inline constexpr std::size_t N_SAMPLES = gr::util::round_up(10'000, 1024); template -struct math_op : public fg::node, fg::IN, fg::OUT> { +struct math_op : public fg::node, fg::InNamed, fg::OutNamed> { T factor = static_cast(1.0f); // public: @@ -58,7 +58,7 @@ static_assert(fg::traits::node::can_process_one_simd>); #endif template -class math_bulk_op : public fg::node, fg::IN, fg::OUT> { +class math_bulk_op : public fg::node, fg::InNamed>, fg::OutNamed>> { T _factor = static_cast(1.0f); public: @@ -183,7 +183,7 @@ static_assert(fg::traits::node::can_process_one_simd>); // It doesn't need to be enabled for reflection. // template -class gen_operation_SIMD : public fg::node, fg::IN, fg::OUT> { +class gen_operation_SIMD : public fg::node, fg::InNamed>, fg::OutNamed>> { T _value = static_cast(1.0f); public: @@ -273,8 +273,8 @@ using add_SIMD = gen_operation_SIMD; template class copy : public fg::node> { public: - fg::IN in; - fg::OUT out; + fg::IN> in; + fg::OUT> out; template V> [[nodiscard]] constexpr V @@ -337,7 +337,7 @@ simd_size() noexcept { namespace stdx = vir::stdx; template -class convert : public fg::node, fg::IN, fg::OUT> { +class convert : public fg::node, fg::InNamed>, fg::OutNamed>> { static_assert(stdx::is_simd_v != stdx::is_simd_v, "either input xor output must be SIMD capable"); constexpr static std::size_t from_simd_size = detail::simd_size(); constexpr static std::size_t to_simd_size = detail::simd_size(); diff --git a/bench/bm_scheduler.cpp b/bench/bm_scheduler.cpp index 400966d3c..6b22f7d65 100644 --- a/bench/bm_scheduler.cpp +++ b/bench/bm_scheduler.cpp @@ -14,7 +14,7 @@ inline constexpr std::size_t N_SAMPLES = gr::util::round_up(10'000'000, 1024); inline constexpr std::size_t N_NODES = 5; template -class math_op : public fg::node, fg::IN, fg::OUT> { +class math_op : public fg::node, fg::InNamed, fg::OutNamed> { T _factor = static_cast(1.0f); public: diff --git a/bench/bm_test_helper.hpp b/bench/bm_test_helper.hpp index c725228ff..846998aec 100644 --- a/bench/bm_test_helper.hpp +++ b/bench/bm_test_helper.hpp @@ -89,7 +89,7 @@ inline static std::size_t n_samples_consumed = 0_UZ; template struct sink : public fg::node> { - fg::IN in; + fg::IN> in; std::size_t should_receive_n_samples = 0; int64_t _last_tag_position = -1; diff --git a/include/data_sink.hpp b/include/data_sink.hpp index 34512dde2..c060aa91d 100644 --- a/include/data_sink.hpp +++ b/include/data_sink.hpp @@ -144,7 +144,7 @@ class data_sink_registry { template std::shared_ptr::dataset_poller> - get_trigger_poller(const data_sink_query &query, M&& matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block = blocking_mode::Blocking) { + get_trigger_poller(const data_sink_query &query, M &&matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block = blocking_mode::Blocking) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); return sink ? sink->get_trigger_poller(std::forward(matcher), pre_samples, post_samples, block) : nullptr; @@ -152,7 +152,7 @@ class data_sink_registry { template std::shared_ptr::dataset_poller> - get_multiplexed_poller(const data_sink_query &query, M&& matcher, std::size_t maximum_window_size, blocking_mode block = blocking_mode::Blocking) { + get_multiplexed_poller(const data_sink_query &query, M &&matcher, std::size_t maximum_window_size, blocking_mode block = blocking_mode::Blocking) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); return sink ? sink->get_multiplexed_poller(std::forward(matcher), maximum_window_size, block) : nullptr; @@ -160,7 +160,7 @@ class data_sink_registry { template std::shared_ptr::dataset_poller> - get_snapshot_poller(const data_sink_query &query, M&& matcher, std::chrono::nanoseconds delay, blocking_mode block = blocking_mode::Blocking) { + get_snapshot_poller(const data_sink_query &query, M &&matcher, std::chrono::nanoseconds delay, blocking_mode block = blocking_mode::Blocking) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); return sink ? sink->get_snapshot_poller(std::forward(matcher), delay, block) : nullptr; @@ -168,7 +168,7 @@ class data_sink_registry { template Callback> bool - register_streaming_callback(const data_sink_query &query, std::size_t max_chunk_size, Callback&& callback) { + register_streaming_callback(const data_sink_query &query, std::size_t max_chunk_size, Callback &&callback) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); if (!sink) { @@ -181,7 +181,7 @@ class data_sink_registry { template Callback, TriggerMatcher M> bool - register_trigger_callback(const data_sink_query &query, M&& matcher, std::size_t pre_samples, std::size_t post_samples, Callback&& callback) { + register_trigger_callback(const data_sink_query &query, M &&matcher, std::size_t pre_samples, std::size_t post_samples, Callback &&callback) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); if (!sink) { @@ -194,7 +194,7 @@ class data_sink_registry { template Callback, TriggerMatcher M> bool - register_multiplexed_callback(const data_sink_query &query, M&& matcher, std::size_t maximum_window_size, Callback&& callback) { + register_multiplexed_callback(const data_sink_query &query, M &&matcher, std::size_t maximum_window_size, Callback &&callback) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); if (!sink) { @@ -207,7 +207,7 @@ class data_sink_registry { template Callback, TriggerMatcher M> bool - register_snapshot_callback(const data_sink_query &query, M&& matcher, std::chrono::nanoseconds delay, Callback&& callback) { + register_snapshot_callback(const data_sink_query &query, M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); if (!sink) { @@ -319,7 +319,7 @@ class data_sink : public node> { Annotated> signal_min = std::numeric_limits::lowest(); Annotated> signal_max = std::numeric_limits::max(); - IN in; + IN> in; struct poller { // TODO consider whether reusing port here makes sense @@ -409,7 +409,7 @@ class data_sink : public node> { template std::shared_ptr - get_trigger_poller(M&& matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block_mode = blocking_mode::Blocking) { + get_trigger_poller(M &&matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block_mode = blocking_mode::Blocking) { const auto block = block_mode == blocking_mode::Blocking; auto handler = std::make_shared(); std::lock_guard lg(_listener_mutex); @@ -420,7 +420,7 @@ class data_sink : public node> { template std::shared_ptr - get_multiplexed_poller(M&& matcher, std::size_t maximum_window_size, blocking_mode block_mode = blocking_mode::Blocking) { + get_multiplexed_poller(M &&matcher, std::size_t maximum_window_size, blocking_mode block_mode = blocking_mode::Blocking) { std::lock_guard lg(_listener_mutex); const auto block = block_mode == blocking_mode::Blocking; auto handler = std::make_shared(); @@ -430,7 +430,7 @@ class data_sink : public node> { template std::shared_ptr - get_snapshot_poller(M&& matcher, std::chrono::nanoseconds delay, blocking_mode block_mode = blocking_mode::Blocking) { + get_snapshot_poller(M &&matcher, std::chrono::nanoseconds delay, blocking_mode block_mode = blocking_mode::Blocking) { const auto block = block_mode == blocking_mode::Blocking; auto handler = std::make_shared(); std::lock_guard lg(_listener_mutex); @@ -440,27 +440,27 @@ class data_sink : public node> { template Callback> void - register_streaming_callback(std::size_t max_chunk_size, Callback&& callback) { + register_streaming_callback(std::size_t max_chunk_size, Callback &&callback) { add_listener(std::make_unique>(max_chunk_size, std::forward(callback), *this), false); } template Callback> void - register_trigger_callback(M&& matcher, std::size_t pre_samples, std::size_t post_samples, Callback&& callback) { + register_trigger_callback(M &&matcher, std::size_t pre_samples, std::size_t post_samples, Callback &&callback) { add_listener(std::make_unique>(std::forward(matcher), pre_samples, post_samples, std::forward(callback)), false); ensure_history_size(pre_samples); } template Callback> void - register_multiplexed_callback(M&& matcher, std::size_t maximum_window_size, Callback&& callback) { + register_multiplexed_callback(M &&matcher, std::size_t maximum_window_size, Callback &&callback) { std::lock_guard lg(_listener_mutex); add_listener(std::make_unique>(std::forward(matcher), maximum_window_size, std::forward(callback)), false); } template Callback> void - register_snapshot_callback(M&& matcher, std::chrono::nanoseconds delay, Callback&& callback) { + register_snapshot_callback(M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) { std::lock_guard lg(_listener_mutex); add_listener(std::make_unique>(std::forward(matcher), delay, std::forward(callback)), false); } @@ -634,7 +634,7 @@ class data_sink : public node> { Callback callback; template - explicit continuous_listener(std::size_t max_chunk_size, CallbackFW&& c, const data_sink &parent) : parent_sink(parent), buffer(max_chunk_size), callback{ std::forward(c) } {} + explicit continuous_listener(std::size_t max_chunk_size, CallbackFW &&c, const data_sink &parent) : parent_sink(parent), buffer(max_chunk_size), callback{ std::forward(c) } {} explicit continuous_listener(std::shared_ptr poller, bool do_block, const data_sink &parent) : parent_sink(parent), block(do_block), polling_handler{ std::move(poller) } {} @@ -760,11 +760,12 @@ class data_sink : public node> { Callback callback; template - explicit trigger_listener(Matcher&& matcher, std::shared_ptr handler, std::size_t pre, std::size_t post, bool do_block) + explicit trigger_listener(Matcher &&matcher, std::shared_ptr handler, std::size_t pre, std::size_t post, bool do_block) : block(do_block), pre_samples(pre), post_samples(post), trigger_matcher(std::forward(matcher)), polling_handler{ std::move(handler) } {} template - explicit trigger_listener(Matcher&& matcher, std::size_t pre, std::size_t post, CallbackFW&& cb) : pre_samples(pre), post_samples(post), trigger_matcher(std::forward(matcher)), callback{ std::forward(cb) } {} + explicit trigger_listener(Matcher &&matcher, std::size_t pre, std::size_t post, CallbackFW &&cb) + : pre_samples(pre), post_samples(post), trigger_matcher(std::forward(matcher)), callback{ std::forward(cb) } {} void set_dataset_template(DataSet dst) override { @@ -850,11 +851,11 @@ class data_sink : public node> { Callback callback; template - explicit multiplexed_listener(Matcher&& matcher_, std::size_t max_window_size, CallbackFW&& cb) + explicit multiplexed_listener(Matcher &&matcher_, std::size_t max_window_size, CallbackFW &&cb) : matcher(std::forward(matcher_)), maximum_window_size(max_window_size), callback(std::forward(cb)) {} template - explicit multiplexed_listener(Matcher&& matcher_, std::size_t max_window_size, std::shared_ptr handler, bool do_block) + explicit multiplexed_listener(Matcher &&matcher_, std::size_t max_window_size, std::shared_ptr handler, bool do_block) : block(do_block), matcher(std::forward(matcher_)), maximum_window_size(max_window_size), polling_handler{ std::move(handler) } {} void @@ -949,11 +950,12 @@ class data_sink : public node> { Callback callback; template - explicit snapshot_listener(Matcher&& matcher, std::chrono::nanoseconds delay, std::shared_ptr poller, bool do_block) + explicit snapshot_listener(Matcher &&matcher, std::chrono::nanoseconds delay, std::shared_ptr poller, bool do_block) : block(do_block), time_delay(delay), trigger_matcher(std::forward(matcher)), polling_handler{ std::move(poller) } {} template - explicit snapshot_listener(Matcher&& matcher, std::chrono::nanoseconds delay, CallbackFW&& cb) : time_delay(delay), trigger_matcher(std::forward(matcher)), callback(std::forward(cb)) {} + explicit snapshot_listener(Matcher &&matcher, std::chrono::nanoseconds delay, CallbackFW &&cb) + : time_delay(delay), trigger_matcher(std::forward(matcher)), callback(std::forward(cb)) {} void set_dataset_template(DataSet dst) override { diff --git a/include/graph.hpp b/include/graph.hpp index b7f8b259f..08f4fbb84 100644 --- a/include/graph.hpp +++ b/include/graph.hpp @@ -738,9 +738,10 @@ operator<<(std::ostream &os, const port_direction_t &value) { return os << static_cast(value); } +template inline std::ostream & -operator<<(std::ostream &os, const port_domain_t &value) { - return os << static_cast(value); +operator<<(std::ostream &os, const T &value) { + return os << value.Name; } #if HAVE_SOURCE_LOCATION diff --git a/include/node.hpp b/include/node.hpp index 1bbb1eca3..1b4b1ad6d 100644 --- a/include/node.hpp +++ b/include/node.hpp @@ -1320,8 +1320,12 @@ merge(A &&a, B &&b) { } #if !DISABLE_SIMD -namespace test { -struct copy : public node::max(), "in">, OUT::max(), "out">> { +namespace test { // TODO: move to dedicated tests + +struct copy : public node { + IN in; + OUT out; + public: template V> [[nodiscard]] constexpr V @@ -1329,7 +1333,18 @@ struct copy : public node::m return a; } }; +} // namespace test +#endif +} // namespace fair::graph +#if !DISABLE_SIMD +ENABLE_REFLECTION(fair::graph::test::copy, in, out); +#endif + +namespace fair::graph { + +#if !DISABLE_SIMD +namespace test { static_assert(traits::node::input_port_types::size() == 1); static_assert(std::same_as, float>); static_assert(traits::node::can_process_one_scalar); diff --git a/include/port.hpp b/include/port.hpp index a5d48bee0..92672a876 100644 --- a/include/port.hpp +++ b/include/port.hpp @@ -20,25 +20,48 @@ using namespace fair::literals; using supported_type = std::variant, std::complex, DataSet, DataSet /*, ...*/>; enum class port_direction_t { INPUT, OUTPUT, ANY }; // 'ANY' only for query and not to be used for port declarations + enum class connection_result_t { SUCCESS, FAILED }; + enum class port_type_t { STREAM, /*!< used for single-producer-only ond usually synchronous one-to-one or one-to-many communications */ MESSAGE /*!< used for multiple-producer one-to-one, one-to-many, many-to-one, or many-to-many communications */ }; -enum class port_domain_t { CPU, GPU, NET, FPGA, DSP, MLU }; + +template +struct port_domain { + inline static constexpr fixed_string Name = PortDomainName; +}; + +template +concept PortDomain = requires { T::Name; } && std::is_base_of_v, T>; + +template +using is_port_domain = std::bool_constant>; + +struct CPU : public port_domain<"CPU"> {}; + +struct GPU : public port_domain<"GPU"> {}; + +static_assert(is_port_domain::value); +static_assert(is_port_domain::value); +static_assert(!is_port_domain::value); template -concept Port = requires(T t, const std::size_t n_items) { // dynamic definitions +concept Port = requires(T t, const std::size_t n_items, const supported_type &newDefault) { // dynamic definitions typename T::value_type; - { t.pmt_type() } -> std::same_as; + { t.defaultValue() } -> std::same_as; + { t.setDefaultValue(newDefault) } -> std::same_as; { t.name } -> std::convertible_to; { t.priority } -> std::convertible_to; { t.min_samples } -> std::convertible_to; { t.max_samples } -> std::convertible_to; { t.type() } -> std::same_as; { t.direction() } -> std::same_as; + { t.domain() } -> std::same_as; { t.resize_buffer(n_items) } -> std::same_as; { t.disconnect() } -> std::same_as; + { t.isOptional() } -> std::same_as; }; /** @@ -51,6 +74,61 @@ struct internal_port_buffers { void *tagHandler; }; +template +struct RequiredSamples { + static constexpr std::size_t MinSamples = MIN_SAMPLES; + static constexpr std::size_t MaxSamples = MAX_SAMPLES; +}; + +template +concept IsRequiredSamples = requires { + T::MinSamples; + T::MaxSamples; +} && std::is_base_of_v, T>; + +template +using is_required_samples = std::bool_constant>; + +static_assert(is_required_samples>::value); +static_assert(!is_required_samples::value); + +template +struct StreamBuffer { + using type = T; +}; + +template +concept IsStreamBuffer = requires { typename T::type; } && gr::Buffer; + +template +using is_stream_buffer = std::bool_constant>; + +template +struct TagBuffer { + using type = T; +}; + +template +concept IsTagBuffer = requires { typename T::type; } && gr::Buffer; + +template +using is_tag_buffer = std::bool_constant>; + +template +struct DefaultStreamBuffer : StreamBuffer> {}; + +struct DefaultTagBuffer : TagBuffer> {}; + +static_assert(is_stream_buffer>::value); +// static_assert(!is_stream_buffer::value); +// static_assert(!is_tag_buffer>::value); +static_assert(is_tag_buffer::value); + +/** + * @brief optional Annotation informing the graph/scheduler that a given port does not require to be connected + */ +struct Optional {}; + /** * @brief 'ports' are interfaces that allows data to flow between blocks in a graph, similar to RF connectors. * Each block can have zero or more input/output ports. When connecting ports, either a single-step or a two-step @@ -73,38 +151,38 @@ struct internal_port_buffers { * @tparam PortName a string to identify the port, notably to be used in an UI- and hand-written explicit code context. * @tparam PortType STREAM or MESSAGE * @tparam PortDirection either input or output - * @tparam MIN_SAMPLES specifies the minimum number of samples the port/block requires for processing in one scheduler iteration - * @tparam MAX_SAMPLES specifies the maximum number of samples the port/block can process in one scheduler iteration - * @tparam BufferType user-extendable buffer implementation for the streaming data - * @tparam TagBufferType user-extendable buffer implementation for the tag data + * @tparam Arguments optional: default to 'DefaultStreamBuffer' and DefaultTagBuffer' based on 'gr::circular_buffer', and CPU domain */ -template, - gr::Buffer TagBufferType = gr::circular_buffer> -class port { -public: - static_assert(PortDirection != port_direction_t::ANY, "ANY reserved for queries and not port direction declarations"); - - using value_type = T; - - static constexpr bool IS_INPUT = PortDirection == port_direction_t::INPUT; - static constexpr bool IS_OUTPUT = PortDirection == port_direction_t::OUTPUT; - +template +struct port { template - using with_name = port; + using with_name = port; - using ReaderType = decltype(std::declval().new_reader()); - using WriterType = decltype(std::declval().new_writer()); - using IoType = std::conditional_t; - using TagReaderType = decltype(std::declval().new_reader()); - using TagWriterType = decltype(std::declval().new_writer()); - using TagIoType = std::conditional_t; + static_assert(PortDirection != port_direction_t::ANY, "ANY reserved for queries and not port direction declarations"); + + using value_type = T; + using Domain = typename fair::meta::typelist::template find_or_default; + using Required = typename fair::meta::typelist::template find_or_default>; + using BufferType = typename fair::meta::typelist::template find_or_default>::type; + using TagBufferType = typename fair::meta::typelist::template find_or_default::type; + static constexpr port_direction_t Direction = PortDirection; + static constexpr bool IS_INPUT = PortDirection == port_direction_t::INPUT; + static constexpr bool IS_OUTPUT = PortDirection == port_direction_t::OUTPUT; + + using ReaderType = decltype(std::declval().new_reader()); + using WriterType = decltype(std::declval().new_writer()); + using IoType = std::conditional_t; + using TagReaderType = decltype(std::declval().new_reader()); + using TagWriterType = decltype(std::declval().new_writer()); + using TagIoType = std::conditional_t; // public properties - const std::string name = static_cast(PortName); - std::int16_t priority = 0; // → dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) - std::size_t min_samples = (MIN_SAMPLES == std::dynamic_extent ? 1 : MIN_SAMPLES); - std::size_t max_samples = MAX_SAMPLES; + constexpr static bool optional = std::disjunction_v...>; + const std::string name = static_cast(PortName); + std::int16_t priority = 0; // → dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) + std::size_t min_samples = (Required::MinSamples == std::dynamic_extent ? 1 : Required::MinSamples); + std::size_t max_samples = Required::MaxSamples; + T default_value = T{}; private: bool _connected = false; @@ -112,21 +190,30 @@ class port { TagIoType _tagIoHandler = new_tag_io_handler(); public: + bool + initBuffer(std::size_t nSamples = 0) noexcept { + if constexpr (IS_OUTPUT) { + // write one default value into output -- needed for cyclic graph initialisation + return _ioHandler.try_publish([val = default_value](std::span &out) { out[0] = val; }, 1_UZ); + } + return true; + } + [[nodiscard]] constexpr auto - new_io_handler() const noexcept { + new_io_handler(std::size_t buffer_size = 65536) const noexcept { if constexpr (IS_INPUT) { - return BufferType(65536).new_reader(); + return BufferType(buffer_size).new_reader(); } else { - return BufferType(65536).new_writer(); + return BufferType(buffer_size).new_writer(); } } [[nodiscard]] constexpr auto - new_tag_io_handler() const noexcept { + new_tag_io_handler(std::size_t buffer_size = 65536) const noexcept { if constexpr (IS_INPUT) { - return TagBufferType(65536).new_reader(); + return TagBufferType(buffer_size).new_reader(); } else { - return TagBufferType(65536).new_writer(); + return TagBufferType(buffer_size).new_writer(); } } @@ -158,7 +245,7 @@ class port { } public: - port() = default; + constexpr port() = default; port(const port &) = delete; auto operator=(const port &) @@ -195,6 +282,16 @@ class port { return PortDirection; } + [[nodiscard]] constexpr static std::string_view + domain() noexcept { + return std::string_view(Domain::Name); + } + + [[nodiscard]] constexpr static bool + isOptional() noexcept { + return optional; + } + [[nodiscard]] constexpr static decltype(PortName) static_name() noexcept requires(!PortName.empty()) @@ -208,8 +305,17 @@ class port { #else [[nodiscard]] constexpr supported_type #endif - pmt_type() const noexcept { - return T(); + defaultValue() const noexcept { + return default_value; + } + + bool + setDefaultValue(const supported_type &newDefault) noexcept { + if (std::holds_alternative(newDefault)) { + default_value = std::get(newDefault); + return true; + } + return false; } [[nodiscard]] constexpr static std::size_t @@ -219,19 +325,19 @@ class port { [[nodiscard]] constexpr std::size_t min_buffer_size() const noexcept { - if constexpr (MIN_SAMPLES == std::dynamic_extent) { + if constexpr (Required::MinSamples == std::dynamic_extent) { return min_samples; } else { - return MIN_SAMPLES; + return Required::MinSamples; } } [[nodiscard]] constexpr std::size_t max_buffer_size() const noexcept { - if constexpr (MAX_SAMPLES == std::dynamic_extent) { + if constexpr (Required::MaxSamples == std::dynamic_extent) { return max_samples; } else { - return MAX_SAMPLES; + return Required::MaxSamples; } } @@ -354,18 +460,26 @@ repeated_ports_impl(std::index_sequence) { } // namespace detail // TODO: Add port index to BaseName -template -using repeated_ports = decltype(detail::repeated_ports_impl>(std::make_index_sequence())); - -template -using IN = port; -template -using OUT = port; -template -using IN_MSG = port; -template -using OUT_MSG = port; +template +using repeated_ports = decltype(detail::repeated_ports_impl>(std::make_index_sequence())); + +template +using IN = port; +template +using OUT = port; +template +using IN_MSG = port; +template +using OUT_MSG = port; + +template +using InNamed = port; +template +using OutNamed = port; +template +using MsgInNamed = port; +template +using MsgOutNamed = port; static_assert(Port>); static_assert(Port())>); @@ -373,12 +487,14 @@ static_assert(Port>); static_assert(Port>); static_assert(Port>); -static_assert(IN::static_name() == fixed_string("in")); -static_assert(requires { IN("in").name; }); +static_assert(IN>::Required::MinSamples == 1); +static_assert(IN>::Required::MaxSamples == 2); +static_assert(std::same_as>::Domain, CPU>); +static_assert(std::same_as, GPU>::Domain, GPU>); -static_assert(OUT_MSG::static_name() == fixed_string("out_msg")); -static_assert(!(OUT_MSG::with_name<"out_message">::static_name() == fixed_string("out_msg"))); -static_assert(OUT_MSG::with_name<"out_message">::static_name() == fixed_string("out_message")); +static_assert(MsgOutNamed::static_name() == fixed_string("out_msg")); +static_assert(!(MsgOutNamed::with_name<"out_message">::static_name() == fixed_string("out_msg"))); +static_assert(MsgOutNamed::with_name<"out_message">::static_name() == fixed_string("out_message")); /** * Runtime capable wrapper to be used within a block. It's primary purpose is to allow the runtime @@ -404,7 +520,11 @@ class dynamic_port { virtual ~model() = default; [[nodiscard]] virtual supported_type - pmt_type() const noexcept + defaultValue() const noexcept + = 0; + + [[nodiscard]] virtual bool + setDefaultValue(const supported_type &val) noexcept = 0; [[nodiscard]] virtual port_type_t @@ -415,6 +535,14 @@ class dynamic_port { direction() const noexcept = 0; + [[nodiscard]] virtual std::string_view + domain() const noexcept + = 0; + + [[nodiscard]] virtual bool + isOptional() noexcept + = 0; + [[nodiscard]] virtual connection_result_t resize_buffer(std::size_t min_size) noexcept = 0; @@ -489,15 +617,20 @@ class dynamic_port { } ~wrapper() override = default; - + // TODO revisit: constexpr was removed because emscripten does not support constexpr function for non literal type, like DataSet #if defined(__EMSCRIPTEN__) [[nodiscard]] supported_type #else [[nodiscard]] constexpr supported_type #endif - pmt_type() const noexcept override { - return _value.pmt_type(); + defaultValue() const noexcept override { + return _value.defaultValue(); + } + + [[nodiscard]] bool + setDefaultValue(const supported_type &val) noexcept override { + return _value.setDefaultValue(val); } [[nodiscard]] constexpr port_type_t @@ -510,6 +643,16 @@ class dynamic_port { return _value.direction(); } + [[nodiscard]] constexpr std::string_view + domain() const noexcept override { + return _value.domain(); + } + + [[nodiscard]] bool + isOptional() noexcept override { + return _value.isOptional(); + } + [[nodiscard]] connection_result_t resize_buffer(std::size_t min_size) noexcept override { return _value.resize_buffer(min_size); @@ -562,8 +705,13 @@ class dynamic_port { : name(arg.name), priority(arg.priority), min_samples(arg.min_samples), max_samples(arg.max_samples), _accessor{ std::make_unique>(std::forward(arg)) } {} [[nodiscard]] supported_type - pmt_type() const noexcept { - return _accessor->pmt_type(); + defaultValue() const noexcept { + return _accessor->defaultValue(); + } + + [[nodiscard]] bool + setDefaultValue(const supported_type &val) noexcept { + return _accessor->setDefaultValue(val); } [[nodiscard]] port_type_t @@ -576,6 +724,16 @@ class dynamic_port { return _accessor->direction(); } + [[nodiscard]] std::string_view + domain() const noexcept { + return _accessor->domain(); + } + + [[nodiscard]] bool + isOptional() noexcept { + return _accessor->isOptional(); + } + [[nodiscard]] connection_result_t resize_buffer(std::size_t min_size) { if (direction() == port_direction_t::OUTPUT) { diff --git a/include/port_traits.hpp b/include/port_traits.hpp index 9ae32d5c8..11b9b7d1e 100644 --- a/include/port_traits.hpp +++ b/include/port_traits.hpp @@ -8,11 +8,11 @@ namespace fair::graph::traits::port { template concept has_fixed_info_v = requires { - typename T::value_type; - { T::static_name() }; - { T::direction() } -> std::same_as; - { T::type() } -> std::same_as; - }; + typename T::value_type; + { T::static_name() }; + { T::direction() } -> std::same_as; + { T::type() } -> std::same_as; +}; template using has_fixed_info = std::integral_constant>; @@ -43,25 +43,15 @@ using is_output = std::integral_constant concept is_output_v = is_output::value; -template +template concept is_port_v = is_output_v || is_input_v; template -struct min_samples : std::integral_constant::value... })> {}; - -template -struct min_samples> - : std::integral_constant {}; +struct min_samples : std::integral_constant {}; template -struct max_samples : std::integral_constant::value... })> {}; - -template -struct max_samples> - : std::integral_constant {}; +struct max_samples : std::integral_constant {}; -} // namespace port +} // namespace fair::graph::traits::port #endif // include guard diff --git a/include/typelist.hpp b/include/typelist.hpp index 749fb18b6..bf789677a 100644 --- a/include/typelist.hpp +++ b/include/typelist.hpp @@ -428,6 +428,9 @@ struct typelist { }())>; }; +template +constexpr bool is_any_of_v = std::disjunction_v...>; + namespace detail { template typename OtherTypelist, typename... Args> meta::typelist diff --git a/src/main.cpp b/src/main.cpp index 5b1dbe313..dd691b3bd 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -8,46 +8,58 @@ namespace fg = fair::graph; template -class count_source : public fg::node, fg::OUT::max(), "random">> { -public: +struct count_source : public fg::node> { + fg::OUT random; + constexpr T process_one() { return 42; } }; +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (count_source), random); template -class expect_sink : public fg::node, fg::IN::max(), "sink">> { -public: +struct expect_sink : public fg::node> { + fg::IN sink; + void process_one(T value) { std::cout << value << std::endl; } }; +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (expect_sink), sink); template() * std::declval())> -class scale : public fg::node, fg::IN::max(), "original">, fg::OUT::max(), "scaled">> { -public: +struct scale : public fg::node> { + fg::IN original; + fg::OUT scaled; + template V> [[nodiscard]] constexpr auto process_one(V a) const noexcept { return a * Scale; } }; +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, T Scale, typename R), (scale), original, scaled); template() + std::declval())> -class adder : public fg::node, fg::IN::max(), "addend0">, fg::IN::max(), "addend1">, fg::OUT::max(), "sum">> { -public: +struct adder : public fg::node> { + fg::IN addend0; + fg::IN addend1; + fg::OUT sum; + template V> [[nodiscard]] constexpr auto process_one(V a, V b) const noexcept { return a + b; } }; +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, typename R), (adder), addend0, addend1, sum); +using fg::port_type_t::STREAM, fg::port_direction_t::INPUT, fg::port_direction_t::OUTPUT; template -class duplicate : public fg::node, fair::meta::typelist::max(), "in">>, fg::repeated_ports> { - using base = fg::node, fair::meta::typelist::max(), "in">>, fg::repeated_ports>; +class duplicate : public fg::node, fair::meta::typelist>, fg::repeated_ports> { + using base = fg::node, fair::meta::typelist>, fg::repeated_ports>; public: using return_type = typename fg::traits::node::return_type; @@ -61,11 +73,12 @@ class duplicate : public fg::node, fair::meta::typelist requires(Depth > 0) -class delay : public fg::node, fg::IN::max(), "in">, fg::OUT::max(), "out">> { +struct delay : public fg::node> { + fg::IN in; + fg::OUT out; std::array buffer = {}; int pos = 0; -public: [[nodiscard]] constexpr T process_one(T in) noexcept { T ret = buffer[pos]; @@ -78,6 +91,7 @@ class delay : public fg::node, fg::IN), in, out); int main() { diff --git a/test/plugins/good_base_plugin.cpp b/test/plugins/good_base_plugin.cpp index f0931776a..934f3120f 100644 --- a/test/plugins/good_base_plugin.cpp +++ b/test/plugins/good_base_plugin.cpp @@ -27,7 +27,7 @@ read_total_count(const fair::graph::property_map ¶ms) { } template -class cout_sink : public fg::node, fg::IN> { +class cout_sink : public fg::node, fg::InNamed> { public: std::size_t total_count = -1_UZ; @@ -45,7 +45,7 @@ class cout_sink : public fg::node, fg::IN> { }; template -class fixed_source : public fg::node, fg::OUT> { +class fixed_source : public fg::node, fg::OutNamed> { public: std::size_t event_count = -1_UZ; // infinite count by default diff --git a/test/plugins/good_math_plugin.cpp b/test/plugins/good_math_plugin.cpp index ad96b0dba..3b7f9d021 100644 --- a/test/plugins/good_math_plugin.cpp +++ b/test/plugins/good_math_plugin.cpp @@ -28,8 +28,8 @@ class math_base { T _factor = static_cast(1.0f); public: - fg::IN in; - fg::OUT out; + fg::IN> in; + fg::OUT> out; math_base() = delete; diff --git a/test/qa_dynamic_node.cpp b/test/qa_dynamic_node.cpp index 3a96bcbe3..3a229b986 100644 --- a/test/qa_dynamic_node.cpp +++ b/test/qa_dynamic_node.cpp @@ -10,7 +10,7 @@ template std::atomic_size_t multi_adder::_unique_id_counter = 0; template -struct fixed_source : public fg::node, fg::OUT> { +struct fixed_source : public fg::node, fg::OutNamed> { T value = 1; fg::work_return_t @@ -30,7 +30,7 @@ struct fixed_source : public fg::node, fg::OUT>); template -struct cout_sink : public fg::node, fg::IN> { +struct cout_sink : public fg::node, fg::InNamed> { std::size_t remaining = 0; void diff --git a/test/qa_dynamic_port.cpp b/test/qa_dynamic_port.cpp index 108ba81cf..2ce8651c7 100644 --- a/test/qa_dynamic_port.cpp +++ b/test/qa_dynamic_port.cpp @@ -103,24 +103,33 @@ const boost::ut::suite PortApiTests = [] { using namespace fair::graph; "PortApi"_test = [] { - static_assert(Port>); - static_assert(Port("in"))>); - static_assert(Port>); - static_assert(Port>); - static_assert(Port>); - - static_assert(IN::static_name() == fixed_string("in")); - static_assert(requires { IN("in").name; }); + static_assert(Port>); + static_assert(Port())>); + static_assert(Port>); + static_assert(Port>); + static_assert(Port>); + + static_assert(Port>); + static_assert(Port("in"))>); + static_assert(Port>); + static_assert(Port>); + static_assert(Port>); + + static_assert(IN>::Required::MinSamples == 1); + static_assert(IN>::Required::MaxSamples == 2); + static_assert(IN::direction() == port_direction_t::INPUT); + static_assert(OUT::direction() == port_direction_t::OUTPUT); }; "PortBufferApi"_test = [] { - OUT::max(), "out0"> output_port; - BufferWriter auto &writer = output_port.streamWriter(); + OUT output_port; + BufferWriter auto &writer = output_port.streamWriter(); // BufferWriter auto &tagWriter = output_port.tagWriter(); expect(ge(writer.available(), 32_UZ)); - IN::max(), "int0"> input_port; - const BufferReader auto &reader = input_port.streamReader(); + using ExplicitUnlimitedSize = RequiredSamples<0, std::numeric_limits::max()>; + IN input_port; + const BufferReader auto &reader = input_port.streamReader(); expect(eq(reader.available(), 0_UZ)); auto buffers = output_port.buffer(); input_port.setBuffer(buffers.streamBuffer, buffers.tagBufferType); @@ -139,9 +148,10 @@ const boost::ut::suite PortApiTests = [] { "RuntimePortApi"_test = [] { // declare in block - OUT::max(), "out"> out; - IN::max(), "in"> in; - std::vector port_list; + using ExplicitUnlimitedSize = RequiredSamples<0, std::numeric_limits::max()>; + OUT out; + IN in; + std::vector port_list; port_list.emplace_back(out); port_list.emplace_back(in); diff --git a/test/qa_hier_node.cpp b/test/qa_hier_node.cpp index efe942a98..366e853ba 100644 --- a/test/qa_hier_node.cpp +++ b/test/qa_hier_node.cpp @@ -6,7 +6,7 @@ namespace fg = fair::graph; template() * std::declval())> -struct scale : public fg::node, fg::IN::max(), "original">, fg::OUT::max(), "scaled">> { +struct scale : public fg::node, fg::InNamed, fg::OutNamed> { template V> [[nodiscard]] constexpr auto process_one(V a) const noexcept { @@ -15,8 +15,8 @@ struct scale : public fg::node, fg::IN() + std::declval())> -struct adder : public fg::node, fg::IN::max(), "addend0">, fg::IN::max(), "addend1">, - fg::OUT::max(), "sum">> { +struct adder : public fg::node, fg::InNamed, fg::InNamed, + fg::OutNamed> { template V> [[nodiscard]] constexpr auto process_one(V a, V b) const noexcept { @@ -136,7 +136,8 @@ template std::atomic_size_t hier_node::_unique_id_counter = 0; template -struct fixed_source : public fg::node, fg::OUT> { +struct fixed_source : public fg::node> { + fg::OUT> out; std::size_t remaining_events_count; T value = 1; @@ -145,8 +146,7 @@ struct fixed_source : public fg::node, fg::OUT(this); - auto &writer = port.streamWriter(); + auto &writer = out.streamWriter(); auto data = writer.reserve_output_range(1_UZ); data[0] = value; data.publish(1_UZ); @@ -165,10 +165,11 @@ struct fixed_source : public fg::node, fg::OUT), remaining_events_count); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (fixed_source), out, remaining_events_count); template -struct cout_sink : public fg::node, fg::IN> { +struct cout_sink : public fg::node> { + fg::IN> in; std::size_t remaining = 0; void @@ -180,7 +181,7 @@ struct cout_sink : public fg::node, fg::IN> { } }; -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (cout_sink), remaining); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (cout_sink), in, remaining); fg::graph make_graph(std::size_t events_count) { diff --git a/test/qa_scheduler.cpp b/test/qa_scheduler.cpp index 65a64a285..5be802828 100644 --- a/test/qa_scheduler.cpp +++ b/test/qa_scheduler.cpp @@ -34,7 +34,7 @@ class tracer { // define some example graph nodes template -class count_source : public fg::node, fg::OUT::max(), "out">> { +class count_source : public fg::node, fg::OutNamed> { tracer &_tracer; std::size_t _count = 0; @@ -57,7 +57,7 @@ class count_source : public fg::node, fg::OUT>); template -class expect_sink : public fg::node, fg::IN::max(), "in">> { +class expect_sink : public fg::node, fg::InNamed> { tracer &_tracer; std::int64_t _count = 0; std::function _checker; @@ -84,7 +84,7 @@ class expect_sink : public fg::node, fg::IN() * std::declval())> -class scale : public fg::node, fg::IN::max(), "original">, fg::OUT::max(), "scaled">> { +class scale : public fg::node, fg::InNamed, fg::OutNamed> { tracer &_tracer; public: @@ -99,8 +99,7 @@ class scale : public fg::node, fg::IN() + std::declval())> -class adder : public fg::node, fg::IN::max(), "addend0">, fg::IN::max(), "addend1">, - fg::OUT::max(), "sum">> { +class adder : public fg::node, fg::InNamed, fg::InNamed, fg::OutNamed> { tracer &_tracer; public: