Skip to content

Commit

Permalink
Update codebase to stop using deprecated Publish
Browse files Browse the repository at this point in the history
Use the recommended PublishWithContext version. Made some minor syntax
corrections to some comments.

Signed-off-by: Aitor Pérez Cedres <[email protected]>
  • Loading branch information
Zerpet committed Jun 22, 2023
1 parent 2fea75e commit 596cb12
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 48 deletions.
25 changes: 13 additions & 12 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package amqp091

import (
"bytes"
"context"
"io"
"reflect"
"testing"
Expand Down Expand Up @@ -484,16 +485,16 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {

go func() {
var e error
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 1")}); e != nil {
if e = ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("pub 1")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 2")}); e != nil {
if e = ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("pub 2")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 3")}); e != nil {
if e = ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("pub 3")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 4")}); e != nil {
if e = ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("pub 4")}); e != nil {
t.Errorf("publish error: %v", err)
}
}()
Expand All @@ -507,16 +508,16 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {

go func() {
var e error
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 5")}); e != nil {
if e = ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("pub 5")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 6")}); e != nil {
if e = ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("pub 6")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 7")}); e != nil {
if e = ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("pub 7")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 8")}); e != nil {
if e = ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("pub 8")}); e != nil {
t.Errorf("publish error: %v", err)
}
}()
Expand Down Expand Up @@ -563,7 +564,7 @@ func TestDeferredConfirmations(t *testing.T) {

var results []*DeferredConfirmation
for i := 1; i < 5; i++ {
dc, err := ch.PublishWithDeferredConfirm("", "q", false, false, Publishing{Body: []byte("pub")})
dc, err := ch.PublishWithDeferredConfirmWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("pub")})
if err != nil {
t.Fatalf("failed to PublishWithDeferredConfirm: %v", err)
}
Expand Down Expand Up @@ -732,7 +733,7 @@ func TestPublishBodySliceIssue74(t *testing.T) {

for i := 0; i < publishings; i++ {
go func(ii int) {
if err := ch.Publish("", "q", false, false, Publishing{Body: base[0:ii]}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: base[0:ii]}); err != nil {
t.Errorf("publish error: %v", err)
}
}(i)
Expand Down Expand Up @@ -780,7 +781,7 @@ func TestPublishZeroFrameSizeIssue161(t *testing.T) {

for i := 0; i < publishings; i++ {
go func() {
if err := ch.Publish("", "q", false, false, Publishing{Body: []byte("anything")}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "", "q", false, false, Publishing{Body: []byte("anything")}); err != nil {
t.Errorf("publish error: %v", err)
}
}()
Expand Down Expand Up @@ -813,7 +814,7 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {

defer time.AfterFunc(500*time.Millisecond, func() { t.Fatalf("Publish deadlock") }).Stop()
for {
if err := ch.Publish("exchange", "q", false, false, Publishing{Body: []byte("test")}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "exchange", "q", false, false, Publishing{Body: []byte("test")}); err != nil {
t.Log("successfully caught disconnect error", err)
return
}
Expand Down
83 changes: 48 additions & 35 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,14 @@ func TestIntegrationBasicQueueOperations(t *testing.T) {
}
t.Logf("delete exchange (first) OK")

if _, err := channel.QueueInspect(queueName); err != nil {
if _, err := channel.QueueDeclarePassive(
queueName,
true,
false,
false,
false,
nil,
); err != nil {
t.Fatalf("inspect queue state after deleting exchange: %s", err)
}
t.Logf("queue properly remains after exchange is deleted")
Expand Down Expand Up @@ -669,13 +676,13 @@ func TestIntegrationPublishConsume(t *testing.T) {

messages, _ := sub.Consume(queue, "", false, false, false, false, nil)

if e := pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 1")}); e != nil {
if e := pub.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("pub 1")}); e != nil {
t.Fatalf("publish error: %v", e)
}
if e := pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 2")}); e != nil {
if e := pub.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("pub 2")}); e != nil {
t.Fatalf("publish error: %v", e)
}
if e := pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 3")}); e != nil {
if e := pub.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("pub 3")}); e != nil {
t.Fatalf("publish error: %v", e)
}

Expand Down Expand Up @@ -718,10 +725,10 @@ func TestIntegrationConsumeFlow(t *testing.T) {
t.Fatalf("error consuming: %v", err)
}

if e := pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 1")}); e != nil {
if e := pub.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("pub 1")}); e != nil {
t.Fatalf("error publishing: %v", e)
}
if e := pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 2")}); e != nil {
if e := pub.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("pub 2")}); e != nil {
t.Fatalf("error publishing: %v", e)
}

