This document captures all improvements identified and partially implemented during the January 2026 review session. These should be implemented systematically in future development cycles.
Standardized error handling with retry support and context.
export type AdapterErrorCode =
| 'CONNECTION_FAILED'
| 'AUTHENTICATION_FAILED'
| 'VALIDATION_FAILED'
| 'DOWNLOAD_FAILED'
| 'UPLOAD_FAILED'
| 'TIMEOUT'
| 'RATE_LIMITED'
| 'NOT_FOUND'
| 'INVALID_CONFIG'
| 'UNKNOWN';
export class AdapterError extends Error {
public readonly code: AdapterErrorCode;
public readonly retryable: boolean;
public readonly context?: Record<string, unknown>;
public readonly cause?: Error;
constructor(
message: string,
code: AdapterErrorCode = 'UNKNOWN',
retryable: boolean = false,
context?: Record<string, unknown>,
cause?: Error
) {
super(message);
this.name = 'AdapterError';
this.code = code;
this.retryable = retryable;
this.context = context;
this.cause = cause;
}
static from(error: unknown, code?: AdapterErrorCode, retryable?: boolean, context?: Record<string, unknown>): AdapterError;
toJSON(): Record<string, unknown>;
}Structured configuration for adapter settings.
export interface ConfigItem {
id: string;
name: string;
required: boolean;
type?: string;
default?: any;
}Settings for adapter endpoints.
export interface EndpointSettings {
pagination?: AdapterPagination | false;
config?: ConfigItem[];
}OAuth flow utilities.
export type Helpers = {
getCode: (redirectUrl: string, client_id: string) => string;
getTokens: (redirectUrl: string, client_id: string, secret_id: string, queryParams: string) => object;
}export interface BaseAdapter {
id: string;
name: string;
type?: "http" | "database" | "file";
category?: string; // e.g., "Databases & Data Warehouses"
image?: string; // URL to adapter logo
action: Array<"download" | "upload" | "sync">;
credential_type: "api_key" | "oauth2" | "basic";
config?: ConfigItem[];
metadata?: {
description?: string;
provider?: string;
version?: string;
[key: string]: any;
};
pagination?: AdapterPagination;
helpers?: Helpers;
}export interface Endpoint {
id: string;
tool?: string;
description?: string;
supported_actions: Array<"download" | "upload" | "sync">;
settings?: EndpointSettings;
}
export interface HttpEndpoint extends Endpoint {
path: string;
method: "GET" | "POST" | "PUT" | "DELETE";
defaultFields?: string[];
}
export interface DatabaseEndpoint extends Endpoint {
query_type: "table" | "custom";
}Complex filter conditions with AND/OR logic.
export type FilterGroup = {
op: "AND" | "OR";
filters: Array<Filter | FilterGroup>;
};
// In Connector:
filters?: Array<Filter | FilterGroup>;export interface AdapterInstance {
connect?(): Promise<void>;
disconnect?(): Promise<void>;
download(pageOptions: { limit: number; offset: number }): Promise<{
data: any[];
options?: { nextOffset?: string | number; totalCount?: number; [key: string]: any; };
}>;
upload?(data: any[]): Promise<void>;
validate?(): Promise<{ valid: boolean; message?: string; details?: Record<string, unknown> }>;
getSchema?(): Promise<{ fields: string[]; types?: Record<string, string>; }>;
count?(): Promise<number>;
getConfig?(): BaseAdapter | HttpAdapter | DatabaseAdapter;
}export function validatePipeline<T>(
pipeline: Pipeline<T>,
adapters: Adapters,
vault: Vault
): { valid: boolean; errors: string[]; warnings: string[] };export interface Connector {
// ... existing fields
debug?: boolean; // Enable debug logging
}function calculateBackoff(
attempt: number,
baseDelay: number = 1000,
maxDelay: number = 30000
): number {
const exponentialDelay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
const jitter = Math.random() * 0.3 * exponentialDelay;
return exponentialDelay + jitter;
}export const DEFAULT_CONFIG = {
ITEMS_PER_PAGE: 100,
TOTAL_ITEMS_LIMIT: 1000000,
TIMEOUT_MS: 30000,
RETRY_INTERVAL_MS: 1000,
MAX_RETRIES: 0,
MAX_BACKOFF_MS: 30000,
} as const;
export interface OrchestratorConfig {
itemsPerPage?: number;
totalItemsLimit?: number;
timeoutMs?: number;
retryIntervalMs?: number;
maxRetries?: number;
maxBackoffMs?: number;
}const ALLOWED_OPERATORS = new Set([
'=', '!=', '<>', '<', '>', '<=', '>=',
'LIKE', 'ILIKE', 'NOT LIKE', 'NOT ILIKE',
'IN', 'NOT IN', 'IS NULL', 'IS NOT NULL',
'BETWEEN', 'NOT BETWEEN'
]);
function escapeIdentifier(identifier: string): string {
if (!identifier || typeof identifier !== 'string') {
throw new Error('Invalid identifier: must be a non-empty string');
}
return `"${identifier.replace(/"/g, '""')}"`;
}
function validateOperator(operator: string): string {
const normalized = operator.trim().toUpperCase();
if (!ALLOWED_OPERATORS.has(normalized)) {
throw new Error(`Invalid SQL operator: '${operator}'`);
}
return normalized;
}function escapeIdentifier(identifier: string): string {
if (!identifier || typeof identifier !== 'string') {
throw new Error('Invalid identifier: must be a non-empty string');
}
return `\`${identifier.replace(/`/g, '``')}\``;
}Instead of:
// VULNERABLE
`SELECT ${fields} FROM ${table} WHERE status = '${value}'`Use:
// SECURE
const query = {
text: `SELECT "id", "name" FROM "public"."users" WHERE "status" = $1 LIMIT $2 OFFSET $3`,
values: [value, limit, offset]
};
await pool.query(query.text, query.values);All adapters should include:
const Adapter = {
id: "adapter-name", // Consistent, lowercase, hyphenated
name: "Human Readable Name",
type: "http" | "database",
category: "Category Name", // e.g., "Databases & Data Warehouses"
image: "https://...", // URL to logo SVG
// ...
};"Databases & Data Warehouses"- PostgreSQL, MySQL, MongoDB"SaaS & CRM Applications"- HubSpot, Zoho"E-commerce & Payment Platforms"- Stripe"Accounting & Finance"- Xero
| Adapter | ID | Category |
|---|---|---|
| PostgreSQL | postgresql |
Databases & Data Warehouses |
| MySQL | mysql |
Databases & Data Warehouses |
| MongoDB | mongodb |
Databases & Data Warehouses |
| HubSpot | hubspot |
SaaS & CRM Applications |
| Stripe | stripe |
E-commerce & Payment Platforms |
| Xero | xero |
Accounting & Finance |
| Google Ads | google-ads |
Advertising & Marketing |
| GitHub | github |
Developer Tools |
| Gmail | gmail |
Communication |
twitter |
Social Media | |
| ChartMogul | chartmogul |
Analytics |
| S3 | s3 |
Cloud Storage |
| Zoho | zoho |
SaaS & CRM Applications |
All adapters should use consistent webpack configuration:
const path = require('path');
module.exports = {
mode: 'production',
entry: './src/index.ts',
output: {
filename: 'index.js',
path: path.resolve(__dirname, 'dist'),
library: {
type: 'commonjs2',
},
},
resolve: {
extensions: ['.ts', '.js'],
},
module: {
rules: [
{
test: /\.ts$/,
use: 'ts-loader',
exclude: /node_modules/,
},
],
},
externals: {
// List adapter-specific externals
},
target: 'node',
};All adapter package.json files should include:
{
"name": "@openetl/adapter-name",
"version": "x.x.x",
"description": "Descriptive text with key features",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "npx webpack --config webpack.config.js",
"test": "jest",
"lint": "eslint src --ext .ts"
},
"keywords": ["etl", "adapter-name", "openetl", "data-pipeline"],
"license": "MIT",
"repository": {
"type": "git",
"url": "https://github.com/openetl/openetl.git",
"directory": "adapters/adapter-name"
},
"peerDependencies": {
"openetl": "^1.0.0"
}
}- Adapter metadata validation
- Query building with parameterized queries
- Filter handling (including FilterGroup)
- Error handling
- Authentication validation
expect(mockPool.query).toHaveBeenCalledWith(
`SELECT "id", "name" FROM "public"."users" WHERE "status" = $1 LIMIT $2 OFFSET $3`,
['active', 10, 0]
);Each adapter should have:
-
README.md (300+ lines) covering:
- Installation
- Quick start
- All endpoints
- Filtering options
- Security features
- Configuration
- Complete examples
- Error handling
-
Website docs at
docs/openetl/docs/adapters/[name].md
- Debug mode pattern via
connector.debugflag - Streaming support for large datasets
- Webhook/event hooks system
- Per-endpoint pagination settings
- Schema validation before upload
- Data type coercion
- Flatten nested objects utility
- CLI tool
- Monorepo restructuring
- Connection pooling improvements
- Dry run mode
- Resume/checkpoint support
- Phase 1: Core types (AdapterError, ConfigItem, etc.)
- Phase 2: Orchestrator improvements (backoff, validation)
- Phase 3: SQL injection protection for all database adapters
- Phase 4: Standardize webpack/package.json across adapters
- Phase 5: Update tests for parameterized queries
- Phase 6: Documentation updates
Currently, all data is loaded into memory. For large datasets, implement streaming:
export interface StreamingOptions {
chunkSize?: number;
onChunk?: (chunk: T[], chunkIndex: number) => Promise<void>;
backpressure?: boolean;
}
// Usage in pipeline
const pipeline = {
source: connector,
streaming: {
chunkSize: 1000,
onChunk: async (chunk, index) => {
// Process each chunk without loading all into memory
}
}
};Validate data schema before sending to target:
export interface SchemaValidator {
validate(data: any[], schema: SchemaInfo): ValidationResult;
}
// In Connector:
validation?: {
schema?: SchemaInfo;
strict?: boolean; // Fail on first error
coerce?: boolean; // Attempt type coercion
}Test pipeline without executing:
export interface DryRunResult {
wouldExtract: number;
wouldTransform: { before: any; after: any }[];
wouldLoad: number;
errors: string[];
warnings: string[];
}
// In pipeline execution:
orchestrator.runPipeline(pipeline, { dryRun: true });Continue from failure point:
export interface Checkpoint {
pipelineId: string;
lastOffset: number | string;
processedCount: number;
timestamp: string;
state?: Record<string, unknown>;
}
// In Pipeline:
checkpoint?: {
enabled: boolean;
storage: 'memory' | 'file' | 'redis';
path?: string;
}Process multiple batches concurrently:
export interface ConcurrencyOptions {
maxConcurrent?: number; // Default: 1 (sequential)
batchSize?: number;
preserveOrder?: boolean;
}
// In Pipeline rate_limiting:
rate_limiting?: {
requests_per_second: number;
concurrent_requests: number; // Enable this
max_retries_on_rate_limit: number;
}Insert or update based on key:
export interface UpsertOptions {
keys: string[]; // Unique key fields
updateFields?: string[]; // Fields to update (default: all)
onConflict?: 'update' | 'ignore' | 'error';
}
// In DatabaseEndpoint:
export interface DatabaseEndpoint extends Endpoint {
query_type: "table" | "custom" | "upsert";
upsert?: UpsertOptions;
}Command-line interface for running pipelines:
# Run a pipeline from JSON config
openetl run --config pipeline.json --vault vault.json
# Validate configuration
openetl validate --config pipeline.json
# List available adapters
openetl adapters list
# Test connection
openetl test-connection --adapter postgresql --vault vault.jsonTrack and sync only changed records:
export interface CDCOptions {
trackingColumn: string; // e.g., 'updated_at'
lastSync?: string | number; // Last sync timestamp/version
includeDeletes?: boolean;
softDeleteColumn?: string;
}Built-in anonymization:
export type MaskingType =
| 'hash' // SHA-256 hash
| 'redact' // Replace with ***
| 'partial' // Show last 4 chars
| 'fake' // Replace with fake data
| 'encrypt'; // Reversible encryption
export interface MaskingRule {
field: string;
type: MaskingType;
options?: Record<string, unknown>;
}
// In Connector:
masking?: MaskingRule[];Built-in metrics collection:
export interface PipelineMetrics {
extractedCount: number;
transformedCount: number;
loadedCount: number;
errorCount: number;
durationMs: number;
bytesProcessed: number;
avgRecordsPerSecond: number;
}
export interface MetricsCollector {
onExtract(count: number, durationMs: number): void;
onTransform(count: number, durationMs: number): void;
onLoad(count: number, durationMs: number): void;
onError(error: AdapterError): void;
getMetrics(): PipelineMetrics;
}Actual scheduler implementation (not just configuration):
import { schedule } from 'openetl/scheduler';
schedule(pipeline, {
cron: '0 0 * * *', // Daily at midnight
timezone: 'UTC',
retryOnFailure: true,
});Admin interface for pipeline management - consider separate package.
Support for running across multiple workers/nodes.
| Feature | OpenETL | Airbyte | Singer | Apache NiFi |
|---|---|---|---|---|
| TypeScript Native | ✅ | ❌ | ❌ | ❌ |
| Lightweight | ✅ | ❌ | ✅ | ❌ |
| MIT License | ✅ | ELVE2 | ✅ | Apache 2.0 |
| Streaming | ❌ TODO | ✅ | ✅ | ✅ |
| Schema Validation | ❌ TODO | ✅ | ✅ | ✅ |
| CDC Support | ❌ TODO | ✅ | Partial | ✅ |
| CLI Tool | ❌ TODO | ✅ | ✅ | ✅ |
| Web UI | ❌ | ✅ | ❌ | ✅ |
| Custom Transforms | ✅ | Limited | ✅ | ✅ |
| SQL Injection Protection | ✅ | ✅ | Varies | ✅ |
| Parameterized Queries | ✅ | ✅ | Varies | ✅ |
- Always use parameterized queries for database adapters
- Never interpolate user values directly into SQL strings
- Escape all identifiers (table names, column names)
- Validate operators against whitelist
- Include category and image in all adapters for UI display
- Use consistent adapter IDs across the codebase