STAMP-based safety framework for data pipeline reliability. Interlock prevents pipelines from executing when preconditions aren't safe — like a physical interlock mechanism.
The framework applies Leveson's Systems-Theoretic Accident Model to data engineering: pipelines have control structures with traits (feedback), readiness predicates (process models), and conditional execution (safe control actions).
go get github.com/dwsmith1983/interlock
make build
# Initialize a project (starts a local Valkey container)
./interlock init my-project
cd my-project
# Evaluate a pipeline's readiness
interlock evaluate example
# Run a pipeline (evaluate + trigger)
interlock run example
# Check status
interlock status
# Start the HTTP API server
interlock serveInterlock uses a three-level check system:
- Archetypes define what to check — reusable templates of safety traits (e.g.,
batch-ingestionrequires source-freshness, upstream-dependency, resource-availability). - Pipeline configs specialize how — override thresholds, point to specific evaluators, set TTLs.
- Evaluators perform the actual checks — subprocess (any language, JSON stdin/stdout) or HTTP.
interlock evaluate my-pipeline
-> load pipeline config
-> resolve archetype (merge trait definitions)
-> for each required trait, IN PARALLEL:
spawn evaluator subprocess -> pipe config JSON to stdin -> read result from stdout
-> apply readiness rule (all-required-pass)
-> READY or NOT_READY (with blocking trait list)
Evaluators are executable files. Interlock pipes config as JSON to stdin and reads the result from stdout.
Input (stdin):
{"maxLagSeconds": 300, "source": "sales_events"}Output (stdout):
{"status": "PASS", "value": {"lagSeconds": 45, "threshold": 300}}Status must be "PASS" or "FAIL". Non-zero exit code or timeout = automatic FAIL.
Write evaluators in any language:
#!/bin/bash
# evaluators/check-disk-space
echo '{"status":"PASS","value":{"disk_pct":42}}'#!/usr/bin/env python3
# evaluators/check-source-freshness
import json, sys
config = json.load(sys.stdin)
lag = 45 # query your source system here
result = {"status": "PASS" if lag <= config["maxLagSeconds"] else "FAIL",
"value": {"lagSeconds": lag}}
json.dump(result, sys.stdout)Interlock runs in two modes: local (Redis + subprocess evaluators) and AWS (DynamoDB + Lambda + Step Functions).
┌──────────────────────────────────────────────────┐
│ interlock serve │
│ (HTTP API + watcher loop + status) │
├──────────────────────────────────────────────────┤
│ InterlockEngine │
│ (pure STAMP logic — readiness, lifecycle, UCA) │
├──────────────┬───────────────────────────────────┤
│ Provider │ Redis/Valkey [implemented] │
│ Interface │ DynamoDB [implemented] │
│ │ Postgres [archival only] │
├──────────────┴───────────────────────────────────┤
│ Evaluator Runner │
│ (subprocess: JSON stdin → JSON stdout) │
│ (any language: Python, Bash, Go, JS, etc.) │
└──────────────────────────────────────────────────┘
┌──────────────────┐ DynamoDB Stream ┌──────────────────┐
│ DynamoDB │ ──────────────────────► │ stream-router │
│ (single table) │ │ (MARKER# → SFN) │
└────────┬─────────┘ └────────┬─────────┘
│ │
│ ┌────────────▼─────────────┐
│ │ Step Function │
│ │ (47-state lifecycle) │
│ └──┬────┬────┬─────┬───────┘
│ │ │ │ │
┌────▼─────┐ ┌──────────┐ ┌───────────┴┐ ┌┴────▼──┐ ┌┴───────────┐
│orchestr- │ │evaluator │ │ trigger │ │ run- │ │ SNS │
│ ator │ │(per-trait│ │(job launch)│ │checker │ │ (alerts) │
│(14 acts) │ │ eval) │ │ │ │(poll) │ │ │
└──────────┘ └──────────┘ └────────────┘ └────────┘ └────────────┘
| Cloud | Status |
|---|---|
| AWS (DynamoDB + Lambda + Step Functions) | Implemented |
| GCP (Firestore + Cloud Run + Workflows) | Planned |
| Azure (Cosmos DB + Functions + Durable Functions) | Planned |
| Type | SDK/Protocol | Use Case |
|---|---|---|
command |
Subprocess | Local scripts, CLI tools |
http |
HTTP POST | Generic REST APIs, webhooks |
airflow |
HTTP (Airflow API) | Apache Airflow DAG runs |
glue |
AWS SDK | AWS Glue ETL jobs |
emr |
AWS SDK | Amazon EMR step execution |
emr-serverless |
AWS SDK | EMR Serverless job runs |
step-function |
AWS SDK | AWS Step Functions executions |
databricks |
HTTP (REST 2.1) | Databricks job runs |
Failed triggers are automatically retried with configurable exponential backoff:
retry:
maxAttempts: 3
backoffSeconds: 30
backoffMultiplier: 2.0
retryableFailures: [TRANSIENT, TIMEOUT]When a pipeline completes, Interlock notifies downstream pipelines that depend on it via cascade markers, triggering their evaluation cycles.
After completion, Interlock can monitor traits for drift — detecting when conditions that were true at trigger time have degraded. Drift triggers alerts and rerun records.
watch:
interval: 30s
monitoring:
enabled: true
duration: 2hPipelines define evaluation and completion deadlines. Breaches fire alerts:
sla:
evaluationDeadline: "09:00"
completionDeadline: "12:00"
timezone: America/New_YorkPipelines can define multiple evaluation schedules, each with independent timing, deadlines, and SLA tracking:
name: multi-window-pipeline
archetype: batch-ingestion
schedules:
- name: morning
after: "06:00"
deadline: "09:00"
timezone: America/New_York
- name: evening
after: "18:00"
deadline: "21:00"
timezone: America/New_YorkPipelines without explicit schedules default to a single daily schedule.
Skip pipeline evaluation on specific days or dates:
exclusions:
days: [Saturday, Sunday]
dates: ["2025-12-25", "2026-01-01"]
calendar: us-holidays # reference a named calendar fileprovider: redis
redis:
addr: localhost:6379
keyPrefix: "interlock:"
server:
addr: ":3000"
archetypeDirs:
- ./archetypes
evaluatorDirs:
- ./evaluators
pipelineDirs:
- ./pipelines
alerts:
- type: console
- type: webhook
url: http://localhost:8080/alertsname: daily-sales-rollup
archetype: batch-ingestion
tier: 2
traits:
source-freshness:
evaluator: ./evaluators/check-sales-freshness
config:
source: sales_events
maxLagSeconds: 60
ttl: 120
timeout: 15
trigger:
type: command
command: "python scripts/run_pipeline.py"interlock/
├── cmd/
│ ├── interlock/ # CLI binary
│ └── lambda/ # AWS Lambda handlers
│ ├── stream-router/ # DynamoDB Stream -> SFN
│ ├── orchestrator/ # Multi-action workflow logic
│ ├── evaluator/ # Single trait evaluation
│ ├── trigger/ # Job execution + state machine
│ └── run-checker/ # External job status polling
├── pkg/types/ # Public domain types
├── internal/
│ ├── engine/ # Readiness evaluation engine
│ ├── provider/
│ │ ├── redis/ # Redis/Valkey provider
│ │ ├── dynamodb/ # DynamoDB provider (single-table)
│ │ └── postgres/ # Postgres archival store
│ ├── watcher/ # Reactive evaluation loop
│ ├── schedule/ # Schedule, SLA, retry utilities
│ ├── evaluator/ # Subprocess + HTTP evaluator runners
│ ├── trigger/ # Trigger execution (8 types)
│ ├── alert/ # Alert dispatching (console, webhook, file, SNS)
│ ├── archetype/ # Archetype loading + resolution
│ ├── calendar/ # Calendar exclusion registry
│ └── config/ # YAML config loading
├── deploy/
│ ├── terraform/ # Terraform deployment (AWS infrastructure)
│ ├── build.sh # Lambda build script
│ └── statemachine.asl.json # Step Function definition
└── demo/
├── local/ # Local demo (Redis + Docker Compose)
└── aws/ # AWS E2E test suite
make build # Build binary
make test # Run all tests
make test-unit # Unit tests (no Redis needed)
make test-integration # Integration tests (requires Redis)
make lint # gofmt + go vet + golangci-lint
make build-lambda # Build Lambda handlers
make local-e2e-test # Run local E2E test suite (Docker)- Go 1.24+
- Docker (for
interlock init— starts Valkey container) - Redis/Valkey on
localhost:6379(for integration tests) - AWS CLI v2 + Terraform >= 1.5 (for AWS deployment)