Expand Down Expand Up @@ -754,6 +761,7 @@ func TestIntegrationConsumeFlow(t *testing.T) {
}

func TestIntegrationRecoverNotImplemented(t *testing.T) {
// TODO: remove this when Channel.Recover is removed
queue := "test.recover"

if c, ch := integrationQueue(t, queue); c != nil {
Expand Down Expand Up @@ -794,7 +802,7 @@ func TestIntegrationConsumeCancel(t *testing.T) {

messages, _ := ch.Consume(queue, "integration-tag", false, false, false, false, nil)

if e := ch.Publish("", queue, false, false, Publishing{Body: []byte("1")}); e != nil {
if e := ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("1")}); e != nil {
t.Fatalf("error publishing: %v", e)
}

Expand All @@ -805,7 +813,7 @@ func TestIntegrationConsumeCancel(t *testing.T) {
t.Fatalf("error cancelling the consumer: %v", err)
}

if e := ch.Publish("", queue, false, false, Publishing{Body: []byte("2")}); e != nil {
if e := ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("2")}); e != nil {
t.Fatalf("error publishing: %v", e)
}

Expand Down Expand Up @@ -839,7 +847,7 @@ func TestIntegrationConsumeCancelWithContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
messages, _ := ch.ConsumeWithContext(ctx, queue, "integration-tag-with-context", false, false, false, false, nil)

if e := ch.Publish("", queue, false, false, Publishing{Body: []byte("1")}); e != nil {
if e := ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("1")}); e != nil {
t.Fatalf("error publishing: %v", e)
}

Expand All @@ -848,7 +856,7 @@ func TestIntegrationConsumeCancelWithContext(t *testing.T) {
cancel()
<-time.After(200 * time.Millisecond) // wait to call cancel asynchronously

if e := ch.Publish("", queue, false, false, Publishing{Body: []byte("2")}); e != nil {
if e := ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("2")}); e != nil {
t.Fatalf("error publishing: %v", e)
}

Expand Down Expand Up @@ -938,7 +946,7 @@ func TestQuickPublishOnly(t *testing.T) {
defer integrationQueueDelete(t, pub, queue)

chk := func(msg Publishing) bool {
return pub.Publish("", queue, false, false, msg) == nil
return pub.PublishWithContext(context.TODO(), "", queue, false, false, msg) == nil
}
if err := quick.Check(chk, nil); err != nil {
t.Fatalf("check error: %v", err)
Expand Down Expand Up @@ -969,7 +977,7 @@ func TestPublishEmptyBody(t *testing.T) {
t.Fatalf("Could not consume")
}

err = ch.Publish("", queue, false, false, Publishing{})
err = ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{})
if err != nil {
t.Fatalf("Could not publish")
}
Expand Down Expand Up @@ -1013,7 +1021,7 @@ func TestPublishEmptyBodyWithHeadersIssue67(t *testing.T) {
"ham": "spam",
}

err = ch.Publish("", queue, false, false, Publishing{Headers: headers})
err = ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Headers: headers})
if err != nil {
t.Fatalf("Could not publish")
}
Expand Down Expand Up @@ -1072,7 +1080,7 @@ func TestQuickPublishConsumeOnly(t *testing.T) {
if chkerr := quick.CheckEqual(
func(msg Publishing) []byte {
empty := Publishing{Body: msg.Body}
if pub.Publish("", queue, false, false, empty) != nil {
if pub.PublishWithContext(context.TODO(), "", queue, false, false, empty) != nil {
return []byte{'X'}
}
return msg.Body
Expand Down Expand Up @@ -1126,7 +1134,7 @@ func TestQuickPublishConsumeBigBody(t *testing.T) {
t.Fatalf("Failed to declare: %s", err)
}

err = pub.Publish("", queue, false, false, fixture)
err = pub.PublishWithContext(context.TODO(), "", queue, false, false, fixture)
if err != nil {
t.Fatalf("Could not publish big body")
}
Expand All @@ -1153,7 +1161,7 @@ func TestIntegrationGetOk(t *testing.T) {
t.Fatalf("Failed to declare: %s", err)
}

if err := ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("ok")}); err != nil {
t.Fatalf("Failed to publish: %s", err)
}

Expand Down Expand Up @@ -1211,7 +1219,7 @@ func TestIntegrationTxCommit(t *testing.T) {
t.Fatalf("tx.select failed")
}

if err := ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("ok")}); err != nil {
t.Fatalf("Failed to publish: %s", err)
}

