diff --git a/apps/server-nestjs/src/cpin-module/infrastructure/reconcile/reconcile.constants.ts b/apps/server-nestjs/src/cpin-module/infrastructure/reconcile/reconcile.constants.ts new file mode 100644 index 000000000..84489806a --- /dev/null +++ b/apps/server-nestjs/src/cpin-module/infrastructure/reconcile/reconcile.constants.ts @@ -0,0 +1,3 @@ +export const DEFAULT_RECONCILE_MAX_RETRIES = 5 +export const DEFAULT_RECONCILE_REQUEUE_AFTER_MS = 0 +export const DEFAULT_RECONCILE_ERROR_REQUEUE_AFTER_MS = 0 diff --git a/apps/server-nestjs/src/cpin-module/infrastructure/reconcile/reconcile.decorator.ts b/apps/server-nestjs/src/cpin-module/infrastructure/reconcile/reconcile.decorator.ts new file mode 100644 index 000000000..490c4a0d9 --- /dev/null +++ b/apps/server-nestjs/src/cpin-module/infrastructure/reconcile/reconcile.decorator.ts @@ -0,0 +1,103 @@ +import { setTimeout } from 'node:timers/promises' +import { Logger } from '@nestjs/common' +import { + DEFAULT_RECONCILE_ERROR_REQUEUE_AFTER_MS, + DEFAULT_RECONCILE_MAX_RETRIES, + DEFAULT_RECONCILE_REQUEUE_AFTER_MS, +} from './reconcile.constants' + +export interface RequeueResult { + requeueAfterMs?: number +} + +export type ReconcileResult = undefined | RequeueResult + +export function requeue(options: RequeueResult = {}): RequeueResult { + return options +} + +export interface ReconcileOptions { + maxRetries?: number + defaultRequeueAfterMs?: number + defaultErrorRequeueAfterMs?: number + shouldRetry?: (error: unknown) => boolean + onError?: (error: unknown) => void +} + +async function reconcile(handler: () => Promise | T, options: ReconcileOptions = {}): Promise { + const { + maxRetries = DEFAULT_RECONCILE_MAX_RETRIES, + defaultRequeueAfterMs = DEFAULT_RECONCILE_REQUEUE_AFTER_MS, + defaultErrorRequeueAfterMs = DEFAULT_RECONCILE_ERROR_REQUEUE_AFTER_MS, + shouldRetry, + onError, + } = options + + const run = async (attempt: number): Promise => { + try { + const result = await handler() + const requeueResult = toRequeueResult(result) + + if (requeueResult) { + if (attempt >= maxRetries) return result + const delayMs = Math.max(0, requeueResult.requeueAfterMs ?? defaultRequeueAfterMs) + await setTimeout(delayMs) + return await run(attempt + 1) + } + + return result + } catch (error) { + onError?.(error) + const canRetry = attempt < maxRetries && (shouldRetry?.(error) ?? true) + if (!canRetry) throw error + + await setTimeout(Math.max(0, defaultErrorRequeueAfterMs)) + return await run(attempt + 1) + } + } + + return await run(0) +} + +export type TypedMethodDecorator = any>( + target: object, + propertyKey: string | symbol, + descriptor: TypedPropertyDescriptor, +) => void + +export function Reconcile(options: ReconcileOptions = {}): TypedMethodDecorator { + return any>( + _target: object, + propertyKey: string | symbol, + descriptor: TypedPropertyDescriptor, + ): void => { + const original = descriptor.value + if (!original) return + + descriptor.value = (async function (this: ThisParameterType, ...args: Parameters): Promise>> { + const logger: Logger = this?.logger instanceof Logger + ? this.logger + : new Logger(this?.constructor?.name ?? 'Reconcile') + + try { + return await reconcile( + () => original.apply(this, args), + options, + ) as Awaited> + } catch (error) { + logger.error( + `Handler ${String(propertyKey)} failed permanently`, + error instanceof Error ? error.stack : undefined, + ) + throw error + } + }) as T + } +} + +function toRequeueResult(value: unknown): RequeueResult | undefined { + if (value && typeof value === 'object' && 'requeueAfterMs' in value) { + const ms = (value as RequeueResult).requeueAfterMs + return ms === undefined || typeof ms === 'number' ? { requeueAfterMs: ms } : undefined + } +} diff --git a/apps/server-nestjs/src/modules/nexus/nexus-controller.service.ts b/apps/server-nestjs/src/modules/nexus/nexus-controller.service.ts index 52c9a8965..d93ee4e36 100644 --- a/apps/server-nestjs/src/modules/nexus/nexus-controller.service.ts +++ b/apps/server-nestjs/src/modules/nexus/nexus-controller.service.ts @@ -4,6 +4,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common' import { OnEvent } from '@nestjs/event-emitter' import { Cron, CronExpression } from '@nestjs/schedule' import { trace } from '@opentelemetry/api' + import { ConfigurationService } from '../../cpin-module/infrastructure/configuration/configuration.service' import { StartActiveSpan } from '../../cpin-module/infrastructure/telemetry/telemetry.decorator' import { VaultClientService, VaultError } from '../vault/vault-client.service'