Skip to content

added kafka subscriber to observability package#2863

Open
sacrana0 wants to merge 1 commit into
thunder-id:mainfrom
Infosys:feature/2708
Open

added kafka subscriber to observability package#2863
sacrana0 wants to merge 1 commit into
thunder-id:mainfrom
Infosys:feature/2708

Conversation

@sacrana0
Copy link
Copy Markdown
Contributor

@sacrana0 sacrana0 commented May 20, 2026

Purpose

Added support for a subscriber to publish events to Kafka.

Approach

Related Issues

Related PRs

  • N/A

Checklist

  • Followed the contribution guidelines.
  • [ x Manual test round performed and verified.
  • Documentation provided. (Add links if there are any)
    • Ran Vale and fixed all errors and warnings
  • Tests provided. (Add links if there are any)
    • Unit Tests
    • Integration Tests
  • Breaking changes. (Fill if applicable)
    • Breaking changes section filled.
    • breaking change label added.

Security checks

  • Followed secure coding standards in WSO2 Secure Coding Guidelines
  • Confirmed that this PR doesn't commit any keys, passwords, tokens, usernames, or other secrets.

Summary by CodeRabbit

  • New Features
    • Added Kafka observability output and subscriber: enable via config, validate brokers/topic, configure format/retries/timeouts/client ID, route by categories, and generate subscriber IDs.
  • Tests
    • Added unit tests covering Kafka output adapter and subscriber behavior, lifecycle, error cases, and factory registration.
  • Chores
    • Updated Go module dependency versions.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 20, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: fd3536b4-7b05-4413-a4a2-68ba7d60b8f0

📥 Commits

Reviewing files that changed from the base of the PR and between 6a1f48e and 1f1c8a8.

⛔ Files ignored due to path filters (1)
  • backend/go.sum is excluded by !**/*.sum
📒 Files selected for processing (7)
  • backend/go.mod
  • backend/internal/system/config/config.go
  • backend/internal/system/observability/adapter/init.go
  • backend/internal/system/observability/adapter/kafka_adapter.go
  • backend/internal/system/observability/adapter/kafka_adapter_test.go
  • backend/internal/system/observability/subscriber/kafka_subscriber.go
  • backend/internal/system/observability/subscriber/kafka_subscriber_test.go
✅ Files skipped from review due to trivial changes (1)
  • backend/go.mod

📝 Walkthrough

Walkthrough

Adds Kafka observability output: config schema (ObservabilityKafkaConfig), Sarama async kafkaAdapter with error-drain and graceful shutdown, InitializeKafkaAdapter wiring, a KafkaSubscriber to republish events to Kafka, and comprehensive unit tests.

Changes

Kafka Observability Output Sink

Layer / File(s) Summary
Configuration Schema
backend/internal/system/config/config.go
New ObservabilityKafkaConfig struct defines Kafka sink settings and is attached to ObservabilityOutputConfig to enable YAML/JSON configuration.
Dependency updates (go.mod)
backend/go.mod
Updated indirect and direct Go module versions required by the new Kafka adapter and its tests.
Kafka Adapter implementation and tests
backend/internal/system/observability/adapter/kafka_adapter.go, backend/internal/system/observability/adapter/kafka_adapter_test.go
Sarama AsyncProducer-based kafkaAdapter with validation, producer config, error-drain goroutine, mutex-guarded writes after close, no-op Flush, Close with AsyncClose and timeout wait, GetName, and unit tests verifying config validation, publish semantics, close/idempotency, and name.
Adapter Initialization Wiring
backend/internal/system/observability/adapter/init.go
Exported InitializeKafkaAdapter wires configuration to newKafkaAdapter using sarama.NewAsyncProducer.
Kafka Subscriber Implementation and tests
backend/internal/system/observability/subscriber/kafka_subscriber.go, backend/internal/system/observability/subscriber/kafka_subscriber_test.go
KafkaSubscriber registers a "kafka" factory, gates enablement via config, validates brokers/topic at Initialize, sets up formatter and adapter, computes categories (defaults to CategoryAll), generates a UUIDv7 subscriber ID, forwards events via OnEvent to the adapter with "kafka" tag, manages Close/Flush, and includes tests using a stubAdapter to verify behavior and factory registration.

Sequence Diagrams

sequenceDiagram
  participant Adapter as kafkaAdapter
  participant Producer as sarama.AsyncProducer
  participant Errors as Producer.Errors()
  participant Drain as ErrorDrainGoroutine

  Adapter->>Producer: NewAsyncProducer(brokers, config)
  Adapter->>Drain: start error drain goroutine
  Drain->>Errors: range over Errors()

  Note over Adapter,Producer: runtime
  Adapter->>Producer: Input() <- ProducerMessage (topic, bytes)
  Errors->>Drain: send error (if any)
  Drain->>Adapter: log error with topic

  Note over Adapter,Producer: shutdown
  Adapter->>Producer: AsyncClose()
  Drain->>Drain: exit when Errors() closed
  Adapter->>Adapter: wait for done or timeout
Loading
sequenceDiagram
  participant Client as Client Code
  participant Subscriber as KafkaSubscriber
  participant Formatter as Formatter
  participant Adapter as kafkaAdapter
  participant Kafka as Kafka Broker

  Client->>Subscriber: Initialize()
  Subscriber->>Subscriber: validate brokers & topic
  Subscriber->>Adapter: InitializeKafkaAdapter(cfg)
  Subscriber->>Subscriber: generate UUIDv7 ID

  Client->>Subscriber: OnEvent(evt)
  Subscriber->>Formatter: format evt -> bytes
  Subscriber->>Adapter: Write(bytes)  (topic)
  Adapter->>Kafka: async publish

  Client->>Subscriber: Close()
  Subscriber->>Adapter: Flush()
  Subscriber->>Adapter: Close()
  Adapter->>Kafka: AsyncClose producer
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The description addresses the Purpose section but the Approach section is empty, lacking implementation details about design decisions. Documentation links are missing, and the checklist shows inconsistent checkbox formatting. Fill in the Approach section with implementation details and key design decisions. Fix checkbox formatting inconsistencies and provide documentation links if applicable.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: adding a Kafka subscriber to the observability package. It is concise, clear, and directly reflects the primary purpose of the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/internal/system/observability/adapter/kafka_adapter.go`:
- Around line 103-115: The Write method can race with Close and panic by sending
to the producer Input after the adapter is marked closed; to fix it, make the
closed check and the send atomic by keeping ka.mu locked across the check and
the send: in kafkaAdapter.Write, remove the early ka.mu.Unlock() and perform the
ka.producer.Input() <- ... while still holding ka.mu, then unlock; reference the
kafkaAdapter.Write method, ka.mu, ka.closed and ka.producer.Input() so reviewers
can locate and update the code (this guarantees the check and send are
serialized with Close which calls AsyncClose()).

In `@backend/internal/system/observability/subscriber/kafka_subscriber.go`:
- Around line 128-143: KafkaSubscriber.Close uses ks.logger without checking for
nil which can panic if Close is called before Initialize; update Close (method
KafkaSubscriber.Close) to guard all logger calls by checking ks.logger != nil
before calling ks.logger.Info or ks.logger.Error, and ensure the early adapter
flush/close logic still runs (keep the existing ks.adapter != nil checks) but
replace direct logger usage at the start and end of the method (and inside error
branches) with conditional logging so Close is safe when ks.logger is nil (do
not change adapter behavior or return values).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: 801ec88a-a6e5-48f6-93d9-7192897056a1

📥 Commits

Reviewing files that changed from the base of the PR and between 0ee2b0d and 11c8c8b.

⛔ Files ignored due to path filters (1)
  • backend/go.sum is excluded by !**/*.sum
📒 Files selected for processing (7)
  • backend/go.mod
  • backend/internal/system/config/config.go
  • backend/internal/system/observability/adapter/init.go
  • backend/internal/system/observability/adapter/kafka_adapter.go
  • backend/internal/system/observability/adapter/kafka_adapter_test.go
  • backend/internal/system/observability/subscriber/kafka_subscriber.go
  • backend/internal/system/observability/subscriber/kafka_subscriber_test.go

Comment thread backend/internal/system/observability/adapter/kafka_adapter.go
@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/internal/system/config/config.go`:
- Around line 325-326: Add documentation for the new Kafka observability config
and the new agent.store option: document observability.output.kafka and all its
keys (enabled, brokers, topic, client_id, format, retries, timeout, categories)
with example YAML and explain initialization requirements (must have non-empty
brokers and topic), failure/close semantics, and category filtering behavior in
the observability config guide (e.g.,
docs/content/guides/getting-started/configuration.mdx or observability guide);
also document the new top-level agent.store accepted values and fallback
semantics alongside other *.store config docs so users know precedence and
defaults. Reference the new config struct ObservabilityKafkaConfig and the
agent.store option in the docs, and ensure a single consolidated docs update
covers the listed changed sections.

In `@backend/internal/system/observability/adapter/kafka_adapter.go`:
- Around line 103-115: In kafkaAdapter.Write, avoid blocking forever on
ka.producer.Input() by replacing the direct send with a timeout-based select:
check ka.closed under ka.mu as you already do, then use a select that attempts
to send the *sarama.ProducerMessage to ka.producer.Input(), a case for
<-time.After(timeout) (use cfg.Timeout if appropriate, otherwise introduce a
clear constant like producerEnqueueTimeout), and a case to detect if the adapter
was closed (e.g., a non-blocking check of ka.closed under ka.mu or a separate
closed channel). On timeout return a descriptive error (e.g., "kafka producer
input enqueue timed out"); keep the mutex usage minimal (only for checking
closed) and do not hold it while blocking on the select.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: 88c10d7f-8b8f-40b3-b157-87eea22bd528

📥 Commits

Reviewing files that changed from the base of the PR and between 11c8c8b and 69d6357.

⛔ Files ignored due to path filters (1)
  • backend/go.sum is excluded by !**/*.sum
📒 Files selected for processing (7)
  • backend/go.mod
  • backend/internal/system/config/config.go
  • backend/internal/system/observability/adapter/init.go
  • backend/internal/system/observability/adapter/kafka_adapter.go
  • backend/internal/system/observability/adapter/kafka_adapter_test.go
  • backend/internal/system/observability/subscriber/kafka_subscriber.go
  • backend/internal/system/observability/subscriber/kafka_subscriber_test.go

Comment thread backend/internal/system/config/config.go
Comment on lines +103 to +115
func (ka *kafkaAdapter) Write(data []byte) error {
ka.mu.Lock()
if ka.closed {
ka.mu.Unlock()
return fmt.Errorf("kafka adapter is closed")
}
ka.mu.Unlock()

ka.producer.Input() <- &sarama.ProducerMessage{
Topic: ka.topic,
Value: sarama.ByteEncoder(data),
}
return nil
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

For github.com/IBM/sarama v1.49.0 AsyncProducer, does sending to producer.Input() block when the channel is full, and what is the recommended way to avoid indefinite blocking in producer write paths?

💡 Result:

Yes, sending a message to the AsyncProducer.Input channel in Sarama will block if the channel is full [1][2]. The Input channel is essentially an unbuffered channel [2], and the producer's internal processing must be able to keep up with your ingestion rate to prevent blocking [3][2]. To avoid indefinite blocking in your producer write paths, you should implement one of the following strategies: 1. Non-blocking Writes with Select: Use a select statement with a default case when sending to the Input channel. This allows you to handle cases where the producer is overloaded by dropping the message or logging a warning instead of blocking your application [2]. select { case producer.Input <- msg: // Message successfully queued default: // Channel is full; handle as needed (e.g., drop, buffer, or retry) } 2. Timeout-based Writes: Instead of an immediate default, use a time.After or context.WithTimeout within a select block. This allows the application to wait for a short, defined period for space to become available before giving up, providing a compromise between immediate dropping and indefinite blocking [2]. 3. Monitor and Drain Channels: Deadlocks can occur if you do not actively drain the Errors and Successes channels [1]. Even if you do not need to process successes, the Successes channel must be consumed if Return.Successes is enabled in your configuration [1]. Similarly, the Errors channel must be drained to prevent the producer from deadlocking when the internal error buffer fills up [1]. It is also important to note that performance bottlenecks often stem from misconfiguration or improper handling of these return channels [1][3]. Always ensure that you are calling Close or AsyncClose on the producer to prevent memory leaks and ensure that all in-flight messages are flushed correctly [1].

Citations:


🏁 Script executed:

cat -n backend/internal/system/observability/adapter/kafka_adapter.go

Repository: thunder-id/thunderid

Length of output: 5020


🏁 Script executed:

grep -A2 "github.com/IBM/sarama" go.mod

Repository: thunder-id/thunderid

Length of output: 104


🏁 Script executed:

find . -name "go.mod" -type f

Repository: thunder-id/thunderid

Length of output: 138


🏁 Script executed:

grep "github.com/IBM/sarama" backend/go.mod

Repository: thunder-id/thunderid

Length of output: 107


Add timeout to producer input write to prevent indefinite blocking on channel saturation.

The ka.producer.Input() <- ... send blocks indefinitely when the async producer input channel is saturated (e.g., due to broker/network degradation). Per Sarama v1.49.0 documentation, the Input channel blocks when full; you must implement a timeout-based select pattern to avoid stalling caller request paths.

While error draining is already correctly implemented (line 84), the Write path needs bounded enqueue behavior. Use cfg.Timeout (which exists but is currently only applied to network timeouts) or a new constant for the Input channel write timeout.

💡 Proposed change (timeout-based enqueue)
 type kafkaAdapter struct {
 	producer sarama.AsyncProducer
 	topic    string
 	logger   *log.Logger
 	done     chan struct{}
+	writeTimeout time.Duration
 	mu       sync.Mutex
 	closed   bool
 }
@@
 	ka := &kafkaAdapter{
 		producer: producer,
 		topic:    cfg.Topic,
 		logger:   logger,
 		done:     make(chan struct{}),
+		writeTimeout: func() time.Duration {
+			if cfg.Timeout > 0 {
+				return cfg.Timeout
+			}
+			return 5 * time.Second
+		}(),
 	}
@@
-	ka.producer.Input() <- &sarama.ProducerMessage{
-		Topic: ka.topic,
-		Value: sarama.ByteEncoder(data),
-	}
-	return nil
+	msg := &sarama.ProducerMessage{
+		Topic: ka.topic,
+		Value: sarama.ByteEncoder(data),
+	}
+
+	select {
+	case ka.producer.Input() <- msg:
+		return nil
+	case <-time.After(ka.writeTimeout):
+		return fmt.Errorf("timed out enqueueing kafka message")
+	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/internal/system/observability/adapter/kafka_adapter.go` around lines
103 - 115, In kafkaAdapter.Write, avoid blocking forever on ka.producer.Input()
by replacing the direct send with a timeout-based select: check ka.closed under
ka.mu as you already do, then use a select that attempts to send the
*sarama.ProducerMessage to ka.producer.Input(), a case for <-time.After(timeout)
(use cfg.Timeout if appropriate, otherwise introduce a clear constant like
producerEnqueueTimeout), and a case to detect if the adapter was closed (e.g., a
non-blocking check of ka.closed under ka.mu or a separate closed channel). On
timeout return a descriptive error (e.g., "kafka producer input enqueue timed
out"); keep the mutex usage minimal (only for checking closed) and do not hold
it while blocking on the select.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to drop published audits/logs/events so its ok for it to block. Will revisit if we decide to change it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sacrana0, understood — backpressure semantics make sense here since losing audit/observability events would be worse than momentary caller blocking. I'll keep this in mind for future reviews of this adapter.

(˳˘ ɜ˘) 🐇


✏️ Learnings added
Learnt from: sacrana0
Repo: thunder-id/thunderid PR: 2863
File: backend/internal/system/observability/adapter/kafka_adapter.go:103-115
Timestamp: 2026-05-22T09:13:41.634Z
Learning: In `backend/internal/system/observability/adapter/kafka_adapter.go`, the `Write` method intentionally blocks on `ka.producer.Input() <- msg` without a timeout. This is by design: the team prefers backpressure over dropping audit/log/observability events. Do not flag this as a bug or suggest a timeout-based select pattern for this file.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: KaveeshaPiumini
Repo: thunder-id/thunderid PR: 2835
File: backend/internal/agent/declarative_resource.go:21-39
Timestamp: 2026-05-20T05:39:33.413Z
Learning: Do not treat importing a package named "testing" in production Go files as an anti-pattern in this codebase. For .go files that are not *_test.go, this convention is used to expose *ForTest constructors (e.g., NewAgentExporterForTest, NewApplicationExporterForTest) that are guarded by a call like testing.Testing() and panic when not running under tests. When reviewing, only flag importing "testing" in non-test files if it is not part of this established *ForTest/guard+panic convention.

@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

Signed-off-by: Sachin Rana <sacrana324@gmail.com>
@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant