Skip to content

Refactor/modularize resilientws#13

Open
fakhrilainur wants to merge 11 commits into
ipanardian:developfrom
fakhrilainur:refactor/modularize-resilientws
Open

Refactor/modularize resilientws#13
fakhrilainur wants to merge 11 commits into
ipanardian:developfrom
fakhrilainur:refactor/modularize-resilientws

Conversation

@fakhrilainur

Copy link
Copy Markdown

Code Changes Summary: Refactor & Concurrent Safety Fixes

Overview

This change refactors the monolithic resilientws.go (~1000 lines) into 7 focused, single-responsibility modules while simultaneously fixing critical concurrent write race conditions and lock contention issues.

Why This Change?

1. Code Organization & Maintainability

  • The original resilientws.go mixed concerns: connection management, message I/O, backoff logic, heartbeat, logging, and state management
  • Splitting into focused modules makes the code easier to understand, test, and modify

2. Concurrent Safety

  • Critical bug: gorilla/websocket connections are NOT safe for concurrent writes
  • Multiple goroutines (Send, SendJSON, processMessageQueue, heartbeat) could race on writes
  • Lock contention: r.mu.Lock() held during I/O operations blocked all readers/writers for 100+ ms

Key Changes

New Files Created

File Purpose
types.go Struct definitions, enums, sentinel errors
logger.go Logger interface & default implementation
state.go Atomic state read helpers
backoff.go Backoff calculation & reconnect attempt tracking
connection.go Dial, connect, reconnect logic
heartbeat.go Ping/pong keepalive
io.go Read, write, send, queue operations

Critical Bug Fixes

1. Write-side Race Condition (NEW: writeMu in types.go, io.go)

// BEFORE: Multiple goroutines could call conn.WriteMessage concurrently
Send() {
    r.mu.Lock()
    err := conn.WriteMessage(...)
    r.mu.Unlock()
}

// AFTER: All writes serialized through writeMu
lockedWrite(conn *websocket.Conn, fn func() error) error {
    r.mu.RLock()          // read deadline config
    deadline := r.WriteDeadline
    r.mu.RUnlock()
    
    r.writeMu.Lock()      // serialize all writes
    defer r.writeMu.Unlock()
    // write operation
}

Impact: Prevents data corruption from concurrent writes to websocket connection.

2. TOCTOU (Time-of-Check-Time-of-Use) Race in Read/Write Methods

// BEFORE: TOCTOU window between check and use
if !r.IsConnected() { return errNotConnected }  // releases lock here
msgType, msg, err := r.Conn.ReadMessage()       // r.Conn could be nil!

// AFTER: Atomic check
conn, ok := r.checkConnection()  // returns both under single lock
if !ok { return errNotConnected }
msgType, msg, err := conn.ReadMessage()  // safe

Impact: Eliminates nil-pointer dereferences and race windows.

3. Lock Held During I/O in Close/CloseAndReconnect (resilientws.go)

// BEFORE: 100ms sleep while holding r.mu.Lock()
r.mu.Lock()
conn := r.Conn
if conn != nil {
    conn.WriteControl(...)
    time.Sleep(100 * time.Millisecond)  // <-- blocks everything!
    conn.Close()
}
r.mu.Unlock()

// AFTER: Release lock before I/O
r.mu.Lock()
conn := r.Conn
r.Conn = nil
r.mu.Unlock()

if conn != nil {
    conn.WriteControl(...)  // safe, no lock held
    time.Sleep(100 * time.Millisecond)
    conn.Close()
}

Impact: Prevents reader/writer goroutines from blocking on every reconnect.

4. Safe Type Assertion in emitEvent (resilientws.go:286)

// BEFORE: Panics if Data is not time.Duration
r.onReconnectingFn(event.Data.(time.Duration))

// AFTER: Safe assertion
if d, ok := event.Data.(time.Duration); ok {
    r.onReconnectingFn(d)
}

Impact: Prevents runtime panic from type assertion failures.

5. Backoff Integer Overflow (backoff.go:33-42)

// BEFORE: bit shift can overflow even with attempt > 30 cap
backoff := min * time.Duration(1<<attempt)

// AFTER: Iterative doubling with overflow check
func calculateBackoff(min, max time.Duration, attempt int) time.Duration {
    backoff := min
    for i := 0; i < attempt; i++ {
        backoff *= 2
        if backoff > max || backoff < 0 { return max }
    }
    return backoff
}

Impact: Prevents backoff wrap-around from integer overflow.

6. Reader Exit on Close Message (io.go)

// BEFORE: handleMessage had unreachable return
case websocket.CloseMessage:
    return  // returns from handleMessage, loop continues!

// AFTER: Propagates close signal up
if !r.handleMessage(msgType, msg) {
    return  // reader loop exits
}

Impact: Reader goroutine properly exits on server-initiated close.

7. Subscribe Retry Off-by-One (connection.go)

// BEFORE: backoff checked lags by one iteration
backoff = r.backoff(attempt)  // computed after error handling
handleSubscribeError(err, backoff, max, attempt)  // compares stale value

// AFTER: backoff computed before error handling
nextBackoff := r.backoff(attempt)
handleSubscribeError(err, nextBackoff, max, attempt)  // compares current value

Impact: Retry termination logic checks correct backoff value.

Testing

  • Passed all test in test folder

Files Modified

  • resilientws.go reduced from ~904 lines to ~325 lines
  • 7 new focused modules created

Backward Compatibility

Fully backward compatible all public APIs unchanged, internal refactoring only.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants