From 4eee2311d58574d4c317fa3e8e0b42801b908c27 Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Sun, 24 Jul 2022 22:33:39 +0200 Subject: [PATCH 1/5] Removed memcpy of packet in parser. --- include/ipfixprobe/packet.hpp | 5 +++-- input/parser.cpp | 10 ++-------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index 9c7fd6a1..da77906f 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -84,17 +84,18 @@ struct Packet : public Record { uint32_t tcp_seq; uint32_t tcp_ack; - uint8_t *packet; /**< Pointer to begin of packet, if available */ + const uint8_t *packet; /**< Pointer to begin of packet, if available */ uint16_t packet_len; /**< Length of data in packet buffer, packet_len <= packet_len_wire */ uint16_t packet_len_wire; /**< Original packet length on wire */ - uint8_t *payload; /**< Pointer to begin of payload, if available */ + const uint8_t *payload; /**< Pointer to begin of payload, if available */ uint16_t payload_len; /**< Length of data in payload buffer, payload_len <= payload_len_wire */ uint16_t payload_len_wire; /**< Original payload length computed from headers */ uint8_t *custom; /**< Pointer to begin of custom data, if available */ uint16_t custom_len; /**< Length of data in custom buffer */ + // TODO REMOVE uint8_t *buffer; /**< Buffer for packet, payload and custom data */ uint16_t buffer_size; /**< Size of buffer */ diff --git a/input/parser.cpp b/input/parser.cpp index e7c5bd31..df5e7088 100644 --- a/input/parser.cpp +++ b/input/parser.cpp @@ -697,14 +697,8 @@ void parse_packet(parser_opt_t *opt, struct timeval ts, const uint8_t *data, uin } uint16_t pkt_len = caplen; - if ((int) pkt_len > pkt->buffer_size - 1) { - pkt_len = pkt->buffer_size - 1; - DEBUG_MSG("Packet size too long, truncating to %u\n", pkt_len); - } - pkt->packet = pkt->buffer; - memcpy(pkt->packet, data, pkt_len); - pkt->packet[pkt_len] = 0; - pkt->packet_len = pkt_len; + pkt->packet = data; + pkt->packet_len = caplen; if (l4_hdr_offset != l3_hdr_offset) { if (l4_hdr_offset + pkt->ip_payload_len < 64) { From 942164d0bebef8fb095902fd41bfe6f4cdcc8c37 Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Sun, 24 Jul 2022 22:42:08 +0200 Subject: [PATCH 2/5] Changed allocation of PacketBlock structure. --- include/ipfixprobe/packet.hpp | 10 ++++++-- ipfixprobe.cpp | 48 ++++++++--------------------------- ipfixprobe.hpp | 17 ++----------- workers.cpp | 25 +++++++++--------- workers.hpp | 2 +- 5 files changed, 34 insertions(+), 68 deletions(-) diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index da77906f..52fe92e2 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -126,9 +126,15 @@ struct PacketBlock { size_t bytes; size_t size; - PacketBlock() : - pkts(nullptr), cnt(0), bytes(0), size(0) + PacketBlock(size_t pkts_size) : + cnt(0), bytes(0), size(pkts_size) { + pkts = new Packet[pkts_size]; + } + + ~PacketBlock() + { + delete[] pkts; } }; diff --git a/ipfixprobe.cpp b/ipfixprobe.cpp index 3f04e81c..1dc4a1e4 100644 --- a/ipfixprobe.cpp +++ b/ipfixprobe.cpp @@ -73,7 +73,6 @@ volatile sig_atomic_t terminate_export = 0; volatile sig_atomic_t terminate_input = 0; const uint32_t DEFAULT_IQUEUE_SIZE = 64; -const uint32_t DEFAULT_IQUEUE_BLOCK = 32; const uint32_t DEFAULT_OQUEUE_SIZE = 16536; const uint32_t DEFAULT_FPS = 0; // unlimited @@ -159,29 +158,6 @@ void print_help(ipxp_conf_t &conf, const std::string &arg) } } -void init_packets(ipxp_conf_t &conf) -{ - // Reserve +1 more block as a "working block" - conf.blocks_cnt = static_cast(conf.iqueue_size + 1U) * conf.worker_cnt; - conf.pkts_cnt = conf.blocks_cnt * conf.iqueue_block; - conf.pkt_data_cnt = conf.pkts_cnt * conf.pkt_bufsize; - conf.blocks = new PacketBlock[conf.blocks_cnt]; - conf.pkts = new Packet[conf.pkts_cnt]; - conf.pkt_data = new uint8_t[conf.pkt_data_cnt]; - - for (unsigned i = 0; i < conf.blocks_cnt; i++) { - size_t pkts_offset = static_cast(i) * conf.iqueue_block; // offset in number of packets - - conf.blocks[i].pkts = conf.pkts + pkts_offset; - conf.blocks[i].cnt = 0; - conf.blocks[i].size = conf.iqueue_block; - for (unsigned j = 0; j < conf.iqueue_block; j++) { - conf.blocks[i].pkts[j].buffer = static_cast(conf.pkt_data + conf.pkt_bufsize * (j + pkts_offset)); - conf.blocks[i].pkts[j].buffer_size = conf.pkt_bufsize; - } - } -} - void process_plugin_argline(const std::string &args, std::string &plugin, std::string ¶ms) { size_t delim; @@ -356,17 +332,17 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser) conf.input_stats.push_back(input_stats); WorkPipeline tmp = { - { - input_plugin, - new std::thread(input_storage_worker, input_plugin, storage_plugin, &conf.blocks[pipeline_idx * (conf.iqueue_size + 1)], - conf.iqueue_size + 1, conf.max_pkts, input_res, input_stats), - input_res, - input_stats - }, - { - storage_plugin, - storage_process_plugins - } + { + input_plugin, + new std::thread(input_storage_worker, input_plugin, storage_plugin, conf.iqueue_size, + conf.max_pkts, input_res, input_stats), + input_res, + input_stats + }, + { + storage_plugin, + storage_process_plugins + } }; conf.pipelines.push_back(tmp); pipeline_idx++; @@ -646,7 +622,6 @@ int run(int argc, char *argv[]) } conf.worker_cnt = parser.m_input.size(); - conf.iqueue_block = parser.m_iqueue_block; conf.iqueue_size = parser.m_iqueue; conf.oqueue_size = parser.m_oqueue; conf.fps = parser.m_fps; @@ -654,7 +629,6 @@ int run(int argc, char *argv[]) conf.max_pkts = parser.m_max_pkts; try { - init_packets(conf); if (process_plugin_args(conf, parser)) { goto EXIT; } diff --git a/ipfixprobe.hpp b/ipfixprobe.hpp index 1516edaa..71a1f7a3 100644 --- a/ipfixprobe.hpp +++ b/ipfixprobe.hpp @@ -66,7 +66,6 @@ namespace ipxp { extern const uint32_t DEFAULT_IQUEUE_SIZE; -extern const uint32_t DEFAULT_IQUEUE_BLOCK; extern const uint32_t DEFAULT_OQUEUE_SIZE; extern const uint32_t DEFAULT_FPS; @@ -95,7 +94,6 @@ class IpfixprobeOptParser : public OptionsParser { std::string m_pid; bool m_daemon; uint32_t m_iqueue; - uint32_t m_iqueue_block; uint32_t m_oqueue; uint32_t m_fps; uint32_t m_pkt_bufsize; @@ -106,7 +104,7 @@ class IpfixprobeOptParser : public OptionsParser { IpfixprobeOptParser() : OptionsParser("ipfixprobe", "flow exporter supporting various custom IPFIX elements"), m_pid(""), m_daemon(false), - m_iqueue(DEFAULT_IQUEUE_SIZE), m_iqueue_block(DEFAULT_IQUEUE_BLOCK), m_oqueue(DEFAULT_OQUEUE_SIZE), m_fps(DEFAULT_FPS), + m_iqueue(DEFAULT_IQUEUE_SIZE), m_oqueue(DEFAULT_OQUEUE_SIZE), m_fps(DEFAULT_FPS), m_pkt_bufsize(1600), m_max_pkts(0), m_help(false), m_help_str(""), m_version(false) { m_delim = ' '; @@ -137,12 +135,6 @@ class IpfixprobeOptParser : public OptionsParser { std::invalid_argument &e) { return false; } return true; }, OptionFlags::RequiredArgument); - register_option("-b", "--iqueueb", "SIZE", "Size of input queue packet block", - [this](const char *arg) { - try { m_iqueue_block = str2num(arg); } catch ( - std::invalid_argument &e) { return false; } - return true; - }, OptionFlags::RequiredArgument); register_option("-Q", "--oqueue", "SIZE", "Size of queue between storage and output plugins", [this](const char *arg) { try { m_oqueue = str2num(arg); } catch ( @@ -189,7 +181,6 @@ class IpfixprobeOptParser : public OptionsParser { struct ipxp_conf_t { uint32_t iqueue_size; - uint32_t iqueue_block; uint32_t oqueue_size; uint32_t worker_cnt; uint32_t fps; @@ -222,7 +213,7 @@ struct ipxp_conf_t { Packet *pkts; uint8_t *pkt_data; - ipxp_conf_t() : iqueue_size(DEFAULT_IQUEUE_SIZE), iqueue_block(DEFAULT_IQUEUE_BLOCK), + ipxp_conf_t() : iqueue_size(DEFAULT_IQUEUE_SIZE), oqueue_size(DEFAULT_OQUEUE_SIZE), worker_cnt(0), fps(0), max_pkts(0), pkt_bufsize(1600), blocks_cnt(0), pkts_cnt(0), pkt_data_cnt(0), blocks(nullptr), pkts(nullptr), pkt_data(nullptr) @@ -268,10 +259,6 @@ struct ipxp_conf_t { for (auto &it : output_stats) { delete it; } - - delete[] pkts; - delete[] blocks; - delete[] pkt_data; } }; diff --git a/workers.cpp b/workers.cpp index c38a5f58..cae9e77f 100644 --- a/workers.cpp +++ b/workers.cpp @@ -51,7 +51,7 @@ namespace ipxp { #define MICRO_SEC 1000000L -void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit, +void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, size_t queue_size, uint64_t pkt_limit, std::promise *out, std::atomic *out_stats) { struct timespec start_cache; @@ -60,11 +60,12 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock struct timespec end = {0, 0}; struct timeval ts = {0, 0}; bool timeout = false; - size_t i = 0; InputPlugin::Result ret; InputStats stats = {0, 0, 0, 0, 0}; WorkerResult res = {false, ""}; + PacketBlock block(queue_size); + #ifdef __linux__ const clockid_t clk_id = CLOCK_MONOTONIC_COARSE; #else @@ -72,18 +73,17 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock #endif while (!terminate_input) { - PacketBlock *block = &pkts[i]; - block->cnt = 0; - block->bytes = 0; + block.cnt = 0; + block.bytes = 0; - if (pkt_limit && plugin->m_parsed + block->size >= pkt_limit) { + if (pkt_limit && plugin->m_parsed + block.size >= pkt_limit) { if (plugin->m_parsed >= pkt_limit) { break; } - block->size = pkt_limit - plugin->m_parsed; + block.size = pkt_limit - plugin->m_parsed; } try { - ret = plugin->get(*block); + ret = plugin->get(block); } catch (PluginError &e) { res.error = true; res.msg = e.what(); @@ -107,13 +107,13 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock stats.packets = plugin->m_seen; stats.parsed = plugin->m_parsed; stats.dropped = plugin->m_dropped; - stats.bytes += block->bytes; + stats.bytes += block.bytes; clock_gettime(clk_id, &start_cache); try { - for (unsigned i = 0; i < block->cnt; i++) { - cache->put_pkt(block->pkts[i]); + for (unsigned i = 0; i < block.cnt; i++) { + cache->put_pkt(block.pkts[i]); } - ts = block->pkts[block->cnt - 1].ts; + ts = block.pkts[block.cnt - 1].ts; } catch (PluginError &e) { res.error = true; res.msg = e.what(); @@ -127,7 +127,6 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock time += 1000000000; } stats.qtime += time; - i = (i + 1) % block_cnt; out_stats->store(stats); } else if (ret == InputPlugin::Result::ERROR) { diff --git a/workers.hpp b/workers.hpp index 690784c0..51bafd48 100644 --- a/workers.hpp +++ b/workers.hpp @@ -86,7 +86,7 @@ struct OutputWorker { ipx_ring_t *queue; }; -void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit, +void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, size_t queue_size, uint64_t pkt_limit, std::promise *out, std::atomic *out_stats); void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promise *out, std::atomic *out_stats, uint32_t fps); From 51f77154296916faeee8f960c16226dda877905f Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Sun, 24 Jul 2022 22:56:33 +0200 Subject: [PATCH 3/5] PCAP: changed number of packets to read from interface. --- input/pcap.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/input/pcap.cpp b/input/pcap.cpp index a88880cf..117c74bf 100644 --- a/input/pcap.cpp +++ b/input/pcap.cpp @@ -53,6 +53,9 @@ namespace ipxp { +// Read only 1 packet into packet block +constexpr size_t PCAP_PACKET_BLOCK_SIZE = 1; + __attribute__((constructor)) static void register_this_plugin() { static PluginRecord rec = PluginRecord("pcap", [](){return new PcapReader();}); @@ -266,7 +269,7 @@ InputPlugin::Result PcapReader::get(PacketBlock &packets) } packets.cnt = 0; - ret = pcap_dispatch(m_handle, packets.size, packet_handler, (u_char *) (&opt)); + ret = pcap_dispatch(m_handle, PCAP_PACKET_BLOCK_SIZE, packet_handler, (u_char *) (&opt)); if (m_live) { if (ret == 0) { return Result::TIMEOUT; From 47d00ac152c114db6bcad54b054be4b79b35a47e Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Sun, 24 Jul 2022 23:00:09 +0200 Subject: [PATCH 4/5] RAW: changed number of packets to read from interface. --- input/raw.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/input/raw.cpp b/input/raw.cpp index 5563dd32..f03c8c3f 100644 --- a/input/raw.cpp +++ b/input/raw.cpp @@ -69,6 +69,9 @@ namespace ipxp { #error "raw plugin is supported with TPACKET3 only" #endif +// Read only 1 packet into packet block +constexpr size_t RAW_PACKET_BLOCK_SIZE = 1; + __attribute__((constructor)) static void register_this_plugin() { static PluginRecord rec = PluginRecord("raw", [](){return new RawReader();}); @@ -301,7 +304,7 @@ int RawReader::process_packets(struct tpacket_block_desc *pbd, PacketBlock &pack { parser_opt_t opt = {&packets, false, false, DLT_EN10MB}; uint32_t num_pkts = pbd->hdr.bh1.num_pkts; - uint32_t capacity = packets.size - packets.cnt; + uint32_t capacity = RAW_PACKET_BLOCK_SIZE - packets.cnt; uint32_t to_read = 0; struct tpacket3_hdr *ppd; From 554d3f9c703860758ba3419db6ed193f36100680 Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Sun, 24 Jul 2022 23:00:24 +0200 Subject: [PATCH 5/5] STEM: changed number of packets to read from interface. --- input/stem.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/input/stem.cpp b/input/stem.cpp index 8286f44d..81e619f5 100644 --- a/input/stem.cpp +++ b/input/stem.cpp @@ -50,6 +50,8 @@ namespace ipxp { +// Read only 1 packet into packet block +constexpr size_t STEM_PACKET_BLOCK_SIZE = 1; __attribute__((constructor)) static void register_this_plugin() { @@ -168,7 +170,7 @@ InputPlugin::Result StemPacketReader::get(PacketBlock &packets) { packets.cnt = 0; packets.bytes = 0; - while (packets.cnt < packets.size) { + while (packets.cnt < STEM_PACKET_BLOCK_SIZE) { try { auto pkt = m_reader->next_packet(); if (!pkt.has_value()) {