Skip to content

Commit

Permalink
Closing of sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Aug 15, 2024
1 parent 3b09c6b commit c56a876
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 57 deletions.
20 changes: 12 additions & 8 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ jobs:
include:
# test with check race with coverage and sonarcloud
- name: test
cmd: test
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false
checkRace: "true"
coapGateway:
log:
level: "debug"
dumpBody: "true"

- name: test/cqldb
cmd: test
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false
checkRace: "true"
database: "cqldb"
coapGateway:
Expand All @@ -46,14 +48,16 @@ jobs:

# test without check race
- name: test/norace
cmd: test
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false
coapGateway:
log:
level: "debug"
dumpBody: "true"

- name: test/norace/cqldb
cmd: test
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false
database: "cqldb"
coapGateway:
log:
Expand All @@ -65,16 +69,16 @@ jobs:
# - with ECDSA-SHA256 signature and P384 elliptic curve certificates
# - with TEST_LEAD_RESOURCE_TYPE_FILTER, TEST_LEAD_RESOURCE_TYPE_USE_UUID
- name: test/norace-384
cmd: test
args: CERT_TOOL_SIGN_ALG=ECDSA-SHA384 CERT_TOOL_ELLIPTIC_CURVE=P384 TEST_LEAD_RESOURCE_TYPE_FILTER=last TEST_LEAD_RESOURCE_TYPE_USE_UUID=true
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false CERT_TOOL_SIGN_ALG=ECDSA-SHA384 CERT_TOOL_ELLIPTIC_CURVE=P384 TEST_LEAD_RESOURCE_TYPE_FILTER=last TEST_LEAD_RESOURCE_TYPE_USE_UUID=true

# test
# - without check race
# - with TEST_LEAD_RESOURCE_TYPE_FILTER, TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER
# - with logs from all services
- name: test/norace/logs
cmd: test
args: TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER='oic\.wk\.d,^/light/\d+$$' TEST_LEAD_RESOURCE_TYPE_FILTER=first
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER='oic\.wk\.d,^/light/\d+$$' TEST_LEAD_RESOURCE_TYPE_FILTER=first
coapGateway:
log:
level: "debug"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.test
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM ubuntu:22.04 AS hub-test
RUN apt-get update \
&& DEBIAN_FRONTEND="noninteractive" apt-get install -y --no-install-recommends \
build-essential ca-certificates curl git make patch sudo \
build-essential ca-certificates curl git make net-tools patch sudo \
&& apt-get clean \
&& curl --proto "=https" -sSL https://get.docker.com/ | sh
WORKDIR /
Expand Down
7 changes: 4 additions & 3 deletions coap-gateway/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func SetUp(t require.TestingT) (tearDown func()) {
return New(t, MakeConfig(t))
}

func checkForClosedSockets(t require.TestingT, cfg service.Config) {
func checkForClosedSockets(cfg service.Config) error {
sockets := make(test.ListenSockets, 0, len(cfg.APIs.COAP.Protocols))
for _, protocol := range cfg.APIs.COAP.Protocols {
sockets = append(sockets, test.ListenSocket{
Network: string(protocol),
Address: cfg.APIs.COAP.Addr,
})
}
sockets.CheckForClosedSockets(t)
return sockets.CheckForClosedSockets()
}

// New creates test coap-gateway.
Expand All @@ -106,6 +106,7 @@ func New(t require.TestingT, cfg service.Config) func() {
err = fileWatcher.Close()
require.NoError(t, err)

checkForClosedSockets(t, cfg)
err = checkForClosedSockets(cfg)
require.NoError(t, err)
}
}
8 changes: 8 additions & 0 deletions device-provisioning-service/service/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"os/exec"
"strings"

"github.com/fullstorydev/grpchan/inprocgrpc"
Expand Down Expand Up @@ -33,6 +34,13 @@ func New(ctx context.Context, serviceName string, config Config, fileWatcher *fs
}
listener, err := listener.New(config.Connection, fileWatcher, logger)
if err != nil {
cmd := exec.Command("/usr/bin/netstat", "-tulpn")
stdout, err := cmd.Output()
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println(string(stdout))
}
validator.Close()
return nil, fmt.Errorf("cannot create grpc server: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion device-provisioning-service/service/provision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestProvisioningWithPSK(t *testing.T) {
hubTestService.SetUpServicesId|hubTestService.SetUpServicesResourceAggregate|hubTestService.SetUpServicesGrpcGateway)
defer hubShutdown()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3600)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
ctx = pkgGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t))

