Skip to content

Commit

Permalink
Merge pull request #96 from CESNET/packet-zerocopy
Browse files Browse the repository at this point in the history
Packet zerocopy
  • Loading branch information
SiskaPavel authored Jul 26, 2022
2 parents 1fd3bb2 + 85d707d commit 6d9b63e
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 81 deletions.
15 changes: 11 additions & 4 deletions include/ipfixprobe/packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,18 @@ struct Packet : public Record {
uint32_t tcp_seq;
uint32_t tcp_ack;

uint8_t *packet; /**< Pointer to begin of packet, if available */
const uint8_t *packet; /**< Pointer to begin of packet, if available */
uint16_t packet_len; /**< Length of data in packet buffer, packet_len <= packet_len_wire */
uint16_t packet_len_wire; /**< Original packet length on wire */

uint8_t *payload; /**< Pointer to begin of payload, if available */
const uint8_t *payload; /**< Pointer to begin of payload, if available */
uint16_t payload_len; /**< Length of data in payload buffer, payload_len <= payload_len_wire */
uint16_t payload_len_wire; /**< Original payload length computed from headers */

uint8_t *custom; /**< Pointer to begin of custom data, if available */
uint16_t custom_len; /**< Length of data in custom buffer */

// TODO REMOVE
uint8_t *buffer; /**< Buffer for packet, payload and custom data */
uint16_t buffer_size; /**< Size of buffer */

Expand Down Expand Up @@ -125,9 +126,15 @@ struct PacketBlock {
size_t bytes;
size_t size;

PacketBlock() :
pkts(nullptr), cnt(0), bytes(0), size(0)
PacketBlock(size_t pkts_size) :
cnt(0), bytes(0), size(pkts_size)
{
pkts = new Packet[pkts_size];
}

~PacketBlock()
{
delete[] pkts;
}
};

Expand Down
10 changes: 2 additions & 8 deletions input/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,14 +697,8 @@ void parse_packet(parser_opt_t *opt, struct timeval ts, const uint8_t *data, uin
}

uint16_t pkt_len = caplen;
if ((int) pkt_len > pkt->buffer_size - 1) {
pkt_len = pkt->buffer_size - 1;
DEBUG_MSG("Packet size too long, truncating to %u\n", pkt_len);
}
pkt->packet = pkt->buffer;
memcpy(pkt->packet, data, pkt_len);
pkt->packet[pkt_len] = 0;
pkt->packet_len = pkt_len;
pkt->packet = data;
pkt->packet_len = caplen;

if (l4_hdr_offset != l3_hdr_offset) {
if (l4_hdr_offset + pkt->ip_payload_len < 64) {
Expand Down
5 changes: 4 additions & 1 deletion input/pcap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@

namespace ipxp {

// Read only 1 packet into packet block
constexpr size_t PCAP_PACKET_BLOCK_SIZE = 1;

__attribute__((constructor)) static void register_this_plugin()
{
static PluginRecord rec = PluginRecord("pcap", [](){return new PcapReader();});
Expand Down Expand Up @@ -266,7 +269,7 @@ InputPlugin::Result PcapReader::get(PacketBlock &packets)
}

packets.cnt = 0;
ret = pcap_dispatch(m_handle, packets.size, packet_handler, (u_char *) (&opt));
ret = pcap_dispatch(m_handle, PCAP_PACKET_BLOCK_SIZE, packet_handler, (u_char *) (&opt));
if (m_live) {
if (ret == 0) {
return Result::TIMEOUT;
Expand Down
5 changes: 4 additions & 1 deletion input/raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ namespace ipxp {
#error "raw plugin is supported with TPACKET3 only"
#endif

// Read only 1 packet into packet block
constexpr size_t RAW_PACKET_BLOCK_SIZE = 1;

__attribute__((constructor)) static void register_this_plugin()
{
static PluginRecord rec = PluginRecord("raw", [](){return new RawReader();});
Expand Down Expand Up @@ -301,7 +304,7 @@ int RawReader::process_packets(struct tpacket_block_desc *pbd, PacketBlock &pack
{
parser_opt_t opt = {&packets, false, false, DLT_EN10MB};
uint32_t num_pkts = pbd->hdr.bh1.num_pkts;
uint32_t capacity = packets.size - packets.cnt;
uint32_t capacity = RAW_PACKET_BLOCK_SIZE - packets.cnt;
uint32_t to_read = 0;
struct tpacket3_hdr *ppd;

Expand Down
4 changes: 3 additions & 1 deletion input/stem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@

namespace ipxp {

// Read only 1 packet into packet block
constexpr size_t STEM_PACKET_BLOCK_SIZE = 1;

__attribute__((constructor)) static void register_this_plugin()
{
Expand Down Expand Up @@ -168,7 +170,7 @@ InputPlugin::Result StemPacketReader::get(PacketBlock &packets)
{
packets.cnt = 0;
packets.bytes = 0;
while (packets.cnt < packets.size) {
while (packets.cnt < STEM_PACKET_BLOCK_SIZE) {
try {
auto pkt = m_reader->next_packet();
if (!pkt.has_value()) {
Expand Down
48 changes: 11 additions & 37 deletions ipfixprobe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ volatile sig_atomic_t terminate_export = 0;
volatile sig_atomic_t terminate_input = 0;

const uint32_t DEFAULT_IQUEUE_SIZE = 64;
const uint32_t DEFAULT_IQUEUE_BLOCK = 32;
const uint32_t DEFAULT_OQUEUE_SIZE = 16536;
const uint32_t DEFAULT_FPS = 0; // unlimited

Expand Down Expand Up @@ -159,29 +158,6 @@ void print_help(ipxp_conf_t &conf, const std::string &arg)
}
}

void init_packets(ipxp_conf_t &conf)
{
// Reserve +1 more block as a "working block"
conf.blocks_cnt = static_cast<size_t>(conf.iqueue_size + 1U) * conf.worker_cnt;
conf.pkts_cnt = conf.blocks_cnt * conf.iqueue_block;
conf.pkt_data_cnt = conf.pkts_cnt * conf.pkt_bufsize;
conf.blocks = new PacketBlock[conf.blocks_cnt];
conf.pkts = new Packet[conf.pkts_cnt];
conf.pkt_data = new uint8_t[conf.pkt_data_cnt];

for (unsigned i = 0; i < conf.blocks_cnt; i++) {
size_t pkts_offset = static_cast<size_t>(i) * conf.iqueue_block; // offset in number of packets

conf.blocks[i].pkts = conf.pkts + pkts_offset;
conf.blocks[i].cnt = 0;
conf.blocks[i].size = conf.iqueue_block;
for (unsigned j = 0; j < conf.iqueue_block; j++) {
conf.blocks[i].pkts[j].buffer = static_cast<uint8_t *>(conf.pkt_data + conf.pkt_bufsize * (j + pkts_offset));
conf.blocks[i].pkts[j].buffer_size = conf.pkt_bufsize;
}
}
}

void process_plugin_argline(const std::string &args, std::string &plugin, std::string &params)
{
size_t delim;
Expand Down Expand Up @@ -356,17 +332,17 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
conf.input_stats.push_back(input_stats);

WorkPipeline tmp = {
{
input_plugin,
new std::thread(input_storage_worker, input_plugin, storage_plugin, &conf.blocks[pipeline_idx * (conf.iqueue_size + 1)],
conf.iqueue_size + 1, conf.max_pkts, input_res, input_stats),
input_res,
input_stats
},
{
storage_plugin,
storage_process_plugins
}
{
input_plugin,
new std::thread(input_storage_worker, input_plugin, storage_plugin, conf.iqueue_size,
conf.max_pkts, input_res, input_stats),
input_res,
input_stats
},
{
storage_plugin,
storage_process_plugins
}
};
conf.pipelines.push_back(tmp);
pipeline_idx++;
Expand Down Expand Up @@ -646,15 +622,13 @@ int run(int argc, char *argv[])
}

conf.worker_cnt = parser.m_input.size();
conf.iqueue_block = parser.m_iqueue_block;
conf.iqueue_size = parser.m_iqueue;
conf.oqueue_size = parser.m_oqueue;
conf.fps = parser.m_fps;
conf.pkt_bufsize = parser.m_pkt_bufsize;
conf.max_pkts = parser.m_max_pkts;

try {
init_packets(conf);
if (process_plugin_args(conf, parser)) {
goto EXIT;
}
Expand Down
17 changes: 2 additions & 15 deletions ipfixprobe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
namespace ipxp {

extern const uint32_t DEFAULT_IQUEUE_SIZE;
extern const uint32_t DEFAULT_IQUEUE_BLOCK;
extern const uint32_t DEFAULT_OQUEUE_SIZE;
extern const uint32_t DEFAULT_FPS;

Expand Down Expand Up @@ -95,7 +94,6 @@ class IpfixprobeOptParser : public OptionsParser {
std::string m_pid;
bool m_daemon;
uint32_t m_iqueue;
uint32_t m_iqueue_block;
uint32_t m_oqueue;
uint32_t m_fps;
uint32_t m_pkt_bufsize;
Expand All @@ -106,7 +104,7 @@ class IpfixprobeOptParser : public OptionsParser {

IpfixprobeOptParser() : OptionsParser("ipfixprobe", "flow exporter supporting various custom IPFIX elements"),
m_pid(""), m_daemon(false),
m_iqueue(DEFAULT_IQUEUE_SIZE), m_iqueue_block(DEFAULT_IQUEUE_BLOCK), m_oqueue(DEFAULT_OQUEUE_SIZE), m_fps(DEFAULT_FPS),
m_iqueue(DEFAULT_IQUEUE_SIZE), m_oqueue(DEFAULT_OQUEUE_SIZE), m_fps(DEFAULT_FPS),
m_pkt_bufsize(1600), m_max_pkts(0), m_help(false), m_help_str(""), m_version(false)
{
m_delim = ' ';
Expand Down Expand Up @@ -137,12 +135,6 @@ class IpfixprobeOptParser : public OptionsParser {
std::invalid_argument &e) { return false; }
return true;
}, OptionFlags::RequiredArgument);
register_option("-b", "--iqueueb", "SIZE", "Size of input queue packet block",
[this](const char *arg) {
try { m_iqueue_block = str2num<decltype(m_iqueue_block)>(arg); } catch (
std::invalid_argument &e) { return false; }
return true;
}, OptionFlags::RequiredArgument);
register_option("-Q", "--oqueue", "SIZE", "Size of queue between storage and output plugins",
[this](const char *arg) {
try { m_oqueue = str2num<decltype(m_oqueue)>(arg); } catch (
Expand Down Expand Up @@ -189,7 +181,6 @@ class IpfixprobeOptParser : public OptionsParser {

struct ipxp_conf_t {
uint32_t iqueue_size;
uint32_t iqueue_block;
uint32_t oqueue_size;
uint32_t worker_cnt;
uint32_t fps;
Expand Down Expand Up @@ -222,7 +213,7 @@ struct ipxp_conf_t {
Packet *pkts;
uint8_t *pkt_data;

ipxp_conf_t() : iqueue_size(DEFAULT_IQUEUE_SIZE), iqueue_block(DEFAULT_IQUEUE_BLOCK),
ipxp_conf_t() : iqueue_size(DEFAULT_IQUEUE_SIZE),
oqueue_size(DEFAULT_OQUEUE_SIZE),
worker_cnt(0), fps(0), max_pkts(0),
pkt_bufsize(1600), blocks_cnt(0), pkts_cnt(0), pkt_data_cnt(0), blocks(nullptr), pkts(nullptr), pkt_data(nullptr)
Expand Down Expand Up @@ -268,10 +259,6 @@ struct ipxp_conf_t {
for (auto &it : output_stats) {
delete it;
}

delete[] pkts;
delete[] blocks;
delete[] pkt_data;
}
};

Expand Down
25 changes: 12 additions & 13 deletions workers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace ipxp {

#define MICRO_SEC 1000000L

void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit,
void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, size_t queue_size, uint64_t pkt_limit,
std::promise<WorkerResult> *out, std::atomic<InputStats> *out_stats)
{
struct timespec start_cache;
Expand All @@ -60,30 +60,30 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock
struct timespec end = {0, 0};
struct timeval ts = {0, 0};
bool timeout = false;
size_t i = 0;
InputPlugin::Result ret;
InputStats stats = {0, 0, 0, 0, 0};
WorkerResult res = {false, ""};

PacketBlock block(queue_size);

#ifdef __linux__
const clockid_t clk_id = CLOCK_MONOTONIC_COARSE;
#else
const clockid_t clk_id = CLOCK_MONOTONIC;
#endif

while (!terminate_input) {
PacketBlock *block = &pkts[i];
block->cnt = 0;
block->bytes = 0;
block.cnt = 0;
block.bytes = 0;

if (pkt_limit && plugin->m_parsed + block->size >= pkt_limit) {
if (pkt_limit && plugin->m_parsed + block.size >= pkt_limit) {
if (plugin->m_parsed >= pkt_limit) {
break;
}
block->size = pkt_limit - plugin->m_parsed;
block.size = pkt_limit - plugin->m_parsed;
}
try {
ret = plugin->get(*block);
ret = plugin->get(block);
} catch (PluginError &e) {
res.error = true;
res.msg = e.what();
Expand All @@ -107,13 +107,13 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock
stats.packets = plugin->m_seen;
stats.parsed = plugin->m_parsed;
stats.dropped = plugin->m_dropped;
stats.bytes += block->bytes;
stats.bytes += block.bytes;
clock_gettime(clk_id, &start_cache);
try {
for (unsigned i = 0; i < block->cnt; i++) {
cache->put_pkt(block->pkts[i]);
for (unsigned i = 0; i < block.cnt; i++) {
cache->put_pkt(block.pkts[i]);
}
ts = block->pkts[block->cnt - 1].ts;
ts = block.pkts[block.cnt - 1].ts;
} catch (PluginError &e) {
res.error = true;
res.msg = e.what();
Expand All @@ -127,7 +127,6 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock
time += 1000000000;
}
stats.qtime += time;
i = (i + 1) % block_cnt;

out_stats->store(stats);
} else if (ret == InputPlugin::Result::ERROR) {
Expand Down
2 changes: 1 addition & 1 deletion workers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct OutputWorker {
ipx_ring_t *queue;
};

void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit,
void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, size_t queue_size, uint64_t pkt_limit,
std::promise<WorkerResult> *out, std::atomic<InputStats> *out_stats);
void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promise<WorkerResult> *out, std::atomic<OutputStats> *out_stats,
uint32_t fps);
Expand Down

0 comments on commit 6d9b63e

Please sign in to comment.