Skip to content

Commit

Permalink
[kafka-consumer] Use wait group to ensure goroutine is finished befor…
Browse files Browse the repository at this point in the history
…e returning from Close (#4582)

## Which problem is this PR solving?
Resolves # [4576](#4576)

## Short description of the changes
Once we call
[start](https://github.com/jaegertracing/jaeger/blob/main/cmd/ingester/app/consumer/consumer.go#L76)
and call
[close](https://github.com/jaegertracing/jaeger/blob/main/cmd/ingester/app/consumer/consumer.go#L76)
again with fxtest module, there is a chance that the application will
close the goroutine before the
l[ogline](https://github.com/jaegertracing/jaeger/blob/main/cmd/ingester/app/consumer/consumer.go#L79)
is executed causing **panic Log in routine after .. has completed**.

Fix:
Add a wait group that will wait for the goroutine to finish before
closing the channel.

---------

Signed-off-by: kennyaz <[email protected]>
  • Loading branch information
kennyaz authored Jul 14, 2023
1 parent 84f688f commit 673b69f
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func New(params Params) (*Consumer, error) {
// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.deadlockDetector.start()
c.doneWg.Add(1)
go func() {
defer c.doneWg.Done()
c.logger.Info("Starting main loop")
for pc := range c.internalConsumer.Partitions() {
c.partitionMapLock.Lock()
Expand Down
1 change: 1 addition & 0 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer,
saramaClusterConsumer.On("Partitions").Return((<-chan cluster.PartitionConsumer)(pcha))
saramaClusterConsumer.On("Close").Return(nil).Run(func(args mock.Arguments) {
mc.Close()
close(pcha)
})
saramaClusterConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
return saramaClusterConsumer
Expand Down

0 comments on commit 673b69f

Please sign in to comment.