diff --git a/cmd/openqa-mon/openqa-mon.go b/cmd/openqa-mon/openqa-mon.go index 9667cdc..bc74641 100644 --- a/cmd/openqa-mon/openqa-mon.go +++ b/cmd/openqa-mon/openqa-mon.go @@ -360,50 +360,91 @@ func registerRabbitMQ(tui *TUI, openqaURI string, remote string, topics []string if err != nil { return rmq, fmt.Errorf("RabbitMQ connection error: %s", err) } - for _, topic := range topics { - sub, err := rmq.Subscribe(topic) - if err != nil { - return rmq, fmt.Errorf("RabbitMQ subscribe error: %s", err) - } - // Receive function - go func() { - for { - if status, err := sub.ReceiveJobStatus(); err != nil { - // Receive failed - tui.SetStatus(fmt.Sprintf("rabbitmq recv error: %s", err)) - continue - // Ignore empty updates (status.ID == 0) - } else if status.ID != 0 { - if status.Type == "job.done" { - tui.SetStatus(fmt.Sprintf("Job %d - %s", status.ID, status.Result)) - // Update job, if present - if job, found := updateJobStatus(status, openqaURI); found { - tui.Update() - if config.Notify { - jobs := make([]gopenqa.Job, 0) - jobs = append(jobs, job) - NotifyJobsChanged(jobs) - } - } - } else if status.Type == "job.restarted" { - // Update the job that is being restarted - - if job, found := updateJob(status.ID, openqaURI); found { - tui.Update() - if config.Notify { - jobs := make([]gopenqa.Job, 0) - jobs = append(jobs, job) - NotifyJobsChanged(jobs) + + recvFunction := func(rmq *gopenqa.RabbitMQ) { + connectedFlag := make(chan int) + reconnects := 0 + + // Loop until closed + for !rmq.Closed() { + // Subscribe to all topics in their own goroutine. + // subscriptions notify us via the connectedFlag channel about error events. + for _, topic := range topics { + go func(topic string) { + sub, err := rmq.Subscribe(topic) + if err != nil { + tui.SetStatus(fmt.Sprintf("RabbitMQ subscribe error: %s", err)) + connectedFlag <- 0 // Notify that something's off + return + } + + for { + if status, err := sub.ReceiveJobStatus(); err != nil { + // Receive failed + tui.SetStatus(fmt.Sprintf("rabbitmq recv error: %s", err)) + connectedFlag <- 0 + return + // Ignore empty updates (status.ID == 0) + } else if status.ID != 0 { + if status.Type == "job.done" { + tui.SetStatus(fmt.Sprintf("Job %d - %s", status.ID, status.Result)) + // Update job, if present + if job, found := updateJobStatus(status, openqaURI); found { + tui.Update() + if config.Notify { + jobs := make([]gopenqa.Job, 0) + jobs = append(jobs, job) + NotifyJobsChanged(jobs) + } + } + } else if status.Type == "job.restarted" { + // Update the job that is being restarted + + if job, found := updateJob(status.ID, openqaURI); found { + tui.Update() + if config.Notify { + jobs := make([]gopenqa.Job, 0) + jobs = append(jobs, job) + NotifyJobsChanged(jobs) + } + } + } else { + // Unknown job status + tui.SetStatus(fmt.Sprintf("job %d: %s", status.ID, status.Type)) } } - } else { - // Unknown job status - tui.SetStatus(fmt.Sprintf("job %d: %s", status.ID, status.Type)) } + }(topic) + } + + // Wait for someone to notify us about a broken channel + if reconnects == 0 { + tui.SetStatus("RabbitMQ mode") + } else if reconnects == 1 { + tui.SetStatus("RabbitMQ mode (reconnected)") + } else { + tui.SetStatus(fmt.Sprintf("RabbitMQ mode (%dx reconnected)", reconnects)) + } + <-connectedFlag + rmq.Close() // Close for everyone and wait a bit before reconnecting + reconnects++ + tui.SetStatus(fmt.Sprintf("RabbitMQ reconnecting %d ...", reconnects)) + time.Sleep(time.Duration(2) * time.Second) + // Consume remaining signals + consuming := true + for consuming { + select { + case <-connectedFlag: + consuming = true + default: + consuming = false } } - }() + rmq.Reconnect() + tui.SetStatus(fmt.Sprintf("RabbitMQ reconnecting %d ...", reconnects)) + } } + go recvFunction(&rmq) return rmq, nil }