Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/scala/com/tesobe/obp/adapter/AdapterMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object AdapterMain extends IOApp {
healthResult <- localAdapter.checkHealth(
com.tesobe.obp.adapter.models.CallContext(
correlationId = "startup-health-check",
sessionId = "startup",
sessionId = Some("startup"),
userId = None,
username = None,
consumerId = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ import com.rabbitmq.client.{
import com.tesobe.obp.adapter.config.AdapterConfig
import java.nio.charset.StandardCharsets

/** Message envelope containing body and RabbitMQ properties
*/
case class MessageEnvelope(
body: String,
messageId: String,
correlationId: Option[String],
replyTo: Option[String]
)

/** Simple RabbitMQ client wrapper using the Java client library
*
* This provides basic publish/consume functionality without the complexity of
Expand Down Expand Up @@ -71,25 +80,26 @@ class RabbitMQClient(config: AdapterConfig) {
}.void
}

/** Publish a message to a queue with process as messageId
/** Publish a message to a queue with optional correlationId for RPC responses
*/
def publishMessage(
channel: Channel,
queueName: String,
message: String,
process: Option[String] = None
process: Option[String] = None,
correlationId: Option[String] = None
): IO[Unit] = {
IO {
val propsBuilder = new com.rabbitmq.client.AMQP.BasicProperties.Builder()
var propsBuilder = new com.rabbitmq.client.AMQP.BasicProperties.Builder()
.contentType("application/json")

// Add process as messageId property (matching OBP-API behavior)
val props = process match {
case Some(p) =>
propsBuilder.messageId(p).build()
case None =>
propsBuilder.build()
}
process.foreach(p => propsBuilder = propsBuilder.messageId(p))

// Add correlationId for RPC response matching
correlationId.foreach(cid => propsBuilder = propsBuilder.correlationId(cid))

val props = propsBuilder.build()

channel.basicPublish(
"", // exchange (empty = default)
Expand All @@ -101,12 +111,12 @@ class RabbitMQClient(config: AdapterConfig) {
}

/** Consume messages from a queue with a callback Returns an IO that will run
* forever, processing messages Handler receives: (message, routingKey)
* forever, processing messages Handler receives: MessageEnvelope with body, messageId, correlationId, and replyTo
*/
def consumeMessages(
channel: Channel,
queueName: String,
handler: (String, String) => IO[Unit]
handler: MessageEnvelope => IO[Unit]
): IO[Unit] = {
IO {
// Set prefetch count
Expand All @@ -117,13 +127,18 @@ class RabbitMQClient(config: AdapterConfig) {

val deliverCallback: DeliverCallback = (consumerTag, delivery) => {
val message = new String(delivery.getBody, StandardCharsets.UTF_8)

// Extract process from messageId property (matching OBP-API behavior)
val process = Option(delivery.getProperties.getMessageId)
.getOrElse("unknown")
val props = delivery.getProperties

// Extract properties from message
val envelope = MessageEnvelope(
body = message,
messageId = Option(props.getMessageId).getOrElse("unknown"),
correlationId = Option(props.getCorrelationId),
replyTo = Option(props.getReplyTo)
)

val processAndAck = for {
_ <- handler(message, process)
_ <- handler(envelope)
_ <- IO(channel.basicAck(delivery.getEnvelope.getDeliveryTag, false))
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.duration._
/** RabbitMQ consumer for OBP messages
*
* Consumes messages from the request queue, processes them via local adapter,
* and sends responses to the response queue.
* and sends responses to the replyTo queue (RPC pattern) or fallback response queue.
*/
object RabbitMQConsumer {

Expand All @@ -45,6 +45,7 @@ object RabbitMQConsumer {
_ <- IO.println(
s"[RabbitMQ] Connected to ${config.rabbitmq.host}:${config.rabbitmq.port}"
)
_ <- IO.println(s"[INFO] RabbitMQ connected: ${config.rabbitmq.host}:${config.rabbitmq.port}")
_ <- telemetry.recordRabbitMQConnected(
config.rabbitmq.host,
config.rabbitmq.port
Expand All @@ -59,21 +60,21 @@ object RabbitMQConsumer {
_ <- telemetry.recordQueueConsumptionStarted(
config.queue.requestQueue
)
_ <- IO.println(s"[INFO] Queue consumption started: ${config.queue.requestQueue}")
_ <- IO.println(
s"[OK] Consuming from queue: ${config.queue.requestQueue}"
)
_ <- IO.println("")

// Start consuming messages
// Start consuming messages with MessageEnvelope
_ <- client.consumeMessages(
channel,
config.queue.requestQueue,
(message, routingKey) =>
envelope =>
processMessage(
client,
channel,
message,
routingKey,
envelope,
config,
localAdapter,
telemetry,
Expand All @@ -97,16 +98,24 @@ object RabbitMQConsumer {
private def processMessage(
client: RabbitMQClient,
channel: com.rabbitmq.client.Channel,
messageJson: String,
process: String,
envelope: MessageEnvelope,
config: AdapterConfig,
localAdapter: LocalAdapter,
telemetry: Telemetry,
redis: Option[dev.profunktor.redis4cats.RedisCommands[IO, String, String]]
): IO[Unit] = {
val startTime = System.currentTimeMillis()
val messageJson = envelope.body
val process = envelope.messageId

// Use replyTo from message properties if present, otherwise fall back to configured response queue
val responseQueue = envelope.replyTo.getOrElse(config.queue.responseQueue)
val rabbitCorrelationId = envelope.correlationId

(for {
// Log incoming message details
_ <- IO.println(s"[DEBUG] Received message - process: $process, replyTo: ${envelope.replyTo}, correlationId: ${envelope.correlationId}")

// Increment outbound counter
_ <- redis match {
case Some(r) => RedisCounter.incrementOutbound(r, process)
Expand Down Expand Up @@ -137,7 +146,7 @@ object RabbitMQConsumer {

// Handle adapter-specific messages or delegate to local adapter
adapterResponse <- process match {
case "obp.getAdapterInfo" =>
case "obp.getAdapterInfo" | "obp_get_adapter_info" =>
handleGetAdapterInfo(localAdapter, callContext)
case _ =>
localAdapter.handleMessage(process, dataFields, callContext)
Expand All @@ -146,8 +155,9 @@ object RabbitMQConsumer {
// Build inbound message
inboundMsg <- buildInboundMessage(outboundMsg, adapterResponse)

// Send response
_ <- sendResponse(client, channel, config.queue.responseQueue, inboundMsg)
// Send response to replyTo queue with correlationId
_ <- IO.println(s"[DEBUG] Sending response to queue: $responseQueue with correlationId: $rabbitCorrelationId")
_ <- sendResponse(client, channel, responseQueue, inboundMsg, rabbitCorrelationId)

// Increment inbound counter
_ <- redis match {
Expand All @@ -172,8 +182,8 @@ object RabbitMQConsumer {
val duration = (System.currentTimeMillis() - startTime).millis
for {
_ <- telemetry.recordMessageFailed(
process = "unknown",
correlationId = "unknown",
process = process,
correlationId = envelope.correlationId.getOrElse("unknown"),
errorCode = "ADAPTER_ERROR",
errorMessage = error.getMessage,
duration = duration
Expand Down Expand Up @@ -206,8 +216,10 @@ object RabbitMQConsumer {
IO.pure(
com.tesobe.obp.adapter.interfaces.LocalAdapterResult.success(
JsonObject(
"name" -> Json.fromString("OBP-Rabbit-Cats-Adapter"),
"version" -> Json.fromString("1.0.0-SNAPSHOT"),
"errorCode" -> Json.fromString(""),
"backendMessages" -> Json.arr(),
"name" -> Json.fromString(s"${localAdapter.name}"),
"version" -> Json.fromString(localAdapter.version),
"git_commit" -> Json.fromString(gitCommit),
"date" -> Json.fromString(java.time.Instant.now().toString)
),
Expand Down Expand Up @@ -263,13 +275,14 @@ object RabbitMQConsumer {
}
}

/** Send response message to response queue
/** Send response message to response queue with correlationId for RPC pattern
*/
private def sendResponse(
client: RabbitMQClient,
channel: com.rabbitmq.client.Channel,
responseQueue: String,
message: InboundMessage
message: InboundMessage,
correlationId: Option[String]
): IO[Unit] = {
for {
// Convert to JSON
Expand All @@ -283,8 +296,9 @@ object RabbitMQConsumer {
)
)

// Publish message
_ <- client.publishMessage(channel, responseQueue, json)
// Publish message with correlationId for RPC response matching
_ <- client.publishMessage(channel, responseQueue, json, correlationId = correlationId)
_ <- IO.println(s"[DEBUG] Response published to $responseQueue")

} yield ()
}
Expand Down
26 changes: 14 additions & 12 deletions src/main/scala/com/tesobe/obp/adapter/models/OBPModels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import io.circe.generic.semiauto._

/**
* OBP RabbitMQ Message Protocol Models
*
*
* Based on OBP Message Docs: /obp/v6.0.0/message-docs/rabbitmq_vOct2024
*
*
* This file contains ONLY the RabbitMQ message envelope format.
* The actual data payloads are handled as JsonObject to match message docs exactly.
*/
Expand All @@ -37,10 +37,11 @@ object OutboundMessage {

/**
* Context information for outbound messages
* Note: sessionId is optional as it may not always be present in OBP messages
*/
case class OutboundAdapterCallContext(
correlationId: String,
sessionId: String,
sessionId: Option[String],
consumerId: Option[String],
generalContext: Option[List[KeyValue]],
outboundAdapterAuthInfo: Option[OutboundAdapterAuthInfo],
Expand Down Expand Up @@ -185,6 +186,7 @@ object KeyValue {

/**
* Response message sent back to OBP-API via RabbitMQ
* Matches OBP's InBoundTrait structure
*/
case class InboundMessage(
inboundAdapterCallContext: InboundAdapterCallContext,
Expand All @@ -195,13 +197,13 @@ case class InboundMessage(
object InboundMessage {
implicit val decoder: Decoder[InboundMessage] = deriveDecoder
implicit val encoder: Encoder[InboundMessage] = deriveEncoder

/**
* Create success response
*/
def success(
correlationId: String,
sessionId: String,
sessionId: Option[String],
data: JsonObject,
generalContext: List[KeyValue] = Nil,
backendMessages: List[BackendMessage] = Nil
Expand All @@ -217,13 +219,13 @@ object InboundMessage {
),
data = Some(data)
)

/**
* Create error response
*/
def error(
correlationId: String,
sessionId: String,
sessionId: Option[String],
errorCode: String,
errorMessage: String,
generalContext: List[KeyValue] = Nil,
Expand Down Expand Up @@ -251,11 +253,11 @@ object InboundMessage {
}

/**
* Context for inbound messages
* Context for inbound messages - matches OBP's InboundAdapterCallContext
*/
case class InboundAdapterCallContext(
correlationId: String,
sessionId: String,
sessionId: Option[String],
generalContext: Option[List[KeyValue]]
)

Expand Down Expand Up @@ -301,15 +303,15 @@ object BackendMessage {
*/
case class CallContext(
correlationId: String,
sessionId: String,
sessionId: Option[String],
userId: Option[String],
username: Option[String],
consumerId: Option[String],
generalContext: Map[String, String]
)

object CallContext {

/**
* Extract CallContext from OutboundMessage
*/
Expand All @@ -327,4 +329,4 @@ object CallContext {
.toMap
)
}
}
}