Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/e2e/log_queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *basicSuite) TestLogQueries_Attempts() {
s.Equal(setup.destinationID, first["destination_id"])
s.NotEmpty(first["status"])
s.NotEmpty(first["time"])
s.Equal(float64(0), first["attempt_number"])
s.Equal(float64(1), first["attempt_number"])
})

s.Run("filter by destination_id", func() {
Expand Down
12 changes: 6 additions & 6 deletions cmd/e2e/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func (s *basicSuite) TestRetry_FailedDeliveryAutoRetries() {
s.Require().GreaterOrEqual(len(resp.Models), 2)

for i, atm := range resp.Models {
s.Equal(float64(i), atm["attempt_number"],
"attempt %d should have attempt_number=%d (automated retry increments)", i, i)
s.Equal(float64(i+1), atm["attempt_number"],
"attempt %d should have attempt_number=%d (automated retry increments)", i, i+1)
}
}

Expand All @@ -51,14 +51,14 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() {
// Wait for initial delivery
s.waitForNewAttempts(tenant.ID, 1)

// Verify first attempt has attempt_number=0
// Verify first attempt has attempt_number=1
var attResp struct {
Models []map[string]any `json:"models"`
}
status := s.doJSON(http.MethodGet, s.apiURL("/attempts?tenant_id="+tenant.ID+"&event_id="+eventID), nil, &attResp)
s.Require().Equal(http.StatusOK, status)
s.Require().NotEmpty(attResp.Models)
s.Equal(float64(0), attResp.Models[0]["attempt_number"])
s.Equal(float64(1), attResp.Models[0]["attempt_number"])

// Manual retry
retryStatus := s.retryEvent(eventID, dest.ID)
Expand All @@ -75,9 +75,9 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() {
s.Require().Equal(http.StatusOK, status)
s.Require().Len(verifyResp.Models, 2)

// Both should have attempt_number=0 (manual retry resets)
// Both should have attempt_number=1 (manual retry resets)
for _, atm := range verifyResp.Models {
s.Equal(float64(0), atm["attempt_number"])
s.Equal(float64(1), atm["attempt_number"])
}

// Verify one manual=true
Expand Down
10 changes: 7 additions & 3 deletions internal/deliverymq/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ func (h *messageHandler) shouldScheduleRetry(task models.DeliveryTask, err error
if !errors.As(err, &pubErr) {
return false
}
// Attempt starts at 0 for initial attempt, so we can compare directly
return task.Attempt < h.retryMaxLimit
// Attempt starts at 1 for initial attempt, so use <= to allow retryMaxLimit total attempts
return task.Attempt <= h.retryMaxLimit
}

func (h *messageHandler) shouldNackError(err error) bool {
Expand Down Expand Up @@ -410,7 +410,11 @@ func (h *messageHandler) shouldNackDeliveryError(err error) bool {
}

func (h *messageHandler) scheduleRetry(ctx context.Context, task models.DeliveryTask) error {
backoffDuration := h.retryBackoff.Duration(task.Attempt)
// Backoff expects a 0-based index (0 for first retry, 1 for second, etc.).
// attempt_number changed from 0-based to 1-based without migrating in-flight
// tasks, so clamp to 0 to safely handle any leftover Attempt=0 tasks.
backoffIndex := max(task.Attempt-1, 0)
backoffDuration := h.retryBackoff.Duration(backoffIndex)

retryTask := RetryTaskFromDeliveryTask(task)
retryTaskStr, err := retryTask.ToString()
Expand Down
2 changes: 1 addition & 1 deletion internal/models/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewDeliveryTask(event Event, destinationID string) DeliveryTask {
return DeliveryTask{
Event: event,
DestinationID: destinationID,
Attempt: 0,
Attempt: 1,
}
}

Expand Down
Loading