diff --git a/internal/client/client_integration_test.go b/internal/client/client_integration_test.go index c69be435..38e68b4e 100644 --- a/internal/client/client_integration_test.go +++ b/internal/client/client_integration_test.go @@ -31,7 +31,7 @@ func TestClient_V1Alpha1RoundTrip(t *testing.T) { mux := http.NewServeMux() api := humago.New(mux, huma.DefaultConfig("test", "v1")) - crud.Register(api, "/v0", stores, nil, nil, crud.PerKindHooks{}) + crud.Register(api, "/v0", stores, nil, nil, crud.PerKindHooks{}, nil) resource.RegisterApply(api, resource.ApplyConfig{ BasePrefix: "/v0", Stores: stores, @@ -143,7 +143,7 @@ func TestClient_V1Alpha1_NotFound(t *testing.T) { mux := http.NewServeMux() api := humago.New(mux, huma.DefaultConfig("test", "v1")) - crud.Register(api, "/v0", stores, nil, nil, crud.PerKindHooks{}) + crud.Register(api, "/v0", stores, nil, nil, crud.PerKindHooks{}, nil) ts := httptest.NewServer(mux) defer ts.Close() diff --git a/internal/registry/api/handlers/v0/crud/crud.go b/internal/registry/api/handlers/v0/crud/crud.go index 189d15c7..4d786706 100644 --- a/internal/registry/api/handlers/v0/crud/crud.go +++ b/internal/registry/api/handlers/v0/crud/crud.go @@ -23,6 +23,7 @@ import ( "github.com/agentregistry-dev/agentregistry/pkg/api/v1alpha1" "github.com/agentregistry-dev/agentregistry/pkg/registry/resource" "github.com/agentregistry-dev/agentregistry/pkg/registry/v1alpha1store" + "github.com/agentregistry-dev/agentregistry/pkg/types" ) // PerKindHooks groups optional, per-kind callbacks layered on top of @@ -70,6 +71,7 @@ func Register( resolver v1alpha1.ResolverFunc, registryValidator v1alpha1.RegistryValidatorFunc, perKind PerKindHooks, + deleteAdmission types.DeleteAdmission, ) { cfgFor := func(kind string) (resource.Config, bool) { store, ok := stores[kind] @@ -86,6 +88,7 @@ func Register( ListFilter: perKind.ListFilters[kind], PostUpsert: perKind.PostUpserts[kind], PostDelete: perKind.PostDeletes[kind], + DeleteAdmission: deleteAdmission, InitialFinalizers: perKind.InitialFinalizers[kind], }, true } diff --git a/internal/registry/api/handlers/v0/crud/deployment_hooks_test.go b/internal/registry/api/handlers/v0/crud/deployment_hooks_test.go index a4fb685e..5fd4e916 100644 --- a/internal/registry/api/handlers/v0/crud/deployment_hooks_test.go +++ b/internal/registry/api/handlers/v0/crud/deployment_hooks_test.go @@ -75,6 +75,7 @@ func seedDeploymentFixtures(t *testing.T) (humatest.TestAPI, map[string]*v1alpha }, }, }, + nil, ) deploymentlogs.Register(api, deploymentlogs.Config{ BasePrefix: "/v0", diff --git a/internal/registry/api/router/v0.go b/internal/registry/api/router/v0.go index 89334464..4fcc3970 100644 --- a/internal/registry/api/router/v0.go +++ b/internal/registry/api/router/v0.go @@ -21,6 +21,7 @@ import ( "github.com/agentregistry-dev/agentregistry/pkg/api/v1alpha1/registries" "github.com/agentregistry-dev/agentregistry/pkg/registry/resource" "github.com/agentregistry-dev/agentregistry/pkg/registry/v1alpha1store" + "github.com/agentregistry-dev/agentregistry/pkg/types" ) // Stores is the per-kind Store map used by the v1alpha1 @@ -73,6 +74,28 @@ type RouteOptions struct { // Optional callback for integration-owned route registration. ExtraRoutes func(api huma.API, pathPrefix string) + + // Admission optionally owns the final apply write. Nil preserves OSS + // production writes through resource.ProductionAdmission. + // TODO(krt): temporary synchronous-handler bridge; remove when KRT owns + // admission/staging. + Admission types.Admission + + // DeleteAdmission optionally owns the final delete. Nil preserves OSS + // production deletes through resource.ProductionDeleteAdmission. + // TODO(krt): temporary synchronous-handler bridge; remove when KRT owns + // admission/staging. + DeleteAdmission types.DeleteAdmission + + // ResolverWrapper decorates the shared ResourceRef resolver before + // resource and apply routes are registered. + // TODO(krt): temporary bridge for pending staged refs during HTTP apply. + ResolverWrapper func(v1alpha1.ResolverFunc) v1alpha1.ResolverFunc + + // ExtraResourceRoutes registers adjacent routes with access to the same + // v1alpha1 stores and hooks used by /v0/apply. + // TODO(krt): temporary bridge for downstream synchronous approval routes. + ExtraResourceRoutes func(api huma.API, pathPrefix string, ctx types.ResourceRouteContext) } // RegisterRoutes registers all API routes under /v0. Required @@ -109,6 +132,10 @@ func RegisterRoutes( opts.DeploymentCoordinator, opts.PerKindHooks, opts.RegistryValidator, + opts.Admission, + opts.DeleteAdmission, + opts.ResolverWrapper, + opts.ExtraResourceRoutes, ) if opts.ExtraRoutes != nil { @@ -136,8 +163,15 @@ func registerKindRoutes( coord *deploymentsvc.Coordinator, perKind crud.PerKindHooks, registryValidator v1alpha1.RegistryValidatorFunc, -) { + admission types.Admission, + deleteAdmission types.DeleteAdmission, + resolverWrapper func(v1alpha1.ResolverFunc) v1alpha1.ResolverFunc, + extraResourceRoutes func(api huma.API, pathPrefix string, ctx types.ResourceRouteContext), +) resource.ApplyConfig { resolver := internaldb.NewResolver(stores) + if resolverWrapper != nil { + resolver = resolverWrapper(resolver) + } if registryValidator == nil { registryValidator = registries.Dispatcher } @@ -174,7 +208,7 @@ func registerKindRoutes( // Per-kind CRUD endpoints — one call per built-in kind, hidden // inside crud.Register. - crud.Register(api, basePrefix, stores, resolver, registryValidator, perKind) + crud.Register(api, basePrefix, stores, resolver, registryValidator, perKind, deleteAdmission) // Deployment-specific endpoints: logs stream (cancel is subsumed // by DesiredState=undeployed + DELETE in the v1alpha1 lifecycle). @@ -191,7 +225,7 @@ func registerKindRoutes( // same per-kind hook table populated above, so Deployment reconciliation // and any caller-supplied PostUpsert/PostDelete fire identically on // the batch path. - resource.RegisterApply(api, resource.ApplyConfig{ + applyCfg := resource.ApplyConfig{ BasePrefix: basePrefix, Stores: stores, Resolver: resolver, @@ -200,5 +234,31 @@ func registerKindRoutes( PostUpserts: perKind.PostUpserts, PostDeletes: perKind.PostDeletes, InitialFinalizers: perKind.InitialFinalizers, - }) + Admission: admission, + DeleteAdmission: deleteAdmission, + } + productionApplyCfg := applyCfg + productionApplyCfg.Admission = resource.ProductionAdmission + productionDeleteCfg := applyCfg + productionDeleteCfg.DeleteAdmission = resource.ProductionDeleteAdmission + resource.RegisterApply(api, applyCfg) + + if extraResourceRoutes != nil { + opaqueStores := make(map[string]any, len(stores)) + for kind, store := range stores { + opaqueStores[kind] = store + } + extraResourceRoutes(api, basePrefix, types.ResourceRouteContext{ + Stores: opaqueStores, + Resolver: resolver, + RegistryValidator: registryValidator, + Apply: func(ctx context.Context, obj v1alpha1.Object, dryRun bool) arv0.ApplyResult { + return resource.ApplyObject(ctx, productionApplyCfg, obj, dryRun) + }, + Delete: func(ctx context.Context, obj v1alpha1.Object, dryRun bool) arv0.ApplyResult { + return resource.DeleteObject(ctx, productionDeleteCfg, obj, dryRun) + }, + }) + } + return applyCfg } diff --git a/internal/registry/registry_app.go b/internal/registry/registry_app.go index 4bc64fd6..88f0a714 100644 --- a/internal/registry/registry_app.go +++ b/internal/registry/registry_app.go @@ -213,10 +213,14 @@ func buildRouteOptions( adapters map[string]types.DeploymentAdapter, ) *router.RouteOptions { routeOpts := &router.RouteOptions{ - ExtraRoutes: options.ExtraRoutes, - Stores: stores, - PerKindHooks: crudPerKindHooks(options), - RegistryValidator: options.RegistryValidator, + ExtraRoutes: options.ExtraRoutes, + Stores: stores, + PerKindHooks: crudPerKindHooks(options), + RegistryValidator: options.RegistryValidator, + Admission: options.Admission, + DeleteAdmission: options.DeleteAdmission, + ResolverWrapper: options.ResolverWrapper, + ExtraResourceRoutes: options.ExtraResourceRoutes, } if stores != nil { diff --git a/internal/registry/service/deployment/coordinator.go b/internal/registry/service/deployment/coordinator.go index afc23723..d36dc823 100644 --- a/internal/registry/service/deployment/coordinator.go +++ b/internal/registry/service/deployment/coordinator.go @@ -95,10 +95,16 @@ func (c *Coordinator) Apply(ctx context.Context, deployment *v1alpha1.Deployment target, err := c.resolveTarget(ctx, deployment) if err != nil { + if errors.Is(err, v1alpha1.ErrDanglingRef) { + return c.persistReferencePending(ctx, deployment, err) + } return err } runtime, err := c.resolveRuntime(ctx, deployment) if err != nil { + if errors.Is(err, v1alpha1.ErrDanglingRef) { + return c.persistReferencePending(ctx, deployment, err) + } return err } @@ -124,6 +130,22 @@ func (c *Coordinator) Apply(ctx context.Context, deployment *v1alpha1.Deployment return c.persistApplyResult(ctx, deployment, result) } +func (c *Coordinator) persistReferencePending(ctx context.Context, deployment *v1alpha1.Deployment, cause error) error { + message := "referenced resource is not available yet" + if cause != nil { + message = cause.Error() + } + return c.persistApplyResult(ctx, deployment, &types.ApplyResult{ + Conditions: []v1alpha1.Condition{{ + Type: "Ready", + Status: v1alpha1.ConditionFalse, + Reason: "ReferencePending", + Message: message, + ObservedGeneration: deployment.Metadata.Generation, + }}, + }) +} + // Remove tears down a Deployment's runtime resources via the adapter and // merges the resulting Removed condition into the row's status. Called // after the row's DeletionTimestamp is set (soft-delete path) or when diff --git a/internal/registry/service/deployment/coordinator_test.go b/internal/registry/service/deployment/coordinator_test.go index 5346ec34..dadd3c9c 100644 --- a/internal/registry/service/deployment/coordinator_test.go +++ b/internal/registry/service/deployment/coordinator_test.go @@ -168,8 +168,17 @@ func TestCoordinator_DanglingTargetRef(t *testing.T) { }) err := coord.Apply(ctx, deployment) - require.Error(t, err) - require.ErrorIs(t, err, v1alpha1.ErrDanglingRef) + require.NoError(t, err) + + raw, err := stores[v1alpha1.KindDeployment].Get(ctx, deployment.Metadata.Namespace, deployment.Metadata.Name, "") + require.NoError(t, err) + var status v1alpha1.Status + require.NoError(t, v1alpha1.UnmarshalStatusFromStorage(raw.Status, &status)) + ready := status.GetCondition("Ready") + require.NotNil(t, ready) + require.Equal(t, v1alpha1.ConditionFalse, ready.Status) + require.Equal(t, "ReferencePending", ready.Reason) + require.Contains(t, ready.Message, "does-not-exist") } func TestCoordinator_Discover_ReturnsAdapterResults(t *testing.T) { diff --git a/pkg/api/v0/apply.go b/pkg/api/v0/apply.go index 44cfd0f9..34a73c3d 100644 --- a/pkg/api/v0/apply.go +++ b/pkg/api/v0/apply.go @@ -9,7 +9,7 @@ type ApplyResult struct { Namespace string `json:"namespace,omitempty"` Name string `json:"name"` Tag string `json:"tag,omitempty"` - // Status is one of: created, configured, unchanged, deleted, + // Status is one of: created, configured, unchanged, staged, deleted, // dry-run, failed. Matches kubectl-style apply output. Status string `json:"status"` // Generation is the server-managed generation after the apply. @@ -26,6 +26,7 @@ const ( ApplyStatusCreated = "created" ApplyStatusConfigured = "configured" ApplyStatusUnchanged = "unchanged" + ApplyStatusStaged = "staged" ApplyStatusDeleted = "deleted" ApplyStatusDryRun = "dry-run" ApplyStatusFailed = "failed" diff --git a/pkg/registry/resource/apply.go b/pkg/registry/resource/apply.go index 1519c8dc..512b09d3 100644 --- a/pkg/registry/resource/apply.go +++ b/pkg/registry/resource/apply.go @@ -2,7 +2,6 @@ package resource import ( "context" - "errors" "fmt" "net/http" @@ -10,8 +9,8 @@ import ( arv0 "github.com/agentregistry-dev/agentregistry/pkg/api/v0" "github.com/agentregistry-dev/agentregistry/pkg/api/v1alpha1" - pkgdb "github.com/agentregistry-dev/agentregistry/pkg/registry/database" "github.com/agentregistry-dev/agentregistry/pkg/registry/v1alpha1store" + "github.com/agentregistry-dev/agentregistry/pkg/types" ) // ApplyConfig is the per-server configuration for the multi-doc apply @@ -63,6 +62,24 @@ type ApplyConfig struct { // InitialFinalizers mirrors resource.Config.InitialFinalizers per kind. InitialFinalizers map[string]func(obj v1alpha1.Object) []string + + // Source labels the producer of objects entering this apply pipeline. + // Empty defaults to types.AdmissionSourceApply. + Source string + + // Admission optionally owns the final apply write. Nil uses + // ProductionAdmission, which writes to the configured production Store. + Admission types.Admission + + // DeleteAdmission optionally owns the final delete. Nil uses + // ProductionDeleteAdmission, which deletes from the configured production + // Store and runs the per-kind PostDelete hook. + DeleteAdmission types.DeleteAdmission + + // Prepare optionally mutates an object after validation and before + // admission. Import uses this to merge scanner output while still + // persisting through the shared apply path. + Prepare func(ctx context.Context, obj v1alpha1.Object) error } // applyInput receives a raw multi-doc YAML stream. RawBody keeps bytes @@ -150,6 +167,20 @@ func runApplyBatch(ctx context.Context, cfg ApplyConfig, scheme *v1alpha1.Scheme return out } +// ApplyObject runs one already-decoded object through the same production +// apply path used by POST /v0/apply. Downstream routes can call this to replay +// a previously accepted object without duplicating validation, authz, +// persistence, or post-upsert behavior. +func ApplyObject(ctx context.Context, cfg ApplyConfig, obj v1alpha1.Object, dryRun bool) arv0.ApplyResult { + return applyOne(ctx, cfg, obj, dryRun) +} + +// DeleteObject runs one already-decoded object through the same production +// delete path used by DELETE /v0/apply. +func DeleteObject(ctx context.Context, cfg ApplyConfig, obj v1alpha1.Object, dryRun bool) arv0.ApplyResult { + return deleteOne(ctx, cfg, obj, dryRun) +} + // applyOne runs a single document through the shared apply pipeline. // Never errors; encodes any failure into the returned ApplyResult. func applyOne(ctx context.Context, cfg ApplyConfig, obj v1alpha1.Object, dryRun bool) arv0.ApplyResult { @@ -164,36 +195,26 @@ func applyOne(ctx context.Context, cfg ApplyConfig, obj v1alpha1.Object, dryRun return failResult(res, ae) } - up, ae := applyCore(ctx, store, obj, applyOpts{ + admitted, ae := applyCore(ctx, store, obj, applyOpts{ Authorize: batchAuthorize(cfg, obj.GetKind()), Resolver: cfg.Resolver, RegistryValidator: cfg.RegistryValidator, PostUpsert: cfg.PostUpserts[obj.GetKind()], InitialFinalizers: cfg.InitialFinalizers[obj.GetKind()], + Admission: cfg.Admission, + Source: cfg.Source, + Prepare: cfg.Prepare, }, dryRun) if ae != nil { return failResult(res, ae) } - if dryRun { - res.Status = arv0.ApplyStatusDryRun - return res - } - // Map the Store outcome onto the wire status. Tagged-artifact creates - // surface as created, same-tag replacements as configured, and exact - // re-applies as unchanged. - switch up.Outcome { - case v1alpha1store.UpsertCreated: - res.Status = arv0.ApplyStatusCreated - case v1alpha1store.UpsertReplaced: - res.Status = arv0.ApplyStatusConfigured - case v1alpha1store.UpsertNoOp: + res.Status = admitted.Status + if res.Status == "" { res.Status = arv0.ApplyStatusUnchanged } - if up.Tag != "" { - res.Tag = up.Tag - } - res.Generation = up.Generation + res.Tag = admitted.Tag + res.Generation = admitted.Generation return res } @@ -219,38 +240,21 @@ func deleteOne(ctx context.Context, cfg ApplyConfig, obj v1alpha1.Object, dryRun return failResult(res, ae) } - if dryRun { - res.Status = arv0.ApplyStatusDryRun - return res - } - - authz := batchAuthorize(cfg, obj.GetKind()) - if authz != nil { - if err := authz(ctx, AuthorizeInput{ - Verb: "delete", Kind: obj.GetKind(), - Namespace: meta.Namespace, Name: meta.Name, Tag: meta.Tag, - Object: obj, - }); err != nil { - return failResult(res, &applyError{Stage: stageAuth, Err: err}) - } - } - - err := store.DeleteByRef(ctx, meta.Namespace, meta.Name, meta.Tag) - if err != nil { - return failResult(res, &applyError{ - Stage: stageDelete, - Err: err, - NotFound: errors.Is(err, pkgdb.ErrNotFound), - }) + admitted, ae := deleteCore(ctx, store, obj.GetKind(), meta.Namespace, meta.Name, meta.Tag, deleteOpts{ + Authorize: batchAuthorize(cfg, obj.GetKind()), + PostDelete: cfg.PostDeletes[obj.GetKind()], + PreDeleteObject: obj, + DeleteAdmission: cfg.DeleteAdmission, + Source: cfg.Source, + }, dryRun) + if ae != nil { + return failResult(res, ae) } - - res.Tag = meta.Tag - if hook := cfg.PostDeletes[obj.GetKind()]; hook != nil { - if err := hook(ctx, obj); err != nil { - return failResult(res, &applyError{Stage: stagePostDelete, Err: err}) - } + res.Status = admitted.Status + if res.Status == "" { + res.Status = arv0.ApplyStatusDeleted } - res.Status = arv0.ApplyStatusDeleted + res.Tag = admitted.Tag return res } diff --git a/pkg/registry/resource/apply_test.go b/pkg/registry/resource/apply_test.go index fd91768f..7479dcb1 100644 --- a/pkg/registry/resource/apply_test.go +++ b/pkg/registry/resource/apply_test.go @@ -17,6 +17,7 @@ import ( pkgdb "github.com/agentregistry-dev/agentregistry/pkg/registry/database" "github.com/agentregistry-dev/agentregistry/pkg/registry/resource" "github.com/agentregistry-dev/agentregistry/pkg/registry/v1alpha1store" + "github.com/agentregistry-dev/agentregistry/pkg/types" ) func TestRegisterApply_MultiDocRoundTrip(t *testing.T) { @@ -119,6 +120,149 @@ spec: require.Contains(t, out.Results[1].Error, "unknown or unconfigured kind") } +func TestRegisterApply_AdmissionCanStageInsteadOfProductionUpsert(t *testing.T) { + pool := v1alpha1store.NewTestPool(t) + agents := v1alpha1store.NewStore(pool, "v1alpha1.agents") + + var admitted types.AdmissionInput + postUpsertCalled := false + _, api := humatest.New(t) + resource.RegisterApply(api, resource.ApplyConfig{ + BasePrefix: "/v0", + Stores: map[string]*v1alpha1store.Store{ + v1alpha1.KindAgent: agents, + }, + PostUpserts: map[string]func(context.Context, v1alpha1.Object) error{ + v1alpha1.KindAgent: func(context.Context, v1alpha1.Object) error { + postUpsertCalled = true + return nil + }, + }, + Admission: func(ctx context.Context, in types.AdmissionInput) (types.AdmissionResult, error) { + admitted = in + return types.AdmissionResult{Status: arv0.ApplyStatusStaged, Tag: in.Tag}, nil + }, + }) + + yaml := []byte(`apiVersion: ar.dev/v1alpha1 +kind: Agent +metadata: + namespace: default + name: staged-agent +spec: + title: Staged Agent +`) + resp := api.Post("/v0/apply", "Content-Type: application/yaml", strings.NewReader(string(yaml))) + require.Equal(t, http.StatusOK, resp.Code, resp.Body.String()) + + var out struct { + Results []arv0.ApplyResult `json:"results"` + } + require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &out)) + require.Len(t, out.Results, 1) + require.Equal(t, arv0.ApplyStatusStaged, out.Results[0].Status) + require.Equal(t, v1alpha1store.DefaultTag(), out.Results[0].Tag) + require.False(t, postUpsertCalled, "admitted applies must not fire production side effects") + require.Equal(t, types.AdmissionSourceApply, admitted.Source) + require.Equal(t, "apply", admitted.Verb) + require.Equal(t, v1alpha1.KindAgent, admitted.Kind) + require.Equal(t, "default", admitted.Namespace) + require.Equal(t, "staged-agent", admitted.Name) + require.Equal(t, v1alpha1store.DefaultTag(), admitted.Tag) + require.Same(t, agents, admitted.Store) + + _, err := agents.Get(t.Context(), "default", "staged-agent", v1alpha1store.DefaultTag()) + require.ErrorIs(t, err, pkgdb.ErrNotFound) +} + +func TestRegisterApply_DeleteAdmissionCanStageInsteadOfProductionDelete(t *testing.T) { + pool := v1alpha1store.NewTestPool(t) + agents := v1alpha1store.NewStore(pool, "v1alpha1.agents") + _, err := agents.Upsert(t.Context(), &v1alpha1.Agent{ + Metadata: v1alpha1.ObjectMeta{Namespace: "default", Name: "staged-delete", Tag: "stable"}, + Spec: v1alpha1.AgentSpec{Title: "Staged Delete"}, + }) + require.NoError(t, err) + + var admitted types.DeleteAdmissionInput + postDeleteCalled := false + _, api := humatest.New(t) + resource.RegisterApply(api, resource.ApplyConfig{ + BasePrefix: "/v0", + Stores: map[string]*v1alpha1store.Store{ + v1alpha1.KindAgent: agents, + }, + PostDeletes: map[string]func(context.Context, v1alpha1.Object) error{ + v1alpha1.KindAgent: func(context.Context, v1alpha1.Object) error { + postDeleteCalled = true + return nil + }, + }, + DeleteAdmission: func(ctx context.Context, in types.DeleteAdmissionInput) (types.DeleteAdmissionResult, error) { + admitted = in + return types.DeleteAdmissionResult{Status: arv0.ApplyStatusStaged, Tag: in.Tag}, nil + }, + }) + + yaml := []byte(`apiVersion: ar.dev/v1alpha1 +kind: Agent +metadata: + namespace: default + name: staged-delete + tag: stable +`) + resp := api.Do(http.MethodDelete, "/v0/apply", "Content-Type: application/yaml", strings.NewReader(string(yaml))) + require.Equal(t, http.StatusOK, resp.Code, resp.Body.String()) + + var out struct { + Results []arv0.ApplyResult `json:"results"` + } + require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &out)) + require.Len(t, out.Results, 1) + require.Equal(t, arv0.ApplyStatusStaged, out.Results[0].Status) + require.Equal(t, "stable", out.Results[0].Tag) + require.False(t, postDeleteCalled, "staged deletes must not fire production side effects") + require.Equal(t, types.AdmissionSourceDelete, admitted.Source) + require.Equal(t, "delete", admitted.Verb) + require.Equal(t, v1alpha1.KindAgent, admitted.Kind) + require.Equal(t, "default", admitted.Namespace) + require.Equal(t, "staged-delete", admitted.Name) + require.Equal(t, "stable", admitted.Tag) + require.NotNil(t, admitted.Object) + require.NotNil(t, admitted.PostDelete) + require.Same(t, agents, admitted.Store) + + row, err := agents.Get(t.Context(), "default", "staged-delete", "stable") + require.NoError(t, err) + require.Equal(t, "stable", row.Metadata.Tag) +} + +func TestApplyObject_ReusesProductionApplyPath(t *testing.T) { + pool := v1alpha1store.NewTestPool(t) + agents := v1alpha1store.NewStore(pool, "v1alpha1.agents") + + obj := &v1alpha1.Agent{ + TypeMeta: v1alpha1.TypeMeta{APIVersion: v1alpha1.GroupVersion, Kind: v1alpha1.KindAgent}, + Metadata: v1alpha1.ObjectMeta{ + Namespace: "default", + Name: "replayed-agent", + Tag: "stable", + }, + Spec: v1alpha1.AgentSpec{Title: "Replayed Agent"}, + } + res := resource.ApplyObject(t.Context(), resource.ApplyConfig{ + Stores: map[string]*v1alpha1store.Store{ + v1alpha1.KindAgent: agents, + }, + }, obj, false) + require.Equal(t, arv0.ApplyStatusCreated, res.Status) + require.Equal(t, "stable", res.Tag) + + row, err := agents.Get(t.Context(), "default", "replayed-agent", "stable") + require.NoError(t, err) + require.Equal(t, "stable", row.Metadata.Tag) +} + func TestRegisterApply_MutableObjectResultsDoNotExposeVersion(t *testing.T) { pool := v1alpha1store.NewTestPool(t) runtimes := v1alpha1store.NewMutableObjectStore(pool, "v1alpha1.runtimes") diff --git a/pkg/registry/resource/core.go b/pkg/registry/resource/core.go index 77316143..5787f334 100644 --- a/pkg/registry/resource/core.go +++ b/pkg/registry/resource/core.go @@ -4,9 +4,11 @@ import ( "context" "errors" + arv0 "github.com/agentregistry-dev/agentregistry/pkg/api/v0" "github.com/agentregistry-dev/agentregistry/pkg/api/v1alpha1" pkgdb "github.com/agentregistry-dev/agentregistry/pkg/registry/database" "github.com/agentregistry-dev/agentregistry/pkg/registry/v1alpha1store" + "github.com/agentregistry-dev/agentregistry/pkg/types" ) // applyOpts threads the per-kind dependencies into the apply pipeline. @@ -20,19 +22,9 @@ type applyOpts struct { RegistryValidator v1alpha1.RegistryValidatorFunc PostUpsert func(ctx context.Context, obj v1alpha1.Object) error InitialFinalizers func(obj v1alpha1.Object) []string -} - -// upsertResult is the outcome of a successful applyCore call. -type upsertResult struct { - // Outcome categorises what the underlying Store.Upsert did. Callers - // map this onto their wire status (ApplyStatusCreated, etc.). - Outcome v1alpha1store.UpsertOutcome - // Tag is the content tag after apply for tagged artifact stores. - Tag string - // Generation is the internal row generation after apply. - Generation int64 - // UID is the server-managed row identity after production upsert. - UID string + Admission types.Admission + Source string + Prepare func(ctx context.Context, obj v1alpha1.Object) error } // applyStage tags which step of the pipeline produced an error so @@ -45,6 +37,8 @@ const ( stageValidation applyStage = "validation" stageRefs applyStage = "refs" stageRegistries applyStage = "registries" + stageAdmission applyStage = "admission" + stagePrepare applyStage = "prepare" stageMarshal applyStage = "marshal" stageUpsert applyStage = "upsert" stagePostUpsert applyStage = "post-upsert" @@ -72,18 +66,18 @@ func (e *applyError) Error() string { // already-decoded, metadata-stamped object: // // canonicalize metadata → authorize → validate → resolve refs → -// validate registries → marshal spec → Store.Upsert → PostUpsert +// validate registries → prepare → admission // -// dryRun=true skips Upsert + PostUpsert; everything else still runs so -// clients get the same 400-class error surface they would on a real -// apply. Returns a stage-tagged applyError on failure; nil otherwise. +// The admission implementation owns the final write result. The OSS default +// ProductionAdmission maps dry-runs to ApplyStatusDryRun and real writes to +// Store.Upsert + PostUpsert. Returns a stage-tagged applyError on failure. func applyCore( ctx context.Context, store *v1alpha1store.Store, obj v1alpha1.Object, opts applyOpts, dryRun bool, -) (upsertResult, *applyError) { +) (types.AdmissionResult, *applyError) { meta := obj.GetMetadata() kind := obj.GetKind() @@ -103,52 +97,109 @@ func applyCore( Namespace: meta.Namespace, Name: meta.Name, Tag: meta.Tag, Object: obj, }); err != nil { - return upsertResult{}, &applyError{Stage: stageAuth, Err: err} + return types.AdmissionResult{}, &applyError{Stage: stageAuth, Err: err} } } if err := v1alpha1.ValidateObject(obj); err != nil { - return upsertResult{}, &applyError{Stage: stageValidation, Err: err} + return types.AdmissionResult{}, &applyError{Stage: stageValidation, Err: err} } if err := v1alpha1.ResolveObjectRefs(ctx, obj, opts.Resolver); err != nil { - return upsertResult{}, &applyError{Stage: stageRefs, Err: err} + return types.AdmissionResult{}, &applyError{Stage: stageRefs, Err: err} } if err := v1alpha1.ValidateObjectRegistries(ctx, obj, opts.RegistryValidator); err != nil { - return upsertResult{}, &applyError{Stage: stageRegistries, Err: err} + return types.AdmissionResult{}, &applyError{Stage: stageRegistries, Err: err} } - if dryRun { - return upsertResult{}, nil + if opts.Prepare != nil { + if err := opts.Prepare(ctx, obj); err != nil { + return types.AdmissionResult{}, &applyError{Stage: stagePrepare, Err: err} + } + } + + source := opts.Source + if source == "" { + source = types.AdmissionSourceApply + } + admission := opts.Admission + if admission == nil { + admission = ProductionAdmission + } + result, err := admission(ctx, types.AdmissionInput{ + Source: source, + Verb: "apply", + DryRun: dryRun, + Kind: kind, + Namespace: meta.Namespace, + Name: meta.Name, + Tag: meta.Tag, + Object: obj, + Store: store, + PostUpsert: opts.PostUpsert, + InitialFinalizers: opts.InitialFinalizers, + }) + if err != nil { + if ae, ok := err.(*applyError); ok { + return types.AdmissionResult{}, ae + } + return types.AdmissionResult{}, &applyError{Stage: stageAdmission, Err: err} + } + return result, nil +} + +// ProductionAdmission is the OSS admission implementation: dry-runs stop after +// validation, and real writes upsert the object into the production store and +// run the per-kind post-upsert hook. +func ProductionAdmission(ctx context.Context, in types.AdmissionInput) (types.AdmissionResult, error) { + if in.DryRun { + return types.AdmissionResult{Status: arv0.ApplyStatusDryRun, Tag: in.Tag}, nil + } + store, ok := in.Store.(*v1alpha1store.Store) + if !ok || store == nil { + return types.AdmissionResult{}, errors.New("production store is required") } upsertOpts := v1alpha1store.UpsertOpts{} - if opts.InitialFinalizers != nil { - upsertOpts.InitialFinalizers = opts.InitialFinalizers(obj) + if in.InitialFinalizers != nil { + upsertOpts.InitialFinalizers = in.InitialFinalizers(in.Object) } - up, err := store.Upsert(ctx, obj, upsertOpts) + up, err := store.Upsert(ctx, in.Object, upsertOpts) if err != nil { - return upsertResult{}, &applyError{ + return types.AdmissionResult{}, &applyError{ Stage: stageUpsert, Err: err, Terminating: errors.Is(err, v1alpha1store.ErrTerminating), } } - res := upsertResult{ - Outcome: up.Outcome, - Tag: up.Tag, - Generation: up.Generation, - UID: up.UID, - } - if opts.PostUpsert != nil { + if in.PostUpsert != nil { + meta := in.Object.GetMetadata() meta.Generation = up.Generation meta.UID = up.UID - obj.SetMetadata(*meta) - if err := opts.PostUpsert(ctx, obj); err != nil { - return res, &applyError{Stage: stagePostUpsert, Err: err} + in.Object.SetMetadata(*meta) + if err := in.PostUpsert(ctx, in.Object); err != nil { + return types.AdmissionResult{}, &applyError{Stage: stagePostUpsert, Err: err} } } - return res, nil + + return types.AdmissionResult{ + Status: applyStatusFromUpsert(up.Outcome), + Tag: up.Tag, + Generation: up.Generation, + }, nil +} + +func applyStatusFromUpsert(outcome v1alpha1store.UpsertOutcome) string { + switch outcome { + case v1alpha1store.UpsertCreated: + return arv0.ApplyStatusCreated + case v1alpha1store.UpsertReplaced: + return arv0.ApplyStatusConfigured + case v1alpha1store.UpsertNoOp: + return arv0.ApplyStatusUnchanged + default: + return arv0.ApplyStatusUnchanged + } } // deleteOpts threads the per-kind dependencies into deleteCore. As with @@ -160,11 +211,15 @@ type deleteOpts struct { Authorize func(ctx context.Context, in AuthorizeInput) error PostDelete func(ctx context.Context, obj v1alpha1.Object) error PreDeleteObject v1alpha1.Object + DeleteAdmission types.DeleteAdmission + Source string + Force bool } -// deleteCore runs Authorize → Store.Delete → PostDelete for a single resource. -// Validation is intentionally skipped — deleting a row should not require its -// spec to validate. +// deleteCore runs Authorize → delete admission for a single resource. +// Validation is intentionally skipped — deleting a row should not require +// its spec to validate. The OSS default admission performs Store.DeleteByRef +// + PostDelete; downstream implementations may stage or reject the delete. // // Returns NotFound=true on the missing-row case so callers can map it // to 404 (single PUT) or "not found" Result (batch). @@ -173,29 +228,70 @@ func deleteCore( store *v1alpha1store.Store, kind, namespace, name, tag string, opts deleteOpts, -) *applyError { + dryRun bool, +) (types.DeleteAdmissionResult, *applyError) { if opts.Authorize != nil { if err := opts.Authorize(ctx, AuthorizeInput{ Verb: "delete", Kind: kind, Namespace: namespace, Name: name, Tag: tag, Object: opts.PreDeleteObject, }); err != nil { - return &applyError{Stage: stageAuth, Err: err} + return types.DeleteAdmissionResult{}, &applyError{Stage: stageAuth, Err: err} } } - if err := store.Delete(ctx, namespace, name, tag); err != nil { - return &applyError{ + source := opts.Source + if source == "" { + source = types.AdmissionSourceDelete + } + admission := opts.DeleteAdmission + if admission == nil { + admission = ProductionDeleteAdmission + } + result, err := admission(ctx, types.DeleteAdmissionInput{ + Source: source, + Verb: "delete", + DryRun: dryRun, + Kind: kind, + Namespace: namespace, + Name: name, + Tag: tag, + Object: opts.PreDeleteObject, + Store: store, + PostDelete: opts.PostDelete, + Force: opts.Force, + }) + if err != nil { + if ae, ok := err.(*applyError); ok { + return types.DeleteAdmissionResult{}, ae + } + return types.DeleteAdmissionResult{}, &applyError{Stage: stageAdmission, Err: err} + } + return result, nil +} + +// ProductionDeleteAdmission is the OSS delete admission implementation. It +// removes the selected production row(s) and then runs the per-kind post-delete +// hook when present. +func ProductionDeleteAdmission(ctx context.Context, in types.DeleteAdmissionInput) (types.DeleteAdmissionResult, error) { + if in.DryRun { + return types.DeleteAdmissionResult{Status: arv0.ApplyStatusDryRun, Tag: in.Tag}, nil + } + store, ok := in.Store.(*v1alpha1store.Store) + if !ok || store == nil { + return types.DeleteAdmissionResult{}, errors.New("production store is required") + } + if err := store.DeleteByRef(ctx, in.Namespace, in.Name, in.Tag); err != nil { + return types.DeleteAdmissionResult{}, &applyError{ Stage: stageDelete, Err: err, NotFound: errors.Is(err, pkgdb.ErrNotFound), } } - - if opts.PostDelete != nil && opts.PreDeleteObject != nil { - if err := opts.PostDelete(ctx, opts.PreDeleteObject); err != nil { - return &applyError{Stage: stagePostDelete, Err: err} + if in.PostDelete != nil && in.Object != nil { + if err := in.PostDelete(ctx, in.Object); err != nil { + return types.DeleteAdmissionResult{}, &applyError{Stage: stagePostDelete, Err: err} } } - return nil + return types.DeleteAdmissionResult{Status: arv0.ApplyStatusDeleted, Tag: in.Tag}, nil } diff --git a/pkg/registry/resource/handler.go b/pkg/registry/resource/handler.go index e6e2b95a..eb0f1c30 100644 --- a/pkg/registry/resource/handler.go +++ b/pkg/registry/resource/handler.go @@ -31,6 +31,7 @@ import ( "github.com/agentregistry-dev/agentregistry/pkg/api/v1alpha1" pkgdb "github.com/agentregistry-dev/agentregistry/pkg/registry/database" "github.com/agentregistry-dev/agentregistry/pkg/registry/v1alpha1store" + "github.com/agentregistry-dev/agentregistry/pkg/types" ) // unescapePath URL-decodes a path segment captured by Huma. Resource @@ -112,6 +113,11 @@ type Config struct { // and writes the terminal Removed condition. PostDelete func(ctx context.Context, obj v1alpha1.Object) error + // DeleteAdmission optionally owns the final delete after authz. Nil uses + // ProductionDeleteAdmission, which deletes from the configured Store and + // runs PostDelete. + DeleteAdmission types.DeleteAdmission + // InitialFinalizers, when non-nil, seeds finalizers atomically on create. // Updates preserve existing finalizers. InitialFinalizers func(obj v1alpha1.Object) []string @@ -573,9 +579,13 @@ func runDeleteLatest[T v1alpha1.Object](ctx context.Context, cfg Config, newObj dopts := deleteOpts{Authorize: cfg.Authorize} if cfg.PostDelete != nil && !force { dopts.PostDelete = cfg.PostDelete + } + if cfg.DeleteAdmission != nil || dopts.PostDelete != nil { dopts.PreDeleteObject = obj } - if ae := deleteCore(ctx, cfg.Store, kind, ns, name, "", dopts); ae != nil { + dopts.DeleteAdmission = cfg.DeleteAdmission + dopts.Force = force + if _, ae := deleteCore(ctx, cfg.Store, kind, ns, name, "", dopts, false); ae != nil { return nil, mapApplyErrorToHuma(ae, kind, ns, name, "") } return &deleteOutput{}, nil @@ -614,7 +624,9 @@ func runDelete[T v1alpha1.Object](ctx context.Context, cfg Config, newObj func() if runHook { dopts.PostDelete = cfg.PostDelete } - if ae := deleteCore(ctx, cfg.Store, kind, ns, name, tag, dopts); ae != nil { + dopts.DeleteAdmission = cfg.DeleteAdmission + dopts.Force = force + if _, ae := deleteCore(ctx, cfg.Store, kind, ns, name, tag, dopts, false); ae != nil { return nil, mapApplyErrorToHuma(ae, kind, ns, name, tag) } return &deleteOutput{}, nil @@ -635,6 +647,8 @@ func mapApplyErrorToHuma(ae *applyError, kind, ns, name, tag string) error { return huma.Error400BadRequest("refs: " + ae.Err.Error()) case stageRegistries: return huma.Error400BadRequest("registries: " + ae.Err.Error()) + case stageAdmission: + return ae.Err case stageMarshal: return huma.Error400BadRequest("marshal spec: " + ae.Err.Error()) case stageUpsert: diff --git a/pkg/registry/resource/handler_test.go b/pkg/registry/resource/handler_test.go index b7285b62..c60e0183 100644 --- a/pkg/registry/resource/handler_test.go +++ b/pkg/registry/resource/handler_test.go @@ -20,6 +20,7 @@ import ( "github.com/agentregistry-dev/agentregistry/pkg/api/v1alpha1" "github.com/agentregistry-dev/agentregistry/pkg/registry/resource" "github.com/agentregistry-dev/agentregistry/pkg/registry/v1alpha1store" + "github.com/agentregistry-dev/agentregistry/pkg/types" ) // registerAgent wires the generic resource handler for *v1alpha1.Agent and @@ -224,6 +225,50 @@ func TestResourceRegister_DeleteTaggedPassesTagToAuthorizer(t *testing.T) { require.Equal(t, "stable", seen.Tag) } +func TestResourceRegister_DeleteAdmissionCanStageTaggedDelete(t *testing.T) { + pool := v1alpha1store.NewTestPool(t) + store := v1alpha1store.NewStore(pool, "v1alpha1.agents") + _, err := store.Upsert(t.Context(), &v1alpha1.Agent{ + Metadata: v1alpha1.ObjectMeta{Namespace: "default", Name: "alice", Tag: "stable"}, + Spec: v1alpha1.AgentSpec{Title: "Stable Alice"}, + }) + require.NoError(t, err) + + var admitted types.DeleteAdmissionInput + postDeleteCalled := false + _, api := humatest.New(t) + resource.Register[*v1alpha1.Agent](api, resource.Config{ + Kind: v1alpha1.KindAgent, + BasePrefix: "/v0", + Store: store, + PostDelete: func(context.Context, v1alpha1.Object) error { + postDeleteCalled = true + return nil + }, + DeleteAdmission: func(ctx context.Context, in types.DeleteAdmissionInput) (types.DeleteAdmissionResult, error) { + admitted = in + return types.DeleteAdmissionResult{Status: arv0.ApplyStatusStaged, Tag: in.Tag}, nil + }, + }, func() *v1alpha1.Agent { return &v1alpha1.Agent{} }) + + resp := api.Delete("/v0/agents/alice/stable") + require.Equal(t, http.StatusNoContent, resp.Code, resp.Body.String()) + require.False(t, postDeleteCalled, "staged deletes must not fire production side effects") + require.Equal(t, types.AdmissionSourceDelete, admitted.Source) + require.Equal(t, "delete", admitted.Verb) + require.Equal(t, v1alpha1.KindAgent, admitted.Kind) + require.Equal(t, "default", admitted.Namespace) + require.Equal(t, "alice", admitted.Name) + require.Equal(t, "stable", admitted.Tag) + require.NotNil(t, admitted.Object) + require.NotNil(t, admitted.PostDelete) + require.Same(t, store, admitted.Store) + + row, err := store.Get(t.Context(), "default", "alice", "stable") + require.NoError(t, err) + require.Equal(t, "stable", row.Metadata.Tag) +} + func TestResourceRegister_AgentNamespaceIsolation(t *testing.T) { pool := v1alpha1store.NewTestPool(t) store := v1alpha1store.NewStore(pool, "v1alpha1.agents") diff --git a/pkg/types/types.go b/pkg/types/types.go index 2fa000a2..7199d50d 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -17,6 +17,7 @@ import ( "github.com/danielgtaylor/huma/v2" + v0 "github.com/agentregistry-dev/agentregistry/pkg/api/v0" "github.com/agentregistry-dev/agentregistry/pkg/api/v1alpha1" "github.com/agentregistry-dev/agentregistry/pkg/registry/auth" "github.com/agentregistry-dev/agentregistry/pkg/registry/database" @@ -67,6 +68,84 @@ type PostUpsert func(ctx context.Context, obj v1alpha1.Object) error // batch's per-doc delete hook. type PostDelete func(ctx context.Context, obj v1alpha1.Object) error +const ( + AdmissionSourceApply = "apply" + AdmissionSourceDelete = "delete" + AdmissionSourceImport = "import" +) + +// Admission owns the final write decision for an apply request after authz, +// validation, reference resolution, and registry checks have passed. The OSS +// default writes to production; downstream integrations can wrap that behavior +// to stage, reject, or otherwise route the write. +// +// TODO(krt): this belongs to the synchronous handler architecture. Prefer a +// reconciler-owned admission/staging model when KRT becomes the write path, and +// delete this bridge once no downstream route depends on it. +type Admission func(ctx context.Context, in AdmissionInput) (AdmissionResult, error) + +type AdmissionInput struct { + Source string + Verb string + DryRun bool + Kind string + Namespace string + Name string + Tag string + Object v1alpha1.Object + Store any + PostUpsert PostUpsert + InitialFinalizers func(v1alpha1.Object) []string +} + +type AdmissionResult struct { + Status string + Tag string + Generation int64 +} + +// DeleteAdmission owns the final delete decision after authz has passed. The +// OSS default deletes from production; downstream integrations can stage, +// reject, or otherwise route the delete before production storage is touched. +// +// TODO(krt): temporary synchronous-handler bridge; remove with reconciler +// admission/staging. +type DeleteAdmission func(ctx context.Context, in DeleteAdmissionInput) (DeleteAdmissionResult, error) + +type DeleteAdmissionInput struct { + Source string + Verb string + DryRun bool + Kind string + Namespace string + Name string + Tag string + Object v1alpha1.Object + Store any + PostDelete PostDelete + Force bool +} + +type DeleteAdmissionResult struct { + Status string + Tag string +} + +// ResourceRouteContext exposes the finalized v1alpha1 route wiring to +// downstream integrations that need adjacent routes against the same stores +// and hooks as /v0/apply. +// +// TODO(krt): this is a temporary way for downstream synchronous routes to reuse +// production apply wiring. KRT should make this unnecessary by owning the +// staging-to-production transition outside HTTP route callbacks. +type ResourceRouteContext struct { + Stores map[string]any + Resolver v1alpha1.ResolverFunc + RegistryValidator v1alpha1.RegistryValidatorFunc + Apply func(ctx context.Context, obj v1alpha1.Object, dryRun bool) v0.ApplyResult + Delete func(ctx context.Context, obj v1alpha1.Object, dryRun bool) v0.ApplyResult +} + // Auditor receives audit events for state changes that the OSS layer // considers significant. The default OSS implementation is a no-op; // downstream builds plug in a real audit sink via NewStore options. @@ -152,6 +231,23 @@ type AppOptions struct { // PostDeletes mirror PostUpserts on the delete path. PostDeletes map[string]PostDelete + // Admission optionally accepts a validated write before the row reaches + // production storage. Nil preserves normal direct writes. + // TODO(krt): temporary synchronous-handler bridge; remove with reconciler + // admission/staging. + Admission Admission + + // DeleteAdmission optionally accepts an authorized delete before the row is + // removed from production storage. Nil preserves normal direct deletes. + // TODO(krt): temporary synchronous-handler bridge; remove with reconciler + // admission/staging. + DeleteAdmission DeleteAdmission + + // ResolverWrapper decorates the shared ResourceRef resolver before route + // registration. Nil preserves the default store-backed resolver. + // TODO(krt): temporary bridge for pending staged refs in HTTP apply. + ResolverWrapper func(v1alpha1.ResolverFunc) v1alpha1.ResolverFunc + // V1Alpha1StoreTables registers additional v1alpha1 kinds with their // backing PostgreSQL tables. Downstream builds that add their own // Scheme kinds should populate this so the shared /v0/apply, @@ -198,6 +294,11 @@ type AppOptions struct { // routes. ExtraRoutes func(api huma.API, pathPrefix string) + // ExtraResourceRoutes is like ExtraRoutes, but runs after the v1alpha1 + // resource route context has been finalized. + // TODO(krt): temporary bridge for downstream synchronous approval routes. + ExtraResourceRoutes func(api huma.API, pathPrefix string, ctx ResourceRouteContext) + // HTTPServerFactory is an optional function to create a server that // adds new API routes. HTTPServerFactory HTTPServerFactory