Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lint errors #4025

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions control-plane/pkg/contract/contract.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3071,7 +3071,7 @@ func makeTLSSecret() *corev1.Secret {
Name: brokerIngressTLSSecretName,
},
Data: map[string][]byte{
"ca.crt": []byte(eventingtlstesting.CA),
"ca.crt": eventingtlstesting.CA,
},
Type: corev1.SecretTypeTLS,
}
Expand Down
9 changes: 5 additions & 4 deletions test/e2e_sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
testlib "knative.dev/eventing/test/lib"

eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
Expand All @@ -37,7 +37,8 @@ import (
)

const (
sinkSecretName = "secret-test"
sinkSecretName = "secret-test"
numPartitions int32 = 10
)

func RunTestKafkaSink(t *testing.T, mode string, sp SecretProvider, opts ...func(kss *eventingv1alpha1.KafkaSinkSpec) error) {
Expand All @@ -59,10 +60,10 @@ func RunTestKafkaSink(t *testing.T, mode string, sp SecretProvider, opts ...func

kss := eventingv1alpha1.KafkaSinkSpec{
Topic: "kafka-sink-" + client.Namespace,
NumPartitions: pointer.Int32(10),
NumPartitions: ptr.To(numPartitions),
ReplicationFactor: func(rf int16) *int16 { return &rf }(1),
BootstrapServers: BootstrapServersPlaintextArr,
ContentMode: pointer.String(mode),
ContentMode: ptr.To(mode),
}
for _, opt := range opts {
require.Nil(t, opt(&kss))
Expand Down
4 changes: 2 additions & 2 deletions test/e2e_sink/kafka_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package e2e_sink
import (
"testing"

"k8s.io/utils/pointer"
"k8s.io/utils/ptr"

eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
. "knative.dev/eventing-kafka-broker/test/pkg"
Expand All @@ -31,7 +31,7 @@ import (

func TestKafkaSinkV1Alpha1DefaultContentMode(t *testing.T) {
RunTestKafkaSink(t, eventingv1alpha1.ModeStructured, nil, func(kss *eventingv1alpha1.KafkaSinkSpec) error {
kss.ContentMode = pointer.String("")
kss.ContentMode = ptr.To("")
return nil
})
}
Expand Down
1 change: 0 additions & 1 deletion test/e2e_source/helpers/kafka_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ const (

var (
topicGVR = schema.GroupVersionResource{Group: strimziApiGroup, Version: strimziApiVersion, Resource: strimziTopicResource}
userGVR = schema.GroupVersionResource{Group: strimziApiGroup, Version: strimziApiVersion, Resource: strimziUserResource}
ImcGVR = schema.GroupVersionResource{Group: "messaging.knative.dev", Version: "v1", Resource: "inmemorychannels"}
)

Expand Down
4 changes: 2 additions & 2 deletions test/lib/resources/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"

v1 "knative.dev/eventing/pkg/apis/duck/v1"

Expand Down Expand Up @@ -130,7 +130,7 @@ func WithKafkaChannelEndpointsReady() KafkaChannelOption {
func WithKafkaChannelAddress(a string) KafkaChannelOption {
return func(nc *v1beta1.KafkaChannel) {
nc.Status.SetAddress(&duckv1.Addressable{
Name: pointer.String("http"),
Name: ptr.To("http"),
URL: apis.HTTP(a),
})
}
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/features/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func KafkaSourceLease() *feature.Feature {

func verifyLeaseAcquired(name string) feature.StepFn {
return func(ctx context.Context, t feature.T) {
err := wait.Poll(time.Second, time.Minute, func() (done bool, err error) {
err := wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) {
lease, err := kubeclient.Get(ctx).
CoordinationV1().
Leases(system.Namespace()).
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/resources/kafkasource/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func VerifyScale(name string, replicas int32) feature.StepFn {
return func(ctx context.Context, t feature.T) {
interval, timeout := environment.PollTimingsFromContext(ctx)
last := &sources.KafkaSource{}
err := wait.PollImmediate(interval, timeout, func() (done bool, err error) {
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
ks, err := kafkaclientset.Get(ctx).
SourcesV1beta1().
KafkaSources(environment.FromContext(ctx).Namespace()).
Expand Down
4 changes: 2 additions & 2 deletions test/rekt/resources/kafkatopic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func HasReplicationFactor(name string, replicationFactor int, timings ...time.Du
return func(ctx context.Context, t feature.T) {
interval, timeout := k8s.PollTimings(ctx, timings)

err := wait.PollImmediate(interval, timeout, func() (bool, error) {
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
ut, err := dynamicclient.Get(ctx).
Resource(GVR()).
Namespace(kafkaNamespace).
Expand Down Expand Up @@ -119,7 +119,7 @@ func HasNumPartitions(name string, numPartitions int, timings ...time.Duration)
return func(ctx context.Context, t feature.T) {
interval, timeout := k8s.PollTimings(ctx, timings)

err := wait.PollImmediate(interval, timeout, func() (bool, error) {
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
ut, err := dynamicclient.Get(ctx).
Resource(GVR()).
Namespace(kafkaNamespace).
Expand Down
10 changes: 6 additions & 4 deletions test/test_images/consumer-group-lag-provider-test/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/IBM/sarama"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/reconciler-test/pkg/k8s"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg"
Expand Down Expand Up @@ -76,6 +77,7 @@ func main() {

log.Println("Sending events to topic", topic)

interval, timeout := k8s.PollTimings(ctx, []time.Duration{})
for i := 0; i < n; i++ {
msg := &sarama.ProducerMessage{
Topic: topic,
Expand All @@ -84,7 +86,7 @@ func main() {
}
// Send message might fail with:
// "kafka server: Request was for a topic or partition that does not exist on this broker."
err := wait.PollImmediateUntil(time.Minute, func() (done bool, err error) {
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
partition, offset, err := producer.SendMessage(msg)
if err != nil {
return false, nil
Expand All @@ -95,7 +97,7 @@ func main() {
}
lastOffset = offset
return true, nil
}, ctx.Done())
})
mustBeNil(err)
}
if int64(n) != lastOffset+1 { // Consistency check
Expand Down Expand Up @@ -137,7 +139,7 @@ func main() {
mustBeNil(err)

// Wait for propagation of the committed offset
err = wait.PollImmediateUntil(time.Minute, func() (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
log.Println("Starting consumer group lag provider")

consumerGroupLagProvider := kafka.NewConsumerGroupLagProvider(client, sarama.NewClusterAdminFromClient, sarama.OffsetOldest)
Expand Down Expand Up @@ -176,7 +178,7 @@ func main() {
return false, nil
}
return true, nil
}, ctx.Done())
})
mustBeNil(err)
}

Expand Down
4 changes: 2 additions & 2 deletions test/upgrade/postupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func verifyPostInstall(t *testing.T) {
defer testlib.TearDown(client)

var lastJob *batchv1.Job
err := wait.Poll(5*time.Second, 10*time.Minute, func() (done bool, err error) {
lastJob, err = client.Kube.
err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
lastJob, err := client.Kube.
BatchV1().
Jobs(system.Namespace()).
Get(context.Background(), name, metav1.GetOptions{})
Expand Down
Loading