A Go package for PostgreSQL logical replication with both full and incremental synchronization support.
- Full Synchronization: Initial data sync using consistent PostgreSQL snapshots
- Incremental Synchronization: Real-time change tracking via logical replication
- Automatic Recovery: LSN persistence for crash recovery and resumption
- Event-Driven: Callback-based architecture for data change events
- Thread-Safe: Concurrent-safe operations throughout
- Schema Filtering: Support for specific schemas and tables
- Auto-Management: Automatic publication and replication slot handling
go get github.com/hupeh/pglogreplYour PostgreSQL server must be configured for logical replication:
-
Check if logical replication is supported:
package main import ( "context" "log" "github.com/hupeh/pglogrepl" ) func main() { dsn := "host=localhost port=5432 user=postgres password=secret dbname=mydb" result, err := pglogrepl.CheckLogicalReplication(context.Background(), dsn) if err != nil { log.Fatal(err) } log.Println(result.String()) if !result.Supported { log.Fatal("Logical replication is not properly configured") } }
-
If not configured, edit
postgresql.conf:wal_level = logical max_replication_slots = 4 max_wal_senders = 4
-
Restart PostgreSQL to apply changes
-
Grant replication privileges to your user:
ALTER USER your_user WITH REPLICATION;
- PostgreSQL 10+
- Recommended: PostgreSQL 14+ (for streaming large transactions)
package main
import (
"context"
"log"
"github.com/hupeh/pglogrepl"
)
func main() {
// Configure connection
config := pglogrepl.Config{
Host: "localhost",
Port: 5432,
Username: "postgres",
Password: "password",
Database: "mydb",
Schema: "public",
Tables: []string{"users", "orders"}, // Empty for all tables
LSNFile: "/var/lib/myapp/lsn.dat",
}
// Create replication instance
repl := pglogrepl.New(config)
// Register event callbacks
repl.SetCallback(pglogrepl.EventInsert, func(table pglogrepl.Table, data map[string]any) {
log.Printf("[INSERT] %s: %v", table.Name(), data)
})
repl.SetCallback(pglogrepl.EventUpdate, func(table pglogrepl.Table, data map[string]any) {
log.Printf("[UPDATE] %s: %v", table.Name(), data)
})
repl.SetCallback(pglogrepl.EventDelete, func(table pglogrepl.Table, data map[string]any) {
log.Printf("[DELETE] %s: %v", table.Name(), data)
})
// Start replication
ctx := context.Background()
if err := repl.Start(ctx); err != nil {
log.Fatal(err)
}
defer repl.Stop()
// Block forever (or until signal)
select {}
}- Creates a consistent database snapshot with
REPEATABLE READisolation - Reads all existing rows from configured tables
- Dispatches
EventInsertfor each existing row - Creates a replication slot and publication
- Transitions to incremental synchronization
- Begins consuming WAL changes from the snapshot position
- Loads the saved LSN from file
- Validates the LSN against current configuration
- Connects to the existing replication slot
- Resumes consuming WAL changes from the saved position
- Dispatches events for each data modification
The Log Sequence Number (LSN) tracks the current position in the PostgreSQL Write-Ahead Log:
- Write: LSN is written to file on every change (buffered)
- Sync: Automatically synced to disk every minute
- Format:
checksum/LSN_string(e.g.,1234567890/0/16B2E58) - Recovery: Prevents duplicate processing after crashes
| Field | Type | Description | Default |
|---|---|---|---|
Host |
string |
PostgreSQL server hostname | localhost |
Port |
int |
PostgreSQL server port | 5432 |
Username |
string |
Database user (requires REPLICATION privilege) | - |
Password |
string |
Database password | - |
Database |
string |
Target database name | - |
SSLMode |
string |
SSL mode (disable/require/verify-ca/verify-full) | disable |
Tables |
[]string |
Tables to replicate (empty = all tables) | [] (all) |
Schema |
string |
PostgreSQL schema name | public |
PubName |
string |
Publication name | pglogrepl_demo |
SlotName |
string |
Replication slot name | {PubName}_sync_slot |
LSNFile |
string |
LSN persistence file path | - |
Logger |
Logger |
Custom logger implementation | stdout logger |
Implement the Logger interface for custom logging:
type MyLogger struct{}
func (l *MyLogger) LogInfo(format string, args ...any) {
log.Printf("[INFO] "+format, args...)
}
func (l *MyLogger) LogError(format string, args ...any) {
log.Printf("[ERROR] "+format, args...)
}
func (l *MyLogger) LogWarn(format string, args ...any) {
log.Printf("[WARN] "+format, args...)
}
config.Logger = &MyLogger{}Or use the function adapter:
config.Logger = pglogrepl.LoggerFunc(func(level, format string, args ...any) {
msg := fmt.Sprintf(format, args...)
log.Printf("[%s] %s", level, msg)
})EventInsert: Triggered when a new row is insertedEventUpdate: Triggered when a row is modifiedEventDelete: Triggered when a row is deletedEventTruncate: Triggered when a table is truncated (not fully implemented)
The callback receives:
table: Qualified table name (includes schema)data: Map of column names to values
repl.SetCallback(pglogrepl.EventUpdate, func(table pglogrepl.Table, data map[string]any) {
schema := table.Schema() // e.g., "public"
name := table.Name() // e.g., "users"
if userID, ok := data["id"].(int64); ok {
log.Printf("User %d updated in %s.%s", userID, schema, name)
}
})For DELETE events to include old row data, you must set the table's replica identity:
-- Option 1: Full row (all columns)
ALTER TABLE users REPLICA IDENTITY FULL;
-- Option 2: Using primary key (default, only key columns)
ALTER TABLE users REPLICA IDENTITY DEFAULT;
-- Option 3: Using a unique index
ALTER TABLE users REPLICA IDENTITY USING INDEX users_email_key;StatusStopped: Replication is not runningStatusStarting: Initializing connections and slotsStatusSyncing: Performing full synchronizationStatusListening: Active incremental replicationStatusStopping: Gracefully shutting down
status := repl.Status()
statusName := pglogrepl.StatusName(status)
log.Printf("Replication status: %s", statusName)
if err := repl.Err(); err != nil {
log.Printf("Replication error: %v", err)
}package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/hupeh/pglogrepl"
)
func main() {
config := pglogrepl.Config{
// ... configuration
}
repl := pglogrepl.New(config)
// Setup signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Start replication
ctx := context.Background()
if err := repl.Start(ctx); err != nil {
log.Fatal(err)
}
// Wait for signal
<-sigChan
log.Println("Shutting down...")
// Graceful stop
if err := repl.Stop(); err != nil {
log.Printf("Error during shutdown: %v", err)
}
}config := pglogrepl.Config{
// ... other fields
Tables: []string{"users", "orders", "products"},
}config := pglogrepl.Config{
// ... other fields
Tables: []string{}, // Empty = all tables in schema
}The package syncs LSN to disk every minute by default. The sync happens automatically in a background goroutine. For manual control, you can call lsn.Sync() directly if you have access to the LSN instance.
PostgreSQL 14+ supports streaming large transactions. The package automatically handles:
StreamStartMessageV2: Begin streaming a large transactionStreamCommitMessageV2: Commit the streamed transactionStreamAbortMessageV2: Abort the streamed transaction
If you see errors about existing replication slots:
-- List all replication slots
SELECT * FROM pg_replication_slots;
-- Drop a specific slot
SELECT pg_drop_replication_slot('slot_name');-- List all publications
SELECT * FROM pg_publication;
-- Drop a specific publication
DROP PUBLICATION publication_name;This occurs when the configuration (database, schema, tables, etc.) changes but the LSN file remains. Solutions:
- Delete the LSN file to force full resync
- Call
lsn.Reset()to clear the LSN - Ensure your configuration matches the previous run
Ensure your PostgreSQL user has the required privileges:
-- Grant replication privilege
ALTER USER your_user WITH REPLICATION;
-- Grant necessary table permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO your_user;-
LSN Sync Frequency: LSN is synced to disk every minute. More frequent syncs increase I/O but reduce potential data loss window.
-
Event Callbacks: Keep callbacks lightweight. Heavy processing should be done asynchronously:
eventChan := make(chan Event, 1000) repl.SetCallback(pglogrepl.EventInsert, func(table pglogrepl.Table, data map[string]any) { eventChan <- Event{Table: table, Data: data} }) // Process events in separate goroutine go processEvents(eventChan)
-
Network Latency: Use a dedicated connection or connection pooling for replication to avoid blocking other operations.
-
WAL Retention: Monitor your WAL disk usage. Inactive replication slots prevent WAL cleanup.
This package is part of the go-slim framework.
Contributions are welcome! Please ensure:
- Code follows Go conventions
- All tests pass
- Documentation is updated
- Commit messages are clear
For issues and questions:
- GitHub Issues: pglogrepl issues
- Documentation: Go package documentation