Expand Down Expand Up @@ -1246,7 +1254,7 @@ func TestIntegrationTxRollback(t *testing.T) {
t.Fatalf("tx.select failed")
}

if err := ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("ok")}); err != nil {
t.Fatalf("Failed to publish: %s", err)
}

Expand Down Expand Up @@ -1275,7 +1283,7 @@ func TestIntegrationReturn(t *testing.T) {
ch.NotifyReturn(ret)

// mandatory publish to an exchange without a binding should be returned
if err := ch.Publish("", "return-without-binding", true, false, Publishing{Body: []byte("mandatory")}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "", "return-without-binding", true, false, Publishing{Body: []byte("mandatory")}); err != nil {
t.Fatalf("Failed to publish: %s", err)
}

Expand Down Expand Up @@ -1340,7 +1348,7 @@ func TestIntegrationConfirm(t *testing.T) {
t.Fatalf("could not confirm")
}

if err := ch.Publish("", "confirm", false, false, Publishing{Body: []byte("confirm")}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "", "confirm", false, false, Publishing{Body: []byte("confirm")}); err != nil {
t.Fatalf("Failed to publish: %s", err)
}

Expand Down Expand Up @@ -1407,7 +1415,7 @@ func TestRoundTripAllFieldValueTypes61(t *testing.T) {
t.Fatalf("Could not consume")
}

err = ch.Publish("", queue, false, false, Publishing{Body: []byte("ignored"), Headers: headers})
err = ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("ignored"), Headers: headers})
if err != nil {
t.Fatalf("Could not publish: %v", err)
}
Expand Down Expand Up @@ -1499,11 +1507,11 @@ func TestDeclareArgsRejectToDeadLetterQueue(t *testing.T) {
}()

