Skip to content

Commit

Permalink
Add reconnect handler for RabbitMQ
Browse files Browse the repository at this point in the history
Refactor the RabbitMQ receive routine to handle reconnects.
  • Loading branch information
grisu48 committed Jul 21, 2023
1 parent 93d1ab7 commit 5fb17ee
Showing 1 changed file with 69 additions and 39 deletions.
108 changes: 69 additions & 39 deletions cmd/openqa-mon/openqa-mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/grisu48/gopenqa"
)

const VERSION = "1.0.1"
const VERSION = "1.1.0"

var config Config
var tui TUI
Expand Down Expand Up @@ -360,50 +360,80 @@ func registerRabbitMQ(tui *TUI, openqaURI string, remote string, topics []string
if err != nil {
return rmq, fmt.Errorf("RabbitMQ connection error: %s", err)
}
for _, topic := range topics {
sub, err := rmq.Subscribe(topic)
if err != nil {
return rmq, fmt.Errorf("RabbitMQ subscribe error: %s", err)
}
// Receive function
go func() {
for {
if status, err := sub.ReceiveJobStatus(); err != nil {
// Receive failed
tui.SetStatus(fmt.Sprintf("rabbitmq recv error: %s", err))
continue
// Ignore empty updates (status.ID == 0)
} else if status.ID != 0 {
if status.Type == "job.done" {
tui.SetStatus(fmt.Sprintf("Job %d - %s", status.ID, status.Result))
// Update job, if present
if job, found := updateJobStatus(status, openqaURI); found {
tui.Update()
if config.Notify {
jobs := make([]gopenqa.Job, 0)
jobs = append(jobs, job)
NotifyJobsChanged(jobs)
}
}
} else if status.Type == "job.restarted" {
// Update the job that is being restarted

if job, found := updateJob(status.ID, openqaURI); found {
tui.Update()
if config.Notify {
jobs := make([]gopenqa.Job, 0)
jobs = append(jobs, job)
NotifyJobsChanged(jobs)

recvFunction := func(rmq *gopenqa.RabbitMQ) {
connectedFlag := make(chan int)

// Loop until closed
for !rmq.Closed() {
// Subscribe to all topics in their own goroutine.
// subscriptions notify us via the connectedFlag channel about error events.
for _, topic := range topics {
go func() {
sub, err := rmq.Subscribe(topic)
if err != nil {
tui.SetStatus(fmt.Sprintf("RabbitMQ subscribe error: %s", err))
connectedFlag <- 0 // Notify that something's off
return
}

for {
if status, err := sub.ReceiveJobStatus(); err != nil {
// Receive failed
tui.SetStatus(fmt.Sprintf("rabbitmq recv error: %s", err))
connectedFlag <- 0
return
// Ignore empty updates (status.ID == 0)
} else if status.ID != 0 {
if status.Type == "job.done" {
tui.SetStatus(fmt.Sprintf("Job %d - %s", status.ID, status.Result))
// Update job, if present
if job, found := updateJobStatus(status, openqaURI); found {
tui.Update()
if config.Notify {
jobs := make([]gopenqa.Job, 0)
jobs = append(jobs, job)
NotifyJobsChanged(jobs)
}
}
} else if status.Type == "job.restarted" {
// Update the job that is being restarted

if job, found := updateJob(status.ID, openqaURI); found {
tui.Update()
if config.Notify {
jobs := make([]gopenqa.Job, 0)
jobs = append(jobs, job)
NotifyJobsChanged(jobs)
}
}
} else {
// Unknown job status
tui.SetStatus(fmt.Sprintf("job %d: %s", status.ID, status.Type))
}
}
} else {
// Unknown job status
tui.SetStatus(fmt.Sprintf("job %d: %s", status.ID, status.Type))
}
}()
}

// Wait for someone to notify us about a broken channel
<-connectedFlag
rmq.Close() // Close for everyone and wait a bit before reconnecting
time.Sleep(time.Duration(2) * time.Second)
// Consume remaining signals
consuming := true
for consuming {
select {
case <-connectedFlag:
consuming = true
default:
consuming = false
}
}
}()
rmq.Reconnect()
}
}
go recvFunction(&rmq)
return rmq, nil
}

Expand Down

0 comments on commit 5fb17ee

Please sign in to comment.