Skip to content

Commit

Permalink
[jaeger-v2] Align Cassandra Storage Config With OTEL (#5949)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
-  Resolves #5928 

## Description of the changes
- Revamped configuration for Cassandra to be logically grouped 
- Added unit tests for the `Validate` method 
- Changed the configuration to use OTEL's `configtls.ClientConfig` type
- [Migration
guide](https://docs.google.com/document/d/1mCcQJTYC1eEhNVI9HSR-F6A9gxK-GlEtSz0wiEbRxlU/edit)

## How was this change tested?
- New unit tests 
- CI

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
mahadzaryab1 and yurishkuro authored Sep 26, 2024
1 parent d7f01d1 commit 3cd1f59
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 168 deletions.
24 changes: 18 additions & 6 deletions cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,26 @@ extensions:
backends:
some_storage:
cassandra:
keyspace: "jaeger_v1_dc1"
username: "cassandra"
password: "cassandra"
schema:
keyspace: "jaeger_v1_dc1"
connection:
auth:
basic:
username: "cassandra"
password: "cassandra"
tls:
insecure: true
another_storage:
cassandra:
keyspace: "jaeger_v1_dc1"
username: "cassandra"
password: "cassandra"
schema:
keyspace: "jaeger_v1_dc1"
connection:
auth:
basic:
username: "cassandra"
password: "cassandra"
tls:
insecure: true
receivers:
otlp:
protocols:
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ backends:
`)
cfg := createDefaultConfig().(*Config)
require.NoError(t, conf.Unmarshal(cfg))
assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Servers)
assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Connection.Servers)
}

func TestConfigDefaultElasticsearch(t *testing.T) {
Expand Down
209 changes: 122 additions & 87 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,99 +5,138 @@
package config

import (
"context"
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/gocql/gocql"
"go.uber.org/zap"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/pkg/cassandra"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
)

// Configuration describes the configuration properties needed to connect to a Cassandra cluster
// Configuration describes the configuration properties needed to connect to a Cassandra cluster.
type Configuration struct {
Servers []string `valid:"required,url" mapstructure:"servers"`
Keyspace string `mapstructure:"keyspace"`
LocalDC string `mapstructure:"local_dc"`
ConnectionsPerHost int `mapstructure:"connections_per_host"`
Timeout time.Duration `mapstructure:"-"`
ConnectTimeout time.Duration `mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `mapstructure:"max_retry_attempts"`
ProtoVersion int `mapstructure:"proto_version"`
Consistency string `mapstructure:"consistency"`
DisableCompression bool `mapstructure:"disable_compression"`
Port int `mapstructure:"port"`
Authenticator Authenticator `mapstructure:",squash"`
DisableAutoDiscovery bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
Schema Schema `mapstructure:"schema"`
Connection Connection `mapstructure:"connection"`
Query Query `mapstructure:"query"`
}

func DefaultConfiguration() Configuration {
return Configuration{
Servers: []string{"127.0.0.1"},
Port: 9042,
MaxRetryAttempts: 3,
Keyspace: "jaeger_v1_test",
ProtoVersion: 4,
ConnectionsPerHost: 2,
ReconnectInterval: 60 * time.Second,
}
type Connection struct {
// Servers contains a list of hosts that are used to connect to the cluster.
Servers []string `mapstructure:"servers" valid:"required,url"`
// LocalDC contains the name of the local Data Center (DC) for DC-aware host selection
LocalDC string `mapstructure:"local_dc"`
// The port used when dialing to a cluster.
Port int `mapstructure:"port"`
// DisableAutoDiscovery, if set to true, will disable the cluster's auto-discovery features.
DisableAutoDiscovery bool `mapstructure:"disable_auto_discovery"`
// ConnectionsPerHost contains the maximum number of open connections for each host on the cluster.
ConnectionsPerHost int `mapstructure:"connections_per_host"`
// ReconnectInterval contains the regular interval after which the driver tries to connect to
// nodes that are down.
ReconnectInterval time.Duration `mapstructure:"reconnect_interval"`
// SocketKeepAlive contains the keep alive period for the default dialer to the cluster.
SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"`
// TLS contains the TLS configuration for the connection to the cluster.
TLS configtls.ClientConfig `mapstructure:"tls"`
// Timeout contains the maximum time spent to connect to a cluster.
Timeout time.Duration `mapstructure:"timeout"`
// Authenticator contains the details of the authentication mechanism that is used for
// connecting to a cluster.
Authenticator Authenticator `mapstructure:"auth"`
// ProtoVersion contains the version of the native protocol to use when connecting to a cluster.
ProtoVersion int `mapstructure:"proto_version"`
}

type Schema struct {
// Keyspace contains the namespace where Jaeger data will be stored.
Keyspace string `mapstructure:"keyspace"`
// DisableCompression, if set to true, disables the use of the default Snappy Compression
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster
type Query struct {
// Timeout contains the maximum time spent executing a query.
Timeout time.Duration `mapstructure:"timeout"`
// MaxRetryAttempts indicates the maximum number of times a query will be retried for execution.
MaxRetryAttempts int `mapstructure:"max_retry_attempts"`
// Consistency specifies the consistency level which needs to be satisified before responding
// to a query.
Consistency string `mapstructure:"consistency"`
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster.
type Authenticator struct {
Basic BasicAuthenticator `yaml:"basic" mapstructure:",squash"`
Basic BasicAuthenticator `mapstructure:"basic"`
// TODO: add more auth types
}

// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster
// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster.
type BasicAuthenticator struct {
Username string `yaml:"username" mapstructure:"username"`
Password string `yaml:"password" mapstructure:"password" json:"-"`
AllowedAuthenticators []string `yaml:"allowed_authenticators" mapstructure:"allowed_authenticators"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
AllowedAuthenticators []string `mapstructure:"allowed_authenticators"`
}

func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Port: 9042,
ProtoVersion: 4,
ConnectionsPerHost: 2,
ReconnectInterval: 60 * time.Second,
},
Query: Query{
MaxRetryAttempts: 3,
},
}
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.ConnectionsPerHost == 0 {
c.ConnectionsPerHost = source.ConnectionsPerHost
if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace
}
if c.MaxRetryAttempts == 0 {
c.MaxRetryAttempts = source.MaxRetryAttempts
if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
if c.Timeout == 0 {
c.Timeout = source.Timeout
if c.Connection.ReconnectInterval == 0 {
c.Connection.ReconnectInterval = source.Connection.ReconnectInterval
}
if c.ReconnectInterval == 0 {
c.ReconnectInterval = source.ReconnectInterval
if c.Connection.Port == 0 {
c.Connection.Port = source.Connection.Port
}
if c.Port == 0 {
c.Port = source.Port
if c.Connection.ProtoVersion == 0 {
c.Connection.ProtoVersion = source.Connection.ProtoVersion
}
if c.Keyspace == "" {
c.Keyspace = source.Keyspace
if c.Connection.SocketKeepAlive == 0 {
c.Connection.SocketKeepAlive = source.Connection.SocketKeepAlive
}
if c.ProtoVersion == 0 {
c.ProtoVersion = source.ProtoVersion
if c.Query.MaxRetryAttempts == 0 {
c.Query.MaxRetryAttempts = source.Query.MaxRetryAttempts
}
if c.SocketKeepAlive == 0 {
c.SocketKeepAlive = source.SocketKeepAlive
if c.Query.Timeout == 0 {
c.Query.Timeout = source.Query.Timeout
}
}

// SessionBuilder creates new cassandra.Session
type SessionBuilder interface {
NewSession(logger *zap.Logger) (cassandra.Session, error)
NewSession() (cassandra.Session, error)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error) {
cluster, err := c.NewCluster(logger)
func (c *Configuration) NewSession() (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}
Expand All @@ -109,68 +148,64 @@ func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error
}

// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Servers...)
cluster.Keyspace = c.Keyspace
cluster.NumConns = c.ConnectionsPerHost
cluster.Timeout = c.Timeout
cluster.ConnectTimeout = c.ConnectTimeout
cluster.ReconnectInterval = c.ReconnectInterval
cluster.SocketKeepalive = c.SocketKeepAlive
if c.ProtoVersion > 0 {
cluster.ProtoVersion = c.ProtoVersion
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
cluster.Keyspace = c.Schema.Keyspace
cluster.NumConns = c.Connection.ConnectionsPerHost
cluster.ConnectTimeout = c.Connection.Timeout
cluster.ReconnectInterval = c.Connection.ReconnectInterval
cluster.SocketKeepalive = c.Connection.SocketKeepAlive
cluster.Timeout = c.Query.Timeout
if c.Connection.ProtoVersion > 0 {
cluster.ProtoVersion = c.Connection.ProtoVersion
}
if c.MaxRetryAttempts > 1 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.MaxRetryAttempts - 1}
if c.Query.MaxRetryAttempts > 1 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.Query.MaxRetryAttempts - 1}
}
if c.Port != 0 {
cluster.Port = c.Port
if c.Connection.Port != 0 {
cluster.Port = c.Connection.Port
}

if !c.DisableCompression {
if !c.Schema.DisableCompression {
cluster.Compressor = gocql.SnappyCompressor{}
}

if c.Consistency == "" {
if c.Query.Consistency == "" {
cluster.Consistency = gocql.LocalOne
} else {
cluster.Consistency = gocql.ParseConsistency(c.Consistency)
cluster.Consistency = gocql.ParseConsistency(c.Query.Consistency)
}

fallbackHostSelectionPolicy := gocql.RoundRobinHostPolicy()
if c.LocalDC != "" {
fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.LocalDC)
if c.Connection.LocalDC != "" {
fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.Connection.LocalDC)
}
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallbackHostSelectionPolicy, gocql.ShuffleReplicas())

if c.Authenticator.Basic.Username != "" && c.Authenticator.Basic.Password != "" {
if c.Connection.Authenticator.Basic.Username != "" && c.Connection.Authenticator.Basic.Password != "" {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: c.Authenticator.Basic.Username,
Password: c.Authenticator.Basic.Password,
AllowedAuthenticators: c.Authenticator.Basic.AllowedAuthenticators,
Username: c.Connection.Authenticator.Basic.Username,
Password: c.Connection.Authenticator.Basic.Password,
AllowedAuthenticators: c.Connection.Authenticator.Basic.AllowedAuthenticators,
}
}
tlsCfg, err := c.TLS.Config(logger)
if err != nil {
return nil, err
}
if c.TLS.Enabled {
if !c.Connection.TLS.Insecure {
tlsCfg, err := c.Connection.TLS.LoadTLSConfig(context.Background())
if err != nil {
return nil, err
}
cluster.SslOpts = &gocql.SslOptions{
Config: tlsCfg,
}
}
// If tunneling connection to C*, disable cluster autodiscovery features.
if c.DisableAutoDiscovery {
if c.Connection.DisableAutoDiscovery {
cluster.DisableInitialHostLookup = true
cluster.IgnorePeerAddr = true
}
return cluster, nil
}

func (c *Configuration) Close() error {
return c.TLS.Close()
}

func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}
Expand Down
Loading

0 comments on commit 3cd1f59

Please sign in to comment.