Skip to content

Commit 1551ff6

Browse files
committed
Simplify response, confirm and returns handling
1 parent 6cd9f2e commit 1551ff6

File tree

3 files changed

+165
-250
lines changed

3 files changed

+165
-250
lines changed

client.go

Lines changed: 26 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -469,17 +469,10 @@ func (c *Client) runPublisher(ouputChan *amqp.Channel) {
469469
return
470470
}
471471

472-
if !c.confirmMode {
473-
// We're not in confirm mode so we confirm that we have sent
474-
// the request here.
475-
c.confirmRequest(request)
476-
477-
if !request.Reply {
478-
// Since we won't get a confirmation of this request and
479-
// we don't want to have a reply, just return nil to the
480-
// caller.
481-
c.respondToRequest(request, nil)
482-
}
472+
if !c.confirmMode && !request.Reply {
473+
// Since we won't get a confirmation of this request and we
474+
// don't want to have a reply, just return nil to the caller.
475+
c.respondToRequest(request, nil, nil)
483476
}
484477
}
485478
}
@@ -497,12 +490,8 @@ func (c *Client) retryRequest(request *Request, err error) {
497490
slogGroupFor("request", slogAttrsForRequest(request)),
498491
)
499492

500-
// We shouldn't wait for confirmations any more because they will never
501-
// arrive.
502-
c.confirmRequest(request)
503-
504493
// Return whatever error .Publish returned to the caller.
505-
c.respondErrorToRequest(request, err)
494+
c.respondToRequest(request, nil, err)
506495

