From d23c85d0f0875c56026e45389725e15d7f781817 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Sun, 18 Aug 2024 11:15:47 +0200 Subject: [PATCH] Fix data race in opentelemetry wrapper With blockwise transfer enabled a data race in message.BodySize could occur. The issue is fixed by getting and caching of the message size before writing of the response message is started. --- coap-gateway/service/service.go | 11 ++++-- coap-gateway/service/session.go | 4 +- coap-gateway/test/test.go | 26 ++++-------- pkg/opentelemetry/otelcoap/opentelemetry.go | 25 +++++++----- test/helm/mock.plgd.cloud.yaml | 6 +-- test/helm/try.plgd.cloud.yaml | 36 ++++++++--------- test/test.go | 44 ++++++++++++++++++--- 7 files changed, 92 insertions(+), 60 deletions(-) diff --git a/coap-gateway/service/service.go b/coap-gateway/service/service.go index bb0cfc702..4975f0e78 100644 --- a/coap-gateway/service/service.go +++ b/coap-gateway/service/service.go @@ -342,7 +342,12 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg tracerProvider: tracerProvider, } - return s.createServices(fileWatcher, logger) + ss, err := s.createServices(fileWatcher, logger) + if err != nil { + nats.Close() + return nil, fmt.Errorf("cannot create services: %w", err) + } + return ss, nil } func getDeviceID(client *session) string { @@ -410,7 +415,7 @@ func (s *Service) processCommandTask(req *mux.Message, client *session, span tra span.SetStatus(otelCodes.Error, err.Error()) } if resp != nil { - otelcoap.MessageSentEvent(req.Context(), resp) + otelcoap.MessageSentEvent(req.Context(), otelcoap.MakeMessage(resp)) span.SetAttributes(otelcoap.StatusCodeAttr(resp.Code())) } client.logRequestResponse(req, resp, err) @@ -426,7 +431,7 @@ func (s *Service) makeCommandTask(req *mux.Message, client *session, fnc func(re ctx, span := otelcoap.Start(req.Context(), path, req.Code().String(), otelcoap.WithTracerProvider(s.tracerProvider), otelcoap.WithSpanOptions(trace.WithSpanKind(trace.SpanKindServer))) span.SetAttributes(semconv.NetPeerNameKey.String(client.deviceID())) req.SetContext(ctx) - otelcoap.MessageReceivedEvent(ctx, req.Message) + otelcoap.MessageReceivedEvent(ctx, otelcoap.MakeMessage(req.Message)) otelcoap.SetRequest(ctx, req.Message) x := struct { diff --git a/coap-gateway/service/session.go b/coap-gateway/service/session.go index b4f7f7803..b8123bf1d 100644 --- a/coap-gateway/service/session.go +++ b/coap-gateway/service/session.go @@ -219,7 +219,7 @@ func (c *session) do(req *pool.Message) (*pool.Message, error) { defer span.End() span.SetAttributes(semconv.NetPeerNameKey.String(c.deviceID())) - otelcoap.MessageSentEvent(ctx, req) + otelcoap.MessageSentEvent(ctx, otelcoap.MakeMessage(req)) resp, err := c.coapConn.Do(req) if err != nil { @@ -227,7 +227,7 @@ func (c *session) do(req *pool.Message) (*pool.Message, error) { span.SetStatus(otelCodes.Error, err.Error()) return nil, err } - otelcoap.MessageReceivedEvent(ctx, resp) + otelcoap.MessageReceivedEvent(ctx, otelcoap.MakeMessage(resp)) span.SetAttributes(otelcoap.StatusCodeAttr(resp.Code())) return resp, nil diff --git a/coap-gateway/test/test.go b/coap-gateway/test/test.go index ad4884ff8..206d70111 100644 --- a/coap-gateway/test/test.go +++ b/coap-gateway/test/test.go @@ -72,26 +72,14 @@ func SetUp(t require.TestingT) (tearDown func()) { } func checkForClosedSockets(t require.TestingT, cfg service.Config) { - protocolClosed := make([]bool, len(cfg.APIs.COAP.Protocols)) - // wait for all sockets to be closed - max 3 minutes = 900*200 - for j := 0; j < 900; j++ { - allClosed := true - for i, protocol := range cfg.APIs.COAP.Protocols { - if protocolClosed[i] { - continue - } - protocolClosed[i] = test.IsListenSocketClosed(t, string(protocol), cfg.APIs.COAP.Addr) - if protocolClosed[i] { - continue - } - allClosed = false - break - } - if allClosed { - break - } - time.Sleep(time.Millisecond * 200) + sockets := make(test.ListenSockets, 0, len(cfg.APIs.COAP.Protocols)) + for _, protocol := range cfg.APIs.COAP.Protocols { + sockets = append(sockets, test.ListenSocket{ + Network: string(protocol), + Address: cfg.APIs.COAP.Addr, + }) } + sockets.CheckForClosedSockets(t) } // New creates test coap-gateway. diff --git a/pkg/opentelemetry/otelcoap/opentelemetry.go b/pkg/opentelemetry/otelcoap/opentelemetry.go index cbe67c104..be3f879e3 100644 --- a/pkg/opentelemetry/otelcoap/opentelemetry.go +++ b/pkg/opentelemetry/otelcoap/opentelemetry.go @@ -28,10 +28,11 @@ var ( type MessageType attribute.KeyValue -// Event adds an event of the messageType to the span associated with the -// passed context with id and size (if message is a proto message). -func (m MessageType) Event(ctx context.Context, msg *pool.Message) { - span := trace.SpanFromContext(ctx) +type Message struct { + Size int +} + +func MakeMessage(msg *pool.Message) Message { tcpMsg := message.Message{ Code: msg.Code(), Token: msg.Token(), @@ -50,12 +51,18 @@ func (m MessageType) Event(ctx context.Context, msg *pool.Message) { size = 0 } - if bodySize, err := msg.BodySize(); err != nil { - size += int(bodySize) + return Message{ + Size: size, } +} + +// Event adds an event of the messageType to the span associated with the +// passed context with id and size (if message is a proto message). +func (m MessageType) Event(ctx context.Context, msg Message) { + span := trace.SpanFromContext(ctx) span.AddEvent("message", trace.WithAttributes( attribute.KeyValue(m), - semconv.MessageUncompressedSizeKey.Int(size), + semconv.MessageUncompressedSizeKey.Int(msg.Size), )) } @@ -84,11 +91,11 @@ func StatusCodeAttr(c codes.Code) attribute.KeyValue { return COAPStatusCodeKey.Int64(int64(c)) } -func MessageReceivedEvent(ctx context.Context, message *pool.Message) { +func MessageReceivedEvent(ctx context.Context, message Message) { messageReceived.Event(ctx, message) } -func MessageSentEvent(ctx context.Context, message *pool.Message) { +func MessageSentEvent(ctx context.Context, message Message) { messageSent.Event(ctx, message) } diff --git a/test/helm/mock.plgd.cloud.yaml b/test/helm/mock.plgd.cloud.yaml index ebe12df42..f60fd6427 100644 --- a/test/helm/mock.plgd.cloud.yaml +++ b/test/helm/mock.plgd.cloud.yaml @@ -80,12 +80,12 @@ mockoauthserver: clientSecret: "test" grantType: "clientCredentials" redirectURL: "https://primary.mock.plgd.cloud/things" - scopes: ['openid'] + scopes: ["openid"] - name: "plgd.web" clientID: "test" clientSecret: "test" redirectURL: "https://primary.mock.plgd.cloud/things" - scopes: ['openid'] + scopes: ["openid"] useInUi: true identitystore: log: @@ -191,4 +191,4 @@ m2moauthserver: enabled: true authorization: audience: - endpoints: \ No newline at end of file + endpoints: diff --git a/test/helm/try.plgd.cloud.yaml b/test/helm/try.plgd.cloud.yaml index b5bdc13cd..1721c21b7 100644 --- a/test/helm/try.plgd.cloud.yaml +++ b/test/helm/try.plgd.cloud.yaml @@ -12,19 +12,19 @@ global: -----END EC PRIVATE KEY----- oauth: device: - - name: "plgd.dps" - clientID: "..." - clientSecret: "..." - grantType: clientCredentials - redirectURL: "cloud.plgd.mobile://login-callback" - scopes: ['openid'] - useInUi: false - - name: "plgd.web" - clientID: "..." - clientSecret: "..." - scopes: ["offline_access"] - redirectURL: "https://try.plgd.cloud/devices" - useInUi: true + - name: "plgd.dps" + clientID: "..." + clientSecret: "..." + grantType: clientCredentials + redirectURL: "cloud.plgd.mobile://login-callback" + scopes: ["openid"] + useInUi: false + - name: "plgd.web" + clientID: "..." + clientSecret: "..." + scopes: ["offline_access"] + redirectURL: "https://try.plgd.cloud/devices" + useInUi: true web: clientID: "..." scopes: ["openid"] @@ -53,7 +53,7 @@ snippetservice: coapgateway: log: dumpBody: true - level: 'DEBUG' + level: "DEBUG" service: type: NodePort tcp: @@ -64,15 +64,15 @@ coapgateway: coap: requireBatchObserveEnabled: false protocols: - - "udp" - - "tcp" + - "udp" + - "tcp" tls: clientCertificateRequired: false deviceProvisioningService: apiDomain: "dps.try.plgd.cloud" log: dumpBody: true - level: 'DEBUG' + level: "DEBUG" apis: http: enabled: true @@ -122,4 +122,4 @@ deviceProvisioningService: name: "plgd.dps" clientId: "..." clientSecret: "..." - scopes: ["openid"] \ No newline at end of file + scopes: ["openid"] diff --git a/test/test.go b/test/test.go index fad511dee..3e8172c63 100644 --- a/test/test.go +++ b/test/test.go @@ -999,11 +999,16 @@ func GenerateDeviceIDbyIdx(deviceIndex int) string { return GenerateIDbyIdx("d", deviceIndex) } -func IsListenSocketClosed(t require.TestingT, target string, addStr string) bool { - if strings.Contains(target, "udp") { - addr, err := net.ResolveUDPAddr(target, addStr) +type ListenSocket struct { + Network string + Address string +} + +func (ls *ListenSocket) IsClosed(t require.TestingT) bool { + if strings.Contains(ls.Network, "udp") { + addr, err := net.ResolveUDPAddr(ls.Network, ls.Address) require.NoError(t, err) - c, err := net.ListenUDP(target, addr) + c, err := net.ListenUDP(ls.Network, addr) if err != nil { return false } @@ -1011,9 +1016,10 @@ func IsListenSocketClosed(t require.TestingT, target string, addStr string) bool require.NoError(t, err) return true } - addr, err := net.ResolveTCPAddr(target, addStr) + + addr, err := net.ResolveTCPAddr(ls.Network, ls.Address) require.NoError(t, err) - c, err := net.ListenTCP(target, addr) + c, err := net.ListenTCP(ls.Network, addr) if err != nil { return false } @@ -1021,3 +1027,29 @@ func IsListenSocketClosed(t require.TestingT, target string, addStr string) bool require.NoError(t, err) return true } + +type ListenSockets []ListenSocket + +func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) { + // wait for all sockets to be closed - max 3 minutes = 900*200 + socketClosed := make([]bool, len(ls)) + for j := 0; j < 900; j++ { + allClosed := true + for i, socket := range ls { + if socketClosed[i] { + continue + } + closed := socket.IsClosed(t) + socketClosed[i] = closed + if socketClosed[i] { + continue + } + allClosed = false + } + if allClosed { + return + } + time.Sleep(time.Millisecond * 200) + } + require.FailNow(t, "ports not closed") +}