Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: another Receive() with hooks #621

Open
DriverX opened this issue Aug 28, 2024 · 5 comments
Open

PubSub: another Receive() with hooks #621

DriverX opened this issue Aug 28, 2024 · 5 comments
Labels

Comments

@DriverX
Copy link

DriverX commented Aug 28, 2024

We use short-lived pubsub channels. And typical case is:

  • subscribe to channel
  • check key in redis or wait message in channel
  • unsubscribe from channel

Sometimes this cycle is really short in time.

Problem

  1. client.Receive() If using first pattern from doc via client.Receive() i can't guarantee that subsequent UNSUBSCRIBE call will be done strongly after SUBSCRIBE. Possible situation:

    • do client.Receive() with SUBSCRIBE in another goroutine
    • do something and after conditions we decide unsubscribe from channel
    • do client.Do() with UNSUBSCRIBE command
    • goroutine with SUBSCRIBE wake up to late and register subscribe handler in internal structures forever and handler never
    • receive unsubscribe message
  2. If using alternative pattern with dedicated client i have another problem - too many open connections: connection per channel. We have hundreds or even thousands channels concurrently and no one connection pool can't handle this or ensure predictable behavior.

Reproduce problem

package main

import (
	"context"
	"flag"
	"fmt"
	"time"

	"github.com/redis/rueidis"
)

func main() {
	var redisAddr = flag.String("redis-addr", "", "redis address")
	flag.Parse()

	client, err := rueidis.NewClient(rueidis.ClientOption{
		InitAddress:  []string{*redisAddr},
		DisableCache: true,
	})
	if err != nil {
		panic(err)
	}
	defer client.Close()

	ctx := context.Background()

	chName := "chan:{123}"
	subscribeDoneCh := make(chan struct{})

	go func() {
		// emulate goroutine start lag
		// in real case is several nanosecs but otherwise
		time.Sleep(1000 * time.Millisecond)
		fmt.Printf("Initiate subscribe to %s\n", chName)
		err := client.Receive(ctx, client.B().Ssubscribe().Channel(chName).Build(), func(msg rueidis.PubSubMessage) {
			fmt.Printf("received message: %+v\n", msg)
		})
		fmt.Println("Subscribe done:", err)
		subscribeDoneCh <- struct{}{}
	}()
	fmt.Printf("Unsubscribe from %s\n", chName)
	err = client.Do(ctx, client.B().Sunsubscribe().Channel(chName).Build()).NonRedisError()
	if err != nil {
		panic(err)
	}
	err = client.Do(ctx, client.B().Spublish().Channel(chName).Message("test").Build()).NonRedisError()
	if err != nil {
		panic(err)
	}
	fmt.Println("Wait subscriber is done")
	<-subscribeDoneCh
}

This code stuck on

<-subscribeDoneCh

Solutions

I think if client.Receive() will have option wait subscribe success and post it knowledge somewhere here pipe.go#L691

	if ch, cancel := sb.Subscribe(args); ch != nil {
		defer cancel()
		if err := p.Do(ctx, subscribe).Error(); err != nil {
			return err
		}

		// HERE
		subscribeDoneChan <- true

Or another option. New version of client.Receive() with additional callback function with PubSubSubscription when received (s|p)subscribe response from Redis. For example:

	subscribeSuccessCh := make(chan bool)
	go func() {
		err := client.ReceiveWithConfirmation(
			ctx,
			client.B().Ssubscribe().Channel(chName).Build(),
			func(msg rueidis.PubSubMessage) {
				fmt.Printf("received message: %+v\n", msg)
			},
			func(sub rueidis.PubSubSubscription) {
				if sub.Kind == "ssubscribe" {
					subscribeSuccessCh <- true
				}
			},
		)
		fmt.Println("Subscribe done:", err)
		subscribeDoneCh <- struct{}{}
	}()
	<-subscribeSuccessCh

	// do something useful

	fmt.Printf("Unsubscribe from %s\n", chName)
	err = client.Do(ctx, client.B().Sunsubscribe().Channel(chName).Build()).NonRedisError()
	if err != nil {
		panic(err)
	}

I will try to make PR with a new additional method.

@rueian
Copy link
Collaborator

rueian commented Aug 29, 2024

Hi @DriverX,

Thank you for raising the issue. I do think the current Receive lacks a confirmation mechanism. However, given that the Receive will try reconnection automatically, the confirmation form is hard to design. Callbacks is a good idea. We can re-use the PubSubHooks struct for passing callbacks.

Does the possible memory leak in the title refer to the too many connections issue?

@rueian rueian added the feature label Aug 29, 2024
@DriverX
Copy link
Author

DriverX commented Aug 29, 2024

Does the possible memory leak in the title refer to the too many connections issue?

I wrote about memory leak based on this point

  • goroutine with SUBSCRIBE wake up to late and register subscribe handler in internal structures forever and handler never

UNSUBSCRIBE done before SUBSCRIBE command sent. I think in this case client.Receive() store subscriber internal data and callback function for the whole lifetime of the connection. This is potential memory leak for both sides - redis and application.

And again for a better understanding of my case =)
In real case is important for us and may lead to race condition. For example:

