A PostgreSQL-powered message queue and task execution engine. Catbird brings reliability and simplicity to background job processing by using your database as the single source of truth—no extra services to manage, just your database coordinating everything.
- Transactional by default: enqueue messages in the same DB transaction as your app writes; rollback means no message.
- Exactly-once within a visibility window: safe retries after crashes, no duplicate processing.
- Database as coordinator: horizontal workers, PostgreSQL handles distribution and state.
- Workflows as DAGs: dependencies, branching, and data passing between steps.
- Definition separate from implementation: define and start tasks and flows in one place, implement them elsewhere.
- Persistence and auditability: queues, runs, and results live in PostgreSQL.
- Resiliency baked in: retries, backoff, optional circuit breakers.
- Operational UX: web dashboard and tui for runs, queues, and workers.
- Optional real-time layer: opt-in pub/sub with SSE support for pushing events to browsers.
- Durable notifications: a per-identity inbox with a cursor and seen-tracking, so clients reliably catch up on missed notifications at their own pace.
client := catbird.New(conn)
ctx := context.Background()
// Queues
err := client.CreateQueue(ctx, "my-queue")
err = client.Send(ctx, "my-queue", map[string]any{"user_id": 123}, catbird.SendOpts{
ConcurrencyKey: "user-123",
})
messages, err := client.Read(ctx, "my-queue", 10, 30*time.Second)
for _, msg := range messages {
err = client.Delete(ctx, "my-queue", msg.ID)
}
// Continuous reader: loops ReadPoll, ack on nil, nack on error
go client.Reader(ctx, "my-queue", 10, 30*time.Second,
func(ctx context.Context, msg catbird.Message) error {
return nil // ack (deletes message)
},
)
// Delayed send
client.Send(ctx, "my_queue", map[string]any{"job": "cleanup"}, catbird.SendOpts{VisibleAt: time.Now().Add(30 * time.Minute)})
// Tasks and flows
task := catbird.NewTask("send-email").
WithDescription("Send a transactional email to a user").Do(func(ctx context.Context, input string) (string, error) {
return "sent", nil
})
flow := catbird.NewFlow("double-add")
flow.AddStep(catbird.NewStep("double").Do(func(ctx context.Context, input int) (int, error) {
return input * 2, nil
}))
flow.AddStep(catbird.NewStep("add").
DependsOn("double").Do(func(ctx context.Context, input int, doubled int) (int, error) {
return doubled + 1, nil
}))
worker := catbird.NewWorker(pool).
AddTask(task).
AddFlow(flow)
go worker.Start(ctx)
taskHandle, err := client.RunTask(ctx, "send-email", "hello")
var taskOut string
err = taskHandle.WaitForOutput(ctx, &taskOut)
flowHandle, err := client.RunFlow(ctx, "double-add", 10)
var flowOut int
err = flowHandle.WaitForOutput(ctx, &flowOut)
// Delayed execution
client.RunTask(ctx, "process-user", userID, catbird.RunTaskOpts{VisibleAt: time.Now().Add(5 * time.Minute)})
client.RunFlow(ctx, "order_processing", map[string]any{"order_id": 123}, catbird.RunFlowOpts{VisibleAt: time.Now().Add(30 * time.Second)})
// Priority (higher = claimed first, default 0)
client.RunTask(ctx, "send-email", email, catbird.RunTaskOpts{Priority: 10})
// Ensure definitions exist before usage; this is not necessary if you
// just want to run a worker, definitions will be created for you.
err := client.CreateTask(ctx, taskA)
err = client.CreateTask(ctx, taskB)
err = client.CreateFlow(ctx, flowA)
err = client.CreateFlow(ctx, flowB)
// Direct package-level usage (no Client), for example in a transaction:
taskHandle, err := catbird.RunTask(ctx, tx, "send-email", "hello")Catbird uses a ConcurrencyKey to prevent overlapping runs of tasks and flows, and overlapping messages in queues.
// ConcurrencyKey: prevent overlap
_, err := client.RunTask(ctx, "process-user", userID, catbird.RunTaskOpts{
ConcurrencyKey: fmt.Sprintf("user-%d", userID),
})- Deduplicates
queuedandstartedruns: a new run with the same key is rejected while one is active - After completion or failure: the same key can be used again
- Return value on duplicate:
RunTask()/RunFlow()return a handle to the existing run ID - Failure retries: allows retries on failed runs
- No key = no deduplication: if you don't provide a key, duplicates are allowed
- Queue messages: Use
ConcurrencyKeyinSendOptsfor message deduplication
err := client.CreateQueue(ctx, "user-events")
err = client.CreateQueue(ctx, "audit-log")
err = client.Bind(ctx, "user-events", "events.user.created")
err = client.Bind(ctx, "user-events", "events.*.updated")
err = client.Bind(ctx, "audit-log", "events.#")
_, err = client.Publish(ctx, "events.user.created", map[string]any{
"user_id": 123,
"email": "user@example.com",
})
_, err = client.Unbind(ctx, "user-events", "events.*.updated")Wildcard rules:
*matches a single token (e.g.,events.*.createdmatchesevents.user.created)#matches zero or more tokens at the end (e.g.,events.user.#matchesevents.userandevents.user.created.v1)#must appear as.#at the end of the pattern, or as#by itself- Tokens are separated by
.and can containa-z,A-Z,0-9,_,-
Bind tasks or flows to topic patterns so that publishing a message automatically creates a run.
// Bind a task to a topic pattern
err = client.BindTask(ctx, "send_email", "events.email.*")
// Bind a flow to a topic pattern
err = client.BindFlow(ctx, "order_processing", "events.order.#")
// Publishing now also triggers task/flow runs
_, err = client.Publish(ctx, "events.email.welcome", map[string]any{
"user_id": 123,
"email": "user@example.com",
}, catbird.PublishOpts{
ConcurrencyKey: "user-123",
})
// Unbind when done
_, err = client.UnbindTask(ctx, "send_email", "events.email.*")
_, err = client.UnbindFlow(ctx, "order_processing", "events.order.#")// Define task (scheduling is separate)
task := catbird.NewTask("send-email").Do(func(ctx context.Context, input EmailRequest) (EmailResponse, error) {
return EmailResponse{SentAt: time.Now()}, nil
},
catbird.WithConcurrency(5),
catbird.WithMaxRetries(3),
catbird.WithFullJitterBackoff(500*time.Millisecond, 10*time.Second),
catbird.WithCircuitBreaker(5, 30*time.Second),
)
// Define a task with a condition (skipped when condition is false)
conditionalTask := catbird.NewTask("premium-processing").
WithCondition("input.is_premium"). // Skipped if is_premium = false
Do(func(ctx context.Context, input ProcessRequest) (string, error) {
return "processed", nil
})
// Create worker (requires *pgxpool.Pool)
worker := catbird.NewWorker(pool).
WithLogger(slog.Default()).
WithShutdownTimeout(10 * time.Second).
AddTask(task).
AddTask(conditionalTask)
go worker.Start(ctx)
// Run the task
handle, err := client.RunTask(ctx, "send-email", EmailRequest{
To: "user@example.com",
Subject: "Hello",
})
// Get result
var result EmailResponse
err = handle.WaitForOutput(ctx, &result)On-fail handlers run after a task reaches a failed state (after its own retries).
They execute with their own HandlerOpt retry and backoff settings, and receive the
original input plus rich failure context.
OnFail semantics (tasks and flows):
OnFailruns only after the main task/flow run reachesfailed(after normal handler retries are exhausted).OnFailhas independent retry/backoff via its ownHandlerOptvalues.- A successful
OnFailmarks on-fail handling complete, but the original run remainsfailed. - If
OnFailretries are exhausted, on-fail handling remains failed and no further retries are scheduled.
task := catbird.NewTask("charge-payment").Do(func(ctx context.Context, input ChargeRequest) (ChargeResult, error) {
return ChargeResult{}, fmt.Errorf("gateway timeout")
}).
OnFail(func(ctx context.Context, input ChargeRequest, failure catbird.TaskFailure) error {
// Send alert, enqueue compensation, or record audit log.
return nil
},
catbird.WithMaxRetries(3),
catbird.WithFullJitterBackoff(200*time.Millisecond, 5*time.Second),
)A flow is a directed acyclic graph (DAG) of steps that execute when their dependencies are satisfied.
- Steps with no dependencies start immediately; independent branches run in parallel.
- Flow output is selected by output priority (configured with
OutputPriority(...)or inferred from terminal steps). - Conditions can skip steps; downstream handlers must accept
Optional[T]for any conditional dependency. - A step with a signal waits for both its dependencies and the signal input.
WaitForOutput()returns the selected flow output once the flow completes.
flow := catbird.NewFlow("order-processing")
flow.AddStep(catbird.NewStep("validate").Do(func(ctx context.Context, order Order) (ValidationResult, error) {
if order.Amount <= 0 {
return ValidationResult{Valid: false, Reason: "Invalid amount"}, nil
}
return ValidationResult{Valid: true}, nil
}))
flow.AddStep(catbird.NewStep("charge").
DependsOn("validate").Do(func(ctx context.Context, order Order, validated ValidationResult) (ChargeResult, error) {
if !validated.Valid {
return ChargeResult{}, fmt.Errorf("cannot charge invalid order")
}
return ChargeResult{
TransactionID: "txn-" + order.ID,
Amount: order.Amount,
}, nil
}))
flow.AddStep(catbird.NewStep("check-inventory").
DependsOn("validate").Do(func(ctx context.Context, order Order, validated ValidationResult) (InventoryCheck, error) {
return InventoryCheck{
InStock: true,
Qty: order.Amount,
}, nil
}))
flow.AddStep(catbird.NewStep("ship").
DependsOn("charge", "check-inventory").Do(func(ctx context.Context, order Order, chargeResult ChargeResult, inventory InventoryCheck) (ShipmentResult, error) {
if !inventory.InStock {
return ShipmentResult{}, fmt.Errorf("out of stock")
}
return ShipmentResult{
TrackingNumber: "TRK-" + chargeResult.TransactionID,
EstimatedDays: 3,
}, nil
}))
// Create worker
worker := catbird.NewWorker(pool).
AddFlow(flow)
go worker.Start(ctx)Flows can have multiple terminal steps.
flow := catbird.NewFlow("approval-or-escalation")
flow.OutputPriority("approve", "escalate")
flow.AddStep(catbird.NewStep("validate").Do(func(ctx context.Context, req Request) (Validation, error) {
return Validation{Score: req.Score}, nil
}))
flow.AddStep(catbird.NewStep("approve").
DependsOn("validate").
WithCondition("validate.score gte 80").Do(func(ctx context.Context, req Request, v Validation) (Decision, error) {
return Decision{Status: "approved"}, nil
}))
flow.AddStep(catbird.NewStep("escalate").
DependsOn("validate").
WithCondition("validate.score lt 80").Do(func(ctx context.Context, req Request, v Validation) (Decision, error) {
return Decision{Status: "escalated"}, nil
}))If you omit OutputPriority(...), Catbird uses terminal steps in definition order as the default priority.
flow := catbird.NewFlow("default-terminal-priority")
flow.AddStep(catbird.NewStep("a").Do(func(ctx context.Context, in int) (int, error) { return in, nil }))
flow.AddStep(catbird.NewStep("left").DependsOn("a").Do(func(ctx context.Context, in int, a int) (int, error) { return a + 1, nil }))
flow.AddStep(catbird.NewStep("right").DependsOn("a").Do(func(ctx context.Context, in int, a int) (int, error) { return a + 2, nil }))
// Effective priority: left, then right.On-fail handlers run after a flow reaches a failed state (after its own retries).
They execute with their own HandlerOpt retry and backoff settings, and receive the
original input plus rich failure context.
flow := catbird.NewFlow("order-processing")
flow.AddStep(catbird.NewStep("charge").Do(func(ctx context.Context, order Order) (string, error) {
return "", fmt.Errorf("charge failed")
}))
flow.OnFail(func(ctx context.Context, order Order, failure catbird.FlowFailure) error {
var failedInput Order
if err := failure.FailedStepInputAs(&failedInput); err == nil {
// failedInput has the step input that caused the error
}
var chargeResult ChargeResult
if err := failure.OutputAs("charge", &chargeResult); err == nil {
// access completed step output when available
}
return nil
})Signals enable workflows that wait for external input before proceeding, such as approval workflows or webhooks.
flow := catbird.NewFlow("document_approval")
flow.AddStep(catbird.NewStep("submit").Do(func(ctx context.Context, doc Document) (string, error) {
return doc.ID, nil
}))
flow.AddStep(catbird.NewStep("approve").
DependsOn("submit").
WithSignal().Do(func(ctx context.Context, doc Document, approval ApprovalInput, docID string) (ApprovalResult, error) {
if !approval.Approved {
return ApprovalResult{}, fmt.Errorf("approval denied by %s: %s", approval.ApproverID, approval.Notes)
}
return ApprovalResult{
Status: "approved",
ApprovedBy: approval.ApproverID,
Timestamp: time.Now().Format(time.RFC3339),
}, nil
}))
flow.AddStep(catbird.NewStep("publish").
DependsOn("approve").Do(func(ctx context.Context, doc Document, approval ApprovalResult) (PublishResult, error) {
return PublishResult{
PublishedAt: time.Now().Format(time.RFC3339),
URL: "https://example.com/docs/" + approval.ApprovedBy,
}, nil
}))A step with both dependencies and a signal waits for both conditions: all dependencies must complete and the signal must be delivered before the step executes.
Use CompleteEarly(ctx, output, reason) inside a flow step handler when you already have the final business output and want to stop remaining branches.
flow := catbird.NewFlow("fraud-check")
flow.AddStep(catbird.NewStep("quick_guard").Do(func(ctx context.Context, in Order) (string, error) {
if in.IsKnownSafe {
return "", catbird.CompleteEarly(ctx, Decision{Approved: true}, "known-safe fast path")
}
return "continue", nil
}))
flow.AddStep(catbird.NewStep("slow_analysis").Do(func(ctx context.Context, in Order) (string, error) {
time.Sleep(2 * time.Second)
return "done", nil
}))
flow.AddStep(catbird.NewStep("final").
DependsOn("quick_guard", "slow_analysis").Do(func(ctx context.Context, in Order, guard string, analysis string) (Decision, error) {
return Decision{Approved: guard == "continue" && analysis == "done"}, nil
}))When early completion wins the race, the flow run becomes completed with the provided output, and in-flight sibling work is stopped cooperatively.
Map steps fan out array processing into per-item SQL-coordinated work and aggregate results back in item order.
- Define map steps with
AddStep(NewStep("name").MapFlowInput()...)orAddStep(NewStep("name").MapStepOutput("source")...) - Use
MapFlowInput()to map over flow input (flow input must be a JSON array) - Use
MapStepOutput("step_name")to map over a dependency step output array - Each mapped item runs as its own task, so retries happen per item instead of rerunning the whole step.
- To fold mapped item outputs without materializing a full
[]Out, add an explicit reducer step withAddStep(NewStep("name").ReduceStep("mapped_step").Do(fn)).
flow := catbird.NewFlow("double-input")
flow.AddStep(catbird.NewStep("double").
MapFlowInput().
Do(func(ctx context.Context, n int) (int, error) {
return n * 2, nil
}))
handle, _ := client.RunFlow(ctx, "double-input", []int{1, 2, 3})
var out []int
_ = handle.WaitForOutput(ctx, &out)
// out == []int{2, 4, 6}flow := catbird.NewFlow("double-numbers")
flow.AddStep(catbird.NewStep("numbers").Do(func(ctx context.Context, _ string) ([]int, error) {
return []int{1, 2, 3}, nil
}))
flow.AddStep(catbird.NewStep("double").
MapStepOutput("numbers").
Do(func(ctx context.Context, _ string, n int) (int, error) {
return n * 2, nil
}))
// Reduce mapped outputs with an explicit reducer step
flow = catbird.NewFlow("double-numbers-reduced")
flow.AddStep(catbird.NewStep("numbers").Do(func(ctx context.Context, _ string) ([]int, error) {
return []int{1, 2, 3}, nil
}))
flow.AddStep(catbird.NewStep("double").
MapStepOutput("numbers").
Do(func(ctx context.Context, _ string, n int) (int, error) {
return n * 2, nil
}))
flow.AddStep(catbird.NewStep("sum").
ReduceStep("double").
Do(func(ctx context.Context, acc int, out int) (int, error) {
return acc + out, nil
}))Use IgnoreOutput("step") when a step must wait for a dependency but doesn't need its output. The dependency's output is never aggregated or transmitted — important for map steps with large fan-outs. The handler omits the parameter for that dependency.
flow.AddStep(catbird.NewStep("finish").
DependsOn("expensive-map").
IgnoreOutput("expensive-map").
Do(func(ctx context.Context, in Input) (string, error) {
return "done", nil
}))Generator steps act like normal flow steps with an extra trailing yield callback for streaming items; yielded items are processed by a per-item handler.
- Define the step with
flow.AddStep(NewStep("name").Generate(...).Do(...)) - Optionally add
DependsOn(...)and/orWithSignal()like a normal step - Provide a generator with signature
func(context.Context, In[, Signal][, Dep1, Dep2, ...], func(ItemType) error) error - Provide an item handler with signature
func(context.Context, ItemType) (OutType, error) - To fold yielded item outputs, add an explicit reducer step with
AddStep(NewStep(...).ReduceStep("generator_step").Do(fn)) - Generator steps do not support
MapFlowInput()orMapStepOutput()
flow := catbird.NewFlow("generate-double-sum")
flow.AddStep(catbird.NewStep("seed").Do(func(ctx context.Context, in int) (int, error) {
return in, nil
}))
flow.AddStep(catbird.NewStep("generate").
DependsOn("seed").
Generate(func(ctx context.Context, in int, seed int, yield func(int) error) error {
for i := 0; i < seed; i++ {
if err := yield(i); err != nil {
return err
}
}
return nil
}).
Do(func(ctx context.Context, item int) (int, error) {
return item * 2, nil
}))
flow.AddStep(catbird.NewStep("sum").
DependsOn("generate").Do(func(ctx context.Context, in int, generated []int) (int, error) {
total := 0
for _, v := range generated {
total += v
}
return total, nil
}))
handle, _ := client.RunFlow(ctx, "generate-double-sum", 5)
var out int
_ = handle.WaitForOutput(ctx, &out)
// out == 20Use an explicit reducer step when you want bounded generator output instead of storing all item outputs as []Out:
flow := catbird.NewFlow("generate-double-sum-reduced")
flow.AddStep(catbird.NewStep("generate").
Generate(func(ctx context.Context, input int, yield func(int) error) error {
for i := 0; i < input; i++ {
if err := yield(i); err != nil {
return err
}
}
return nil
}).
Do(func(ctx context.Context, item int) (int, error) {
return item * 2, nil
}))
flow.AddStep(catbird.NewStep("sum").
ReduceStep("generate").
Do(func(ctx context.Context, acc int, out int) (int, error) {
return acc + out, nil
}))
handle, _ := client.RunFlow(ctx, "generate-double-sum-reduced", 5)
var out int
_ = handle.WaitForOutput(ctx, &out)
// out == 20| Status | Meaning | Used by |
|---|---|---|
queued |
Runnable and never picked up by a worker | Task runs, flow step runs, map item runs |
waiting_for_dependencies |
Not runnable yet because one or more dependencies are still incomplete | Flow step runs |
waiting_for_signal |
Dependencies are resolved, but required signal input has not been delivered yet | Flow step runs |
waiting_for_map_tasks |
Parent map/reducer step is waiting for spawned map item runs to finish | Flow step runs |
started |
Picked up by a worker at least once (including retries) | Task runs, flow runs, flow step runs, map item runs |
completed |
Finished successfully and output is available | Task runs, flow runs, flow step runs, map item runs |
failed |
Finished with an error | Task runs, flow runs, flow step runs, map item runs |
skipped |
Intentionally skipped (typically due to a condition evaluating false) | Task runs, flow step runs |
canceling |
Cancellation requested; run is transitioning to canceled | Task runs, flow runs |
canceled |
Run was canceled before completing normally | Task runs, flow runs |
flow := catbird.NewFlow("parallel_watch_flow")
flow.AddStep(catbird.NewStep("long_job").Do(func(ctx context.Context, in Order) (string, error) {
time.Sleep(500 * time.Millisecond)
return "job-finished", nil
}))
flow.AddStep(catbird.NewStep("watch_job").Do(func(ctx context.Context, in Order) (string, error) {
step, err := catbird.WaitForStep(ctx, "long_job", catbird.WaitOpts{PollInterval: 25 * time.Millisecond})
if err != nil {
return "", err
}
if !step.IsCompleted() {
return "", fmt.Errorf("long_job ended with status=%s", step.Status)
}
return "watcher-confirmed:" + step.Status, nil
}))This runs long_job and watch_job in parallel. watch_job blocks on WaitForStep(...) until long_job reaches a terminal state, then exits immediately.
flow := catbird.NewFlow("loop_until_peer_done")
flow.AddStep(catbird.NewStep("controller").Do(func(ctx context.Context, in string) (string, error) {
time.Sleep(2 * time.Second)
return "stop-now", nil
}))
flow.AddStep(catbird.NewStep("worker_loop").Do(func(ctx context.Context, in string) (string, error) {
for {
controller, err := catbird.GetStep(ctx, "controller")
if err != nil {
return "", err
}
if controller.IsDone() {
return "loop-stopped:" + controller.Status, nil
}
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(100 * time.Millisecond):
}
}
}))This pattern keeps worker_loop alive until controller reaches any terminal state, then exits cleanly.
Tasks and flows can be scheduled with cron expressions using CreateTaskSchedule and CreateFlowSchedule. Schedules are stored in PostgreSQL and polled by workers — no external cron daemon needed.
// Schedule a task
client.CreateTaskSchedule(ctx, "send-email", "@hourly")
// Schedule with static input
client.CreateTaskSchedule(ctx, "send-report", "*/15 * * * *",
catbird.WithInput(EmailRequest{To: "ops@example.com", Subject: "Report"}),
)
// Schedule a flow
client.CreateFlowSchedule(ctx, "order-processing", "0 2 * * *")
// Skip all missed ticks on recovery (no catch-up runs)
client.CreateTaskSchedule(ctx, "stats", "@hourly", catbird.WithSkipCatchUp())
// Replay every missed tick on recovery
client.CreateTaskSchedule(ctx, "billing", "0 * * * *", catbird.WithCatchUpAll())Standard 5-field cron format: minute hour day-of-month month day-of-week
| Field | Values | Wildcards |
|---|---|---|
| Minute | 0-59 | *, */N, N-M, N-M/S, comma-separated |
| Hour | 0-23 | same |
| Day of month | 1-31 | same |
| Month | 1-12 | same |
| Day of week | 0-6 (0 = Sunday, 7 also accepted as Sunday) | same |
When both day-of-month and day-of-week are restricted (not *), the date matches if either field matches (standard cron OR semantics).
Shorthand descriptors: @yearly / @annually, @monthly, @weekly, @daily / @midnight, @hourly.
All cron evaluation is in UTC.
When a worker restarts after downtime, the catch-up policy controls how missed ticks are handled:
| Policy | Option | On recovery (5 missed ticks) | Enqueues |
|---|---|---|---|
| skip | WithSkipCatchUp() |
Skip all missed ticks, jump to future | 0 runs |
| one (default) | — | Enqueue one catch-up run (oldest), jump to future | 1 run |
| all | WithCatchUpAll() |
Replay every missed tick, one at a time | All runs |
Both tasks and flow steps support conditional execution via WithCondition on the builder methods. If the condition evaluates to false (or a referenced field is missing), the task/step is marked skipped and its handler does not run.
- Prefixes: tasks use
input.*; flow steps useinput.*,step_name.*, orsignal.*. - Operators:
eq,ne,gt,gte,lt,lte,in,exists,contains, plusnot <expr>. - Optional outputs: if a step can be skipped, downstream handlers must accept
Optional[T]for that dependency. - Map steps: define with
AddStep(NewStep("name")...), then useMapFlowInput()orMapStepOutput("step_name"); map source values must be arrays. - No AND/OR: only one expression per task/step; compute a derived field upstream if needed.
Tasks can use conditions to skip execution based on input fields.
type ProcessRequest struct {
UserID int `json:"user_id"`
IsPremium bool `json:"is_premium"`
Amount int `json:"amount"`
Environment string `json:"environment"`
}
// Only process premium users
premiumTask := catbird.NewTask("premium_processing").
WithCondition("input.is_premium"). // Skipped if is_premium = false
Do(func(ctx context.Context, req ProcessRequest) (string, error) {
return fmt.Sprintf("Processed premium user %d", req.UserID), nil
})
// Run task - may be skipped based on input
client.RunTask(ctx, "premium_processing", ProcessRequest{UserID: 123, IsPremium: false})
// This task run will be skipped (is_premium = false)Flow steps can branch based on prior outputs. Use Optional[T] to handle skipped dependencies.
flow := catbird.NewFlow("payment_processing")
flow.OutputPriority("charge", "free_order")
flow.AddStep(catbird.NewStep("validate").Do(func(ctx context.Context, order Order) (ValidationResult, error) {
return ValidationResult{Valid: order.Amount > 0}, nil
}))
flow.AddStep(catbird.NewStep("charge").
DependsOn("validate").
WithCondition("validate.valid").Do(func(ctx context.Context, order Order, validation ValidationResult) (FinalResult, error) {
return FinalResult{Status: "charged", TxnID: "txn-123"}, nil
}))
flow.AddStep(catbird.NewStep("free_order").
DependsOn("validate").
WithCondition("not validate.valid").Do(func(ctx context.Context, order Order, validation ValidationResult) (FinalResult, error) {
return FinalResult{Status: "free_order", TxnID: ""}, nil
}))Catbird includes multiple resiliency layers for runtime failures. Handler-level retries are configured with HandlerOpt values such as WithMaxRetries(...) and WithFullJitterBackoff(...), and external calls can be protected with WithCircuitBreaker(failureThreshold, openTimeout) to avoid cascading outages. In worker database paths, PostgreSQL reads/writes are retried with bounded attempts and full-jitter backoff; retries stop immediately on context cancellation or deadline expiry.
For reducer-step workflows, retries are two-phase: item handlers retry first per item, then reducer-step finalization retries at the reducer step.
If retries are exhausted in either phase, the parent step fails and task/flow OnFail handlers run with the same terminal failure semantics as non-reduced steps.
Catbird deduplication (ConcurrencyKey) controls duplicate run creation, while handler retries can still re-attempt the same run after transient failures. For non-repeatable side effects (payments, email, webhooks), use idempotent write patterns or upstream idempotency keys so retry attempts remain safe.
Cancellation semantics:
- Cancellation is a distinct terminal outcome (
canceled), separate fromfailedandcompleted. - Task cancellation moves
queued/startedruns directly tocanceled; already-terminal task runs are unchanged. - Flow cancellation is two-phase: flow goes to
canceling, queued child work is canceled, then flow becomescanceledafter in-flight started work drains. - Cancellation requests are idempotent: repeated requests are successful no-ops.
- A cancel request does not rewrite an already-terminal run.
External cancellation:
taskHandle, _ := client.RunTask(ctx, "send-email", "hello")
_, _ = client.CancelTaskRun(ctx, "send-email", taskHandle.ID, catbird.CancelOpts{Reason: "operator requested stop"})
flowHandle, _ := client.RunFlow(ctx, "order-processing", map[string]any{"order_id": 123})
_, _ = client.CancelFlowRun(ctx, "order-processing", flowHandle.ID, catbird.CancelOpts{Reason: "customer canceled order"})Internal cancellation from handlers:
task := catbird.NewTask("validate-order").Do(func(ctx context.Context, input Order) (string, error) {
if input.Amount <= 0 {
if err := catbird.Cancel(ctx, catbird.CancelOpts{Reason: "invalid amount"}); err != nil {
return "", err
}
return "", nil
}
return "ok", nil
})
flow := catbird.NewFlow("order-processing")
flow.AddStep(catbird.NewStep("guard").Do(func(ctx context.Context, input Order) (string, error) {
if input.Amount <= 0 {
if err := catbird.Cancel(ctx, catbird.CancelOpts{Reason: "invalid amount"}); err != nil {
return "", err
}
return "", nil
}
return "proceed", nil
}))Wire is an optional real-time pub/sub layer — use it when you need to push events to browsers or react to notifications server-side. If you're only using queues, tasks, and flows, you can ignore Wire entirely.
Wire provides topic-matched event dispatch with SSE support and presence tracking. Notifications are ephemeral (at-most-once, no storage).
wire := catbird.NewWire(pool, secret)
// Listen: server-side callbacks on topic patterns
wire.Listen("order.*", func(ctx context.Context, topic, message string) {
log.Println(topic, message)
})
// Render: project events into a transport-neutral Fragment (acts as allowlist)
wire.Render("task.*.completed", func(r *http.Request, topic, message string) (catbird.Fragment, error) {
return catbird.Fragment{Event: "task-done", Data: "<div>Task done</div>"}, nil
})
go wire.Start(ctx)
// Notify: local dispatch + pg_notify for cross-node delivery
wire.Notify(ctx, "order.created", `{"id": 123}`)
// SSE: app controls token retrieval
http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
token := r.URL.Query().Get("token")
wire.ServeSSE(w, r, token)
})A Render definition is transport-neutral: ServeSSE wraps each Fragment as an SSE frame, and ServePoll concatenates fragments into an HTTP body backed by the durable inbox — define the projection once, pick the transport per surface. (RenderSSE is a deprecated alias for Render.)
| Method | Runs on | Purpose |
|---|---|---|
wire.Listen |
Every node | Server-side side effects (logging, webhooks) |
wire.Render |
Node serving SSE/poll | Project events into a Fragment (e.g., JSON → HTML) |
Topics without a renderer are handled per-transport: ServeSSE passes them through raw, ServePoll skips them. Multiple renderers matching the same topic each produce a fragment. Render handlers receive the client's *http.Request for access to user context (auth, language, etc).
// Package-level: explicit Conn, works inside transactions (fires on commit)
catbird.Notify(ctx, tx, "order.progress", `{"step": 1}`)
// Client method: uses client's Conn
client.Notify(ctx, "order.created", `{"id": 123}`)
// Wire method: local dispatch + pg_notify for cross-node
wire.Notify(ctx, "order.created", `{"id": 123}`)Fragment is the transport-neutral render output: a Data payload (e.g. an HTML fragment) plus SSE-only Event/ID hints. It implements io.Writer, so templates can write directly to it. ServeSSE frames it; ServePoll emits its Data. (SSEEvent is a deprecated alias for Fragment.)
// Typed helper — unmarshals JSON, gives full Fragment control
catbird.Render[TaskEvent](wire, "task.*", func(r *http.Request, topic string, data TaskEvent) (catbird.Fragment, error) {
f := catbird.Fragment{Event: "task-update", ID: topic}
err := views.TaskCompleted(r, data).Render(r.Context(), &f)
return f, err
})SSE connections are authorized via encrypted tokens that specify which topics the client can subscribe to:
token := wire.Token([]string{"order.*", "task.invoice.#"}, catbird.TokenOpts{
Identity: "user-123",
ValidFor: time.Hour,
})Track which identities are connected to a topic (across all nodes):
identities, err := wire.Presence(ctx, "dashboard")Handlers can access the database connection from context for transactional work:
task := catbird.NewTask("process-order").Do(func(ctx context.Context, input Order) (Result, error) {
conn, _ := catbird.GetConn(ctx)
tx, _ := conn.Begin(ctx)
defer tx.Rollback(ctx)
catbird.Send(ctx, tx, "audit", map[string]any{"order_id": input.ID})
catbird.Notify(ctx, tx, "order.progress", `{"step": 1}`) // fires on commit
tx.Commit(ctx)
return Result{}, nil
})A per-identity durable inbox. Where Wire is ephemeral push — miss the moment (navigating, offline, asleep) and the notification is gone — the inbox is a store the client catches up against on its own schedule. It is a separate primitive with no dependency on Wire: delete Wire and the inbox still does its whole job.
The inbox is a mailbox keyed by identity. Each notification has a monotonic id
(the cursor) and a seen marker, so "replay everything since I last looked" is just
read the unseen rows after my cursor, then ack the cursor.
// Append to an identity's inbox; returns the new id (the cursor value)
id, err := catbird.NotifyDurable(ctx, conn, "user-123", "import.done", "Your import finished")
// Read unseen, still-relevant notifications after the client's last cursor
ns, err := catbird.UnseenNotifications(ctx, conn, "user-123", afterID, 50)
for _, n := range ns {
fmt.Println(n.ID, n.Topic, n.Message)
}
// Ack the cursor: mark everything up to an id seen (returns rows marked)
marked, err := catbird.MarkSeenUntil(ctx, conn, "user-123", ns[len(ns)-1].ID)Relevance window (ExpiresAt). A notification is a perishable pointer to a durable
fact — the underlying result lives permanently elsewhere (e.g. a task row); the
notification is only the prompt to look. Set ExpiresAt to bound how long it is worth
delivering: once it passes, the row drops out of UnseenNotifications and cb_gc
deletes it. Leave it zero and the notification waits until seen (cleared by an
explicit MarkSeenUntil/MarkSeen ack, not by a timer) — the right choice for
"action required" items.
// Perishable toast: gone after an hour whether seen or not
catbird.NotifyDurable(ctx, conn, "user-123", "batch.done", "Batch finished",
catbird.NotifyDurableOpts{ExpiresAt: time.Now().Add(time.Hour)})
// Collapse: a newer notification with the same CollapseKey marks prior unseen ones
// seen, so only the latest stays live (FCM collapse-key semantics — keep newest,
// the deliberate opposite of the queue's keep-oldest ConcurrencyKey)
catbird.NotifyDurable(ctx, conn, "user-123", "import.progress", "100%",
catbird.NotifyDurableOpts{CollapseKey: "import-42"})Reads return unseen and still-relevant rows — never "everything since the cursor"
— so an identity offline for a day is not flooded with stale toasts. Retention folds
into the core cb_gc sweep (see Data Retention).
Pairing with Wire (optional). Durability (is it stored?) and transport (push vs poll) are independent. If you also run Wire, push a content-free ping and let the client re-pull from its cursor — the store stays the single source of truth and the cursor dedups, so a client that is both connected and polling never shows a notification twice:
catbird.NotifyDurable(ctx, conn, userID, topic, msg, opts) // the real message, stored
wire.Notify(ctx, "user."+userID, "") // empty ping → client re-pullswire.ServePoll is the read side: it renders an identity's unseen notifications
(scoped to the token's topics) through the same Render definitions as ServeSSE
and returns them as one HTTP body, with the next cursor in the X-Wire-Cursor header.
It is a pure read — the client acks explicitly, so opening the same surface in several
tabs is convergent, not destructive:
http.HandleFunc("/inbox", func(w http.ResponseWriter, r *http.Request) {
wire.ServePoll(w, r, r.URL.Query().Get("token")) // ?after=<cursor> for catch-up
})
// Ack on an explicit gesture, never on read:
catbird.MarkSeenUntil(ctx, conn, userID, cursor) // "mark all read" (bounded watermark)
catbird.MarkSeen(ctx, conn, userID, []int64{id1, id2}) // dismiss specific items (precise)- Queue, task, flow, and step names: Lowercase letters, digits, and underscores only (
a-z,0-9,_). Max 58 characters. Step names must be unique within a flow. Reserved step names:input,signal. - Topics/Patterns: Letters (upper/lower), digits, dots, underscores, and hyphens (
a-z,A-Z,0-9,.,_,-, plus wildcards*,#).
Use query builders when you want SQL + args directly (for pgx.Batch or custom execution):
SendQuery(queue, body, opts)PublishQuery(topic, body, opts)RunTaskQuery(name, input, opts)RunFlowQuery(name, input, opts)
// Queue into a batch
var batch pgx.Batch
q1, args1, err := catbird.SendQuery("my-queue", map[string]any{"user_id": 123})
if err != nil {
return err
}
batch.Queue(q1, args1...)Catbird is built on PostgreSQL functions, so you can use the API directly from any language or tool with PostgreSQL support (psql, Python, Node.js, Ruby, etc.).
For the full SQL function reference and practical SQL examples (queues, tasks, workflows, and run monitoring), see the SQL API reference.
The dashboard provides a web UI for monitoring queues, tasks, flows, and workers. You can run it standalone with the cb CLI or embed it as an http.Handler.
go install github.com/ugent-library/catbird/cmd/cb@latest
export CB_CONN="postgres://user:pass@localhost:5432/mydb?sslmode=disable"
cb dashboardThe dashboard is a standard http.Handler and can be embedded in any Go web application:
import (
"log/slog"
"net/http"
"github.com/ugent-library/catbird"
"github.com/ugent-library/catbird/dashboard"
)
func main() {
client := catbird.New(conn)
dash := dashboard.New(dashboard.Config{
Client: client,
Log: slog.Default(), // Optional: provide custom logger
PathPrefix: "", // Optional: mount at a subpath (e.g., "/admin")
})
http.ListenAndServe(":8080", dash.Handler())
}The terminal UI provides an interactive dashboard-like view in your terminal.
go install github.com/ugent-library/catbird/cmd/cb@latest
export CB_CONN="postgres://user:pass@localhost:5432/mydb?sslmode=disable"
cb uiYou can also start it from the root command using interactive mode:
cb -iSet a retention period on a task or flow definition to have cb_gc() automatically
delete terminal runs older than that duration. GC runs opportunistically from the
worker heartbeat and can also be triggered manually via client.GC(ctx) or the
standalone purge helpers below.
gcInfo, err := client.GC(ctx)
if err != nil {
// handle error
}
_ = gcInfo.ExpiredQueuesDeleted
_ = gcInfo.StaleWorkersDeleted
_ = gcInfo.TaskRunsPurged
_ = gcInfo.FlowRunsPurged
_ = gcInfo.ExpiredNotificationsDeletedDurable notifications past their ExpiresAt are deleted by the same cb_gc() sweep.
Because GC runs from the worker heartbeat, a deployment that runs Wire but no worker
must call client.GC(ctx) on its own schedule, or expired notifications accumulate.
task := catbird.NewTask("send-email").
RetentionPeriod(7 * 24 * time.Hour). // NULL by default = no cleanup
Do(func(ctx context.Context, in EmailInput) (string, error) {
return "sent", nil
})
flow := catbird.NewFlow("order-processing")
flow.RetentionPeriod(90 * 24 * time.Hour)
flow.AddStep(catbird.NewStep("step1").Do(func(ctx context.Context, in OrderInput) (string, error) {
return "done", nil
}))- Task runs cleaned up:
completed,failed,skipped,canceledolder than the retention period - Flow runs cleaned up:
completed,failed,canceledolder than the retention period; associated step runs and map tasks are removed automatically via cascade - Non-terminal rows are never touched:
queued,started,waiting_*,cancelingare left alone
For targeted or ad-hoc cleanup independent of the retention period:
// Delete task runs older than 30 days
taskPurged, err := client.PurgeTaskRuns(ctx, "send-email", 30*24*time.Hour)
// Delete flow runs older than 90 days
flowPurged, err := client.PurgeFlowRuns(ctx, "order-processing", 90*24*time.Hour)
_ = taskPurged
_ = flowPurgedFor SQL-based archiving patterns and example queries, see the
External archiving section in the SQL API reference.
When Catbird owns its own database (or you want to manage its schema outside your application's migration flow), the cb CLI applies the schema directly:
go install github.com/ugent-library/catbird/cmd/cb@latest
export CB_CONN="postgres://user:pass@localhost:5432/mydb?sslmode=disable"
cb migrate up # apply all pending migrations
cb migrate status # list applied and pending migrations
cb migrate down --to 1 # roll back down to a target versionIf your application already uses Goose migrations, you can register Catbird's schema migration as a normal app migration:
Create a migration file such as 00003_add_catbird.go:
package migrations
import (
"context"
"database/sql"
"github.com/pressly/goose/v3"
"github.com/ugent-library/catbird"
)
func init() {
goose.AddMigrationNoTxContext(addCatbirdUp, addCatbirdDown)
}
func addCatbirdUp(ctx context.Context, db *sql.DB) error {
return catbird.MigrateUpTo(ctx, db, 13)
}
func addCatbirdDown(ctx context.Context, db *sql.DB) error {
return catbird.MigrateDownTo(ctx, db, 0)
}Keep an explicit pinned version (for example 13) in this migration file. Do not use catbird.SchemaVersion.
- Go API Documentation
- SQL API Reference: SQL function reference and practical SQL usage examples
- Events Reference
- Testing Guide
- Copilot Instructions
SQL code is taken from or inspired by the excellent pgmq and pgflow projects.
