From 7e0bc955efdf7d6ead35df205fa5311b32626040 Mon Sep 17 00:00:00 2001 From: sukru tikves Date: Wed, 17 Jun 2026 10:58:30 -0700 Subject: [PATCH 1/2] Fix pipeline race condition: rotate all buffers by pipeline depth (#46) With pipeline depth 3, the GPU sampler output and logits buffers were shared across in-flight stages, causing stale reads (repeated tokens) under CPU contention. Introduce a shared pipelineDepth constant and rotate decodeOutputBuffers, decodeLogitsBuffers, and cachePositionBuffers so no two concurrent stages alias the same memory. --- .../CoreAIPipelinedEngine.swift | 114 ++++++++++++------ .../LanguageModel/CoreAILanguageModel.swift | 2 +- 2 files changed, 80 insertions(+), 36 deletions(-) diff --git a/swift/Sources/CoreAILanguageModels/InferenceEngines/CoreAIPipelinedEngine.swift b/swift/Sources/CoreAILanguageModels/InferenceEngines/CoreAIPipelinedEngine.swift index 53addfd..906124a 100644 --- a/swift/Sources/CoreAILanguageModels/InferenceEngines/CoreAIPipelinedEngine.swift +++ b/swift/Sources/CoreAILanguageModels/InferenceEngines/CoreAIPipelinedEngine.swift @@ -21,6 +21,10 @@ private func milliseconds(since start: ContinuousClock.Instant) -> Double { // MARK: - Constants +/// Maximum number of in-flight pipeline stages. Shared by the backpressure gate +/// and all buffer rotation logic to guarantee no two concurrent stages alias +/// the same memory. +private let pipelineDepth = 3 private let averageExpectedPromptSize = 256 private let temperatureTolerance: Double = 0.001 @@ -31,7 +35,7 @@ private let temperatureTolerance: Double = 0.001 /// Key features: /// - Non-blocking GPU encoding via `InferenceFunction.encode` /// - GPU-direct token sampling (argmax/topK) via MPSGraph compute shaders -/// - Double-buffered cache positions for CPU/GPU overlap +/// - Pipeline-depth-matched buffer rotation for CPU/GPU overlap /// - Growing KV cache with pipelined expansion /// - All tensors are owned MTLBuffers — Core AI never allocates/frees them final class CoreAIPipelinedEngine: InferenceEngine, Sendable { @@ -294,7 +298,7 @@ final class CoreAIPipelinedEngine: InferenceEngine, Sendable { /// sampler callback drains them (~70/s); depth grows until /// `MPSCommandBufferImageCache` fails to allocate another private MTLBuffer. /// -/// Capacity 3 covers {logits encode + sampler commit + optional KV-cache grow}; +/// Capacity matches `pipelineDepth` — covers {logits encode + sampler commit + optional KV-cache grow}; /// deeper queues only cost memory. /// /// Class, not actor: `release()` runs synchronously from the Metal callback — @@ -396,7 +400,9 @@ private struct EngineImpl: ~Copyable { // Owned MTLBuffers var inputTokensBuffer: MTLBuffer - var cachePositionBuffers: (MTLBuffer, MTLBuffer) + var cachePositionBuffers: [MTLBuffer] + var decodeOutputBuffers: [MTLBuffer] + var decodeLogitsBuffers: [MTLBuffer] // KV cache — reuses CoreAIKVCache protocol from KVCache+CoreAI.swift var kvCache: any CoreAIKVCache @@ -413,8 +419,8 @@ private struct EngineImpl: ~Copyable { var step: Int = 0 // Backpressure gate — see PipelineGate doc-comment for the failure mode it prevents. - // Capacity 3 covers {encode logits + sampler commit + optional KV-cache grow} in flight. - let inFlightGate = PipelineGate(capacity: 3) + // Capacity matches pipeline depth: {encode logits + sampler commit + optional KV-cache grow} in flight. + let inFlightGate = PipelineGate(capacity: pipelineDepth) // MARK: - Init @@ -486,22 +492,43 @@ private struct EngineImpl: ~Copyable { throw InferenceRuntimeError.bufferAllocationFailed("inputTokens (\(inputTokensByteCount) bytes)") } - // Allocate double-buffered cache positions + // Allocate pipeline-depth-matched cache position buffers let cachePosSize = config.maxContextLength * posIdsDesc.scalarType.byteSize - guard let cachePosBuf0 = device.makeBuffer(length: cachePosSize, options: .storageModeShared), - let cachePosBuf1 = device.makeBuffer(length: cachePosSize, options: .storageModeShared) - else { - throw InferenceRuntimeError.bufferAllocationFailed("cachePositions (\(cachePosSize * 2) bytes)") + var cachePosBuffers: [MTLBuffer] = [] + for _ in 0...size, options: .storageModeShared) else { + throw InferenceRuntimeError.bufferAllocationFailed("decodeOutputBuffer (\(MemoryLayout.size) bytes)") + } + decodeOutBuffers.append(buf) + } + + // Allocate pipeline-depth-matched decode logits buffers (inference writes logits for decode) + let decodeLogitsSize = config.vocabSize * MemoryLayout.size + var decodeLogBufs: [MTLBuffer] = [] + for _ in 0.., @@ -632,7 +661,7 @@ private struct EngineImpl: ~Copyable { // Prefill: write tokens at their natural position so this step's region is disjoint // from any prior chunk's region still in-flight on the GPU (encode holds a live // MTLBuffer reference; no encodeWriteOperands serialization available in Core AI). - // Decode: token is already at offset 0 via GPU-direct argmax write — no CPU write needed. + // Decode: token is in the previous step's decodeOutputBuffer — no CPU write needed. let tokenByteOffset = processedTokenCount * MemoryLayout.size if !tokens.isEmpty { let ptr = inputTokensBuffer.contents().bindMemory( @@ -642,20 +671,33 @@ private struct EngineImpl: ~Copyable { } } - // Select cache position buffer for this step (double-buffered) - let cachePosBuffer = step % 2 == 0 ? cachePositionBuffers.0 : cachePositionBuffers.1 + // Select cache position buffer for this step (pipeline-depth-matched rotation) + let cachePosBuffer = cachePositionBuffers[step % pipelineDepth] let posLength = processedTokenCount + queryLength // Build Inputs as AsyncValue (from MTLBuffers) let tokenShape = [1, queryLength] let tokenStrides = try resolvedStrides(descriptor: inputIdsBaseDesc, shape: tokenShape) - let tokenValue = unsafe InferenceFunction.AsyncValue( - unsafeBuffer: inputTokensBuffer, - byteOffset: tokens.isEmpty ? 0 : tokenByteOffset, - scalarType: .int32, - shape: tokenShape, - strides: tokenStrides - ) + let tokenValue: InferenceFunction.AsyncValue + if tokens.isEmpty { + // Decode: read input token from previous step's decode output buffer + tokenValue = unsafe InferenceFunction.AsyncValue( + unsafeBuffer: decodeOutputBuffers[(step + pipelineDepth - 1) % pipelineDepth], + byteOffset: 0, + scalarType: .int32, + shape: tokenShape, + strides: tokenStrides + ) + } else { + // Prefill: read from inputTokensBuffer at natural position + tokenValue = unsafe InferenceFunction.AsyncValue( + unsafeBuffer: inputTokensBuffer, + byteOffset: tokenByteOffset, + scalarType: .int32, + shape: tokenShape, + strides: tokenStrides + ) + } let posShape = [1, posLength] let posStrides = try resolvedStrides(descriptor: positionIdsBaseDesc, shape: posShape) let posValue = unsafe InferenceFunction.AsyncValue( @@ -698,11 +740,12 @@ private struct EngineImpl: ~Copyable { asyncStates.insert(&valState, for: valueCacheName) // Build Output as AsyncMutableValue (logits) - let logitsBuffer = logits.metalBuffer + // Decode uses per-step rotating buffer; prefill uses the shared growing buffer. + let logitsOutputBuffer = tokens.isEmpty ? decodeLogitsBuffers[step % pipelineDepth] : logits.metalBuffer let logitsShape = [1, queryLength, vocabSize] let logitsStrides = try resolvedStrides(descriptor: logitsBaseDesc, shape: logitsShape) var logitsOutput = unsafe InferenceFunction.AsyncMutableValue( - unsafeBuffer: logitsBuffer, + unsafeBuffer: logitsOutputBuffer, byteOffset: 0, scalarType: .float16, shape: logitsShape, @@ -731,7 +774,8 @@ private struct EngineImpl: ~Copyable { // GPU sampling via Metal queue let localGPUSampler = gpuSampler - let outputBuffer = inputTokensBuffer + let outputBuffer = decodeOutputBuffers[step % pipelineDepth] + let samplerLogitsBuffer = tokens.isEmpty ? decodeLogitsBuffers[step % pipelineDepth] : logits.metalBuffer let logitsOffset = (actualTokenCount - 1) * vocabSize * MemoryLayout.size let samplerStrategy = gpuSampler is MPSGraphArgmaxSampler ? "GPU-argmax" : "GPU-composite" let samplerTemperature = cachedSamplerTemperature ?? 0.0 @@ -757,7 +801,7 @@ private struct EngineImpl: ~Copyable { if queryLength == 1 { localGPUSampler.encode( to: queue, - logitsBuffer: logitsBuffer, + logitsBuffer: samplerLogitsBuffer, logitsOffset: logitsOffset, outputBuffer: outputBuffer, outputOffset: 0, @@ -766,7 +810,7 @@ private struct EngineImpl: ~Copyable { } else { localGPUSampler.encodeWithSlice( to: queue, - logitsBuffer: logitsBuffer, + logitsBuffer: samplerLogitsBuffer, queryLength: actualTokenCount, outputBuffer: outputBuffer, outputOffset: 0, @@ -969,7 +1013,7 @@ private struct EngineImpl: ~Copyable { ptr[processedTokenCount + i] = token } - let cachePosBuffer = step % 2 == 0 ? cachePositionBuffers.0 : cachePositionBuffers.1 + let cachePosBuffer = cachePositionBuffers[step % pipelineDepth] let posLength = processedTokenCount + queryLength // Build async values and encode @@ -1076,7 +1120,7 @@ private struct EngineImpl: ~Copyable { let ptr = inputTokensBuffer.contents().bindMemory(to: Int32.self, capacity: shape) for i in 0...size do { let queue = pipelineQueue warmupSampler.encode( to: queue, - logitsBuffer: logitsBuffer, + logitsBuffer: warmupLogitsBuffer, logitsOffset: logitsOffset, - outputBuffer: outputBuffer, + outputBuffer: warmupOutputBuffer, outputOffset: 0, completion: { _ in } ) diff --git a/swift/Sources/CoreAILanguageModels/LanguageModel/CoreAILanguageModel.swift b/swift/Sources/CoreAILanguageModels/LanguageModel/CoreAILanguageModel.swift index 696da7b..f970446 100644 --- a/swift/Sources/CoreAILanguageModels/LanguageModel/CoreAILanguageModel.swift +++ b/swift/Sources/CoreAILanguageModels/LanguageModel/CoreAILanguageModel.swift @@ -17,7 +17,7 @@ import Tokenizers /// /// ## Engine Selection /// The engine type is determined by `EngineFactory` based on model structure: -/// - **Pipelined**: GPU-accelerated with double buffering (fastest for GPU models) +/// - **Pipelined**: GPU-accelerated with pipeline-depth-matched buffering (fastest for GPU models) /// - **Sequential**: CPU-based synchronous execution (fallback) /// - **Static-shape**: Neural Engine optimized for chunked static models /// From 83ff4371f6670837c474bf4ec3d5efe1acff47c7 Mon Sep 17 00:00:00 2001 From: sukru tikves Date: Thu, 18 Jun 2026 12:51:54 -0700 Subject: [PATCH 2/2] Format changes --- .../InferenceEngines/CoreAIPipelinedEngine.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/swift/Sources/CoreAILanguageModels/InferenceEngines/CoreAIPipelinedEngine.swift b/swift/Sources/CoreAILanguageModels/InferenceEngines/CoreAIPipelinedEngine.swift index 906124a..b57a522 100644 --- a/swift/Sources/CoreAILanguageModels/InferenceEngines/CoreAIPipelinedEngine.swift +++ b/swift/Sources/CoreAILanguageModels/InferenceEngines/CoreAIPipelinedEngine.swift @@ -514,7 +514,8 @@ private struct EngineImpl: ~Copyable { var decodeOutBuffers: [MTLBuffer] = [] for _ in 0...size, options: .storageModeShared) else { - throw InferenceRuntimeError.bufferAllocationFailed("decodeOutputBuffer (\(MemoryLayout.size) bytes)") + throw InferenceRuntimeError.bufferAllocationFailed( + "decodeOutputBuffer (\(MemoryLayout.size) bytes)") } decodeOutBuffers.append(buf) }