// Publish the 'poison'
if err := ch.Publish(ex, q, true, false, Publishing{Body: []byte("ignored")}); err != nil {
if err := ch.PublishWithContext(context.TODO(), ex, q, true, false, Publishing{Body: []byte("ignored")}); err != nil {
t.Fatalf("publishing failed")
}

// spin-get until message arrives on the dead-letter queue with a
// spin-get until message arrives at the dead-letter queue with a
// synchronous parse to exercise the array field (x-death) set by the
// server relating to issue-56
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -1560,13 +1568,13 @@ func TestDeadlockConsumerIssue48(t *testing.T) {

for i := 0; i < cap(confirms); i++ {
// Fill the queue with some new or remaining publishings
if err := ch.Publish("", queue, false, false, Publishing{Body: []byte("")}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("")}); err != nil {
t.Fatalf("error publishing: %v", err)
}
}

for i := 0; i < cap(confirms); i++ {
// Wait for them to land on the queue so they'll be delivered on consume
// Wait for them to land on the queue, so they'll be delivered on consume
<-confirms
}

Expand Down Expand Up @@ -1619,7 +1627,7 @@ func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) {
}
break
}
err := ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")})
err := ch.PublishWithContext(context.TODO(), "not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")})
if err != nil {
if publishError, ok := err.(*Error); !ok || publishError.Code != 504 {
t.Fatalf("expected channel only exception i: %d j: %d error: %+v", i, j, publishError)
Expand Down Expand Up @@ -1666,7 +1674,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {
// Cause an asynchronous channel exception causing the server
// to send a "channel.close" method either before or after the next
// asynchronous method.
err = c1.Publish("nonexisting-exchange", "", false, false, Publishing{})
err = c1.PublishWithContext(context.TODO(), "nonexisting-exchange", "", false, false, Publishing{})
if err != nil {
t.Fatalf("failed to publish, got: %v", err)
}
Expand Down Expand Up @@ -1721,7 +1729,7 @@ func TestCorruptedMessageIssue7(t *testing.T) {
}

for i := 0; i < messageCount; i++ {
err := pub.Publish("", queue, false, false, Publishing{
err := pub.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{
Body: generateCrc32Random(t, 7*i),
})

Expand Down Expand Up @@ -1851,7 +1859,7 @@ func TestRabbitMQQueueTTLGet(t *testing.T) {
t.Fatalf("queue declare: %s", err)
}

if err := channel.Publish("", queue, false, false, Publishing{Body: []byte("ttl")}); err != nil {
if err := channel.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("ttl")}); err != nil {
t.Fatalf("error publishing: %v", err)
}

Expand Down Expand Up @@ -1884,10 +1892,10 @@ func TestRabbitMQQueueNackMultipleRequeue(t *testing.T) {
t.Fatalf("queue declare: %s", err)
}

if err := channel.Publish("", queue, false, false, Publishing{Body: []byte("1")}); err != nil {
if err := channel.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("1")}); err != nil {
t.Fatalf("error publishing: %v", err)
}
if err := channel.Publish("", queue, false, false, Publishing{Body: []byte("2")}); err != nil {
if err := channel.PublishWithContext(context.TODO(), "", queue, false, false, Publishing{Body: []byte("2")}); err != nil {
t.Fatalf("error publishing: %v", err)
}

Expand Down Expand Up @@ -2006,7 +2014,7 @@ func TestIntegrationGetNextPublishSeqNo(t *testing.T) {
t.Fatalf("wrong next publish seqence number before any publish, expected: %d, got: %d", 1, n)
}

if err := ch.Publish("test-get-next-pub-seq", "", false, false, Publishing{}); err != nil {
if err := ch.PublishWithContext(context.TODO(), "test-get-next-pub-seq", "", false, false, Publishing{}); err != nil {
t.Fatalf("publish error: %v", err)
}

Expand All @@ -2033,7 +2041,7 @@ func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) {
<-closed
}()

confirm, err := ch.PublishWithDeferredConfirm("test-issue44", "issue44", false, false, Publishing{Body: []byte("abc")})
confirm, err := ch.PublishWithDeferredConfirmWithContext(context.TODO(), "test-issue44", "issue44", false, false, Publishing{Body: []byte("abc")})
if err != nil {
t.Fatalf("PublishWithDeferredConfirm error: %v", err)
}
Expand All @@ -2055,7 +2063,12 @@ func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) {
// Returns a connection to the AMQP if the AMQP_URL environment
// variable is set and a connection can be established.
func integrationConnection(t *testing.T, name string) *Connection {
conn, err := Dial(amqpURL)
conf := defaultConfig()
if conf.Properties == nil {
conf.Properties = make(Table)
}
conf.Properties.SetClientConnectionName(name)
conn, err := DialConfig(amqpURL, conf)
if err != nil {
t.Fatalf("cannot dial integration server. Is the rabbitmq-server service running? %s", err)
return nil
Expand Down Expand Up @@ -2117,7 +2130,7 @@ func TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11(t *testing.T

conn.NotifyClose(make(chan *Error, 1))

_, err = ch.PublishWithDeferredConfirm("issue11", "issue11", false, false, Publishing{Body: []byte("abc")})
_, err = ch.PublishWithDeferredConfirmWithContext(context.TODO(), "issue11", "issue11", false, false, Publishing{Body: []byte("abc")})
if err != nil {
t.Fatalf("PublishWithDeferredConfirm error: %v", err)
}
Expand Down
Loading

0 comments on commit 596cb12

Please sign in to comment.