507496
return
508497
}
@@ -534,7 +523,11 @@ func (c *Client) runConfirmsConsumer(confirms chan amqp.Confirmation, returns ch
534523
select {
535524
case ret, ok := <-returns:
536525
if !ok {
537-
return
526+
// Only the closing of confirms will exit this loop. Closing
527+
// the returns channel will do nothing. This ensures that we
528+
// don't exit until we have handled all confirmations.
529+
returns = nil
530+
continue
538531
}
539532

540533
request, ok := c.requestsMap.GetByCorrelationID(ret.CorrelationId)
@@ -584,10 +577,8 @@ func (c *Client) runConfirmsConsumer(confirms chan amqp.Confirmation, returns ch
584577
slog.String("correlation_id", request.Publishing.CorrelationId),
585578
)
586579

587-
c.confirmRequest(request)
588-
589580
if !confirm.Ack {
590-
c.respondErrorToRequest(request, ErrRequestRejected)
581+
c.respondToRequest(request, nil, ErrRequestRejected)
591582

592583
// Doesn't matter if the request wants the nil reply below because
593584
// we gave it an error instead.
@@ -596,65 +587,35 @@ func (c *Client) runConfirmsConsumer(confirms chan amqp.Confirmation, returns ch
596587

597588
// Check if the request was also returned.
598589
if request.returned != nil {
599-
c.respondErrorToRequest(
600-
request,
601-
fmt.Errorf("%w: %d, %s",
602-
ErrRequestReturned,
603-
request.returned.ReplyCode,
604-
request.returned.ReplyText,
605-
),
606-
)
590+
c.respondToRequest(request, nil, fmt.Errorf("%w: %d, %s",
591+
ErrRequestReturned,
592+
request.returned.ReplyCode,
593+
request.returned.ReplyText,
594+
))
607595

608596
continue
609597
}
610598

611599
if !request.Reply {
612600
// The request isn't expecting a reply so we need give a nil
613601
// response instead to signal that we're done.
614-
c.respondToRequest(request, nil)
602+
c.respondToRequest(request, nil, nil)
615603
}
616604
}
617605
}
618606
}
619607

620-
// respondErrorToRequest will return the provided response to the caller.
621-
func (c *Client) respondToRequest(request *Request, response *amqp.Delivery) {
608+
// respondToRequest will return the provided response to the caller.
609+
func (c *Client) respondToRequest(request *Request, d *amqp.Delivery, err error) {
622610
select {
623-
case request.response <- response:
611+
case request.response <- response{delivery: d, err: err}:
624612
return
625613
case <-request.Context.Done():
626614
c.logger.ErrorContext(request.Context,
627615
"nobody is waiting for response",
616+
slog.Any("error", errors.Join(context.Cause(request.Context), err)),
628617
slogGroupFor("request", slogAttrsForRequest(request)),
629-
slogGroupFor("delivery", slogAttrsForDelivery(response)),
630-
)
631-
}
632-
}
633-
634-
// respondErrorToRequest will return the provided error to the caller.
635-
func (c *Client) respondErrorToRequest(request *Request, err error) {
636-
select {
637-
case request.errChan <- err:
638-
return
639-
case <-request.Context.Done():
640-
c.logger.ErrorContext(request.Context,
641-
"nobody is waiting for error",
642-
slog.Any("error", err),
643-
slogGroupFor("request", slogAttrsForRequest(request)),
644-
)
645-
}
646-
}
647-
648-
// confirmRequest will mark the provided request as confirmed by the amqp
649-
// server.
650-
func (c *Client) confirmRequest(request *Request) {
651-
select {
652-
case request.confirmed <- struct{}{}:
653-
return
654-
case <-request.Context.Done():
655-
c.logger.ErrorContext(request.Context,
656-
"nobody is waiting for confirmation",
657-
slogGroupFor("request", slogAttrsForRequest(request)),
618+
slogGroupFor("delivery", slogAttrsForDelivery(d)),
658619
)
659620
}
660621
}
@@ -713,16 +674,7 @@ func (c *Client) runRepliesConsumer(inChan *amqp.Channel) error {
713674
slog.String("correlation_id", response.CorrelationId),
714675
)
715676

716-
responseCopy := response
717-
718-
select {
719-
case request.response <- &responseCopy:
720-
case <-request.Context.Done():
721-
c.logger.ErrorContext(request.Context,
722-
"nobody is waiting on response on request",
723-
slogGroupFor("request", slogAttrsForRequest(request)),
724-
)
725-
}
677+
c.respondToRequest(request, &response, nil)
726678
}
727679

728680
c.logger.Debug("replies consumer is done")
@@ -748,16 +700,7 @@ func (c *Client) send(r *Request) (*amqp.Delivery, error) {
748700
// If this request doesn't want a reply back (by setting Reply to false)
749701
// this channel will get a nil message after publisher has Published the
750702
// message.
751-
r.response = make(chan *amqp.Delivery)
752-
753-
// This channel is sent to when the request is confirmed. This can happen
754-
// both when confirm-mode is set. And if not set, it's automatically
755-
// confirmed once the request is published.
756-
r.confirmed = make(chan struct{})
757-
758-
// This is where we get any (client) errors if they occur before we could
759-
// even send the request.
760-
r.errChan = make(chan error)
703+
r.response = make(chan response)
761704

762705
// Set the correlation id on the publishing if not yet set.
763706
if r.Publishing.CorrelationId == "" {
@@ -791,35 +734,9 @@ func (c *Client) send(r *Request) (*amqp.Delivery, error) {
791734
slog.String("correlation_id", r.Publishing.CorrelationId),
792735
)
793736

794-
// We hang here until the request has been published (or when confirm-mode
795-
// is on; confirmed).
796-
select {
797-
case <-r.confirmed:
798-
// got confirmation.
799-
case <-r.Context.Done():
800-
err := context.Cause(r.Context)
801-
802-
c.logger.DebugContext(r.Context,
803-
"canceled while waiting for request confirmation",
804-
slog.Any("error", err),
805-
slog.String("correlation_id", r.Publishing.CorrelationId),
806-
)
807-
808-
return nil, fmt.Errorf("%w while waiting for confirmation", err)
809-
}
810-
811-
// All responses are published on the requests response channel. Hang here
812-
// until a response is received and close the channel when it's read.
813737
select {
814-
case err := <-r.errChan:
815-
c.logger.DebugContext(r.Context,
816-
"error for request",
817-
slog.Any("error", err),
818-
slog.String("correlation_id", r.Publishing.CorrelationId),
819-
)
820-
821-
return nil, err
822-
738+
case res := <-r.response:
739+
return res.delivery, res.err
823740
case <-r.Context.Done():
824741
err := context.Cause(r.Context)
825742

@@ -830,14 +747,6 @@ func (c *Client) send(r *Request) (*amqp.Delivery, error) {
830747
)
831748

832749
return nil, fmt.Errorf("%w while waiting for response", err)
833-
834-
case delivery := <-r.response:
835-
c.logger.DebugContext(r.Context,
836-
"got response",
837-
slog.String("correlation_id", r.Publishing.CorrelationId),
838-
)
839-
840-
return delivery, nil
841750
}
842751
}
843752

0 commit comments

Comments
 (0)