Skip to content

Commit

Permalink
Fix data race in opentelemetry wrapper
Browse files Browse the repository at this point in the history
With blockwise transfer enabled a data race in message.BodySize
could occur. The issue is fixed by getting and caching of the
message size before writing of the response message is started.
  • Loading branch information
Danielius1922 committed Aug 18, 2024
1 parent 097dff5 commit d23c85d
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 60 deletions.
11 changes: 8 additions & 3 deletions coap-gateway/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,12 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
tracerProvider: tracerProvider,
}

return s.createServices(fileWatcher, logger)
ss, err := s.createServices(fileWatcher, logger)
if err != nil {
nats.Close()
return nil, fmt.Errorf("cannot create services: %w", err)
}
return ss, nil
}

func getDeviceID(client *session) string {
Expand Down Expand Up @@ -410,7 +415,7 @@ func (s *Service) processCommandTask(req *mux.Message, client *session, span tra
span.SetStatus(otelCodes.Error, err.Error())
}
if resp != nil {
otelcoap.MessageSentEvent(req.Context(), resp)
otelcoap.MessageSentEvent(req.Context(), otelcoap.MakeMessage(resp))
span.SetAttributes(otelcoap.StatusCodeAttr(resp.Code()))
}
client.logRequestResponse(req, resp, err)
Expand All @@ -426,7 +431,7 @@ func (s *Service) makeCommandTask(req *mux.Message, client *session, fnc func(re
ctx, span := otelcoap.Start(req.Context(), path, req.Code().String(), otelcoap.WithTracerProvider(s.tracerProvider), otelcoap.WithSpanOptions(trace.WithSpanKind(trace.SpanKindServer)))
span.SetAttributes(semconv.NetPeerNameKey.String(client.deviceID()))
req.SetContext(ctx)
otelcoap.MessageReceivedEvent(ctx, req.Message)
otelcoap.MessageReceivedEvent(ctx, otelcoap.MakeMessage(req.Message))
otelcoap.SetRequest(ctx, req.Message)

x := struct {
Expand Down
4 changes: 2 additions & 2 deletions coap-gateway/service/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,15 @@ func (c *session) do(req *pool.Message) (*pool.Message, error) {
defer span.End()
span.SetAttributes(semconv.NetPeerNameKey.String(c.deviceID()))

otelcoap.MessageSentEvent(ctx, req)
otelcoap.MessageSentEvent(ctx, otelcoap.MakeMessage(req))

resp, err := c.coapConn.Do(req)
if err != nil {
span.RecordError(err)
span.SetStatus(otelCodes.Error, err.Error())
return nil, err
}
otelcoap.MessageReceivedEvent(ctx, resp)
otelcoap.MessageReceivedEvent(ctx, otelcoap.MakeMessage(resp))
span.SetAttributes(otelcoap.StatusCodeAttr(resp.Code()))

return resp, nil
Expand Down
26 changes: 7 additions & 19 deletions coap-gateway/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,14 @@ func SetUp(t require.TestingT) (tearDown func()) {
}

func checkForClosedSockets(t require.TestingT, cfg service.Config) {
protocolClosed := make([]bool, len(cfg.APIs.COAP.Protocols))
// wait for all sockets to be closed - max 3 minutes = 900*200
for j := 0; j < 900; j++ {
allClosed := true
for i, protocol := range cfg.APIs.COAP.Protocols {
if protocolClosed[i] {
continue
}
protocolClosed[i] = test.IsListenSocketClosed(t, string(protocol), cfg.APIs.COAP.Addr)
if protocolClosed[i] {
continue
}
allClosed = false
break
}
if allClosed {
break
}
time.Sleep(time.Millisecond * 200)
sockets := make(test.ListenSockets, 0, len(cfg.APIs.COAP.Protocols))
for _, protocol := range cfg.APIs.COAP.Protocols {
sockets = append(sockets, test.ListenSocket{
Network: string(protocol),
Address: cfg.APIs.COAP.Addr,
})
}
sockets.CheckForClosedSockets(t)
}

// New creates test coap-gateway.
Expand Down
25 changes: 16 additions & 9 deletions pkg/opentelemetry/otelcoap/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ var (

type MessageType attribute.KeyValue

// Event adds an event of the messageType to the span associated with the
// passed context with id and size (if message is a proto message).
func (m MessageType) Event(ctx context.Context, msg *pool.Message) {
span := trace.SpanFromContext(ctx)
type Message struct {
Size int
}

func MakeMessage(msg *pool.Message) Message {
tcpMsg := message.Message{
Code: msg.Code(),
Token: msg.Token(),
Expand All @@ -50,12 +51,18 @@ func (m MessageType) Event(ctx context.Context, msg *pool.Message) {
size = 0
}

if bodySize, err := msg.BodySize(); err != nil {
size += int(bodySize)
return Message{
Size: size,
}
}

// Event adds an event of the messageType to the span associated with the
// passed context with id and size (if message is a proto message).
func (m MessageType) Event(ctx context.Context, msg Message) {
span := trace.SpanFromContext(ctx)
span.AddEvent("message", trace.WithAttributes(
attribute.KeyValue(m),
semconv.MessageUncompressedSizeKey.Int(size),
semconv.MessageUncompressedSizeKey.Int(msg.Size),
))
}

Expand Down Expand Up @@ -84,11 +91,11 @@ func StatusCodeAttr(c codes.Code) attribute.KeyValue {
return COAPStatusCodeKey.Int64(int64(c))
}

func MessageReceivedEvent(ctx context.Context, message *pool.Message) {
func MessageReceivedEvent(ctx context.Context, message Message) {
messageReceived.Event(ctx, message)
}

func MessageSentEvent(ctx context.Context, message *pool.Message) {
func MessageSentEvent(ctx context.Context, message Message) {
messageSent.Event(ctx, message)
}

Expand Down
6 changes: 3 additions & 3 deletions test/helm/mock.plgd.cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ mockoauthserver:
clientSecret: "test"
grantType: "clientCredentials"
redirectURL: "https://primary.mock.plgd.cloud/things"
scopes: ['openid']
scopes: ["openid"]
- name: "plgd.web"
clientID: "test"
clientSecret: "test"
redirectURL: "https://primary.mock.plgd.cloud/things"
scopes: ['openid']
scopes: ["openid"]
useInUi: true
identitystore:
log:
Expand Down Expand Up @@ -191,4 +191,4 @@ m2moauthserver:
enabled: true
authorization:
audience:
endpoints:
endpoints:
36 changes: 18 additions & 18 deletions test/helm/try.plgd.cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ global:
-----END EC PRIVATE KEY-----
oauth:
device:
- name: "plgd.dps"
clientID: "..."
clientSecret: "..."
grantType: clientCredentials
redirectURL: "cloud.plgd.mobile://login-callback"
scopes: ['openid']
useInUi: false
- name: "plgd.web"
clientID: "..."
clientSecret: "..."
scopes: ["offline_access"]
redirectURL: "https://try.plgd.cloud/devices"
useInUi: true
- name: "plgd.dps"
clientID: "..."
clientSecret: "..."
grantType: clientCredentials
redirectURL: "cloud.plgd.mobile://login-callback"
scopes: ["openid"]
useInUi: false
- name: "plgd.web"
clientID: "..."
clientSecret: "..."
scopes: ["offline_access"]
redirectURL: "https://try.plgd.cloud/devices"
useInUi: true
web:
clientID: "..."
scopes: ["openid"]
Expand Down Expand Up @@ -53,7 +53,7 @@ snippetservice:
coapgateway:
log:
dumpBody: true
level: 'DEBUG'
level: "DEBUG"
service:
type: NodePort
tcp:
Expand All @@ -64,15 +64,15 @@ coapgateway:
coap:
requireBatchObserveEnabled: false
protocols:
- "udp"
- "tcp"
- "udp"
- "tcp"
tls:
clientCertificateRequired: false
deviceProvisioningService:
apiDomain: "dps.try.plgd.cloud"
log:
dumpBody: true
level: 'DEBUG'
level: "DEBUG"
apis:
http:
enabled: true
Expand Down Expand Up @@ -122,4 +122,4 @@ deviceProvisioningService:
name: "plgd.dps"
clientId: "..."
clientSecret: "..."
scopes: ["openid"]
scopes: ["openid"]
44 changes: 38 additions & 6 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,25 +999,57 @@ func GenerateDeviceIDbyIdx(deviceIndex int) string {
return GenerateIDbyIdx("d", deviceIndex)
}

func IsListenSocketClosed(t require.TestingT, target string, addStr string) bool {
if strings.Contains(target, "udp") {
addr, err := net.ResolveUDPAddr(target, addStr)
type ListenSocket struct {
Network string
Address string
}

func (ls *ListenSocket) IsClosed(t require.TestingT) bool {
if strings.Contains(ls.Network, "udp") {
addr, err := net.ResolveUDPAddr(ls.Network, ls.Address)
require.NoError(t, err)
c, err := net.ListenUDP(target, addr)
c, err := net.ListenUDP(ls.Network, addr)
if err != nil {
return false
}
err = c.Close()
require.NoError(t, err)
return true
}
addr, err := net.ResolveTCPAddr(target, addStr)

addr, err := net.ResolveTCPAddr(ls.Network, ls.Address)
require.NoError(t, err)
c, err := net.ListenTCP(target, addr)
c, err := net.ListenTCP(ls.Network, addr)
if err != nil {
return false
}
err = c.Close()
require.NoError(t, err)
return true
}

type ListenSockets []ListenSocket

func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) {
// wait for all sockets to be closed - max 3 minutes = 900*200
socketClosed := make([]bool, len(ls))
for j := 0; j < 900; j++ {
allClosed := true
for i, socket := range ls {
if socketClosed[i] {
continue
}
closed := socket.IsClosed(t)
socketClosed[i] = closed
if socketClosed[i] {
continue
}
allClosed = false
}
if allClosed {
return
}
time.Sleep(time.Millisecond * 200)
}
require.FailNow(t, "ports not closed")
}

0 comments on commit d23c85d

Please sign in to comment.