I have 2 apps

  • app_executor - receive and execute some task and store result in redis and publish notification in pubsub
  • app_runner - send task to app_executor and wait notification from pubsub

And its apps works with 2 redis clusters:

  • redis_store - for store data
  • redis_pubsub - for notifications

Now some pseudocode

executor

task = queue.get()
result = executor.execute(task)
redis_store.set("task_result:" + task.id, result)
redis_pubsub.publish("task_result_channel:" + task.id, "1")

runner

queue.put(task)
// channel returns only after successful `SUBSCRIBE` redis response otherwise throw exception
channel = redis_pubsub.subscribe("task_result_channel:" + task.id)
result = redis_store.get("task_result:" + task.id)
if result == null {
    msg = channel.wait()
    result = redis_store.get("task_result:" + task.id)
}
// send `UNSUBSCRIBE` to redis_pubsub
channel.unsubscribe()

This code is race condition safe:

  • executor: store result -> publish to channel
  • runner: subscribe -> check store -> wait channel -> get stored result

But imagine if redis_pubsub.subscribe return channel before actually SUBSCRIBE call done, what client.Receive() do. Sequence of race:

  • runner: channel = redis_pubsub.subscribe("task_result_channel:" + task.id)
  • runner: result = redis_store.get("task_result:" + task.id) - returns null
  • executor: redis_store.set("task_result:" + task.id, result)
  • executor: redis_pubsub.publish("task_result_channel:" + task.id, "1") - message dissapear because no subscribers for channel in time
  • runner: finally SUBSCRIBE command sent to redis and we received response subscribe
  • runner: msg = channel.wait() - wait forever :(

@DriverX
Copy link
Author

DriverX commented Aug 29, 2024

We can re-use the PubSubHooks struct for passing callbacks.

Sounds good!

@rueian
Copy link
Collaborator

rueian commented Aug 29, 2024

Hi @DriverX,

Thank you for your detailed explanation. If you are using Redis > 6, I think there is a simpler solution by leveraging invalidation notifications of client-side-caching and no pubsub cluster needed:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/redis/rueidis"
)

func main() {
	var mu sync.Mutex
	channels := make(map[string]chan struct{})

	client, _ := rueidis.NewClient(rueidis.ClientOption{
		OnInvalidations: func(messages []rueidis.RedisMessage) {
			mu.Lock()
			defer mu.Unlock()
			for _, message := range messages {
				key, _ := message.ToString()
				if ch, ok := channels[key]; ok {
					delete(channels, key)
					close(ch)
				}
			}
		},
	})
	
	key := "task_result:ooxx"

RETRY:
	ch := make(chan struct{})
	mu.Lock()
	channels[key] = ch
	mu.Unlock()
	result, err := client.DoCache(context.Background(), client.B().Get().Key(key).Cache(), time.Second).ToString()
	if rueidis.IsRedisNil(err) {
		<-ch
		result, err = client.DoCache(context.Background(), client.B().Get().Key(key).Cache(), time.Second).ToString()
	} else {
		mu.Lock()
		delete(channels, key)
		mu.Unlock()
	}
	if rueidis.IsRedisNil(err) {
		goto RETRY
	}
	if err != nil {
		panic(err)
	}
	fmt.Println(result)
}

@DriverX
Copy link
Author

DriverX commented Aug 29, 2024

If you are using Redis > 6, I think there is a simpler solution by leveraging invalidation notifications of client-side-caching and no pubsub cluster needed:

We use Redis 7.0 in production. And this is very interesting suggestion. I think about it. But now i would like to use pubsub scheme because big piece of infrastructure based on this pattern and unfortunately not all client support client side caching.

@rueian rueian changed the title PubSub: subscribe->unsubscribe race condition and possible memory leak PubSub: another Receive() with hooks Aug 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants