Skip to content

Commit

Permalink
refactored port<T> interface - WIP
Browse files Browse the repository at this point in the history
... as outlined by GR Architecture WG and #148

tackled items:
 * refactored port structure (mandatory enum NTTPs vs. optional type-wrapped arguments)
 * added optional domain argument
 * added default init value (needed for cyclic graphs)
 * add isOptional() annotation

Signed-off-by: Ralph J. Steinhagen <[email protected]>
  • Loading branch information
RalphSteinhagen committed Sep 19, 2023
1 parent 27aac92 commit 86f8565
Show file tree
Hide file tree
Showing 16 changed files with 359 additions and 166 deletions.
12 changes: 6 additions & 6 deletions bench/bm_case1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ inline constexpr std::size_t N_ITER = 10;
inline constexpr std::size_t N_SAMPLES = gr::util::round_up(10'000, 1024);

template<typename T, char op>
struct math_op : public fg::node<math_op<T, op>, fg::IN<T, 0, N_MAX, "in">, fg::OUT<T, 0, N_MAX, "out">> {
struct math_op : public fg::node<math_op<T, op>, fg::InNamed<T, "in">, fg::OutNamed<T, "out">> {
T factor = static_cast<T>(1.0f);

// public:
Expand Down Expand Up @@ -58,7 +58,7 @@ static_assert(fg::traits::node::can_process_one_simd<multiply<float>>);
#endif

template<typename T, char op>
class math_bulk_op : public fg::node<math_bulk_op<T, op>, fg::IN<T, 0, N_MAX, "in">, fg::OUT<T, 0, N_MAX, "out">> {
class math_bulk_op : public fg::node<math_bulk_op<T, op>, fg::InNamed<T, "in", fg::RequiredSamples<0, N_MAX>>, fg::OutNamed<T, "out", fg::RequiredSamples<0, N_MAX>>> {
T _factor = static_cast<T>(1.0f);

public:
Expand Down Expand Up @@ -183,7 +183,7 @@ static_assert(fg::traits::node::can_process_one_simd<add<float, 1>>);
// It doesn't need to be enabled for reflection.
//
template<typename T, char op>
class gen_operation_SIMD : public fg::node<gen_operation_SIMD<T, op>, fg::IN<T, 0, N_MAX, "in">, fg::OUT<T, 0, N_MAX, "out">> {
class gen_operation_SIMD : public fg::node<gen_operation_SIMD<T, op>, fg::InNamed<T, "in", fg::RequiredSamples<0, N_MAX>>, fg::OutNamed<T, "out", fg::RequiredSamples<0, N_MAX>>> {
T _value = static_cast<T>(1.0f);

public:
Expand Down Expand Up @@ -273,8 +273,8 @@ using add_SIMD = gen_operation_SIMD<T, '+'>;
template<typename T, std::size_t N_MIN = 0, std::size_t N_MAX = N_MAX, bool use_bulk_operation = false, bool use_memcopy = true>
class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memcopy>> {
public:
fg::IN<T, N_MIN, N_MAX> in;
fg::OUT<T, N_MIN, N_MAX> out;
fg::IN<T, fg::RequiredSamples<N_MIN, N_MAX>> in;
fg::OUT<T, fg::RequiredSamples<N_MIN, N_MAX>> out;

template<fair::meta::t_or_simd<T> V>
[[nodiscard]] constexpr V
Expand Down Expand Up @@ -337,7 +337,7 @@ simd_size() noexcept {
namespace stdx = vir::stdx;

template<typename From, typename To, std::size_t N_MIN = 0 /* SIMD size */, std::size_t N_MAX = N_MAX>
class convert : public fg::node<convert<From, To, N_MIN, N_MAX>, fg::IN<From, N_MIN, N_MAX, "in">, fg::OUT<To, N_MIN, N_MAX, "out">> {
class convert : public fg::node<convert<From, To, N_MIN, N_MAX>, fg::InNamed<From, "in", fg::RequiredSamples<N_MIN, N_MAX>>, fg::OutNamed<To, "out", fg::RequiredSamples<N_MIN, N_MAX>>> {
static_assert(stdx::is_simd_v<From> != stdx::is_simd_v<To>, "either input xor output must be SIMD capable");
constexpr static std::size_t from_simd_size = detail::simd_size<From>();
constexpr static std::size_t to_simd_size = detail::simd_size<To>();
Expand Down
2 changes: 1 addition & 1 deletion bench/bm_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ inline constexpr std::size_t N_SAMPLES = gr::util::round_up(10'000'000, 1024);
inline constexpr std::size_t N_NODES = 5;

template<typename T, char op>
class math_op : public fg::node<math_op<T, op>, fg::IN<T, 0, N_MAX, "in">, fg::OUT<T, 0, N_MAX, "out">> {
class math_op : public fg::node<math_op<T, op>, fg::InNamed<T, "in">, fg::OutNamed<T, "out">> {
T _factor = static_cast<T>(1.0f);

public:
Expand Down
2 changes: 1 addition & 1 deletion bench/bm_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ inline static std::size_t n_samples_consumed = 0_UZ;

template<typename T, std::size_t N_MIN = 0_UZ, std::size_t N_MAX = N_MAX>
struct sink : public fg::node<sink<T, N_MIN, N_MAX>> {
fg::IN<T, N_MIN, N_MAX> in;
fg::IN<T, fg::RequiredSamples<N_MIN, N_MAX>> in;
std::size_t should_receive_n_samples = 0;
int64_t _last_tag_position = -1;

Expand Down
46 changes: 24 additions & 22 deletions include/data_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,31 +144,31 @@ class data_sink_registry {

template<typename T, TriggerMatcher M>
std::shared_ptr<typename data_sink<T>::dataset_poller>
get_trigger_poller(const data_sink_query &query, M&& matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block = blocking_mode::Blocking) {
get_trigger_poller(const data_sink_query &query, M &&matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block = blocking_mode::Blocking) {
std::lock_guard lg{ _mutex };
auto sink = find_sink<T>(query);
return sink ? sink->get_trigger_poller(std::forward<M>(matcher), pre_samples, post_samples, block) : nullptr;
}

template<typename T, TriggerMatcher M>
std::shared_ptr<typename data_sink<T>::dataset_poller>
get_multiplexed_poller(const data_sink_query &query, M&& matcher, std::size_t maximum_window_size, blocking_mode block = blocking_mode::Blocking) {
get_multiplexed_poller(const data_sink_query &query, M &&matcher, std::size_t maximum_window_size, blocking_mode block = blocking_mode::Blocking) {
std::lock_guard lg{ _mutex };
auto sink = find_sink<T>(query);
return sink ? sink->get_multiplexed_poller(std::forward<M>(matcher), maximum_window_size, block) : nullptr;
}

template<typename T, TriggerMatcher M>
std::shared_ptr<typename data_sink<T>::dataset_poller>
get_snapshot_poller(const data_sink_query &query, M&& matcher, std::chrono::nanoseconds delay, blocking_mode block = blocking_mode::Blocking) {
get_snapshot_poller(const data_sink_query &query, M &&matcher, std::chrono::nanoseconds delay, blocking_mode block = blocking_mode::Blocking) {
std::lock_guard lg{ _mutex };
auto sink = find_sink<T>(query);
return sink ? sink->get_snapshot_poller(std::forward<M>(matcher), delay, block) : nullptr;
}

template<typename T, StreamCallback<T> Callback>
bool
register_streaming_callback(const data_sink_query &query, std::size_t max_chunk_size, Callback&& callback) {
register_streaming_callback(const data_sink_query &query, std::size_t max_chunk_size, Callback &&callback) {
std::lock_guard lg{ _mutex };
auto sink = find_sink<T>(query);
if (!sink) {
Expand All @@ -181,7 +181,7 @@ class data_sink_registry {

template<typename T, DataSetCallback<T> Callback, TriggerMatcher M>
bool
register_trigger_callback(const data_sink_query &query, M&& matcher, std::size_t pre_samples, std::size_t post_samples, Callback&& callback) {
register_trigger_callback(const data_sink_query &query, M &&matcher, std::size_t pre_samples, std::size_t post_samples, Callback &&callback) {
std::lock_guard lg{ _mutex };
auto sink = find_sink<T>(query);
if (!sink) {
Expand All @@ -194,7 +194,7 @@ class data_sink_registry {

template<typename T, DataSetCallback<T> Callback, TriggerMatcher M>
bool
register_multiplexed_callback(const data_sink_query &query, M&& matcher, std::size_t maximum_window_size, Callback&& callback) {
register_multiplexed_callback(const data_sink_query &query, M &&matcher, std::size_t maximum_window_size, Callback &&callback) {
std::lock_guard lg{ _mutex };
auto sink = find_sink<T>(query);
if (!sink) {
Expand All @@ -207,7 +207,7 @@ class data_sink_registry {

template<typename T, DataSetCallback<T> Callback, TriggerMatcher M>
bool
register_snapshot_callback(const data_sink_query &query, M&& matcher, std::chrono::nanoseconds delay, Callback&& callback) {
register_snapshot_callback(const data_sink_query &query, M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) {
std::lock_guard lg{ _mutex };
auto sink = find_sink<T>(query);
if (!sink) {
Expand Down Expand Up @@ -319,7 +319,7 @@ class data_sink : public node<data_sink<T>> {
Annotated<T, "signal min", Doc<"signal physical min. (e.g. DAQ) limit">> signal_min = std::numeric_limits<T>::lowest();
Annotated<T, "signal max", Doc<"signal physical max. (e.g. DAQ) limit">> signal_max = std::numeric_limits<T>::max();

IN<T, std::dynamic_extent, _listener_buffer_size> in;
IN<T, RequiredSamples<std::dynamic_extent, _listener_buffer_size>> in;

struct poller {
// TODO consider whether reusing port<T> here makes sense
Expand Down Expand Up @@ -409,7 +409,7 @@ class data_sink : public node<data_sink<T>> {

template<TriggerMatcher M>
std::shared_ptr<dataset_poller>
get_trigger_poller(M&& matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block_mode = blocking_mode::Blocking) {
get_trigger_poller(M &&matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block_mode = blocking_mode::Blocking) {
const auto block = block_mode == blocking_mode::Blocking;
auto handler = std::make_shared<dataset_poller>();
std::lock_guard lg(_listener_mutex);
Expand All @@ -420,7 +420,7 @@ class data_sink : public node<data_sink<T>> {

template<TriggerMatcher M>
std::shared_ptr<dataset_poller>
get_multiplexed_poller(M&& matcher, std::size_t maximum_window_size, blocking_mode block_mode = blocking_mode::Blocking) {
get_multiplexed_poller(M &&matcher, std::size_t maximum_window_size, blocking_mode block_mode = blocking_mode::Blocking) {
std::lock_guard lg(_listener_mutex);
const auto block = block_mode == blocking_mode::Blocking;
auto handler = std::make_shared<dataset_poller>();
Expand All @@ -430,7 +430,7 @@ class data_sink : public node<data_sink<T>> {

template<TriggerMatcher M>
std::shared_ptr<dataset_poller>
get_snapshot_poller(M&& matcher, std::chrono::nanoseconds delay, blocking_mode block_mode = blocking_mode::Blocking) {
get_snapshot_poller(M &&matcher, std::chrono::nanoseconds delay, blocking_mode block_mode = blocking_mode::Blocking) {
const auto block = block_mode == blocking_mode::Blocking;
auto handler = std::make_shared<dataset_poller>();
std::lock_guard lg(_listener_mutex);
Expand All @@ -440,27 +440,27 @@ class data_sink : public node<data_sink<T>> {

template<StreamCallback<T> Callback>
void
register_streaming_callback(std::size_t max_chunk_size, Callback&& callback) {
register_streaming_callback(std::size_t max_chunk_size, Callback &&callback) {
add_listener(std::make_unique<continuous_listener<Callback>>(max_chunk_size, std::forward<Callback>(callback), *this), false);
}

template<TriggerMatcher M, DataSetCallback<T> Callback>
void
register_trigger_callback(M&& matcher, std::size_t pre_samples, std::size_t post_samples, Callback&& callback) {
register_trigger_callback(M &&matcher, std::size_t pre_samples, std::size_t post_samples, Callback &&callback) {
add_listener(std::make_unique<trigger_listener<Callback, M>>(std::forward<M>(matcher), pre_samples, post_samples, std::forward<Callback>(callback)), false);
ensure_history_size(pre_samples);
}

template<TriggerMatcher M, DataSetCallback<T> Callback>
void
register_multiplexed_callback(M&& matcher, std::size_t maximum_window_size, Callback&& callback) {
register_multiplexed_callback(M &&matcher, std::size_t maximum_window_size, Callback &&callback) {
std::lock_guard lg(_listener_mutex);
add_listener(std::make_unique<multiplexed_listener<Callback, M>>(std::forward<M>(matcher), maximum_window_size, std::forward<Callback>(callback)), false);
}

template<TriggerMatcher M, DataSetCallback<T> Callback>
void
register_snapshot_callback(M&& matcher, std::chrono::nanoseconds delay, Callback&& callback) {
register_snapshot_callback(M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) {
std::lock_guard lg(_listener_mutex);
add_listener(std::make_unique<snapshot_listener<Callback, M>>(std::forward<M>(matcher), delay, std::forward<Callback>(callback)), false);
}
Expand Down Expand Up @@ -634,7 +634,7 @@ class data_sink : public node<data_sink<T>> {
Callback callback;

template<typename CallbackFW>
explicit continuous_listener(std::size_t max_chunk_size, CallbackFW&& c, const data_sink<T> &parent) : parent_sink(parent), buffer(max_chunk_size), callback{ std::forward<CallbackFW>(c) } {}
explicit continuous_listener(std::size_t max_chunk_size, CallbackFW &&c, const data_sink<T> &parent) : parent_sink(parent), buffer(max_chunk_size), callback{ std::forward<CallbackFW>(c) } {}

explicit continuous_listener(std::shared_ptr<poller> poller, bool do_block, const data_sink<T> &parent) : parent_sink(parent), block(do_block), polling_handler{ std::move(poller) } {}

Expand Down Expand Up @@ -760,11 +760,12 @@ class data_sink : public node<data_sink<T>> {
Callback callback;

template<TriggerMatcher Matcher>
explicit trigger_listener(Matcher&& matcher, std::shared_ptr<dataset_poller> handler, std::size_t pre, std::size_t post, bool do_block)
explicit trigger_listener(Matcher &&matcher, std::shared_ptr<dataset_poller> handler, std::size_t pre, std::size_t post, bool do_block)
: block(do_block), pre_samples(pre), post_samples(post), trigger_matcher(std::forward<Matcher>(matcher)), polling_handler{ std::move(handler) } {}

template<typename CallbackFW, TriggerMatcher Matcher>
explicit trigger_listener(Matcher&& matcher, std::size_t pre, std::size_t post, CallbackFW&& cb) : pre_samples(pre), post_samples(post), trigger_matcher(std::forward<Matcher>(matcher)), callback{ std::forward<CallbackFW>(cb) } {}
explicit trigger_listener(Matcher &&matcher, std::size_t pre, std::size_t post, CallbackFW &&cb)
: pre_samples(pre), post_samples(post), trigger_matcher(std::forward<Matcher>(matcher)), callback{ std::forward<CallbackFW>(cb) } {}

void
set_dataset_template(DataSet<T> dst) override {
Expand Down Expand Up @@ -850,11 +851,11 @@ class data_sink : public node<data_sink<T>> {
Callback callback;

template<typename CallbackFW, TriggerMatcher Matcher>
explicit multiplexed_listener(Matcher&& matcher_, std::size_t max_window_size, CallbackFW&& cb)
explicit multiplexed_listener(Matcher &&matcher_, std::size_t max_window_size, CallbackFW &&cb)
: matcher(std::forward<Matcher>(matcher_)), maximum_window_size(max_window_size), callback(std::forward<CallbackFW>(cb)) {}

template<TriggerMatcher Matcher>
explicit multiplexed_listener(Matcher&& matcher_, std::size_t max_window_size, std::shared_ptr<dataset_poller> handler, bool do_block)
explicit multiplexed_listener(Matcher &&matcher_, std::size_t max_window_size, std::shared_ptr<dataset_poller> handler, bool do_block)
: block(do_block), matcher(std::forward<Matcher>(matcher_)), maximum_window_size(max_window_size), polling_handler{ std::move(handler) } {}

void
Expand Down Expand Up @@ -949,11 +950,12 @@ class data_sink : public node<data_sink<T>> {
Callback callback;

template<TriggerMatcher Matcher>
explicit snapshot_listener(Matcher&& matcher, std::chrono::nanoseconds delay, std::shared_ptr<dataset_poller> poller, bool do_block)
explicit snapshot_listener(Matcher &&matcher, std::chrono::nanoseconds delay, std::shared_ptr<dataset_poller> poller, bool do_block)
: block(do_block), time_delay(delay), trigger_matcher(std::forward<Matcher>(matcher)), polling_handler{ std::move(poller) } {}

template<typename CallbackFW, TriggerMatcher Matcher>
explicit snapshot_listener(Matcher&& matcher, std::chrono::nanoseconds delay, CallbackFW&& cb) : time_delay(delay), trigger_matcher(std::forward<Matcher>(matcher)), callback(std::forward<CallbackFW>(cb)) {}
explicit snapshot_listener(Matcher &&matcher, std::chrono::nanoseconds delay, CallbackFW &&cb)
: time_delay(delay), trigger_matcher(std::forward<Matcher>(matcher)), callback(std::forward<CallbackFW>(cb)) {}

void
set_dataset_template(DataSet<T> dst) override {
Expand Down
5 changes: 3 additions & 2 deletions include/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -738,9 +738,10 @@ operator<<(std::ostream &os, const port_direction_t &value) {
return os << static_cast<int>(value);
}

template<PortDomain T>
inline std::ostream &
operator<<(std::ostream &os, const port_domain_t &value) {
return os << static_cast<int>(value);
operator<<(std::ostream &os, const T &value) {
return os << value.Name;
}

#if HAVE_SOURCE_LOCATION
Expand Down
19 changes: 17 additions & 2 deletions include/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1320,16 +1320,31 @@ merge(A &&a, B &&b) {
}

#if !DISABLE_SIMD
namespace test {
struct copy : public node<copy, IN<float, 0, std::numeric_limits<std::size_t>::max(), "in">, OUT<float, 0, std::numeric_limits<std::size_t>::max(), "out">> {
namespace test { // TODO: move to dedicated tests

struct copy : public node<copy> {
IN<float> in;
OUT<float> out;

public:
template<meta::t_or_simd<float> V>
[[nodiscard]] constexpr V
process_one(const V &a) const noexcept {
return a;
}
};
} // namespace test
#endif
} // namespace fair::graph

#if !DISABLE_SIMD
ENABLE_REFLECTION(fair::graph::test::copy, in, out);
#endif

namespace fair::graph {

#if !DISABLE_SIMD
namespace test {
static_assert(traits::node::input_port_types<copy>::size() == 1);
static_assert(std::same_as<traits::node::return_type<copy>, float>);
static_assert(traits::node::can_process_one_scalar<copy>);
Expand Down
Loading

0 comments on commit 86f8565

Please sign in to comment.