diff --git a/cmd/e2e/log_queries_test.go b/cmd/e2e/log_queries_test.go index e6aaf266..62824fdd 100644 --- a/cmd/e2e/log_queries_test.go +++ b/cmd/e2e/log_queries_test.go @@ -80,7 +80,7 @@ func (s *basicSuite) TestLogQueries_Attempts() { s.Equal(setup.destinationID, first["destination_id"]) s.NotEmpty(first["status"]) s.NotEmpty(first["time"]) - s.Equal(float64(0), first["attempt_number"]) + s.Equal(float64(1), first["attempt_number"]) }) s.Run("filter by destination_id", func() { diff --git a/cmd/e2e/retry_test.go b/cmd/e2e/retry_test.go index 98a85280..6b9b7ccb 100644 --- a/cmd/e2e/retry_test.go +++ b/cmd/e2e/retry_test.go @@ -34,8 +34,8 @@ func (s *basicSuite) TestRetry_FailedDeliveryAutoRetries() { s.Require().GreaterOrEqual(len(resp.Models), 2) for i, atm := range resp.Models { - s.Equal(float64(i), atm["attempt_number"], - "attempt %d should have attempt_number=%d (automated retry increments)", i, i) + s.Equal(float64(i+1), atm["attempt_number"], + "attempt %d should have attempt_number=%d (automated retry increments)", i, i+1) } } @@ -51,14 +51,14 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() { // Wait for initial delivery s.waitForNewAttempts(tenant.ID, 1) - // Verify first attempt has attempt_number=0 + // Verify first attempt has attempt_number=1 var attResp struct { Models []map[string]any `json:"models"` } status := s.doJSON(http.MethodGet, s.apiURL("/attempts?tenant_id="+tenant.ID+"&event_id="+eventID), nil, &attResp) s.Require().Equal(http.StatusOK, status) s.Require().NotEmpty(attResp.Models) - s.Equal(float64(0), attResp.Models[0]["attempt_number"]) + s.Equal(float64(1), attResp.Models[0]["attempt_number"]) // Manual retry retryStatus := s.retryEvent(eventID, dest.ID) @@ -75,9 +75,9 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() { s.Require().Equal(http.StatusOK, status) s.Require().Len(verifyResp.Models, 2) - // Both should have attempt_number=0 (manual retry resets) + // Both should have attempt_number=1 (manual retry resets) for _, atm := range verifyResp.Models { - s.Equal(float64(0), atm["attempt_number"]) + s.Equal(float64(1), atm["attempt_number"]) } // Verify one manual=true diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index f077c99d..2d94ec21 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -360,8 +360,8 @@ func (h *messageHandler) shouldScheduleRetry(task models.DeliveryTask, err error if !errors.As(err, &pubErr) { return false } - // Attempt starts at 0 for initial attempt, so we can compare directly - return task.Attempt < h.retryMaxLimit + // Attempt starts at 1 for initial attempt, so use <= to allow retryMaxLimit total attempts + return task.Attempt <= h.retryMaxLimit } func (h *messageHandler) shouldNackError(err error) bool { @@ -410,7 +410,11 @@ func (h *messageHandler) shouldNackDeliveryError(err error) bool { } func (h *messageHandler) scheduleRetry(ctx context.Context, task models.DeliveryTask) error { - backoffDuration := h.retryBackoff.Duration(task.Attempt) + // Backoff expects a 0-based index (0 for first retry, 1 for second, etc.). + // attempt_number changed from 0-based to 1-based without migrating in-flight + // tasks, so clamp to 0 to safely handle any leftover Attempt=0 tasks. + backoffIndex := max(task.Attempt-1, 0) + backoffDuration := h.retryBackoff.Duration(backoffIndex) retryTask := RetryTaskFromDeliveryTask(task) retryTaskStr, err := retryTask.ToString() diff --git a/internal/models/tasks.go b/internal/models/tasks.go index ed43187a..dd54d1cd 100644 --- a/internal/models/tasks.go +++ b/internal/models/tasks.go @@ -82,7 +82,7 @@ func NewDeliveryTask(event Event, destinationID string) DeliveryTask { return DeliveryTask{ Event: event, DestinationID: destinationID, - Attempt: 0, + Attempt: 1, } }