From 858cd593e96162abfb9475e37e68f9ec3c868a66 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Sun, 12 Dec 2021 20:48:50 +0100 Subject: [PATCH 01/20] tls: fixed ja3 text plugin output --- process/tls.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/process/tls.hpp b/process/tls.hpp index b0053a4e..b672e420 100644 --- a/process/tls.hpp +++ b/process/tls.hpp @@ -146,7 +146,7 @@ struct RecordExtTLS : public RecordExt { << ",tlsalpn=\"" << alpn << "\"" << ",tlsja3="; for (int i = 0; i < 16; i++) { - out << std::hex << std::setw(2) << ja3_hash_bin[i]; + out << std::hex << std::setw(2) << (unsigned) ja3_hash_bin[i]; } return out.str(); } From cfbddb47887613e8c3d65b0f63e523f76611b163 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Sun, 12 Dec 2021 20:53:38 +0100 Subject: [PATCH 02/20] pcap: improved help text --- input/pcap.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/input/pcap.hpp b/input/pcap.hpp index 9781d85f..d164a48d 100644 --- a/input/pcap.hpp +++ b/input/pcap.hpp @@ -82,7 +82,7 @@ class PcapOptParser : public OptionsParser register_option("f", "file", "PATH", "Path to a pcap file", [this](const char *arg){m_file = arg; return true;}, OptionFlags::RequiredArgument); register_option("i", "ifc", "IFC", "Network interface name", [this](const char *arg){m_ifc = arg; return true;}, OptionFlags::RequiredArgument); register_option("F", "filter", "STR", "Filter string", [this](const char *arg){m_filter = arg; return true;}, OptionFlags::RequiredArgument); - register_option("s", "snaplen", "SIZE", "Snapshot length in bytes", + register_option("s", "snaplen", "SIZE", "Snapshot length in bytes (live capture only)", [this](const char *arg){try {m_snaplen = str2num(arg);} catch(std::invalid_argument &e) {return false;} return true;}, OptionFlags::RequiredArgument); register_option("l", "list", "", "Print list of available interfaces", [this](const char *arg){m_list = true; return true;}, OptionFlags::NoArgument); From 2f55feb9354b0d8dfa43c5912ed207c0b2bc86b6 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Sun, 12 Dec 2021 20:59:56 +0100 Subject: [PATCH 03/20] set fill char for byte fields in text outputs --- process/idpcontent.hpp | 4 ++-- process/tls.hpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/process/idpcontent.hpp b/process/idpcontent.hpp index 4ffe80da..b92c5312 100644 --- a/process/idpcontent.hpp +++ b/process/idpcontent.hpp @@ -137,11 +137,11 @@ struct RecordExtIDPCONTENT : public RecordExt { std::ostringstream out; out << "idpsrc="; for (size_t i = 0; i < idps[IDP_CONTENT_INDEX].size; i++) { - out << std::hex << std::setw(2) << idps[IDP_CONTENT_INDEX].data[i]; + out << std::hex << std::setw(2) << std::setfill('0') << idps[IDP_CONTENT_INDEX].data[i]; } out << ",idpdst="; for (size_t i = 0; i < idps[IDP_CONTENT_REV_INDEX].size; i++) { - out << std::hex << std::setw(2) << idps[IDP_CONTENT_REV_INDEX].data[i]; + out << std::hex << std::setw(2) << std::setfill('0') << idps[IDP_CONTENT_REV_INDEX].data[i]; } return out.str(); } diff --git a/process/tls.hpp b/process/tls.hpp index b672e420..e2030c54 100644 --- a/process/tls.hpp +++ b/process/tls.hpp @@ -146,7 +146,7 @@ struct RecordExtTLS : public RecordExt { << ",tlsalpn=\"" << alpn << "\"" << ",tlsja3="; for (int i = 0; i < 16; i++) { - out << std::hex << std::setw(2) << (unsigned) ja3_hash_bin[i]; + out << std::hex << std::setw(2) << std::setfill('0') << (unsigned) ja3_hash_bin[i]; } return out.str(); } From 0b2459567976a1dbbc02ab786b83dd3efdc78164 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Sun, 12 Dec 2021 21:36:53 +0100 Subject: [PATCH 04/20] tls: now parsing fields from shorter malformed packets --- process/tls.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/process/tls.cpp b/process/tls.cpp index 483e0a9e..183493a5 100644 --- a/process/tls.cpp +++ b/process/tls.cpp @@ -173,7 +173,9 @@ bool parse_tls_nonext_hdr(payload_data &payload, std::stringstream *ja3) } uint32_t hs_len = tls_hs->length1 << 16 | ntohs(tls_hs->length2); - if (payload.data + hs_len > payload.end || tls_hs->version.major != 3 || + // 1 + 3 + 2 + 32 + 1 + 2 + 1 + 2 = 44 + // type + length + version + random + sessionid + ciphers + compression + ext-len + if (payload.data + 44 > payload.end || tls_hs->version.major != 3 || tls_hs->version.minor < 1 || tls_hs->version.minor > 3) { return false; } @@ -217,10 +219,9 @@ bool parse_tls_nonext_hdr(payload_data &payload, std::stringstream *ja3) const char *ext_end = payload.data + ntohs(*(uint16_t *) payload.data) + 2; payload.data += 2; - if (ext_end > payload.end) { - return false; + if (ext_end <= payload.end) { + payload.end = ext_end; } - payload.end = ext_end; return true; } @@ -258,6 +259,10 @@ bool TLSPlugin::parse_tls(const char *data, uint16_t payload_len, RecordExtTLS * uint16_t type = ntohs(ext->type); payload.data += sizeof(tls_ext); + if (payload.data + length > payload.end) { + break; + } + if (hs_type == TLS_HANDSHAKE_CLIENT_HELLO) { if (type == TLS_EXT_SERVER_NAME) { get_tls_server_name(payload, rec->sni, sizeof(rec->sni)); From e562dfd74f66e5c9d69c1d47d9408953756b0439 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Sun, 12 Dec 2021 22:08:27 +0100 Subject: [PATCH 05/20] tls: export client version --- include/ipfixprobe/ipfix-elements.hpp | 2 ++ process/tls.cpp | 4 +++- process/tls.hpp | 13 ++++++++++--- tests/functional/reference/tls | 28 +++++++++++++-------------- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/include/ipfixprobe/ipfix-elements.hpp b/include/ipfixprobe/ipfix-elements.hpp index 51c97f85..59f309b9 100644 --- a/include/ipfixprobe/ipfix-elements.hpp +++ b/include/ipfixprobe/ipfix-elements.hpp @@ -194,6 +194,7 @@ namespace ipxp { #define TLS_SNI(F) F(8057, 808, -1, nullptr) #define TLS_ALPN(F) F(8057, 809, -1, nullptr) +#define TLS_VERSION(F) F(8057, 810, 2, nullptr) #define TLS_JA3(F) F(8057, 830, -1, nullptr) #define SMTP_COMMANDS(F) F(8057, 810, 4, nullptr) @@ -337,6 +338,7 @@ namespace ipxp { #define IPFIX_TLS_TEMPLATE(F) \ F(TLS_SNI) \ F(TLS_ALPN) \ + F(TLS_VERSION) \ F(TLS_JA3) #define IPFIX_NTP_TEMPLATE(F) \ diff --git a/process/tls.cpp b/process/tls.cpp index 183493a5..b2e4245b 100644 --- a/process/tls.cpp +++ b/process/tls.cpp @@ -172,7 +172,8 @@ bool parse_tls_nonext_hdr(payload_data &payload, std::stringstream *ja3) return false; } - uint32_t hs_len = tls_hs->length1 << 16 | ntohs(tls_hs->length2); + //uint32_t hs_len = tls_hs->length1 << 16 | ntohs(tls_hs->length2); + // 1 + 3 + 2 + 32 + 1 + 2 + 1 + 2 = 44 // type + length + version + random + sessionid + ciphers + compression + ext-len if (payload.data + 44 > payload.end || tls_hs->version.major != 3 || @@ -252,6 +253,7 @@ bool TLSPlugin::parse_tls(const char *data, uint16_t payload_len, RecordExtTLS * if (!parse_tls_nonext_hdr(payload, &ja3)) { return false; } + rec->version = ((uint16_t) tls_hs->version.major << 8) | tls_hs->version.minor; while (payload.data + sizeof(tls_ext) <= payload.end) { tls_ext *ext = (tls_ext *) payload.data; diff --git a/process/tls.hpp b/process/tls.hpp index e2030c54..eb1d5ee9 100644 --- a/process/tls.hpp +++ b/process/tls.hpp @@ -61,11 +61,12 @@ namespace ipxp { -#define TLS_UNIREC_TEMPLATE "TLS_SNI,TLS_JA3,TLS_ALPN" +#define TLS_UNIREC_TEMPLATE "TLS_SNI,TLS_JA3,TLS_ALPN,TLS_VERSION" UR_FIELDS( string TLS_SNI, string TLS_ALPN, + uint16 TLS_VERSION, bytes TLS_JA3 ) @@ -75,6 +76,7 @@ UR_FIELDS( struct RecordExtTLS : public RecordExt { static int REGISTERED_ID; + uint16_t version; char alpn[255]; char sni[255]; char ja3_hash[33]; @@ -84,7 +86,7 @@ struct RecordExtTLS : public RecordExt { /** * \brief Constructor. */ - RecordExtTLS() : RecordExt(REGISTERED_ID) + RecordExtTLS() : RecordExt(REGISTERED_ID), version(0) { alpn[0] = 0; sni[0] = 0; @@ -93,6 +95,7 @@ struct RecordExtTLS : public RecordExt { #ifdef WITH_NEMEA virtual void fill_unirec(ur_template_t *tmplt, void *record) { + ur_set(tmplt, record, F_TLS_VERSION, version); ur_set_string(tmplt, record, F_TLS_SNI, sni); ur_set_string(tmplt, record, F_TLS_ALPN, alpn); ur_set_var(tmplt, record, F_TLS_JA3, ja3_hash_bin, 16); @@ -110,10 +113,13 @@ struct RecordExtTLS : public RecordExt { int alpn_len = strlen(alpn); int pos = 0; - if (sni_len + alpn_len + 16 + 3 > size) { + if (sni_len + alpn_len + 2 + 16 + 3 > size) { return -1; } + *(uint16_t *) buffer = ntohs(version); + pos += 2; + buffer[pos++] = sni_len; memcpy(buffer + pos, sni, sni_len); pos += sni_len; @@ -144,6 +150,7 @@ struct RecordExtTLS : public RecordExt { std::ostringstream out; out << "tlssni=\"" << sni << "\"" << ",tlsalpn=\"" << alpn << "\"" + << ",tlsversion=0x" << std::hex << std::setw(4) << std::setfill('0') << version << ",tlsja3="; for (int i = 0; i < 16; i++) { out << std::hex << std::setw(2) << std::setfill('0') << (unsigned) ja3_hash_bin[i]; diff --git a/tests/functional/reference/tls b/tests/functional/reference/tls index a5b8b16a..77aa1924 100644 --- a/tests/functional/reference/tls +++ b/tests/functional/reference/tls @@ -1,14 +1,14 @@ -160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.858429,2020-09-07T06:52:40.964957,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59673,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" -160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.858553,2020-09-07T06:52:40.965848,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59674,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" -160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.870218,2020-09-07T06:52:40.972697,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59675,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" -160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.870714,2020-09-07T06:52:40.973197,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59676,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" -160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.870851,2020-09-07T06:52:40.975124,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59677,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" -160.85.255.180,192.168.88.244,714,3436,0,2020-09-07T06:52:40.477735,2020-09-07T06:52:40.549534,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,4,443,59672,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" -82.145.216.15,192.168.88.244,714,4308,0,2020-09-07T06:52:41.372241,2020-09-07T06:52:41.418203,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,4,443,59678,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"af.opera.com" -87.106.189.123,172.16.121.155,1085,4455,0,2015-10-20T07:09:33.614159,2015-10-20T07:09:34.161736,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,7,9,443,3923,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" -87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.611190,2015-10-20T07:09:33.712851,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3919,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" -87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.612842,2015-10-20T07:09:33.714751,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3920,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" -87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.613610,2015-10-20T07:09:33.716640,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3921,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" -87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.613897,2015-10-20T07:09:33.713722,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3922,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" -87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.614399,2015-10-20T07:09:33.715479,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3924,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" -ipaddr DST_IP,ipaddr SRC_IP,uint64 BYTES,uint64 BYTES_REV,uint64 LINK_BIT_FIELD,time TIME_FIRST,time TIME_LAST,macaddr DST_MAC,macaddr SRC_MAC,uint32 PACKETS,uint32 PACKETS_REV,uint16 DST_PORT,uint16 SRC_PORT,uint8 DIR_BIT_FIELD,uint8 PROTOCOL,uint8 TCP_FLAGS,uint8 TCP_FLAGS_REV,string TLS_ALPN,bytes TLS_JA3,string TLS_SNI +160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.858429,2020-09-07T06:52:40.964957,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59673,771,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" +160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.858553,2020-09-07T06:52:40.965848,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59674,771,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" +160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.870218,2020-09-07T06:52:40.972697,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59675,771,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" +160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.870714,2020-09-07T06:52:40.973197,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59676,771,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" +160.85.255.180,192.168.88.244,672,208,0,2020-09-07T06:52:40.870851,2020-09-07T06:52:40.975124,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,1,443,59677,771,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" +160.85.255.180,192.168.88.244,714,3436,0,2020-09-07T06:52:40.477735,2020-09-07T06:52:40.549534,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,4,443,59672,771,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"ja3er.com" +82.145.216.15,192.168.88.244,714,4308,0,2020-09-07T06:52:41.372241,2020-09-07T06:52:41.418203,d8:58:d7:00:c9:27,08:f8:bc:64:5e:6a,2,4,443,59678,771,0,6,24,24,"http/1.1",b32309a26951912be7dba376398abc3b,"af.opera.com" +87.106.189.123,172.16.121.155,1085,4455,0,2015-10-20T07:09:33.614159,2015-10-20T07:09:34.161736,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,7,9,443,3923,771,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" +87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.611190,2015-10-20T07:09:33.712851,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3919,771,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" +87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.612842,2015-10-20T07:09:33.714751,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3920,771,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" +87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.613610,2015-10-20T07:09:33.716640,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3921,771,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" +87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.613897,2015-10-20T07:09:33.713722,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3922,771,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" +87.106.189.123,172.16.121.155,496,325,0,2015-10-20T07:09:33.614399,2015-10-20T07:09:33.715479,00:50:56:e5:80:5b,00:0c:29:9d:b9:d0,4,4,443,3924,771,0,6,26,26,"",9a7b51089c089491dbc4879218db549c,"asecuritysite.com" +ipaddr DST_IP,ipaddr SRC_IP,uint64 BYTES,uint64 BYTES_REV,uint64 LINK_BIT_FIELD,time TIME_FIRST,time TIME_LAST,macaddr DST_MAC,macaddr SRC_MAC,uint32 PACKETS,uint32 PACKETS_REV,uint16 DST_PORT,uint16 SRC_PORT,uint16 TLS_VERSION,uint8 DIR_BIT_FIELD,uint8 PROTOCOL,uint8 TCP_FLAGS,uint8 TCP_FLAGS_REV,string TLS_ALPN,bytes TLS_JA3,string TLS_SNI From 62f08644dfec6ddcf95358b6db0a9fff4a0efa5d Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Sun, 12 Dec 2021 22:13:07 +0100 Subject: [PATCH 06/20] updated README.md --- README.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c7d52058..52d3bfd6 100644 --- a/README.md +++ b/README.md @@ -231,10 +231,12 @@ List of unirec fields exported together with basic flow fields on interface by R ### TLS List of unirec fields exported together with basic flow fields on interface by TLS plugin. -| UniRec field | Type | Description | -|:-------------------:|:------:|:----------------------------:| -| TLS_SNI | string | TLS server name indication | -| TLS_JA3 | string | TLS client JA3 fingerprint | +| UniRec field | Type | Description | +|:-------------------:|:------:|:-------------------------------------------------------------:| +| TLS_SNI | string | TLS server name indication field from client | +| TLS_ALPN | string | TLS application protocol layer negotiation field from server | +| TLS_VERSION | uint16 | TLS client protocol version | +| TLS_JA3 | string | TLS client JA3 fingerprint | ### DNS List of unirec fields exported together with basic flow fields on interface by DNS plugin. From e4261f55d03becfc9df3734c5dee254f32f56fc3 Mon Sep 17 00:00:00 2001 From: Karel Hynek Date: Tue, 14 Dec 2021 17:17:39 +0100 Subject: [PATCH 07/20] README fix --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 52d3bfd6..a0dca82f 100644 --- a/README.md +++ b/README.md @@ -413,7 +413,7 @@ Note: the following fields are UniRec arrays. ##### Example: ``` -ipfixprobe -p pstats:includezeros -r sample.pcap -i "f:output.trapcap" +ipfixprobe 'pcap;file=pcaps/http.pcap' -p pstats:includezeros -o 'unirec;i=u:stats:timeout=WAIT;p=stats'" ``` ### SSDP @@ -507,7 +507,7 @@ The exported unirec fields and IPFIX basiclists is shown in following table: ##### Example: ``` -ipfixprobe -p phists:includezeros -r sample.pcap -i "f:output.trapcap" +ipfixprobe 'pcap;file=pcaps/http.pcap' -p phists:includezeros -o 'unirec;i=u:hists:timeout=WAIT;p=phists'" ``` ### BSTATS From 5be394e03464865847b41301c0ccc9e4d28c64e6 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Tue, 14 Dec 2021 18:43:30 +0100 Subject: [PATCH 08/20] added live stats reading tool --- Makefile.am | 15 ++- configure.ac | 8 ++ ipfixprobe.cpp | 141 +++++++++++++++++++++++------ ipfixprobe.hpp | 17 +++- ipfixprobe_stats.cpp | 211 +++++++++++++++++++++++++++++++++++++++++++ stats.cpp | 165 +++++++++++++++++++++++++++++++++ stats.hpp | 87 ++++++++++++++++++ workers.cpp | 48 ++++++---- workers.hpp | 38 +++----- 9 files changed, 652 insertions(+), 78 deletions(-) create mode 100644 ipfixprobe_stats.cpp create mode 100644 stats.cpp create mode 100644 stats.hpp diff --git a/Makefile.am b/Makefile.am index 316b93c0..c31ec850 100644 --- a/Makefile.am +++ b/Makefile.am @@ -5,11 +5,11 @@ SUBDIRS+=input/nfbCInterface endif SUBDIRS+=. tests init -bin_PROGRAMS=ipfixprobe +bin_PROGRAMS=ipfixprobe ipfixprobe_stats DISTCHECK_CONFIGURE_FLAGS="--with-systemdsystemunitdir=$$dc_install_base/$(systemdsystemunitdir)" -ipfixprobe_LDFLAGS=-lpthread -ldl +ipfixprobe_LDFLAGS=-lpthread -ldl -latomic ipfixprobe_CFLAGS=-I$(srcdir)/include/ ipfixprobe_CXXFLAGS=-std=gnu++11 -Wno-write-strings -I$(srcdir)/include/ @@ -176,6 +176,8 @@ ipfixprobe_src=\ ring.c \ workers.cpp \ workers.hpp \ + stats.cpp \ + stats.hpp \ ipfixprobe.hpp \ ipfixprobe.cpp @@ -187,6 +189,15 @@ endif ipfixprobe_SOURCES=$(ipfixprobe_src) main.cpp +ipfixprobe_stats_CXXFLAGS=-std=gnu++11 -Wno-write-strings -I$(srcdir)/include/ +ipfixprobe_stats_SOURCES=ipfixprobe_stats.cpp \ + include/ipfixprobe/options.hpp \ + include/ipfixprobe/utils.hpp \ + stats.cpp \ + stats.hpp \ + options.cpp \ + utils.cpp + pkgdocdir=${docdir}/ipfixprobe pkgdoc_DATA=README.md EXTRA_DIST=README.md \ diff --git a/configure.ac b/configure.ac index a534a9ed..4b8e8c9f 100644 --- a/configure.ac +++ b/configure.ac @@ -73,6 +73,14 @@ AC_TYPE_UINT8_T AX_C_BIGENDIAN_CROSS +AC_ARG_WITH([defaultsocketdir], + [AS_HELP_STRING([--with-defaultsocketdir=DIR], [Directory for UNIX&service IFCs [/tmp], for production set it to e.g. /var/run/ipfixprobe.])], + [], + [with_defaultsocketdir=/tmp]) + +AC_SUBST([defaultsocketdir], [$with_defaultsocketdir]) +AC_DEFINE_DIR([DEFAULTSOCKETDIR], [defaultsocketdir], [Default path to socket directory]) + ### gtest AC_ARG_WITH([gtest], AC_HELP_STRING([--with-gtest],[Compile ipfixprobe with gtest framework]), diff --git a/ipfixprobe.cpp b/ipfixprobe.cpp index 8ffaa683..804ef205 100644 --- a/ipfixprobe.cpp +++ b/ipfixprobe.cpp @@ -52,6 +52,7 @@ #include #include #include +#include #ifdef WITH_DPDK #include @@ -62,6 +63,7 @@ #ifdef WITH_LIBUNWIND #include "stacktrace.hpp" #endif +#include "stats.hpp" namespace ipxp { @@ -279,15 +281,18 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser) } { - std::promise *output_stats = new std::promise(); + std::promise *output_res = new std::promise(); + auto output_stats = new std::atomic(); + conf.output_stats.push_back(output_stats); OutputWorker tmp = { output_plugin, - new std::thread(output_worker, output_plugin, output_queue, output_stats, conf.fps, &conf.exit_output), + new std::thread(output_worker, output_plugin, output_queue, output_res, output_stats, conf.fps, &conf.exit_output), + output_res, output_stats, output_queue }; conf.outputs.push_back(tmp); - conf.output_fut.push_back(output_stats->get_future()); + conf.output_fut.push_back(output_res->get_future()); } // Input @@ -350,22 +355,26 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser) throw IPXPError("unable to initialize ring buffer"); } - std::promise *input_stats = new std::promise(); - std::promise *storage_stats = new std::promise(); - conf.input_fut.push_back(input_stats->get_future()); - conf.storage_fut.push_back(storage_stats->get_future()); + std::promise *input_res = new std::promise(); + std::promise *storage_res = new std::promise(); + conf.input_fut.push_back(input_res->get_future()); + conf.storage_fut.push_back(storage_res->get_future()); + + auto input_stats = new std::atomic(); + conf.input_stats.push_back(input_stats); WorkPipeline tmp = { { input_plugin, new std::thread(input_worker, input_plugin, &conf.blocks[pipeline_idx * (conf.iqueue_size + 1)], - conf.iqueue_size + 1, conf.max_pkts, input_queue, input_stats, &conf.exit_input), - input_stats, + conf.iqueue_size + 1, conf.max_pkts, input_queue, input_res, input_stats, &conf.exit_input), + input_res, + input_stats }, { storage_plugin, - new std::thread(storage_worker, storage_plugin, input_queue, storage_stats, &conf.exit_storage), - storage_stats, + new std::thread(storage_worker, storage_plugin, input_queue, storage_res, &conf.exit_storage), + storage_res, storage_process_plugins }, input_queue @@ -412,23 +421,26 @@ void finish(ipxp_conf_t &conf) std::setw(10) << "packets" << std::setw(10) << "parsed" << std::setw(16) << "bytes" << + std::setw(10) << "dropped" << std::setw(10) << "qtime" << std::setw(7) << "status" << std::endl; int idx = 0; for (auto &it : conf.input_fut) { - InputStats input = it.get(); + WorkerResult res = it.get(); std::string status = "ok"; - if (input.error) { + if (res.error) { ok = false; - status = input.msg; + status = res.msg; } + InputStats stats = conf.input_stats[idx]->load(); std::cout << std::setw(3) << idx++ << " " << - std::setw(9) << input.packets << " " << - std::setw(9) << input.parsed << " " << - std::setw(15) << input.bytes << " " << - std::setw(9) << input.qtime << " " << + std::setw(9) << stats.packets << " " << + std::setw(9) << stats.parsed << " " << + std::setw(15) << stats.bytes << " " << + std::setw(9) << stats.dropped << " " << + std::setw(9) << stats.qtime << " " << std::setw(6) << status << std::endl; } @@ -440,12 +452,12 @@ void finish(ipxp_conf_t &conf) idx = 0; bool storage_ok = true; for (auto &it : conf.storage_fut) { - StorageStats storage = it.get(); + WorkerResult res = it.get(); std::string status = "ok"; - if (storage.error) { + if (res.error) { ok = false; storage_ok = false; - status = storage.msg; + status = res.msg; } oss << std::setw(3) << idx++ << " " << @@ -465,18 +477,19 @@ void finish(ipxp_conf_t &conf) idx = 0; for (auto &it : conf.output_fut) { - OutputStats output = it.get(); + WorkerResult res = it.get(); std::string status = "ok"; - if (output.error) { + if (res.error) { ok = false; - status = output.msg; + status = res.msg; } + OutputStats stats = conf.output_stats[idx]->load(); std::cout << std::setw(3) << idx++ << " " << - std::setw(9) << output.biflows << " " << - std::setw(9) << output.packets << " " << - std::setw(15) << output.bytes << " " << - std::setw(9) << output.dropped << " " << + std::setw(9) << stats.biflows << " " << + std::setw(9) << stats.packets << " " << + std::setw(15) << stats.bytes << " " << + std::setw(9) << stats.dropped << " " << std::setw(6) << status << std::endl; } @@ -485,17 +498,82 @@ void finish(ipxp_conf_t &conf) } } +void serve_stat_clients(ipxp_conf_t &conf, struct pollfd pfds[2]) +{ + uint8_t buffer[100000]; + size_t written = 0; + msg_header_t *hdr = (msg_header_t *) buffer; + int ret = ppoll(pfds, 2, 0, NULL); + if (ret <= 0) { + return; + } + if (pfds[1].fd > 0 && pfds[1].revents & POLL_IN) { + ret = recv_data(pfds[1].fd, sizeof(uint32_t), buffer); + if (ret < 0) { + // Client disconnected + close(pfds[1].fd); + pfds[1].fd = -1; + } else { + if (*((uint32_t *) buffer) != MSG_MAGIC) { + return; + } + // Received stats request from client + written += sizeof(msg_header_t); + for (auto &it : conf.input_stats) { + InputStats stats = it->load(); + *(InputStats *)(buffer + written) = stats; + written += sizeof(InputStats); + } + for (auto &it : conf.output_stats) { + OutputStats stats = it->load(); + *(OutputStats *)(buffer + written) = stats; + written += sizeof(OutputStats); + } + + hdr->magic = MSG_MAGIC; + hdr->size = written - sizeof(msg_header_t); + hdr->inputs = conf.input_stats.size(); + hdr->outputs = conf.output_stats.size(); + + send_data(pfds[1].fd, written, buffer); + } + } + + if (pfds[0].revents & POLL_IN) { + int fd = accept(pfds[0].fd, NULL, NULL); + if (pfds[1].fd == -1) { + pfds[1].fd = fd; + } else { + close(fd); + } + } +} + void main_loop(ipxp_conf_t &conf) { - std::vector*> futs; + std::vector*> futs; for (auto &it : conf.input_fut) { futs.push_back(&it); } + + struct pollfd pfds[2] = { + {.fd = -1, .events = POLL_IN}, // Server + {.fd = -1, .events = POLL_IN} // Client + }; + + std::string sock_path = create_sockpath(std::to_string(getpid()).c_str()); + pfds[0].fd = create_stats_sock(sock_path.c_str()); + if (pfds[0].fd < 0) { + error("Unable to create stats socket " + sock_path); + } + while (!stop && futs.size()) { + serve_stat_clients(conf, pfds); + for (auto it = futs.begin(); it != futs.end(); it++) { std::future_status status = (*it)->wait_for(std::chrono::seconds(0)); if (status == std::future_status::ready) { - InputStats res = (*it)->get(); + WorkerResult res = (*it)->get(); if (!res.error) { it = futs.erase(it); break; @@ -522,6 +600,9 @@ void main_loop(ipxp_conf_t &conf) usleep(1000); } + close(pfds[0].fd); + close(pfds[1].fd); + unlink(sock_path.c_str()); finish(conf); } diff --git a/ipfixprobe.hpp b/ipfixprobe.hpp index c04fcce6..ab72c656 100644 --- a/ipfixprobe.hpp +++ b/ipfixprobe.hpp @@ -48,6 +48,7 @@ #include #include #include +#include #include #include @@ -201,9 +202,12 @@ struct ipxp_conf_t { std::vector pipelines; std::vector outputs; - std::vector> input_fut; - std::vector> storage_fut; - std::vector> output_fut; + std::vector *> input_stats; + std::vector *> output_stats; + + std::vector> input_fut; + std::vector> storage_fut; + std::vector> output_fut; std::promise exit_input_pr; std::promise exit_storage_pr; @@ -277,6 +281,13 @@ struct ipxp_conf_t { ipx_ring_destroy(it.queue); } + for (auto &it : input_stats) { + delete it; + } + for (auto &it : output_stats) { + delete it; + } + delete[] pkts; delete[] blocks; delete[] pkt_data; diff --git a/ipfixprobe_stats.cpp b/ipfixprobe_stats.cpp new file mode 100644 index 00000000..8d6bb6f9 --- /dev/null +++ b/ipfixprobe_stats.cpp @@ -0,0 +1,211 @@ +/** + * \file ipfixprobe_stats.hpp + * \brief Exporter live stats reading utility + * \author Jiri Havranek + * \date 2021 + */ +/* + * Copyright (C) 2021 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "stats.hpp" + +using namespace ipxp; + +volatile sig_atomic_t stop = 0; + +void signal_handler(int sig) +{ + stop = 1; +} + +class IpfixStatsParser : public OptionsParser { +public: + pid_t m_pid; + bool m_one; + bool m_help; + + IpfixStatsParser() : OptionsParser("ipfixprobe_stats", "Read statistics from running ipfixprobe exporter"), + m_pid(0), m_one(false), m_help(false) + { + m_delim = ' '; + + register_option("-p", "--pid", "NUM", "ipfixprobe exporter PID number", [this](const char *arg) { + try { m_pid = str2num(arg); } catch ( + std::invalid_argument &e) { return false; } + return true; + }, OptionFlags::RequiredArgument); + register_option("-1", "--one", "", "Print stats and exit", [this](const char *arg) { + m_one = true; + return true; + }, OptionFlags::NoArgument); + register_option("-h", "--help", "", "Print help", [this](const char *arg) { + m_help = true; + return true; + }, OptionFlags::NoArgument); + } +}; + +static void error(const std::string &msg) +{ + std::cerr << "Error: " << msg << std::endl; +} + +int main(int argc, char *argv[]) +{ + size_t lines_written = 0; + int fd = -1; + int status = EXIT_SUCCESS; + uint8_t buffer[100000]; + msg_header_t *hdr = (msg_header_t *) buffer; + std::string path; + IpfixStatsParser parser; + + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); + + try { + parser.parse(argc - 1, const_cast(argv) + 1); + } catch (ParserError &e) { + error(e.what()); + status = EXIT_FAILURE; + goto EXIT; + } + + if (parser.m_help) { + parser.usage(std::cout, 0); + goto EXIT; + } + + path = DEFAULTSOCKETDIR "/ipfixprobe_" + std::to_string(parser.m_pid) + ".sock"; + fd = connect_to_exporter(path.c_str()); + if (fd == -1) { + error("connecting to exporter"); + goto EXIT; + } + while (!stop) { + *(uint32_t *) buffer = MSG_MAGIC; + // Send stats data request + if (send_data(fd, sizeof(uint32_t), buffer)) { + status = EXIT_FAILURE; + break; + } + + // Receive message header + if (recv_data(fd, sizeof(msg_header_t), buffer)) { + status = EXIT_FAILURE; + break; + } + + // Check if message header is correct + if (hdr->magic != MSG_MAGIC) { + error("received data are invalid"); + status = EXIT_FAILURE; + break; + } + + // Receive array of various stats from exporter + if (recv_data(fd, hdr->size, (buffer + sizeof(msg_header_t)))) { + status = EXIT_FAILURE; + break; + } + + // Erase previous stats output lines + for (size_t i = 0; i < lines_written; i++) { + std::cout << "\033[A\33[2K\r"; + } + + // Process received stats + std::cout << "Input stats:" << std::endl << + std::setw(3) << "#" << + std::setw(10) << "packets" << + std::setw(10) << "parsed" << + std::setw(16) << "bytes" << + std::setw(10) << "dropped" << + std::setw(10) << "qtime" << std::endl; + + uint8_t *data = buffer + sizeof(msg_header_t); + size_t idx = 0; + for (size_t i = 0; i < hdr->inputs; i++) { + InputStats *stats = (InputStats *) data; + data += sizeof(InputStats); + std::cout << + std::setw(3) << idx++ << " " << + std::setw(9) << stats->packets << " " << + std::setw(9) << stats->parsed << " " << + std::setw(15) << stats->bytes << " " << + std::setw(9) << stats->dropped << " " << + std::setw(9) << stats->qtime << " " << std::endl; + } + + std::cout << "Output stats:" << std::endl << + std::setw(3) << "#" << + std::setw(10) << "biflows" << + std::setw(10) << "packets" << + std::setw(16) << "bytes" << + std::setw(10) << "dropped" << std::endl; + + idx = 0; + for (size_t i = 0; i < hdr->outputs; i++) { + OutputStats *stats = (OutputStats *) data; + data += sizeof(OutputStats); + std::cout << + std::setw(3) << idx++ << " " << + std::setw(9) << stats->biflows << " " << + std::setw(9) << stats->packets << " " << + std::setw(15) << stats->bytes << " " << + std::setw(9) << stats->dropped << " " << std::endl; + } + + lines_written = hdr->inputs + hdr->outputs + 4; + usleep(1000000); + } +EXIT: + if (fd != -1) { + close(fd); + } + return status; +} diff --git a/stats.cpp b/stats.cpp new file mode 100644 index 00000000..56dddc74 --- /dev/null +++ b/stats.cpp @@ -0,0 +1,165 @@ +/** + * \file stats.cpp + * \brief Implementation of service IO functions, modified code from libtrap service ifc and trap_stats + * \author Jiri Havranek + * \date 2021 + */ +/* + * Copyright (C) 2021 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#include +#include + +#include +#include +#include +#include +#include + +#include "stats.hpp" + +namespace ipxp +{ + +int connect_to_exporter(const char *path) +{ + int sd; + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + snprintf(addr.sun_path, sizeof(addr.sun_path) - 1, "%s", path); + + sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sd != -1) { + if (connect(sd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { + perror("unable to connect"); + close(sd); + return -1; + } + } + return sd; +} + +int create_stats_sock(const char *path) +{ + int sd; + struct sockaddr_un addr; + + addr.sun_family = AF_UNIX; + snprintf(addr.sun_path, sizeof(addr.sun_path) - 1, "%s", path); + + unlink(addr.sun_path); + sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sd) { + if (bind(sd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { + perror("unable to bind socket"); + close(sd); + return -1; + } + if (listen(sd, 1) == -1) { + perror("unable to listen on socket"); + close(sd); + return -1; + } + if (chmod(addr.sun_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH) == -1) { + perror("unable to set access rights"); + close(sd); + return -1; + } + } + return sd; +} + +int recv_data(int sd, uint32_t size, void *data) +{ + size_t num_of_timeouts = 0; + size_t total_received = 0; + ssize_t last_received = 0; + + while (total_received < size) { + last_received = recv(sd, (uint8_t *) data + total_received, size - total_received, MSG_DONTWAIT); + if (last_received == 0) { + return -1; + } else if (last_received == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + num_of_timeouts++; + if (num_of_timeouts > SERVICE_WAIT_MAX_TRY) { + return -1; + } else { + usleep(SERVICE_WAIT_BEFORE_TIMEOUT); + continue; + } + } + return -1; + } + total_received += last_received; + } + return 0; +} + +int send_data(int sd, uint32_t size, void *data) +{ + size_t num_of_timeouts = 0; + size_t total_sent = 0; + ssize_t last_sent = 0; + + while (total_sent < size) { + last_sent = send(sd, (uint8_t *) data + total_sent, size - total_sent, MSG_DONTWAIT); + if (last_sent == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + num_of_timeouts++; + if (num_of_timeouts > SERVICE_WAIT_MAX_TRY) { + return -1; + } else { + usleep(SERVICE_WAIT_BEFORE_TIMEOUT); + continue; + } + } + return -1; + } + total_sent += last_sent; + } + return 0; +} + +std::string create_sockpath(const char *id) +{ + return DEFAULTSOCKETDIR "/ipfixprobe_" + std::string(id) + ".sock"; +} + +} diff --git a/stats.hpp b/stats.hpp new file mode 100644 index 00000000..74206b96 --- /dev/null +++ b/stats.hpp @@ -0,0 +1,87 @@ +/** + * \file stats.hpp + * \brief Exporter stats definition and service IO functions + * \author Jiri Havranek + * \date 2021 + */ +/* + * Copyright (C) 2021 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#ifndef IPXP_STATS_HPP +#define IPXP_STATS_HPP + +#define SERVICE_WAIT_BEFORE_TIMEOUT 250000 ///< Timeout after EAGAIN or EWOULDBLOCK errno returned from service send() and recv(). +#define SERVICE_WAIT_MAX_TRY 8 ///< A maximal count of repeated timeouts per each service recv() and send() function call. + +#define MSG_MAGIC 0xBEEFFEEB + +namespace ipxp +{ + +struct InputStats { + uint64_t packets; + uint64_t parsed; + uint64_t bytes; + uint64_t qtime; + uint64_t dropped; +}; + +struct OutputStats { + uint64_t biflows; + uint64_t bytes; + uint64_t packets; + uint64_t dropped; +}; + +typedef struct msg_header_s +{ + uint32_t magic; + uint16_t size; + uint16_t inputs; + uint16_t outputs; + + // followed by arrays of plugin stats +} msg_header_t; + +int connect_to_exporter(const char *path); +int create_stats_sock(const char *path); +int recv_data(int sd, uint32_t size, void *data); +int send_data(int sd, uint32_t size, void *data); +std::string create_sockpath(const char *id); + +} +#endif /* IPXP_STATS_HPP */ diff --git a/workers.cpp b/workers.cpp index 3abecc45..e172c6e8 100644 --- a/workers.cpp +++ b/workers.cpp @@ -52,13 +52,14 @@ namespace ipxp { #define MICRO_SEC 1000000L void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit, ipx_ring_t *queue, - std::promise *output, std::shared_future *terminate) + std::promise *out, std::atomic *out_stats, std::shared_future *terminate) { struct timespec start; struct timespec end; size_t i = 0; InputPlugin::Result ret; - InputStats stats = {0, 0, 0, 0, false, ""}; + InputStats stats = {0, 0, 0, 0, 0}; + WorkerResult res = {false, ""}; while (terminate->wait_for(std::chrono::seconds(0)) != std::future_status::ready) { PacketBlock *block = &pkts[i]; block->cnt = 0; @@ -73,14 +74,16 @@ void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint try { ret = plugin->get(*block); } catch (PluginError &e) { - stats.error = true; - stats.msg = e.what(); + res.error = true; + res.msg = e.what(); break; } if (ret == InputPlugin::Result::TIMEOUT) { usleep(1); continue; } else if (ret == InputPlugin::Result::PARSED) { + stats.packets = plugin->m_processed; + stats.parsed = plugin->m_parsed; stats.bytes += block->bytes; #ifdef __linux__ const clockid_t clk_id = CLOCK_MONOTONIC_COARSE; @@ -97,22 +100,26 @@ void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint } stats.qtime += time; i = (i + 1) % block_cnt; + + out_stats->store(stats); } else if (ret == InputPlugin::Result::ERROR) { - stats.error = true; - stats.msg = "error occured during reading"; + res.error = true; + res.msg = "error occured during reading"; break; } else if (ret == InputPlugin::Result::END_OF_FILE) { break; } } - stats.parsed = plugin->m_parsed; + stats.packets = plugin->m_processed; - output->set_value(stats); + stats.parsed = plugin->m_parsed; + out_stats->store(stats); + out->set_value(res); } -void storage_worker(StoragePlugin *cache, ipx_ring_t *queue, std::promise *output, std::shared_future *terminate) +void storage_worker(StoragePlugin *cache, ipx_ring_t *queue, std::promise *out, std::shared_future *terminate) { - StorageStats stats = {false, ""}; + WorkerResult res = {false, ""}; bool timeout = false; struct timeval ts = {0, 0}; struct timespec begin = {0, 0}; @@ -131,8 +138,8 @@ void storage_worker(StoragePlugin *cache, ipx_ring_t *queue, std::promisepkts[block->cnt - 1].ts; } catch (PluginError &e) { - stats.error = true; - stats.msg = e.what(); + res.error = true; + res.msg = e.what(); break; } timeout = false; @@ -159,7 +166,7 @@ void storage_worker(StoragePlugin *cache, ipx_ring_t *queue, std::promiseset_value(stats); + out->set_value(res); } static long timeval_diff(const struct timeval *start, const struct timeval *end) @@ -168,9 +175,11 @@ static long timeval_diff(const struct timeval *start, const struct timeval *end) + (end->tv_usec - start->tv_usec); } -void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promise *output, uint32_t fps, std::shared_future *terminate) +void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promise *out, std::atomic *out_stats, + uint32_t fps, std::shared_future *terminate) { - OutputStats stats = {0, 0, 0, 0, false, ""}; + WorkerResult res = {false, ""}; + OutputStats stats = {0, 0, 0, 0}; struct timespec sleep_time = {0}; struct timeval begin; struct timeval end; @@ -203,11 +212,13 @@ void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promisesrc_bytes + flow->dst_bytes; stats.packets += flow->src_packets + flow->dst_packets; + stats.dropped = exp->m_flows_dropped; + out_stats->store(stats); try { exp->export_flow(*flow); } catch (PluginError &e) { - stats.error = true; - stats.msg = e.what(); + res.error = true; + res.msg = e.what(); break; } @@ -246,7 +257,8 @@ void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promiseflush(); stats.dropped = exp->m_flows_dropped; - output->set_value(stats); + out_stats->store(stats); + out->set_value(res); } } diff --git a/workers.hpp b/workers.hpp index 483d536e..068d03c6 100644 --- a/workers.hpp +++ b/workers.hpp @@ -45,6 +45,7 @@ #define IPXP_WORKERS_HPP #include +#include #include #include @@ -53,29 +54,13 @@ #include #include +#include "stats.hpp" + namespace ipxp { #define MICRO_SEC 1000000L -struct InputStats { - uint64_t packets; - uint64_t parsed; - uint64_t bytes; - uint64_t qtime; - bool error; - std::string msg; -}; - -struct StorageStats { - bool error; - std::string msg; -}; - -struct OutputStats { - uint64_t biflows; - uint64_t bytes; - uint64_t packets; - uint64_t dropped; +struct WorkerResult { bool error; std::string msg; }; @@ -84,12 +69,13 @@ struct WorkPipeline { struct { InputPlugin *plugin; std::thread *thread; - std::promise *promise; + std::promise *promise; + std::atomic *stats; } input; struct { StoragePlugin *plugin; std::thread *thread; - std::promise *promise; + std::promise *promise; std::vector plugins; } storage; ipx_ring_t *queue; @@ -98,14 +84,16 @@ struct WorkPipeline { struct OutputWorker { OutputPlugin *plugin; std::thread *thread; - std::promise *promise; + std::promise *promise; + std::atomic *stats; ipx_ring_t *queue; }; void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit, ipx_ring_t *queue, - std::promise *output, std::shared_future *terminate); -void storage_worker(StoragePlugin *cache, ipx_ring_t *queue, std::promise *output, std::shared_future *terminate); -void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promise *output, uint32_t fps, std::shared_future *terminate); + std::promise *out, std::atomic *out_stats, std::shared_future *terminate); +void storage_worker(StoragePlugin *cache, ipx_ring_t *queue, std::promise *out, std::shared_future *terminate); +void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promise *out, std::atomic *out_stats, + uint32_t fps, std::shared_future *terminate); } From 085d58f84fa95f2de32337562b115131ea111b51 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Tue, 14 Dec 2021 18:46:51 +0100 Subject: [PATCH 09/20] added dropped packets stats to input plugin class --- include/ipfixprobe/input.hpp | 5 +++-- input/benchmark.cpp | 2 +- input/pcap.cpp | 4 ++-- input/raw.cpp | 2 +- workers.cpp | 6 ++++-- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/include/ipfixprobe/input.hpp b/include/ipfixprobe/input.hpp index 090f7998..b6ccac35 100644 --- a/include/ipfixprobe/input.hpp +++ b/include/ipfixprobe/input.hpp @@ -66,10 +66,11 @@ class InputPlugin : public Plugin ERROR }; - uint64_t m_processed; + uint64_t m_seen; uint64_t m_parsed; + uint64_t m_dropped; - InputPlugin() : m_processed(0), m_parsed(0) {} + InputPlugin() : m_seen(0), m_parsed(0), m_dropped(0) {} virtual ~InputPlugin() {} virtual Result get(PacketBlock &packets) = 0; diff --git a/input/benchmark.cpp b/input/benchmark.cpp index 89d5fd16..b78a2cf7 100644 --- a/input/benchmark.cpp +++ b/input/benchmark.cpp @@ -131,7 +131,7 @@ InputPlugin::Result Benchmark::get(PacketBlock &packets) break; } } - m_processed += packets.cnt; + m_seen += packets.cnt; m_parsed += packets.cnt; return res; } diff --git a/input/pcap.cpp b/input/pcap.cpp index 62aaeff5..8e229e5b 100644 --- a/input/pcap.cpp +++ b/input/pcap.cpp @@ -263,13 +263,13 @@ InputPlugin::Result PcapReader::get(PacketBlock &packets) return Result::TIMEOUT; } if (ret > 0) { - m_processed += ret; + m_seen += ret; m_parsed += opt.pblock->cnt; return opt.packet_valid ? Result::PARSED : Result::NOT_PARSED; } } else { if (opt.pblock->cnt) { - m_processed += ret ? ret : opt.pblock->cnt; + m_seen += ret ? ret : opt.pblock->cnt; m_parsed += opt.pblock->cnt; return Result::PARSED; } else if (ret == 0) { diff --git a/input/raw.cpp b/input/raw.cpp index c0a89e03..5563dd32 100644 --- a/input/raw.cpp +++ b/input/raw.cpp @@ -369,7 +369,7 @@ InputPlugin::Result RawReader::get(PacketBlock &packets) throw PluginError("error during reading from socket"); } - m_processed += ret; + m_seen += ret; m_parsed += packets.cnt; return packets.cnt ? Result::PARSED : Result::NOT_PARSED; } diff --git a/workers.cpp b/workers.cpp index e172c6e8..c8df9f36 100644 --- a/workers.cpp +++ b/workers.cpp @@ -82,8 +82,9 @@ void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint usleep(1); continue; } else if (ret == InputPlugin::Result::PARSED) { - stats.packets = plugin->m_processed; + stats.packets = plugin->m_seen; stats.parsed = plugin->m_parsed; + stats.dropped = plugin->m_dropped; stats.bytes += block->bytes; #ifdef __linux__ const clockid_t clk_id = CLOCK_MONOTONIC_COARSE; @@ -111,8 +112,9 @@ void input_worker(InputPlugin *plugin, PacketBlock *pkts, size_t block_cnt, uint } } - stats.packets = plugin->m_processed; + stats.packets = plugin->m_seen; stats.parsed = plugin->m_parsed; + stats.dropped = plugin->m_dropped; out_stats->store(stats); out->set_value(res); } From 576540696f5e79ef7d87c225ff5e26a7d7e55048 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Tue, 14 Dec 2021 18:57:43 +0100 Subject: [PATCH 10/20] modified conditional build of parser internals --- input/parser.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/input/parser.cpp b/input/parser.cpp index d33c7687..988bca67 100644 --- a/input/parser.cpp +++ b/input/parser.cpp @@ -47,9 +47,9 @@ #include #include -#ifndef WITH_NDP +#ifdef WITH_PCAP #include -#endif /* WITH_NDP */ +#endif /* WITH_PCAP */ #include "parser.hpp" #include "headers.hpp" @@ -141,7 +141,7 @@ inline uint16_t parse_eth_hdr(const u_char *data_ptr, uint16_t data_len, Packet return hdr_len; } -#ifndef WITH_NDP +#ifdef WITH_PCAP /** * \brief Parse specific fields from SLL frame header. * \param [in] data_ptr Pointer to begin of header. @@ -178,7 +178,7 @@ inline uint16_t parse_sll(const u_char *data_ptr, uint16_t data_len, Packet *pkt pkt->ethertype = ntohs(sll->sll_protocol); return sizeof(struct sll_header); } -#endif /* WITH_NDP */ +#endif /* WITH_PCAP */ /** @@ -614,7 +614,7 @@ void parse_packet(parser_opt_t *opt, struct timeval ts, const uint8_t *data, uin uint32_t l3_hdr_offset = 0; uint32_t l4_hdr_offset = 0; try { - #ifndef WITH_NDP + #ifdef WITH_PCAP if (opt->datalink == DLT_EN10MB) { data_offset = parse_eth_hdr(data, caplen, pkt); } else { @@ -622,7 +622,7 @@ void parse_packet(parser_opt_t *opt, struct timeval ts, const uint8_t *data, uin } #else data_offset = parse_eth_hdr(data, caplen, pkt); - #endif /* WITH_NDP */ + #endif /* WITH_PCAP */ if (pkt->ethertype == ETH_P_TRILL) { data_offset += parse_trill(data + data_offset, caplen - data_offset, pkt); From e9211e5a6b43b53504a079a13e49102120415e0c Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Tue, 14 Dec 2021 19:04:09 +0100 Subject: [PATCH 11/20] stats: end when --one parameter is used --- ipfixprobe_stats.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ipfixprobe_stats.cpp b/ipfixprobe_stats.cpp index 8d6bb6f9..cc497395 100644 --- a/ipfixprobe_stats.cpp +++ b/ipfixprobe_stats.cpp @@ -200,6 +200,10 @@ int main(int argc, char *argv[]) std::setw(9) << stats->dropped << " " << std::endl; } + if (parser.m_one) { + break; + } + lines_written = hdr->inputs + hdr->outputs + 4; usleep(1000000); } From e33a3a4dffece312466f5b8f64e85bf0e6b6215a Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Tue, 14 Dec 2021 19:04:43 +0100 Subject: [PATCH 12/20] updated README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 52d3bfd6..7a9cbd68 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ This application creates biflows from packet input and exports them to output interface. ## Requirements +- libatomic - kernel version at least 3.19 when using raw sockets input plugin enabled by default (disable with `--without-raw` parameter for `./configure`) - [libpcap](http://www.tcpdump.org/) when compiling with pcap plugin (`--with-pcap` parameter) - netcope-common [COMBO cards](https://www.liberouter.org/technologies/cards/) when compiling with ndp plugin (`--with-ndp` parameter) From 24b7028f29bf8e46d6c5b2bc702daad65017944b Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Tue, 14 Dec 2021 19:15:27 +0100 Subject: [PATCH 13/20] configure: added libatomic check --- configure.ac | 2 ++ 1 file changed, 2 insertions(+) diff --git a/configure.ac b/configure.ac index 4b8e8c9f..e39ef7ee 100644 --- a/configure.ac +++ b/configure.ac @@ -81,6 +81,8 @@ AC_ARG_WITH([defaultsocketdir], AC_SUBST([defaultsocketdir], [$with_defaultsocketdir]) AC_DEFINE_DIR([DEFAULTSOCKETDIR], [defaultsocketdir], [Default path to socket directory]) +AC_CHECK_LIB(atomic, __atomic_store, [libatomic=yes], AC_MSG_ERROR([libatomic not found])) + ### gtest AC_ARG_WITH([gtest], AC_HELP_STRING([--with-gtest],[Compile ipfixprobe with gtest framework]), From d8be5596af66a26fa3b4205fd734b183e569ead8 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Tue, 14 Dec 2021 19:26:43 +0100 Subject: [PATCH 14/20] updated rpm spec file --- ipfixprobe.spec.in | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ipfixprobe.spec.in b/ipfixprobe.spec.in index 5fee096d..65d22b0e 100644 --- a/ipfixprobe.spec.in +++ b/ipfixprobe.spec.in @@ -45,7 +45,8 @@ Vendor: CESNET, z.s.p.o. Packager: @USERNAME@ <@USERMAIL@> BuildRoot: %{_tmppath}/%{name}-%{version}-%{release} Summary: IPFIX flow exporter with various extending IPFIX elements exported by plugins. -BuildRequires: gcc gcc-c++ make doxygen pkgconfig +Requires: libatomic +BuildRequires: gcc gcc-c++ make doxygen pkgconfig libatomic Provides: ipfixprobe %if %{with ndp} @@ -99,6 +100,7 @@ This package contains header file for liburfilter. %files %attr(0755, root, nemead) %{_bindir}/ipfixprobe +%attr(0755, root, nemead) %{_bindir}/ipfixprobe_stats %attr(0755, root, nemead) %{_bindir}/ipfixprobed %{_sysconfdir}/bash_completion.d/ipfixprobe.bash %{_sysconfdir}/ipfixprobe/link0.conf.example From a0277a0a94ac7547b284856abae40167a5704ab2 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Wed, 15 Dec 2021 08:29:57 +0100 Subject: [PATCH 15/20] ppoll changed to poll --- ipfixprobe.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipfixprobe.cpp b/ipfixprobe.cpp index 804ef205..42da65da 100644 --- a/ipfixprobe.cpp +++ b/ipfixprobe.cpp @@ -503,7 +503,7 @@ void serve_stat_clients(ipxp_conf_t &conf, struct pollfd pfds[2]) uint8_t buffer[100000]; size_t written = 0; msg_header_t *hdr = (msg_header_t *) buffer; - int ret = ppoll(pfds, 2, 0, NULL); + int ret = poll(pfds, 2, 0); if (ret <= 0) { return; } From 706fe257c62c076f8c384e60c402c115e61b42c0 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Fri, 17 Dec 2021 22:38:15 +0100 Subject: [PATCH 16/20] added fd check before close --- ipfixprobe.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ipfixprobe.cpp b/ipfixprobe.cpp index 42da65da..e2fb0fa8 100644 --- a/ipfixprobe.cpp +++ b/ipfixprobe.cpp @@ -600,8 +600,12 @@ void main_loop(ipxp_conf_t &conf) usleep(1000); } - close(pfds[0].fd); - close(pfds[1].fd); + if (pfds[0].fd != -1) { + close(pfds[0].fd); + } + if (pfds[1].fd != -1) { + close(pfds[1].fd); + } unlink(sock_path.c_str()); finish(conf); } From 0807dc2ca0f18b41eef7a4bb236af86bced93dec Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Fri, 17 Dec 2021 22:45:02 +0100 Subject: [PATCH 17/20] ipfix: added contraints for number of process plugins --- output/ipfix.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/output/ipfix.cpp b/output/ipfix.cpp index 033260a8..8702fddb 100644 --- a/output/ipfix.cpp +++ b/output/ipfix.cpp @@ -200,6 +200,9 @@ void IPFIXExporter::init(const char *params, Plugins &plugins) init(params); extension_cnt = get_extension_cnt(); + if (extension_cnt > 64) { + throw PluginError("output plugin operates only with up to 64 running plugins"); + } extensions = new RecordExt*[extension_cnt]; for (int i = 0; i < extension_cnt; i++) { extensions[i] = nullptr; @@ -211,7 +214,7 @@ void IPFIXExporter::init(const char *params, Plugins &plugins) if (ext == nullptr) { continue; } - if (ext->m_ext_id > 64) { + if (ext->m_ext_id >= 64) { throw PluginError("detected plugin ID >64"); } else if (ext->m_ext_id >= extension_cnt) { throw PluginError("detected plugin ID larger than number of extensions"); @@ -274,7 +277,7 @@ template_t *IPFIXExporter::get_template(const Flow &flow) RecordExt *ext = flow.m_exts; while (ext != nullptr) { - if (ext->m_ext_id >= extension_cnt || ext->m_ext_id >= 64) { + if (ext->m_ext_id < 0 || ext->m_ext_id >= extension_cnt) { throw PluginError("encountered invalid extension id"); } extensions[ext->m_ext_id] = ext; From f72349f9b4bf9ed608440655b273f855eddbe58a Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Fri, 17 Dec 2021 22:45:26 +0100 Subject: [PATCH 18/20] ssdp: added missing extension id registration --- process/ssdp.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/process/ssdp.cpp b/process/ssdp.cpp index 387fe8bd..a3963bc5 100644 --- a/process/ssdp.cpp +++ b/process/ssdp.cpp @@ -53,6 +53,7 @@ __attribute__((constructor)) static void register_this_plugin() { static PluginRecord rec = PluginRecord("ssdp", [](){return new SSDPPlugin();}); register_plugin(&rec); + RecordExtSSDP::REGISTERED_ID = register_extension(); } // #define DEBUG_SSDP From e29b0f2209090e0fa326f36d2439ad4847769c59 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Tue, 21 Dec 2021 19:46:31 +0100 Subject: [PATCH 19/20] stats: code improvements --- stats.cpp | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/stats.cpp b/stats.cpp index 56dddc74..2f03c43b 100644 --- a/stats.cpp +++ b/stats.cpp @@ -57,62 +57,62 @@ namespace ipxp int connect_to_exporter(const char *path) { - int sd; + int fd; struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; snprintf(addr.sun_path, sizeof(addr.sun_path) - 1, "%s", path); - sd = socket(AF_UNIX, SOCK_STREAM, 0); - if (sd != -1) { - if (connect(sd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd != -1) { + if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { perror("unable to connect"); - close(sd); + close(fd); return -1; } } - return sd; + return fd; } int create_stats_sock(const char *path) { - int sd; + int fd; struct sockaddr_un addr; addr.sun_family = AF_UNIX; snprintf(addr.sun_path, sizeof(addr.sun_path) - 1, "%s", path); unlink(addr.sun_path); - sd = socket(AF_UNIX, SOCK_STREAM, 0); - if (sd) { - if (bind(sd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd) { + if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { perror("unable to bind socket"); - close(sd); + close(fd); return -1; } - if (listen(sd, 1) == -1) { + if (listen(fd, 1) == -1) { perror("unable to listen on socket"); - close(sd); + close(fd); return -1; } if (chmod(addr.sun_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH) == -1) { perror("unable to set access rights"); - close(sd); + close(fd); return -1; } } - return sd; + return fd; } -int recv_data(int sd, uint32_t size, void *data) +int recv_data(int fd, uint32_t size, void *data) { size_t num_of_timeouts = 0; size_t total_received = 0; ssize_t last_received = 0; while (total_received < size) { - last_received = recv(sd, (uint8_t *) data + total_received, size - total_received, MSG_DONTWAIT); + last_received = recv(fd, (uint8_t *) data + total_received, size - total_received, MSG_DONTWAIT); if (last_received == 0) { return -1; } else if (last_received == -1) { @@ -132,14 +132,14 @@ int recv_data(int sd, uint32_t size, void *data) return 0; } -int send_data(int sd, uint32_t size, void *data) +int send_data(int fd, uint32_t size, void *data) { size_t num_of_timeouts = 0; size_t total_sent = 0; ssize_t last_sent = 0; while (total_sent < size) { - last_sent = send(sd, (uint8_t *) data + total_sent, size - total_sent, MSG_DONTWAIT); + last_sent = send(fd, (uint8_t *) data + total_sent, size - total_sent, MSG_DONTWAIT); if (last_sent == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { num_of_timeouts++; From 754addb5cc1f0793d74e9107b5108e1adb4e2094 Mon Sep 17 00:00:00 2001 From: Jiri Havranek Date: Tue, 21 Dec 2021 20:56:46 +0100 Subject: [PATCH 20/20] stats: fixed issues reported by coverity --- ipfixprobe.cpp | 3 ++- ipfixprobe_stats.cpp | 27 ++++++++++++++------------- stats.cpp | 2 +- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/ipfixprobe.cpp b/ipfixprobe.cpp index e2fb0fa8..baad6d9c 100644 --- a/ipfixprobe.cpp +++ b/ipfixprobe.cpp @@ -543,7 +543,8 @@ void serve_stat_clients(ipxp_conf_t &conf, struct pollfd pfds[2]) int fd = accept(pfds[0].fd, NULL, NULL); if (pfds[1].fd == -1) { pfds[1].fd = fd; - } else { + } else if (fd != -1) { + // Close incoming connection close(fd); } } diff --git a/ipfixprobe_stats.cpp b/ipfixprobe_stats.cpp index cc497395..a9299e42 100644 --- a/ipfixprobe_stats.cpp +++ b/ipfixprobe_stats.cpp @@ -104,28 +104,29 @@ int main(int argc, char *argv[]) std::string path; IpfixStatsParser parser; + signal(SIGTERM, signal_handler); signal(SIGINT, signal_handler); - try { parser.parse(argc - 1, const_cast(argv) + 1); - } catch (ParserError &e) { + + if (parser.m_help) { + parser.usage(std::cout, 0); + goto EXIT; + } + + path = DEFAULTSOCKETDIR "/ipfixprobe_" + std::to_string(parser.m_pid) + ".sock"; + fd = connect_to_exporter(path.c_str()); + if (fd == -1) { + error("connecting to exporter"); + goto EXIT; + } + } catch (std::runtime_error &e) { error(e.what()); status = EXIT_FAILURE; goto EXIT; } - if (parser.m_help) { - parser.usage(std::cout, 0); - goto EXIT; - } - - path = DEFAULTSOCKETDIR "/ipfixprobe_" + std::to_string(parser.m_pid) + ".sock"; - fd = connect_to_exporter(path.c_str()); - if (fd == -1) { - error("connecting to exporter"); - goto EXIT; - } while (!stop) { *(uint32_t *) buffer = MSG_MAGIC; // Send stats data request diff --git a/stats.cpp b/stats.cpp index 2f03c43b..e4b5590f 100644 --- a/stats.cpp +++ b/stats.cpp @@ -85,7 +85,7 @@ int create_stats_sock(const char *path) unlink(addr.sun_path); fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (fd) { + if (fd != -1) { if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { perror("unable to bind socket"); close(fd);