Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrapping up the prototype for the new GR4 graph #25

Merged
merged 4 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 42 additions & 30 deletions bench/bm_case1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memco
return a;
}

fair::graph::work_result
fair::graph::work_return_t
work() noexcept { // TODO - make this an alternate version to 'process_one'
auto &out_port = output_port<"out">(this);
auto &in_port = input_port<"in">(this);
Expand All @@ -62,9 +62,9 @@ class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memco
const auto n_readable = std::min(reader.available(), in_port.max_buffer_size());
const auto n_writable = std::min(writer.available(), out_port.max_buffer_size());
if (n_readable == 0) {
return fair::graph::work_result::inputs_empty;
return fair::graph::work_return_t::DONE;
} else if (n_writable == 0) {
return fair::graph::work_result::writers_not_available;
return fair::graph::work_return_t::INSUFFICIENT_OUTPUT_ITEMS;
}
const std::size_t n_to_publish = std::min(n_readable, n_writable);

Expand All @@ -83,9 +83,9 @@ class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memco
n_to_publish);
}
if (!reader.consume(n_to_publish)) {
return fair::graph::work_result::error;
return fair::graph::work_return_t::ERROR;
}
return fair::graph::work_result::success;
return fair::graph::work_return_t::OK;
}
};

Expand Down Expand Up @@ -274,12 +274,14 @@ inline const boost::ut::suite _runtime_tests = [] {
fg::graph flow_graph;
flow_graph.register_node(src);
flow_graph.register_node(sink);
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, sink)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink)));

"runtime src->sink overhead"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&flow_graph]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
flow_graph.work();
auto token = flow_graph.init();
expect(token);
flow_graph.work(token);
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand All @@ -294,12 +296,14 @@ inline const boost::ut::suite _runtime_tests = [] {
flow_graph.register_node(src);
flow_graph.register_node(cpy);
flow_graph.register_node(sink);
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, cpy)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(cpy, sink)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(cpy)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(cpy).to<"in">(sink)));
"runtime src->copy->sink"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&flow_graph]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
flow_graph.work();
auto token = flow_graph.init();
expect(token);
flow_graph.work(token);
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand All @@ -319,18 +323,20 @@ inline const boost::ut::suite _runtime_tests = [] {
flow_graph.register_node(cpy[i]);

if (i == 0) {
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, cpy[i])));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(cpy[i])));
} else {
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(cpy[i - 1], cpy[i])));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(cpy[i - 1]).to<"in">(cpy[i])));
}
}

expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(cpy[cpy.size() - 1], sink)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(cpy[cpy.size() - 1]).to <"in">(sink)));

"runtime src->copy^10->sink"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&flow_graph]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
flow_graph.work();
auto token = flow_graph.init();
expect(token);
flow_graph.work(token);
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand Down Expand Up @@ -393,14 +399,16 @@ inline const boost::ut::suite _runtime_tests = [] {
flow_graph.register_node(b2);
flow_graph.register_node(b3);
flow_graph.register_node(sink);
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, b1)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(b1, b2)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(b2, b3)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(b3, sink)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(b1)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(b1).to<"in">(b2)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(b2).to<"in">(b3)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(b3).to<"in">(sink)));
"runtime src(N=1024)->b1(N≤128)->b2(N=1024)->b3(N=32...128)->sink"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&flow_graph]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
flow_graph.work();
auto token = flow_graph.init();
expect(token);
flow_graph.work(token);
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand All @@ -419,14 +427,16 @@ inline const boost::ut::suite _runtime_tests = [] {
flow_graph.register_node(mult2);
flow_graph.register_node(add1);
flow_graph.register_node(sink);
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, mult1)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(mult1, mult2)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(mult2, add1)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(add1, sink)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(mult1)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(mult1).to<"in">(mult2)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(mult2).to<"in">(add1)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(add1).to<"in">(sink)));
"runtime src->mult(2.0)->mult(0.5)->add(-1)->sink"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&flow_graph]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
flow_graph.work();
auto token = flow_graph.init();
expect(token);
flow_graph.work(token);
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand Down Expand Up @@ -457,19 +467,21 @@ inline const boost::ut::suite _runtime_tests = [] {

for (std::size_t i = 0; i < add1.size(); i++) {
if (i == 0) {
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(src, mult1[i])));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(mult1[i])));
} else {
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(add1[i - 1], mult1[i])));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(add1[i - 1]).to<"in">(mult1[i])));
}
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(mult1[i], mult2[i])));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(mult2[i], add1[i])));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(mult1[i]).to<"in">(mult2[i])));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(mult2[i]).to<"in">(add1[i])));
}
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out", "in">(add1[add1.size() - 1], sink)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(add1[add1.size() - 1]).to<"in">(sink)));

"runtime src->(mult(2.0)->mult(0.5)->add(-1))^10->sink"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&flow_graph]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
flow_graph.work();
auto token = flow_graph.init();
expect(token);
flow_graph.work(token);
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand Down
13 changes: 6 additions & 7 deletions bench/bm_test_helper.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#ifndef GRAPH_PROTOTYPE_BM_TEST_HELPER_HPP
#define GRAPH_PROTOTYPE_BM_TEST_HELPER_HPP

#ifndef GNURADIO_GRAPH_HPP
#include <graph.hpp>
#endif
#include <merged_node.hpp>

inline constexpr std::size_t N_MAX = std::numeric_limits<std::size_t>::max();

Expand All @@ -26,7 +25,7 @@ class source : public fg::node<source<T, min, count>, fg::OUT<T, "out">, fg::lim
return T{};
}

fair::graph::work_result
fair::graph::work_return_t
work() {
const std::size_t n_to_publish = _n_samples_max - n_samples_produced;
if (n_to_publish > 0) {
Expand All @@ -36,7 +35,7 @@ class source : public fg::node<source<T, min, count>, fg::OUT<T, "out">, fg::lim
if constexpr (use_bulk_operation) {
std::size_t n_write = std::clamp(n_to_publish, 0UL, std::min(writer.available(), port.max_buffer_size()));
if (n_write == 0) {
return fair::graph::work_result::has_unprocessed_data;
return fair::graph::work_return_t::INSUFFICIENT_INPUT_ITEMS;
}

writer.publish( //
Expand All @@ -49,14 +48,14 @@ class source : public fg::node<source<T, min, count>, fg::OUT<T, "out">, fg::lim
} else {
auto [data, token] = writer.get(1);
if (data.size() == 0) {
return fair::graph::work_result::error;
return fair::graph::work_return_t::ERROR;
}
data[0] = process_one();
writer.publish(token, 1);
}
return fair::graph::work_result::success;
return fair::graph::work_return_t::OK;
} else {
return fair::graph::work_result::inputs_empty;
return fair::graph::work_return_t::DONE;
}
}
};
Expand Down
Loading