diff --git a/balance_strategy.go b/balance_strategy.go index 8635bdf7db..cc7a0a3697 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -19,6 +19,8 @@ const ( // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy StickyBalanceStrategyName = "sticky" + CooperativeStickyBalanceStrategyName = "cooperative-sticky" + defaultGeneration = -1 ) @@ -40,6 +42,45 @@ func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) { // -------------------------------------------------------------------- +type RebalanceProtocol int + +const ( + EAGER RebalanceProtocol = iota + COOPERATIVE +) + +func (p RebalanceProtocol) String() string { + switch p { + case EAGER: + return "EAGER" + case COOPERATIVE: + return "COOPERATIVE" + default: + return "UNKNOWN" + } +} + +type RebalanceProtocolSlice []RebalanceProtocol + +func (p RebalanceProtocolSlice) Len() int { return len(p) } +func (p RebalanceProtocolSlice) Less(i, j int) bool { return p[i] < p[j] } +func (p RebalanceProtocolSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func (p RebalanceProtocolSlice) retainAll(p2 RebalanceProtocolSlice) RebalanceProtocolSlice { + var result RebalanceProtocolSlice + set := make(map[RebalanceProtocol]bool, len(p2)) + for _, v := range p2 { + set[v] = true + } + + for _, v := range p { + if set[v] { + result = append(result, v) + } + } + return result +} + // BalanceStrategy is used to balance topics and partitions // across members of a consumer group type BalanceStrategy interface { @@ -53,6 +94,8 @@ type BalanceStrategy interface { // AssignmentData returns the serialized assignment data for the specified // memberID AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) + + SupportedProtocols() RebalanceProtocolSlice } // -------------------------------------------------------------------- @@ -111,6 +154,12 @@ func NewBalanceStrategySticky() BalanceStrategy { // Deprecated: use NewBalanceStrategySticky to avoid data race issue var BalanceStrategySticky = NewBalanceStrategySticky() +func NewBalanceStrategyCooperativeSticky() BalanceStrategy { + return &cooperativeStickyBalanceStrategy{ + stickyBalanceStrategy: &stickyBalanceStrategy{}, + } +} + // -------------------------------------------------------------------- type balanceStrategy struct { @@ -161,6 +210,10 @@ func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]in return nil, nil } +func (s *balanceStrategy) SupportedProtocols() RebalanceProtocolSlice { + return []RebalanceProtocol{EAGER} +} + type stickyBalanceStrategy struct { movements partitionMovements } @@ -279,6 +332,10 @@ func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[strin }, nil) } +func (s *stickyBalanceStrategy) SupportedProtocols() RebalanceProtocolSlice { + return []RebalanceProtocol{EAGER} +} + func strsContains(s []string, value string) bool { for _, entry := range s { if entry == value { @@ -343,6 +400,73 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart } } +type cooperativeStickyBalanceStrategy struct { + stickyBalanceStrategy *stickyBalanceStrategy +} + +func (cs *cooperativeStickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) { + assignments, err := cs.stickyBalanceStrategy.Plan(members, topics) + if err != nil { + return nil, err + } + partitionsTransferredOwnership := computePartitionsTransferringOwnership(members, assignments) + return adjustAssignment(assignments, partitionsTransferredOwnership), nil +} + +// Following the cooperative rebalancing protocol requires removing partitions that must first be revoked from the assignment +func adjustAssignment(assignments BalanceStrategyPlan, partitionsTransferredOwnership map[topicPartitionAssignment]bool) BalanceStrategyPlan { + newAssignments := make(BalanceStrategyPlan) + for memberID, assignment := range assignments { + newAssignments[memberID] = make(map[string][]int32) + for topic, partitions := range assignment { + for _, partition := range partitions { + tp := topicPartitionAssignment{Topic: topic, Partition: partition} + if !partitionsTransferredOwnership[tp] { + newAssignments[memberID][topic] = append(newAssignments[memberID][topic], partition) + } + } + } + } + return newAssignments +} + +func computePartitionsTransferringOwnership(members map[string]ConsumerGroupMemberMetadata, assignments BalanceStrategyPlan) map[topicPartitionAssignment]bool { + partitionsTransferringOwnership := make(map[topicPartitionAssignment]bool) + previousAssignmentSet := make(map[topicPartitionAssignment]string) + for prevMember, metadata := range members { + for _, ownedTopicPartitions := range metadata.OwnedPartitions { + for _, partition := range ownedTopicPartitions.Partitions { + previousAssignmentSet[topicPartitionAssignment{Topic: ownedTopicPartitions.Topic, Partition: partition}] = prevMember + } + } + } + + for currMember, assignment := range assignments { + for topic, partitions := range assignment { + for _, partition := range partitions { + tp := topicPartitionAssignment{Topic: topic, Partition: partition} + prevMember, exist := previousAssignmentSet[tp] + if exist && prevMember != currMember { + partitionsTransferringOwnership[tp] = true + } + } + } + } + return partitionsTransferringOwnership +} + +func (cs *cooperativeStickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { + return cs.stickyBalanceStrategy.AssignmentData(memberID, topics, generationID) +} + +func (cs *cooperativeStickyBalanceStrategy) SupportedProtocols() RebalanceProtocolSlice { + return []RebalanceProtocol{COOPERATIVE, EAGER} +} + +func (cs *cooperativeStickyBalanceStrategy) Name() string { + return CooperativeStickyBalanceStrategyName +} + // NewBalanceStrategyRoundRobin returns a round-robin balance strategy, // which assigns partitions to members in alternating order. // For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): @@ -416,6 +540,10 @@ func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][ return nil, nil // do nothing for now } +func (b *roundRobinBalancer) SupportedProtocols() RebalanceProtocolSlice { + return []RebalanceProtocol{EAGER} +} + type topicAndPartition struct { topic string partition int32 diff --git a/consumer_group.go b/consumer_group.go index a0a1e1a41b..0edebafc39 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -45,6 +45,37 @@ type ConsumerGroup interface { // recreated to get the new claims. Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error + // ConsumeV2 joins a cluster of consumers for a given list of topics and ConsumerGroupHandlerV2. + // It should be wrapped in an infinite loop so that it would join the group again after a rebalance. + // + // Normally, you should always pass the same topic list and ConsumerGroupHandlerV2 instance in ConsumeV2, + // unless you want to change the subscribed topics or the handler implementation during the lifetime of the consumer group. + // + // Unlike the above Consumer interface, ConsumeV2 implements both COOPERATIVE and EAGER rebalance protocol. + // + // COOPERATIVE rebalance protocol works as follows: + // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers) + // and is assigned their "fair share" of partitions, aka 'claims'. + // 2. Comparing to the previous assignments, the newly-added partitions and revoked partitions are calculated. + // 3. For revoked partitions, `ConsumeClaim` loops of these partitions should be exited as quickly as possible. + // Then `Cleanup` hook is called to allow the user to perform any final tasks. + // Finally, marked offsets are committed one last time before claims are released. + // 4. For newly-added partitions, `Setup` hook is called to notify the user + // of the claims and allow any necessary preparation or alteration of state. + // Then several `ConsumeClaim` functions are called in separate goroutines, which is required to be thread-safe. + // 5. For intersection of the previous and current assignments, nothing happens. + // 6. If there are revoked partitions, ConsumeV2 will return. + // ConsumeV2 will be called again to trigger a new rebalance so that leader can re-assigned the revoked partitions + // to other consumers. + // + // The difference between COOPERATIVE and EAGER rebalance protocol is that, + // when a rebalance happens, EAGER rebalance protocol will revoke all the partitions in the current generation, + // no matter whether they will be assigned to the same consumer later or not. + // + // Please note, once ctx is done, `ConsumeClaim` loops must exit as quickly as possible. + // Otherwise, it will be kicked out of the next generation and cause offset commit failures. + ConsumeV2(ctx context.Context, topics []string, handler ConsumerGroupHandlerV2) error + // Errors returns a read channel of errors that occurred during the consumer life-cycle. // By default, errors are logged and not returned over this channel. // If you want to implement any custom error handling, set your config's @@ -65,13 +96,13 @@ type ConsumerGroup interface { // New calls to the broker will return records from these partitions if there are any to be fetched. Resume(partitions map[string][]int32) - // Pause suspends fetching from all partitions. Future calls to the broker will not return any + // PauseAll suspends fetching from all partitions. Future calls to the broker will not return any // records from these partitions until they have been resumed using Resume()/ResumeAll(). // Note that this method does not affect partition subscription. // In particular, it does not cause a group rebalance when automatic assignment is used. PauseAll() - // Resume resumes all partitions which have been paused with Pause()/PauseAll(). + // ResumeAll resumes all partitions which have been paused with Pause()/PauseAll(). // New calls to the broker will return records from these partitions if there are any to be fetched. ResumeAll() } @@ -84,6 +115,7 @@ type consumerGroup struct { groupID string groupInstanceId *string memberID string + generationID int32 errors chan error lock sync.Mutex @@ -93,6 +125,19 @@ type consumerGroup struct { userData []byte + isLeader bool + protocol RebalanceProtocol + ownedPartitions map[string][]int32 + offsetManager *offsetManager + claims map[string]map[int32]*partitionClaim + claimsLock sync.RWMutex + handlerV2 ConsumerGroupHandlerV2 + strategy BalanceStrategy + allSubscribedTopicPartitions map[string][]int32 + allSubscribedTopics []string + wg sync.WaitGroup + subscribedTopics []string + metricRegistry metrics.Registry } @@ -107,6 +152,8 @@ func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerG if err != nil { _ = client.Close() } + + // start heartbeat loop return c, err } @@ -132,18 +179,48 @@ func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) { } cg := &consumerGroup{ - client: client, - consumer: consumer, - config: config, - groupID: groupID, - errors: make(chan error, config.ChannelBufferSize), - closed: make(chan none), - userData: config.Consumer.Group.Member.UserData, - metricRegistry: newCleanupRegistry(config.MetricRegistry), + client: client, + consumer: consumer, + config: config, + groupID: groupID, + errors: make(chan error, config.ChannelBufferSize), + closed: make(chan none), + userData: config.Consumer.Group.Member.UserData, + ownedPartitions: make(map[string][]int32), + metricRegistry: newCleanupRegistry(config.MetricRegistry), + claims: make(map[string]map[int32]*partitionClaim), } if config.Consumer.Group.InstanceId != "" && config.Version.IsAtLeast(V2_3_0_0) { cg.groupInstanceId = &config.Consumer.Group.InstanceId } + + // select the rebalance protocol such that: + // 1. only consider protocols that are supported by all BalanceStrategies. If there is no common protocols supported + // across all the BalanceStrategies, return an error. + // 2. if there are multiple protocols that are commonly supported, select the one with the highest value (i.e. the + // value indicates how advanced the protocol is) + var supportedProtocols RebalanceProtocolSlice + if config.Consumer.Group.Rebalance.Strategy != nil { + supportedProtocols = config.Consumer.Group.Rebalance.Strategy.SupportedProtocols() + } else { + supportedProtocols = config.Consumer.Group.Rebalance.GroupStrategies[0].SupportedProtocols() + for _, strategy := range config.Consumer.Group.Rebalance.GroupStrategies { + supportedProtocols = supportedProtocols.retainAll(strategy.SupportedProtocols()) + } + } + if len(supportedProtocols) == 0 { + return nil, ConfigurationError("no common rebalance protocol found") + } + sort.Sort(supportedProtocols) + cg.protocol = supportedProtocols[len(supportedProtocols)-1] + + Logger.Printf("select %s rebalance protocol", cg.protocol) + + cg.offsetManager, err = newOffsetManagerFromClient(cg.groupID, "", GroupGenerationUndefined, client, nil) + if err != nil { + return nil, err + } + return cg, nil } @@ -155,6 +232,18 @@ func (c *consumerGroup) Close() (err error) { c.closeOnce.Do(func() { close(c.closed) + // In cooperative rebalance protocol, we need to revoke all owned partitions before leaving the group. + c.lock.Lock() + c.revokedOwnedPartitions() + c.lock.Unlock() + + // wait for all ConsumeClaim goroutines to exit + c.wg.Wait() + + if c.offsetManager != nil { + err = c.offsetManager.Close() + } + // leave group if e := c.leave(); e != nil { err = e @@ -189,6 +278,9 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co default: } + if c.protocol == COOPERATIVE { + return fmt.Errorf("use ConsumeV2 instead of Consume for cooperative rebalance protocol") + } c.lock.Lock() defer c.lock.Unlock() @@ -217,6 +309,267 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co return sess.release(true) } +// todo: check pause & resume logic +func (c *consumerGroup) ConsumeV2(ctx context.Context, topics []string, handlerV2 ConsumerGroupHandlerV2) error { + // Quick exit when no topics are provided + if len(topics) == 0 { + return fmt.Errorf("no topics provided") + } + + if handlerV2 == nil { + return fmt.Errorf("nil handler provided") + } + + // Ensure group is not closed + select { + case <-c.closed: + return ErrClosedConsumerGroup + default: + } + + c.lock.Lock() + defer c.lock.Unlock() + + return c.start(ctx, topics, handlerV2) +} + +func (c *consumerGroup) start(ctx context.Context, topics []string, handlerV2 ConsumerGroupHandlerV2) error { + c.modifySubscribedTopicsAndListener(topics, handlerV2) + + ctx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + + // todo: heartbeat should end here + syncGroupResponse, err := c.joinGroup(ctx, topics, c.config.Consumer.Group.Rebalance.Retry.Max) + if err != nil { + c.revokedOwnedPartitions() + return err + } + + // start heartbeat once the status is stable + hbDying, hbDone := c.startHeartbeatLoop(cancelFunc) + + // update the offset manager with new generation, memberID + c.offsetManager.Update(c.memberID, c.generationID) + + // Retrieve and sort claims + var claims map[string][]int32 + if len(syncGroupResponse.MemberAssignment) > 0 { + members, err := syncGroupResponse.GetMemberAssignment() + if err != nil { + return err + } + claims = members.Topics + + // in the case of stateful balance strategies, hold on to the returned + // assignment metadata, otherwise, reset the statically defined consumer + // group metadata + if members.UserData != nil { + c.userData = members.UserData + } else { + c.userData = c.config.Consumer.Group.Member.UserData + } + + for _, partitions := range claims { + sort.Sort(int32Slice(partitions)) + } + } + + newAssignedPartitions := diffAssignment(claims, c.ownedPartitions) + if c.protocol == COOPERATIVE { + revokedPartitions := diffAssignment(c.ownedPartitions, claims) + Logger.Printf("updating consumer(group:%s, member:%s, generation:%d)\n"+ + "All Assignments: %v\n"+ + "New Partitions: %v\n"+ + "Revoked Partitions: %v\n", + c.groupID, c.memberID, c.generationID, claims, newAssignedPartitions, revokedPartitions) + + if len(revokedPartitions) > 0 { + err = c.revokedPartitions(revokedPartitions) + if err != nil { + Logger.Printf("error when revoking partitions: %v", err) + } + cancelFunc() + } + } + c.ownedPartitions = claims + + err = c.startNewPartitions(newAssignedPartitions) + if err != nil { + c.revokedOwnedPartitions() + cancelFunc() + } + + // only the leader needs to check whether there are newly-added partitions in order to trigger a rebalance + if c.isLeader { + go c.loopCheckPartitionNumbers(ctx, cancelFunc, c.allSubscribedTopicPartitions, c.allSubscribedTopics) + } + + select { + case <-c.closed: + cancelFunc() + case <-ctx.Done(): + } + Logger.Printf("consumer(group:%s, member:%s, generation:%d) context is done", c.groupID, c.memberID, c.generationID) + + // if using EAGER rebalance protocol, we need to revoke all owned partitions before sending new JoinGroupRequest + if c.protocol == EAGER { + c.revokedOwnedPartitions() + } + + // make sure heartbeat loop is stopped + close(hbDying) + <-hbDone + return nil +} + +func (c *consumerGroup) joinGroup(ctx context.Context, topics []string, retries int) (*SyncGroupResponse, error) { + coordinator, err := c.joinPrepare(topics) + if err != nil { + return c.retryJoinGroup(ctx, topics, retries-1, err) + } + + // todo: add metrics + + // Join consumer group + join, err := c.joinGroupRequest(coordinator, topics) + if err != nil { + _ = coordinator.Close() + return nil, err + } + + switch join.Err { + case ErrNoError: + c.memberID = join.MemberId + case ErrUnknownMemberId, ErrIllegalGeneration: + // reset member ID and retry immediately + c.memberID = "" + return c.joinGroup(ctx, topics, retries) + case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress: + // retry after backoff + return c.retryJoinGroup(ctx, topics, retries, join.Err) + case ErrMemberIdRequired: + // from JoinGroupRequest v4 onwards (due to KIP-394) if the client starts + // with an empty member id, it needs to get the assigned id from the + // response and send another join request with that id to actually join the + // group + c.memberID = join.MemberId + return c.retryJoinGroup(ctx, topics, retries+1 /*keep retry time*/, join.Err) + case ErrFencedInstancedId: + if c.groupInstanceId != nil { + Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId) + } + return nil, join.Err + default: + return nil, join.Err + } + + c.generationID = join.GenerationId + + var strategy BalanceStrategy + var ok bool + if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy == nil { + strategy, ok = c.findStrategy(join.GroupProtocol, c.config.Consumer.Group.Rebalance.GroupStrategies) + if !ok { + // this case shouldn't happen in practice, since the leader will choose the protocol + // that all the members support + return nil, fmt.Errorf("unable to find selected strategy: %s", join.GroupProtocol) + } + } + c.strategy = strategy + + // Prepare distribution plan if we joined as the leader + var plan BalanceStrategyPlan + var members map[string]ConsumerGroupMemberMetadata + + if join.LeaderId == join.MemberId { + members, err = join.GetMembers() + if err != nil { + return nil, err + } + + c.allSubscribedTopicPartitions, c.allSubscribedTopics, plan, err = c.balance(strategy, members) + if err != nil { + return nil, err + } + } + + // Sync consumer group + syncGroupResponse, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId, strategy) + if err != nil { + _ = coordinator.Close() + return nil, err + } + + switch syncGroupResponse.Err { + case ErrNoError: + c.memberID = join.MemberId + c.generationID = join.GenerationId + case ErrUnknownMemberId, ErrIllegalGeneration: + // reset member ID and retry immediately + c.memberID = "" + return c.joinGroup(ctx, topics, retries) + case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress: + return c.retryJoinGroup(ctx, topics, retries, syncGroupResponse.Err) + case ErrFencedInstancedId: + if c.groupInstanceId != nil { + Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId) + } + return nil, syncGroupResponse.Err + default: + return nil, syncGroupResponse.Err + } + if join.LeaderId == join.MemberId { + c.isLeader = true + } + + if c.isLeader && c.protocol == COOPERATIVE && c.strategy.Name() != CooperativeStickyBalanceStrategyName { + err = validateCooperativeAssignment(members, plan) + if err != nil { + return nil, err + } + } + + return syncGroupResponse, nil +} + +func (c *consumerGroup) retryJoinGroup(ctx context.Context, topics []string, retries int, previousErr error) (*SyncGroupResponse, error) { + if retries <= 0 { + return nil, previousErr + } + + nextRetryTimer := time.NewTimer(c.config.Consumer.Group.Rebalance.Retry.Backoff) + defer nextRetryTimer.Stop() + + select { + case <-ctx.Done(): + return nil, previousErr + case <-nextRetryTimer.C: + return c.joinGroup(ctx, topics, retries) + case <-c.closed: + return nil, ErrClosedConsumerGroup + } +} + +// Used by COOPERATIVE rebalance protocol only. + +// Validate the assignments returned by the BalanceStrategy such that no owned partitions are going to +// be reassigned to a different consumer directly: if the BalanceStrategy wants to reassign an owned partition, +// it must first remove it from the new assignment of the current owner so that it is not assigned to any +// member, and then in the next rebalance it can finally reassign those partitions not owned by anyone to consumers. +func validateCooperativeAssignment(previousAssignment map[string]ConsumerGroupMemberMetadata, currentAssignment BalanceStrategyPlan) error { + set := computePartitionsTransferringOwnership(previousAssignment, currentAssignment) + if len(set) > 0 { + var topicPartitions []string + for k := range set { + topicPartitions = append(topicPartitions, fmt.Sprintf("%s/%d", k.Topic, k.Partition)) + } + return fmt.Errorf("in the customized cooperative rebalance strategy, "+ + "topic-partitions %v should be revoked before reassigning them to other consumers", topicPartitions) + } + return nil +} + // Pause implements ConsumerGroup. func (c *consumerGroup) Pause(partitions map[string][]int32) { c.consumer.Pause(partitions) @@ -302,6 +655,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler switch join.Err { case ErrNoError: c.memberID = join.MemberId + c.generationID = join.GenerationId case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately c.memberID = "" @@ -417,15 +771,14 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler sort.Sort(int32Slice(partitions)) } } - - session, err := newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler) + session, err := newConsumerGroupSession(ctx, c, claims, handler) if err != nil { return nil, err } // only the leader needs to check whether there are newly-added partitions in order to trigger a rebalance if join.LeaderId == join.MemberId { - go c.loopCheckPartitionNumbers(allSubscribedTopicPartitions, allSubscribedTopics, session) + go c.loopCheckPartitionNumbers(session.ctx, session.cancel, allSubscribedTopicPartitions, allSubscribedTopics) } return session, err @@ -464,9 +817,15 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( } meta := &ConsumerGroupMemberMetadata{ + Version: 1, Topics: topics, UserData: c.userData, } + + for topic, partitions := range c.ownedPartitions { + meta.OwnedPartitions = append(meta.OwnedPartitions, &OwnedPartition{Topic: topic, Partitions: partitions}) + } + var strategy BalanceStrategy if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil { if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { @@ -680,12 +1039,12 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { } } -func (c *consumerGroup) loopCheckPartitionNumbers(allSubscribedTopicPartitions map[string][]int32, topics []string, session *consumerGroupSession) { +func (c *consumerGroup) loopCheckPartitionNumbers(ctx context.Context, cancelFunc context.CancelFunc, allSubscribedTopicPartitions map[string][]int32, topics []string) { if c.config.Metadata.RefreshFrequency == time.Duration(0) { return } - defer session.cancel() + defer cancelFunc() oldTopicToPartitionNum := make(map[string]int, len(allSubscribedTopicPartitions)) for topic, partitions := range allSubscribedTopicPartitions { @@ -703,17 +1062,16 @@ func (c *consumerGroup) loopCheckPartitionNumbers(allSubscribedTopicPartitions m Logger.Printf( "consumergroup/%s loop check partition number goroutine find partitions in topics %s changed from %d to %d\n", c.groupID, topics, num, newTopicToPartitionNum[topic]) - return // trigger the end of the session on exit + return // trigger defer cancelFunc() } } } select { case <-pause.C: - case <-session.ctx.Done(): + case <-ctx.Done(): Logger.Printf( "consumergroup/%s loop check partition number goroutine will exit, topics %s\n", c.groupID, topics) - // if session closed by other, should be exited return case <-c.closed: return @@ -736,6 +1094,122 @@ func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int return topicToPartitionNum, nil } +func (c *consumerGroup) revokedPartitions(revokedPartitions map[string][]int32) error { + Logger.Printf("revoking partitions: %v", c.groupID, revokedPartitions) + + // close revoked partition consumers + c.removeClaims(revokedPartitions) + + c.handlerV2.Cleanup(c.offsetManager, revokedPartitions) + + // close partition offset managers for revoked partitions + if err := c.offsetManager.RemovePartitions(revokedPartitions); err != nil { + Logger.Printf("error when removing partition offset managers for %v, err: %v", revokedPartitions, err) + return err + } + return nil +} + +func (c *consumerGroup) revokedOwnedPartitions() { + if len(c.ownedPartitions) > 0 { + err := c.revokedPartitions(c.ownedPartitions) + if err != nil { + Logger.Printf("error revoking owned partitions: %v", err) + } + c.ownedPartitions = make(map[string][]int32) + } +} + +func (c *consumerGroup) startNewPartitions(newAssignedPartitions map[string][]int32) error { + // create partition offset managers for each new assigned partitions + for topic, partitions := range newAssignedPartitions { + for _, partition := range partitions { + pom, err := c.offsetManager.ManagePartition(topic, partition) + if err != nil { + Logger.Printf("unable to create partition offset manager for %s/%d, err: %v", topic, partition, err) + // todo maybe close all offset managers here + return err + } + + // handle POM errors + go func(topic string, partition int32) { + for err := range pom.Errors() { + c.handleError(err, topic, partition) + } + }(topic, partition) + } + } + + c.handlerV2.Setup(c.offsetManager, newAssignedPartitions) + + var errs ConsumerErrors + var errsLock sync.Mutex + + var wg sync.WaitGroup + // create partition consumers for each new assigned partitions + for topic, partitions := range newAssignedPartitions { + for _, partition := range partitions { + wg.Add(1) + // get next offset + go func(topic string, partition int32) { + defer wg.Done() + + offset := c.config.Consumer.Offsets.Initial + if pom := c.offsetManager.findPOM(topic, partition); pom != nil { + offset, _ = pom.NextOffset() + } + + claim, err := c.newConsumerGroupClaim(topic, partition, offset) + if err != nil { + Logger.Printf("unable to create consumer group claim for %s/%d, err: %v", topic, partition, err) + errsLock.Lock() + errs = append(errs, &ConsumerError{ + Topic: topic, + Partition: partition, + Err: err, + }) + errsLock.Unlock() + return + } + pc, err := c.addClaim(claim) + if err != nil { + Logger.Printf("unable to add consumer group claim for %s/%d, err: %v", topic, partition, err) + errsLock.Lock() + errs = append(errs, &ConsumerError{ + Topic: topic, + Partition: partition, + Err: err, + }) + errsLock.Unlock() + return + } + + // handle errors + go func(pc *partitionClaim) { + for err := range pc.claim.Errors() { + c.handleError(err, pc.claim.topic, pc.claim.partition) + } + }(pc) + + c.wg.Add(1) + go func(pc *partitionClaim) { + defer c.wg.Done() + + pc.wg.Add(1) + defer pc.wg.Done() + c.handlerV2.ConsumeClaim(pc.ctx, c.offsetManager, claim) + }(pc) + }(topic, partition) + } + } + wg.Wait() + + if len(errs) > 0 { + return errs + } + return nil +} + // -------------------------------------------------------------------- // ConsumerGroupSession represents a consumer group member session. @@ -799,12 +1273,12 @@ type consumerGroupSession struct { hbDying, hbDead chan none } -func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { +func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { // init context ctx, cancel := context.WithCancel(ctx) // init offset manager - offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client, cancel) + offsets, err := newOffsetManagerFromClient(parent.groupID, parent.memberID, parent.generationID, parent.client, cancel) if err != nil { return nil, err } @@ -812,8 +1286,8 @@ func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims // init session sess := &consumerGroupSession{ parent: parent, - memberID: memberID, - generationID: generationID, + memberID: parent.memberID, + generationID: parent.generationID, handler: handler, offsets: offsets, claims: claims, @@ -915,7 +1389,7 @@ func (s *consumerGroupSession) consume(topic string, partition int32) { } // create new claim - claim, err := newConsumerGroupClaim(s, topic, partition, offset) + claim, err := s.parent.newConsumerGroupClaim(topic, partition, offset) if err != nil { s.parent.handleError(err, topic, partition) return @@ -1075,6 +1549,31 @@ type ConsumerGroupHandler interface { ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error } +// ConsumerGroupHandlerV2 instances are used to handle individual topic/partition claims. +// It also provides hooks triggered when adding new partitions or revoking existing ones. +// +// The difference with ConsumerGroupHandler interface is that ConsumerGroupHandlerV2 supports COOPERATIVE rebalancing protocol. +// You should always pass the same ConsumerGroupHandlerV2 instance in ConsumeV2, +// unless you want to change the handler implementation during the lifetime of the consumer group. +// +// PLEASE NOTE that ConsumeClaim is likely be called from several goroutines concurrently, +// ensure that all state is safely protected against race conditions. +type ConsumerGroupHandlerV2 interface { + // Setup runs at the beginning of setting up new assigned partitions, before ConsumeClaim. + // For EAGER rebalance strategy, this is to set up all assigned partitions. + // For COOPERATIVE rebalance strategy, this is only to set up new assigned partitions. + Setup(offsetManger OffsetManager, newAssignedPartitions map[string][]int32) + + // Cleanup runs after ConsumeClaim, but before the offsets are committed for the claim. + // For EAGER rebalance strategy, this is to clean up all assigned partitions. + // For COOPERATIVE rebalance strategy, this is only to clean up revoked partitions. + Cleanup(offsetManger OffsetManager, revokedPartitions map[string][]int32) + + // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). + // Once ctx is done, ConsumeClaim should return as soon as possible. + ConsumeClaim(ctx context.Context, offsetManger OffsetManager, claim ConsumerGroupClaim) +} + // ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group. type ConsumerGroupClaim interface { // Topic returns the consumed topic name. @@ -1106,12 +1605,12 @@ type consumerGroupClaim struct { PartitionConsumer } -func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) { - pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset) +func (c *consumerGroup) newConsumerGroupClaim(topic string, partition int32, offset int64) (*consumerGroupClaim, error) { + pcm, err := c.consumer.ConsumePartition(topic, partition, offset) - if errors.Is(err, ErrOffsetOutOfRange) && sess.parent.config.Consumer.Group.ResetInvalidOffsets { - offset = sess.parent.config.Consumer.Offsets.Initial - pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset) + if errors.Is(err, ErrOffsetOutOfRange) && c.config.Consumer.Group.ResetInvalidOffsets { + offset = c.config.Consumer.Offsets.Initial + pcm, err = c.consumer.ConsumePartition(topic, partition, offset) } if err != nil { return nil, err @@ -1119,7 +1618,7 @@ func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition i go func() { for err := range pcm.Errors() { - sess.parent.handleError(err, topic, partition) + c.handleError(err, topic, partition) } }() @@ -1131,6 +1630,229 @@ func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition i }, nil } +// todo: make it concurrent +func (c *consumerGroup) addClaim(claim *consumerGroupClaim) (*partitionClaim, error) { + ctx, cancelFunc := context.WithCancel(context.Background()) + // todo check duplication + pc := &partitionClaim{ + claim: claim, + ctx: ctx, + cancelFunc: cancelFunc, + } + + err := c.setPartitionClaim(pc) + if err != nil { + return nil, err + } + return pc, nil +} + +func (c *consumerGroup) removeClaims(revokedPartitions map[string][]int32) { + var wg sync.WaitGroup + + for topic, partitions := range revokedPartitions { + for _, partition := range partitions { + wg.Add(1) + go func(topic string, partition int32) { + defer wg.Done() + + pc := c.getPartitionClaim(topic, partition) + if pc == nil { + return + } + + pc.cancelFunc() + pc.claim.AsyncClose() + + // wait until ConsumerClaim goroutine returns + pc.wg.Wait() + // wait until claim is closed + for _, err := range pc.claim.waitClosed() { + c.handleError(err, topic, partition) + } + }(topic, partition) + } + } + wg.Wait() + + for topic, partitions := range revokedPartitions { + for _, partition := range partitions { + c.removePartitionClaim(topic, partition) + } + } +} + +func (c *consumerGroup) getPartitionClaim(topic string, partition int32) *partitionClaim { + c.claimsLock.RLock() + defer c.claimsLock.RUnlock() + + topicClaims, ok := c.claims[topic] + if !ok { + return nil + } + pc, ok := topicClaims[partition] + if !ok { + return nil + } + return pc +} + +func (c *consumerGroup) setPartitionClaim(pc *partitionClaim) error { + c.claimsLock.Lock() + defer c.claimsLock.Unlock() + + if _, ok := c.claims[pc.claim.topic]; !ok { + c.claims[pc.claim.topic] = make(map[int32]*partitionClaim) + } + if _, ok := c.claims[pc.claim.topic][pc.claim.partition]; ok { + // safeguard, should never happen + return fmt.Errorf("partition claim for %s/%d already exists", pc.claim.topic, pc.claim.partition) + } + c.claims[pc.claim.topic][pc.claim.partition] = pc + return nil +} + +func (c *consumerGroup) removePartitionClaim(topic string, partition int32) { + c.claimsLock.Lock() + defer c.claimsLock.Unlock() + + delete(c.claims[topic], partition) + if len(c.claims[topic]) == 0 { + delete(c.claims, topic) + } +} + +func (c *consumerGroup) joinPrepare(topics []string) (*Broker, error) { + // Refresh metadata for requested topics + if err := c.client.RefreshMetadata(topics...); err != nil { + return nil, err + } + + coordinator, err := c.client.Coordinator(c.groupID) + if err != nil { + return nil, err + } + + return coordinator, nil +} + +func (c *consumerGroup) startHeartbeatLoop(cancelFunc context.CancelFunc) (chan none, chan none) { + hbDone := make(chan none) + hbDying := make(chan none) + go func() { + defer close(hbDone) + + pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval) + defer pause.Stop() + + retryBackoff := time.NewTimer(c.config.Metadata.Retry.Backoff) + defer retryBackoff.Stop() + + retries := c.config.Metadata.Retry.Max + for { + coordinator, err := c.client.Coordinator(c.groupID) + if err != nil { + if retries <= 0 { + c.handleError(err, "", -1) + return + } + retryBackoff.Reset(c.config.Metadata.Retry.Backoff) + select { + case <-hbDying: + return + case <-retryBackoff.C: + retries-- + } + continue + } + + resp, err := c.heartbeatRequest(coordinator, c.memberID, c.generationID) + if err != nil { + _ = coordinator.Close() + + if retries <= 0 { + c.handleError(err, "", -1) + return + } + + retries-- + continue + } + + switch resp.Err { + case ErrNoError: + retries = c.config.Metadata.Retry.Max + case ErrRebalanceInProgress: + retries = c.config.Metadata.Retry.Max + cancelFunc() + case ErrUnknownMemberId, ErrIllegalGeneration: + retries = c.config.Metadata.Retry.Max + cancelFunc() + case ErrFencedInstancedId: + if c.groupInstanceId != nil { + Logger.Printf("heartbeat failed: group instance id %s has been fenced\n", *c.groupInstanceId) + } + c.handleError(resp.Err, "", -1) + retries = c.config.Metadata.Retry.Max + cancelFunc() + default: + c.handleError(resp.Err, "", -1) + Logger.Printf("heartbeat failed with unexpected error: %s\n", resp.Err) + retries = c.config.Metadata.Retry.Max + cancelFunc() + } + + select { + case <-pause.C: + case <-hbDying: + return + } + } + }() + return hbDying, hbDone +} + +func (c *consumerGroup) modifySubscribedTopicsAndListener(topics []string, handlerV2 ConsumerGroupHandlerV2) { + // for the first time + if c.handlerV2 == nil { + c.handlerV2 = handlerV2 + c.subscribedTopics = topics + return + } + + // if listener is changed, revoke all owned partitions + if c.handlerV2 != handlerV2 { + c.revokedOwnedPartitions() + c.handlerV2 = handlerV2 + c.subscribedTopics = topics + return + } + + // if topics are changed, only revoke removed topics + removedTopics := diffTopicSlice(c.subscribedTopics, topics) + + if len(removedTopics) > 0 { + removedTopicPartitions := make(map[string][]int32) + for topic, partitions := range c.ownedPartitions { + if _, ok := removedTopics[topic]; ok { + removedTopicPartitions[topic] = partitions + } + } + err := c.revokedPartitions(removedTopicPartitions) + if err != nil { + Logger.Printf("error revoking owned removed topic/partitions: %v", err) + } + } + c.subscribedTopics = topics +} + +type partitionClaim struct { + claim *consumerGroupClaim + ctx context.Context + cancelFunc context.CancelFunc + wg sync.WaitGroup +} + func (c *consumerGroupClaim) Topic() string { return c.topic } func (c *consumerGroupClaim) Partition() int32 { return c.partition } func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset } diff --git a/consumer_group_test.go b/consumer_group_test.go index 912b6aa4f3..ed32f7d5e4 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -224,3 +224,134 @@ func TestConsumerGroupSessionDoesNotRetryForever(t *testing.T) { wg.Wait() } + +func Test_validateCooperativeAssignment(t *testing.T) { + type args struct { + previousAssignment map[string]ConsumerGroupMemberMetadata + currentAssignment BalanceStrategyPlan + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "no previous assignment", + args: args{ + previousAssignment: nil, + currentAssignment: BalanceStrategyPlan{ + "member1": map[string][]int32{ + "topic1": {0, 1}, + }, + }, + }, + wantErr: false, + }, + { + name: "no current assignment", + args: args{ + previousAssignment: map[string]ConsumerGroupMemberMetadata{ + "member1": { + OwnedPartitions: []*OwnedPartition{ + { + Topic: "topic1", + Partitions: []int32{0, 1}, + }, + }, + }, + }, + currentAssignment: make(BalanceStrategyPlan), + }, + wantErr: false, + }, + { + name: "directly transfer one partition", + args: args{ + previousAssignment: map[string]ConsumerGroupMemberMetadata{ + "member1": { + OwnedPartitions: []*OwnedPartition{ + { + Topic: "topic1", + Partitions: []int32{0, 1}, + }, + }, + }, + "member2": { + OwnedPartitions: []*OwnedPartition{}, + }, + }, + currentAssignment: BalanceStrategyPlan{ + "member1": map[string][]int32{ + "topic1": {0}, + }, + "member2": map[string][]int32{ + "topic1": {1}, + }, + }, + }, + wantErr: true, + }, + { + name: "revoke one partition", + args: args{ + previousAssignment: map[string]ConsumerGroupMemberMetadata{ + "member1": { + OwnedPartitions: []*OwnedPartition{ + { + Topic: "topic1", + Partitions: []int32{0, 1}, + }, + }, + }, + "member2": { + OwnedPartitions: []*OwnedPartition{}, + }, + }, + currentAssignment: BalanceStrategyPlan{ + "member1": map[string][]int32{ + "topic1": {0}, + }, + "member2": map[string][]int32{}, + }, + }, + wantErr: false, + }, + { + name: "add one partition", + args: args{ + previousAssignment: map[string]ConsumerGroupMemberMetadata{ + "member1": { + OwnedPartitions: []*OwnedPartition{ + { + Topic: "topic1", + Partitions: []int32{0}, + }, + }, + }, + "member2": { + OwnedPartitions: []*OwnedPartition{}, + }, + }, + currentAssignment: BalanceStrategyPlan{ + "member1": map[string][]int32{ + "topic1": {0}, + }, + "member2": map[string][]int32{ + "topic1": {1}, + }, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateCooperativeAssignment(tt.args.previousAssignment, tt.args.currentAssignment) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index e3dbabbed7..3fec4994fd 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -84,9 +84,6 @@ func main() { config.Consumer.Offsets.Initial = sarama.OffsetOldest } - /** - * Setup a new Sarama consumer group - */ consumer := Consumer{ ready: make(chan bool), } diff --git a/examples/consumergroup_cooperative/README.md b/examples/consumergroup_cooperative/README.md new file mode 100644 index 0000000000..65ccee6eb4 --- /dev/null +++ b/examples/consumergroup_cooperative/README.md @@ -0,0 +1,9 @@ +# Consumergroup example + +This example shows you how to use the Sarama consumer group consumer. The example simply starts consuming the given Kafka topics and logs the consumed messages. + +```bash +$ go run main.go -brokers="127.0.0.1:9092" -topics="sarama" -group="example" +``` + +You can also toggle (pause/resume) the consumption by sending SIGUSR1 diff --git a/examples/consumergroup_cooperative/go.mod b/examples/consumergroup_cooperative/go.mod new file mode 100644 index 0000000000..62862a216f --- /dev/null +++ b/examples/consumergroup_cooperative/go.mod @@ -0,0 +1,7 @@ +module github.com/IBM/sarama/examples/consumergroup_cooperative + +go 1.16 + +require github.com/IBM/sarama v1.34.1 + +replace github.com/IBM/sarama => ../../ diff --git a/examples/consumergroup_cooperative/go.sum b/examples/consumergroup_cooperative/go.sum new file mode 100644 index 0000000000..3ce0c297db --- /dev/null +++ b/examples/consumergroup_cooperative/go.sum @@ -0,0 +1,117 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/consumergroup_cooperative/main.go b/examples/consumergroup_cooperative/main.go new file mode 100644 index 0000000000..add8fdd14d --- /dev/null +++ b/examples/consumergroup_cooperative/main.go @@ -0,0 +1,215 @@ +package main + +// SIGUSR1 toggle the pause/resume consumption +import ( + "context" + "errors" + "flag" + "log" + "os" + "os/signal" + "strings" + "sync" + "syscall" + + "github.com/IBM/sarama" +) + +// Sarama configuration options +var ( + brokers = "" + version = "" + group = "" + topics = "" + assignors = "" + oldest = true + verbose = false +) + +func init() { + flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list") + flag.StringVar(&group, "group", "", "Kafka consumer group definition") + flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version") + flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma separated list") + flag.StringVar(&assignors, "assignors", "cooperative-sticky", "Consumer group partition assignment strategies (range, roundrobin, sticky, cooperative-sticky)") + flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest") + flag.BoolVar(&verbose, "verbose", false, "Sarama logging") + // flag string slice + // https://stackoverflow.com/questions/1752414/how-to-receive-variable-number-of-arguments-as-a-command-line-flag + + flag.Parse() + + if len(brokers) == 0 { + panic("no Kafka bootstrap brokers defined, please set the -brokers flag") + } + + if len(topics) == 0 { + panic("no topics given to be consumed, please set the -topics flag") + } + + if len(group) == 0 { + panic("no Kafka consumer group defined, please set the -group flag") + } +} + +func main() { + sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Lshortfile|log.Lmsgprefix) + log.Println("Starting a new Sarama consumer") + + if verbose { + sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) + } + + version, err := sarama.ParseKafkaVersion(version) + if err != nil { + log.Panicf("Error parsing Kafka version: %v", err) + } + + /** + * Construct a new Sarama configuration. + * The Kafka cluster version has to be defined before the consumer/producer is initialized. + */ + config := sarama.NewConfig() + config.Version = version + + var strategies []sarama.BalanceStrategy + for _, assignor := range strings.Split(assignors, " ") { + switch assignor { + case "sticky": + strategies = append(strategies, sarama.NewBalanceStrategySticky()) + case "roundrobin": + strategies = append(strategies, sarama.NewBalanceStrategyRoundRobin()) + case "range": + strategies = append(strategies, sarama.NewBalanceStrategyRange()) + case "cooperative-sticky": + strategies = append(strategies, sarama.NewBalanceStrategyCooperativeSticky()) + default: + log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) + } + } + config.Consumer.Group.Rebalance.GroupStrategies = strategies + + if oldest { + config.Consumer.Offsets.Initial = sarama.OffsetOldest + } + + /** + * Setup a new Sarama consumer group + */ + consumer := Consumer{} + + ctx, cancel := context.WithCancel(context.Background()) + client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) + if err != nil { + log.Panicf("Error creating consumer group client: %v", err) + } + + consumptionIsPaused := false + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + var err error + for { + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + if err = client.ConsumeV2(ctx, strings.Split(topics, ","), &consumer); err != nil { + log.Panicf("Error from consumer: %v", err) + } + // return if consumer is closed + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return + } + } + }() + + log.Println("Sarama consumer up and running!...") + + sigusr1 := make(chan os.Signal, 1) + signal.Notify(sigusr1, syscall.SIGUSR1) + + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + + select { + case <-ctx.Done(): + log.Println("terminating: context cancelled") + case <-sigterm: + log.Println("terminating: via signal") + case <-sigusr1: + toggleConsumptionFlow(client, &consumptionIsPaused) + } + cancel() + if err = client.Close(); err != nil { + log.Panicf("Error closing client: %v", err) + } + wg.Wait() +} + +func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) { + if *isPaused { + client.ResumeAll() + log.Println("Resuming consumption") + } else { + client.PauseAll() + log.Println("Pausing consumption") + } + + *isPaused = !*isPaused +} + +// Consumer represents a Sarama consumer group consumer +type Consumer struct{} + +// Setup runs at the beginning of setting up new assigned partitions, before ConsumeClaim. +// For EAGER rebalance strategy, this is to set up all assigned partitions. +// For COOPERATIVE rebalance strategy, this is only to set up new assigned partitions. +func (consumer *Consumer) Setup(offsetManger sarama.OffsetManager, newAssignedPartitions map[string][]int32) { + log.Printf("newAssignedPartitions: %v", newAssignedPartitions) +} + +// Cleanup runs after ConsumeClaim, but before the offsets are committed for the claim. +// For EAGER rebalance strategy, this is to clean up all assigned partitions. +// For COOPERATIVE rebalance strategy, this is only to clean up revoked partitions. +func (consumer *Consumer) Cleanup(offsetManger sarama.OffsetManager, revokedPartitions map[string][]int32) { + log.Printf("revokedPartitions: %v", revokedPartitions) +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). +// Once ctx is done, ConsumeClaim should return as soon as possible. +func (consumer *Consumer) ConsumeClaim(ctx context.Context, om sarama.OffsetManager, claim sarama.ConsumerGroupClaim) { + // NOTE: + // Do not move the code below to a goroutine. + // The `ConsumeClaim` itself is called within a goroutine, see: + // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29 + for { + // `<-ctx.Done()` has a higher priority than `<-claim.Messages()` + select { + case <-ctx.Done(): + return + default: + } + + select { + // Should return immediately when `ctx.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/IBM/sarama/issues/1192 + case <-ctx.Done(): + return + + case message, ok := <-claim.Messages(): + if !ok { + log.Printf("message channel was closed") + return + } + log.Printf("received message topic:%s, partition:%d, offset:%d, value:%s", message.Topic, message.Partition, message.Offset, message.Value) + om.MarkMessage(message, "") + } + } +} diff --git a/metrics.go b/metrics.go index 7b7705f2e3..de8ad95c74 100644 --- a/metrics.go +++ b/metrics.go @@ -32,7 +32,7 @@ func getMetricNameForBroker(name string, broker *Broker) string { func getMetricNameForTopic(name string, topic string) string { // Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy // cf. KAFKA-1902 and KAFKA-2337 - return fmt.Sprintf(name+"-for-topic-%s", strings.Replace(topic, ".", "_", -1)) + return fmt.Sprintf(name+"-for-topic-%s", strings.ReplaceAll(topic, ".", "_")) } func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter { diff --git a/metrics_test.go b/metrics_test.go index c9144df38c..cfa25e7a4e 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -57,6 +57,7 @@ func (m *metricValidators) registerForBroker(broker *Broker, validator *metricVa m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator}) } +// nolint:unused func (m *metricValidators) registerForGlobalAndTopic(topic string, validator *metricValidator) { m.register(&metricValidator{validator.name, validator.validator}) m.register(&metricValidator{getMetricNameForTopic(validator.name, topic), validator.validator}) @@ -103,6 +104,7 @@ func countMeterValidator(name string, expectedCount int) *metricValidator { }) } +// nolint:unused func minCountMeterValidator(name string, minCount int) *metricValidator { return meterValidator(name, func(t *testing.T, meter metrics.Meter) { t.Helper() @@ -137,6 +139,7 @@ func countHistogramValidator(name string, expectedCount int) *metricValidator { }) } +// nolint:unused func minCountHistogramValidator(name string, minCount int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { t.Helper() @@ -161,6 +164,7 @@ func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *me }) } +// nolint:unused func minValHistogramValidator(name string, minMin int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { t.Helper() @@ -171,6 +175,7 @@ func minValHistogramValidator(name string, minMin int) *metricValidator { }) } +// nolint:unused func maxValHistogramValidator(name string, maxMax int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { t.Helper() diff --git a/offset_manager.go b/offset_manager.go index 332679fd77..ec50c83a53 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -1,6 +1,8 @@ package sarama import ( + "errors" + "fmt" "sync" "time" ) @@ -14,6 +16,35 @@ type OffsetManager interface { // topic/partition. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) + RemovePartitions(partitions map[string][]int32) error + + Update(memberID string, generation int32) + + // MarkOffset marks the provided offset, alongside a metadata string + // that represents the state of the partition consumer at that point in time. The + // metadata string can be used by another consumer to restore that state, so it + // can resume consumption. + // + // To follow upstream conventions, you are expected to mark the offset of the + // next message to read, not the last message read. Thus, when calling `MarkOffset` + // you should typically add one to the offset of the last consumed message. + // + // Note: calling MarkOffset does not necessarily commit the offset to the backend + // store immediately for efficiency reasons, and it may never be committed if + // your application crashes. This means that you may end up processing the same + // message twice, and your processing should ideally be idempotent. + MarkOffset(topic string, partition int32, offset int64, metadata string) + + // ResetOffset resets to the provided offset, alongside a metadata string that + // represents the state of the partition consumer at that point in time. Reset + // acts as a counterpart to MarkOffset, the difference being that it allows to + // reset an offset to an earlier or smaller value, where MarkOffset only + // allows incrementing the offset. cf MarkOffset for more details. + ResetOffset(topic string, partition int32, offset int64, metadata string) + + // MarkMessage marks a message as consumed. + MarkMessage(msg *ConsumerMessage, metadata string) + // Close stops the OffsetManager from managing offsets. It is required to call // this function before an OffsetManager object passes out of scope, as it // will otherwise leak memory. You must call this after all the @@ -35,6 +66,7 @@ type offsetManager struct { memberID string groupInstanceId *string generation int32 + updateLock sync.RWMutex broker *Broker brokerLock sync.RWMutex @@ -100,13 +132,81 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti } if topicManagers[partition] != nil { - return nil, ConfigurationError("That topic/partition is already being managed") + return nil, ConfigurationError(fmt.Sprintf("topic:%s/partition:%d is already being managed", topic, partition)) } topicManagers[partition] = pom return pom, nil } +func (om *offsetManager) RemovePartitions(topicPartitions map[string][]int32) error { + var errs ConsumerErrors + var errsLock sync.Mutex + + var wg sync.WaitGroup + for topic, partitions := range topicPartitions { + for _, partition := range partitions { + wg.Add(1) + go func(topic string, partition int32) { + defer wg.Done() + + om.pomsLock.RLock() + pom := om.poms[topic][partition] + om.pomsLock.RUnlock() + err := pom.Close() + if err != nil { + errsLock.Lock() + var consumerErrs ConsumerErrors + if errors.As(err, &consumerErrs) { + errs = append(errs, consumerErrs...) + } + errsLock.Unlock() + } + }(topic, partition) + } + } + wg.Wait() + + // flush one last time + if om.conf.Consumer.Offsets.AutoCommit.Enable { + for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ { + om.flushToBroker() + } + } + + om.pomsLock.Lock() + for topic, partitions := range topicPartitions { + for _, partition := range partitions { + delete(om.poms[topic], partition) + if len(om.poms[topic]) == 0 { + delete(om.poms, topic) + } + } + } + + om.pomsLock.Unlock() + if len(errs) > 0 { + return errs + } + return nil +} + +func (om *offsetManager) MarkOffset(topic string, partition int32, offset int64, metadata string) { + if pom := om.findPOM(topic, partition); pom != nil { + pom.MarkOffset(offset, metadata) + } +} + +func (om *offsetManager) ResetOffset(topic string, partition int32, offset int64, metadata string) { + if pom := om.findPOM(topic, partition); pom != nil { + pom.ResetOffset(offset, metadata) + } +} + +func (om *offsetManager) MarkMessage(msg *ConsumerMessage, metadata string) { + om.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata) +} + func (om *offsetManager) Close() error { om.closeOnce.Do(func() { // exit the mainLoop @@ -248,6 +348,14 @@ func (om *offsetManager) mainLoop() { } } +func (om *offsetManager) Update(memberID string, generation int32) { + om.updateLock.Lock() + defer om.updateLock.Unlock() + + om.memberID = memberID + om.generation = generation +} + func (om *offsetManager) Commit() { om.flushToBroker() om.releasePOMs(false) @@ -277,12 +385,15 @@ func (om *offsetManager) flushToBroker() { } func (om *offsetManager) constructRequest() *OffsetCommitRequest { + om.updateLock.RLock() r := &OffsetCommitRequest{ Version: 1, ConsumerGroup: om.group, ConsumerID: om.memberID, ConsumerGroupGeneration: om.generation, } + om.updateLock.RUnlock() + // Version 1 adds timestamp and group membership information, as well as the commit timestamp. // // Version 2 adds retention time. It removes the commit timestamp added in version 1. @@ -369,6 +480,14 @@ func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest case ErrNoError: block := req.blocks[pom.topic][pom.partition] pom.updateCommitted(block.offset, block.metadata) + case ErrRebalanceInProgress: + // do nothing here + case ErrIllegalGeneration: + // - For EAGER protocol: commit request with a stale generation is rejected by the coordinator. + // - For COOPERATIVE protocol: the partition is not assigned to this consumer in the new generation. + // Normally users don't need to do anything(include retry), Kafka has at-lease-once semantics + // and these messages will be redelivered to the consumer in the new generation. + pom.handleError(err) case ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer: // not a critical error, we just need to redispatch @@ -605,6 +724,7 @@ func (pom *partitionOffsetManager) AsyncClose() { func (pom *partitionOffsetManager) Close() error { pom.AsyncClose() + pom.release() var errors ConsumerErrors for err := range pom.errors { errors = append(errors, err) diff --git a/offset_manager_test.go b/offset_manager_test.go index af95bc9e4c..6e168db8c5 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -534,6 +534,9 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { pom.MarkOffset(100, "modified_meta") + // flaky test: wait for sending commit requests + time.Sleep(10 * time.Millisecond) + err := pom.Close() if err != nil { t.Error(err) diff --git a/utils.go b/utils.go index 748d664126..82d2f1d3d7 100644 --- a/utils.go +++ b/utils.go @@ -264,6 +264,7 @@ var ( DefaultVersion = V2_1_0_0 // reduced set of protocol versions to matrix test + // nolint:unused fvtRangeVersions = []KafkaVersion{ V0_8_2_2, V0_10_2_2, @@ -313,3 +314,40 @@ func (v KafkaVersion) String() string { return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2]) } + +func diffAssignment(map1 map[string][]int32, map2 map[string][]int32) map[string][]int32 { + set := make(map[string]map[int32]bool) + for topic, partitions := range map2 { + if _, exist := set[topic]; !exist { + set[topic] = make(map[int32]bool) + } + for _, partition := range partitions { + set[topic][partition] = true + } + } + + diff := make(map[string][]int32) + for topic, partitions := range map1 { + for _, partition := range partitions { + if _, exist := set[topic][partition]; !exist { + diff[topic] = append(diff[topic], partition) + } + } + } + return diff +} + +func diffTopicSlice(s1 []string, s2 []string) map[string]bool { + set := make(map[string]bool) + for _, s := range s2 { + set[s] = true + } + + diff := make(map[string]bool) + for _, s := range s1 { + if _, exist := set[s]; !exist { + diff[s] = true + } + } + return diff +}