Expand Down
13 changes: 2 additions & 11 deletions device-provisioning-service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client"
"github.com/plgd-dev/hub/v2/pkg/opentelemetry/otelcoap"
"github.com/plgd-dev/hub/v2/pkg/service"
"github.com/plgd-dev/hub/v2/pkg/sync/task/queue"
otelCodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -36,7 +35,6 @@ type Service struct {
config Config
ctx context.Context
cancel context.CancelFunc
taskQueue *queue.Queue
messagePool *pool.Pool
linkedHubCache *LinkedHubCache
store *mongodb.Store
Expand Down Expand Up @@ -127,16 +125,10 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
return nil, fmt.Errorf("cannot create open telemetry collector client: %w", err)
}
otelClient.AddCloseFunc(cancel)
tracerProvider := otelClient.GetTracerProvider()

var closer fn.FuncList
closer.AddFunc(otelClient.Close)
queue, err := queue.New(config.TaskQueue)
if err != nil {
closer.Execute()
return nil, fmt.Errorf("cannot create job queue %w", err)
}
closer.AddFunc(queue.Release)

tracerProvider := otelClient.GetTracerProvider()
store, closeStore, err := NewStore(ctx, config.Clients.Storage.MongoDB, fileWatcher, logger, tracerProvider)
if err != nil {
closer.Execute()
Expand Down Expand Up @@ -180,7 +172,6 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
s := Service{
config: config,
linkedHubCache: linkedHubCache,
taskQueue: queue,

ctx: ctx,
cancel: cancel,
Expand Down
2 changes: 1 addition & 1 deletion device-provisioning-service/test/provisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (h *RequestHandlerWithDps) IsStarted() bool {

func (h *RequestHandlerWithDps) StartDps(opts ...service.Option) {
if h.dpsShutdown != nil {
return
panic("dps already started")
}
h.Logf("start provisioning")
h.dpsShutdown = New(h.t, h.dpsCfg, opts...)
Expand Down
62 changes: 35 additions & 27 deletions device-provisioning-service/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,31 @@ func init() {
},
},
}

if err := checkForClosedSockets(MakeAPIsConfig()); err != nil {
panic(err)
}
}

func MakeAPIsConfig() service.APIsConfig {
var cfg service.APIsConfig
cfg.COAP.Addr = DPSHost
cfg.COAP.MaxMessageSize = 256 * 1024
cfg.COAP.MessagePoolSize = 1000
cfg.COAP.Protocols = []pkgCoapService.Protocol{pkgCoapService.TCP}
if config.DPS_UDP_ENABLED {
cfg.COAP.Protocols = append(cfg.COAP.Protocols, pkgCoapService.UDP)
}
cfg.COAP.InactivityMonitor = &pkgCoapService.InactivityMonitor{
Timeout: time.Second * 20,
}
cfg.COAP.BlockwiseTransfer.Enabled = config.DPS_UDP_ENABLED
cfg.COAP.BlockwiseTransfer.SZX = "1024"
cfg.HTTP = MakeHTTPConfig()
tlsServerCfg := config.MakeTLSServerConfig()
cfg.COAP.TLS.Embedded.CertFile = tlsServerCfg.CertFile
cfg.COAP.TLS.Embedded.KeyFile = tlsServerCfg.KeyFile
return cfg
}

func MakeConfig(t require.TestingT) service.Config {
Expand All @@ -194,27 +219,11 @@ func MakeConfig(t require.TestingT) service.Config {
cfg.Log = log.MakeDefaultConfig()
cfg.TaskQueue.GoPoolSize = 1600
cfg.TaskQueue.Size = 2 * 1024 * 1024
cfg.APIs.COAP.Addr = DPSHost
cfg.APIs.COAP.MaxMessageSize = 256 * 1024
cfg.APIs.COAP.MessagePoolSize = 1000
cfg.APIs.COAP.Protocols = []pkgCoapService.Protocol{pkgCoapService.TCP}
if config.DPS_UDP_ENABLED {
cfg.APIs.COAP.Protocols = append(cfg.APIs.COAP.Protocols, pkgCoapService.UDP)
}
cfg.APIs.COAP.InactivityMonitor = &pkgCoapService.InactivityMonitor{
Timeout: time.Second * 20,
}
cfg.APIs.COAP.BlockwiseTransfer.Enabled = config.DPS_UDP_ENABLED
cfg.APIs.COAP.BlockwiseTransfer.SZX = "1024"
cfg.APIs.HTTP = MakeHTTPConfig()
tlsServerCfg := config.MakeTLSServerConfig()
cfg.APIs.COAP.TLS.Embedded.CertFile = tlsServerCfg.CertFile
cfg.APIs.COAP.TLS.Embedded.KeyFile = tlsServerCfg.KeyFile
cfg.APIs = MakeAPIsConfig()
cfg.Clients.Storage = MakeStorageConfig()
cfg.Clients.OpenTelemetryCollector = pkgHttp.OpenTelemetryCollectorConfig{
Config: config.MakeOpenTelemetryCollectorClient(),
}

cfg.EnrollmentGroups = append(cfg.EnrollmentGroups, MakeEnrollmentGroup())
err := cfg.Validate()
require.NoError(t, err)
Expand Down Expand Up @@ -262,21 +271,21 @@ func New(t *testing.T, cfg service.Config, opts ...service.Option) func() {
return NewWithContext(context.Background(), t, cfg, opts...)
}

func checkForClosedSockets(t require.TestingT, cfg service.Config) {
sockets := make(hubTest.ListenSockets, 0, len(cfg.APIs.COAP.Protocols)+1)
for _, protocol := range cfg.APIs.COAP.Protocols {
func checkForClosedSockets(cfg service.APIsConfig) error {
sockets := make(hubTest.ListenSockets, 0, len(cfg.COAP.Protocols)+1)
for _, protocol := range cfg.COAP.Protocols {
sockets = append(sockets, hubTest.ListenSocket{
Network: string(protocol),
Address: cfg.APIs.COAP.Addr,
Address: cfg.COAP.Addr,
})
}
if cfg.APIs.HTTP.Enabled {
if cfg.HTTP.Enabled {
sockets = append(sockets, hubTest.ListenSocket{
Network: "tcp",
Address: cfg.APIs.HTTP.Config.Connection.Addr,
Address: cfg.HTTP.Config.Connection.Addr,
})
}
sockets.CheckForClosedSockets(t)
return sockets.CheckForClosedSockets()
}

// New creates test dps-gateway.
Expand All @@ -302,9 +311,8 @@ func NewWithContext(ctx context.Context, t *testing.T, cfg service.Config, opts
err = fileWatcher.Close()
require.NoError(t, err)

checkForClosedSockets(t, cfg)
// wait for all connections to be closed
time.Sleep(time.Millisecond * 500)
err = checkForClosedSockets(cfg.APIs)
require.NoError(t, err)
}
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/net/coap/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os/exec"
"strings"

coapDtlsServer "github.com/plgd-dev/go-coap/v3/dtls/server"
Expand Down Expand Up @@ -49,6 +50,16 @@ func closeOnError(services []service.APIService, logger log.Logger) {
}
}

func listPorts() {
cmd := exec.Command("/usr/bin/netstat", "-tulpn")
stdout, err := cmd.Output()
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println(string(stdout))
}
}

func newService(protocol Protocol, config Config, serviceOpts Options, fileWatcher *fsnotify.Watcher, logger log.Logger, opts ...interface {
coapTcpServer.Option
coapDtlsServer.Option
Expand Down
2 changes: 2 additions & 0 deletions pkg/net/coap/service/tcpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ func newTCPServer(config Config, serviceOpts Options, fileWatcher *fsnotify.Watc
) (*tcpServer, error) {
listener, closeListener, err := newTCPListener(config, serviceOpts, fileWatcher, logger)
if err != nil {
listPorts()
return nil, fmt.Errorf("cannot create listener: %w", err)
}
fmt.Printf("tcp listerer(%v) opened\n", config.Addr)
tcpOpts := make([]coapTcpServer.Option, 0, 3)
if serviceOpts.OnNewConnection != nil {
tcpOpts = append(tcpOpts, options.WithOnNewConn(func(cc *coapTcpClient.Conn) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/net/coap/service/udpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ func newDTLSServer(config Config, serviceOpts Options, fileWatcher *fsnotify.Wat
) (*dtlsServer, error) {
listener, closeListener, err := newDTLSListener(config, serviceOpts, fileWatcher, logger)
if err != nil {
listPorts()
return nil, fmt.Errorf("cannot create listener: %w", err)
}
fmt.Printf("dtls listerer(%v) opened\n", config.Addr)
dtlsOpts := make([]coapDtlsServer.Option, 0, 4)
if serviceOpts.OnNewConnection != nil {
dtlsOpts = append(dtlsOpts, options.WithOnNewConn(func(coapConn *coapUdpClient.Conn) {
Expand Down Expand Up @@ -175,8 +177,10 @@ func newUDPServer(config Config, serviceOpts Options, logger log.Logger, opts ..
) (*udpServer, error) {
listener, closeListener, err := newUDPListener(config, logger)
if err != nil {
listPorts()
return nil, fmt.Errorf("cannot create listener: %w", err)
}
fmt.Printf("udp listerer(%v) opened\n", config.Addr)
udpOpts := make([]coapUdpServer.Option, 0, 4)
if serviceOpts.OnNewConnection != nil {
udpOpts = append(udpOpts, options.WithOnNewConn(func(coapConn *coapUdpClient.Conn) {
Expand Down
Loading

0 comments on commit c56a876

Please sign in to comment.