diff --git a/src/main/scala/com/tesobe/obp/adapter/AdapterMain.scala b/src/main/scala/com/tesobe/obp/adapter/AdapterMain.scala index 333d059..a989e14 100644 --- a/src/main/scala/com/tesobe/obp/adapter/AdapterMain.scala +++ b/src/main/scala/com/tesobe/obp/adapter/AdapterMain.scala @@ -69,7 +69,7 @@ object AdapterMain extends IOApp { healthResult <- localAdapter.checkHealth( com.tesobe.obp.adapter.models.CallContext( correlationId = "startup-health-check", - sessionId = "startup", + sessionId = Some("startup"), userId = None, username = None, consumerId = None, diff --git a/src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQClient.scala b/src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQClient.scala index da8852b..a947b07 100644 --- a/src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQClient.scala +++ b/src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQClient.scala @@ -19,6 +19,15 @@ import com.rabbitmq.client.{ import com.tesobe.obp.adapter.config.AdapterConfig import java.nio.charset.StandardCharsets +/** Message envelope containing body and RabbitMQ properties + */ +case class MessageEnvelope( + body: String, + messageId: String, + correlationId: Option[String], + replyTo: Option[String] +) + /** Simple RabbitMQ client wrapper using the Java client library * * This provides basic publish/consume functionality without the complexity of @@ -71,25 +80,26 @@ class RabbitMQClient(config: AdapterConfig) { }.void } - /** Publish a message to a queue with process as messageId + /** Publish a message to a queue with optional correlationId for RPC responses */ def publishMessage( channel: Channel, queueName: String, message: String, - process: Option[String] = None + process: Option[String] = None, + correlationId: Option[String] = None ): IO[Unit] = { IO { - val propsBuilder = new com.rabbitmq.client.AMQP.BasicProperties.Builder() + var propsBuilder = new com.rabbitmq.client.AMQP.BasicProperties.Builder() .contentType("application/json") // Add process as messageId property (matching OBP-API behavior) - val props = process match { - case Some(p) => - propsBuilder.messageId(p).build() - case None => - propsBuilder.build() - } + process.foreach(p => propsBuilder = propsBuilder.messageId(p)) + + // Add correlationId for RPC response matching + correlationId.foreach(cid => propsBuilder = propsBuilder.correlationId(cid)) + + val props = propsBuilder.build() channel.basicPublish( "", // exchange (empty = default) @@ -101,12 +111,12 @@ class RabbitMQClient(config: AdapterConfig) { } /** Consume messages from a queue with a callback Returns an IO that will run - * forever, processing messages Handler receives: (message, routingKey) + * forever, processing messages Handler receives: MessageEnvelope with body, messageId, correlationId, and replyTo */ def consumeMessages( channel: Channel, queueName: String, - handler: (String, String) => IO[Unit] + handler: MessageEnvelope => IO[Unit] ): IO[Unit] = { IO { // Set prefetch count @@ -117,13 +127,18 @@ class RabbitMQClient(config: AdapterConfig) { val deliverCallback: DeliverCallback = (consumerTag, delivery) => { val message = new String(delivery.getBody, StandardCharsets.UTF_8) - - // Extract process from messageId property (matching OBP-API behavior) - val process = Option(delivery.getProperties.getMessageId) - .getOrElse("unknown") + val props = delivery.getProperties + + // Extract properties from message + val envelope = MessageEnvelope( + body = message, + messageId = Option(props.getMessageId).getOrElse("unknown"), + correlationId = Option(props.getCorrelationId), + replyTo = Option(props.getReplyTo) + ) val processAndAck = for { - _ <- handler(message, process) + _ <- handler(envelope) _ <- IO(channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)) } yield () diff --git a/src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQConsumer.scala b/src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQConsumer.scala index 871c79b..201eaca 100644 --- a/src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQConsumer.scala +++ b/src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQConsumer.scala @@ -24,7 +24,7 @@ import scala.concurrent.duration._ /** RabbitMQ consumer for OBP messages * * Consumes messages from the request queue, processes them via local adapter, - * and sends responses to the response queue. + * and sends responses to the replyTo queue (RPC pattern) or fallback response queue. */ object RabbitMQConsumer { @@ -45,6 +45,7 @@ object RabbitMQConsumer { _ <- IO.println( s"[RabbitMQ] Connected to ${config.rabbitmq.host}:${config.rabbitmq.port}" ) + _ <- IO.println(s"[INFO] RabbitMQ connected: ${config.rabbitmq.host}:${config.rabbitmq.port}") _ <- telemetry.recordRabbitMQConnected( config.rabbitmq.host, config.rabbitmq.port @@ -59,21 +60,21 @@ object RabbitMQConsumer { _ <- telemetry.recordQueueConsumptionStarted( config.queue.requestQueue ) + _ <- IO.println(s"[INFO] Queue consumption started: ${config.queue.requestQueue}") _ <- IO.println( s"[OK] Consuming from queue: ${config.queue.requestQueue}" ) _ <- IO.println("") - // Start consuming messages + // Start consuming messages with MessageEnvelope _ <- client.consumeMessages( channel, config.queue.requestQueue, - (message, routingKey) => + envelope => processMessage( client, channel, - message, - routingKey, + envelope, config, localAdapter, telemetry, @@ -97,16 +98,24 @@ object RabbitMQConsumer { private def processMessage( client: RabbitMQClient, channel: com.rabbitmq.client.Channel, - messageJson: String, - process: String, + envelope: MessageEnvelope, config: AdapterConfig, localAdapter: LocalAdapter, telemetry: Telemetry, redis: Option[dev.profunktor.redis4cats.RedisCommands[IO, String, String]] ): IO[Unit] = { val startTime = System.currentTimeMillis() + val messageJson = envelope.body + val process = envelope.messageId + + // Use replyTo from message properties if present, otherwise fall back to configured response queue + val responseQueue = envelope.replyTo.getOrElse(config.queue.responseQueue) + val rabbitCorrelationId = envelope.correlationId (for { + // Log incoming message details + _ <- IO.println(s"[DEBUG] Received message - process: $process, replyTo: ${envelope.replyTo}, correlationId: ${envelope.correlationId}") + // Increment outbound counter _ <- redis match { case Some(r) => RedisCounter.incrementOutbound(r, process) @@ -137,7 +146,7 @@ object RabbitMQConsumer { // Handle adapter-specific messages or delegate to local adapter adapterResponse <- process match { - case "obp.getAdapterInfo" => + case "obp.getAdapterInfo" | "obp_get_adapter_info" => handleGetAdapterInfo(localAdapter, callContext) case _ => localAdapter.handleMessage(process, dataFields, callContext) @@ -146,8 +155,9 @@ object RabbitMQConsumer { // Build inbound message inboundMsg <- buildInboundMessage(outboundMsg, adapterResponse) - // Send response - _ <- sendResponse(client, channel, config.queue.responseQueue, inboundMsg) + // Send response to replyTo queue with correlationId + _ <- IO.println(s"[DEBUG] Sending response to queue: $responseQueue with correlationId: $rabbitCorrelationId") + _ <- sendResponse(client, channel, responseQueue, inboundMsg, rabbitCorrelationId) // Increment inbound counter _ <- redis match { @@ -172,8 +182,8 @@ object RabbitMQConsumer { val duration = (System.currentTimeMillis() - startTime).millis for { _ <- telemetry.recordMessageFailed( - process = "unknown", - correlationId = "unknown", + process = process, + correlationId = envelope.correlationId.getOrElse("unknown"), errorCode = "ADAPTER_ERROR", errorMessage = error.getMessage, duration = duration @@ -206,8 +216,10 @@ object RabbitMQConsumer { IO.pure( com.tesobe.obp.adapter.interfaces.LocalAdapterResult.success( JsonObject( - "name" -> Json.fromString("OBP-Rabbit-Cats-Adapter"), - "version" -> Json.fromString("1.0.0-SNAPSHOT"), + "errorCode" -> Json.fromString(""), + "backendMessages" -> Json.arr(), + "name" -> Json.fromString(s"${localAdapter.name}"), + "version" -> Json.fromString(localAdapter.version), "git_commit" -> Json.fromString(gitCommit), "date" -> Json.fromString(java.time.Instant.now().toString) ), @@ -263,13 +275,14 @@ object RabbitMQConsumer { } } - /** Send response message to response queue + /** Send response message to response queue with correlationId for RPC pattern */ private def sendResponse( client: RabbitMQClient, channel: com.rabbitmq.client.Channel, responseQueue: String, - message: InboundMessage + message: InboundMessage, + correlationId: Option[String] ): IO[Unit] = { for { // Convert to JSON @@ -283,8 +296,9 @@ object RabbitMQConsumer { ) ) - // Publish message - _ <- client.publishMessage(channel, responseQueue, json) + // Publish message with correlationId for RPC response matching + _ <- client.publishMessage(channel, responseQueue, json, correlationId = correlationId) + _ <- IO.println(s"[DEBUG] Response published to $responseQueue") } yield () } diff --git a/src/main/scala/com/tesobe/obp/adapter/models/OBPModels.scala b/src/main/scala/com/tesobe/obp/adapter/models/OBPModels.scala index 21054a3..cec4275 100644 --- a/src/main/scala/com/tesobe/obp/adapter/models/OBPModels.scala +++ b/src/main/scala/com/tesobe/obp/adapter/models/OBPModels.scala @@ -14,9 +14,9 @@ import io.circe.generic.semiauto._ /** * OBP RabbitMQ Message Protocol Models - * + * * Based on OBP Message Docs: /obp/v6.0.0/message-docs/rabbitmq_vOct2024 - * + * * This file contains ONLY the RabbitMQ message envelope format. * The actual data payloads are handled as JsonObject to match message docs exactly. */ @@ -37,10 +37,11 @@ object OutboundMessage { /** * Context information for outbound messages + * Note: sessionId is optional as it may not always be present in OBP messages */ case class OutboundAdapterCallContext( correlationId: String, - sessionId: String, + sessionId: Option[String], consumerId: Option[String], generalContext: Option[List[KeyValue]], outboundAdapterAuthInfo: Option[OutboundAdapterAuthInfo], @@ -185,6 +186,7 @@ object KeyValue { /** * Response message sent back to OBP-API via RabbitMQ + * Matches OBP's InBoundTrait structure */ case class InboundMessage( inboundAdapterCallContext: InboundAdapterCallContext, @@ -195,13 +197,13 @@ case class InboundMessage( object InboundMessage { implicit val decoder: Decoder[InboundMessage] = deriveDecoder implicit val encoder: Encoder[InboundMessage] = deriveEncoder - + /** * Create success response */ def success( correlationId: String, - sessionId: String, + sessionId: Option[String], data: JsonObject, generalContext: List[KeyValue] = Nil, backendMessages: List[BackendMessage] = Nil @@ -217,13 +219,13 @@ object InboundMessage { ), data = Some(data) ) - + /** * Create error response */ def error( correlationId: String, - sessionId: String, + sessionId: Option[String], errorCode: String, errorMessage: String, generalContext: List[KeyValue] = Nil, @@ -251,11 +253,11 @@ object InboundMessage { } /** - * Context for inbound messages + * Context for inbound messages - matches OBP's InboundAdapterCallContext */ case class InboundAdapterCallContext( correlationId: String, - sessionId: String, + sessionId: Option[String], generalContext: Option[List[KeyValue]] ) @@ -301,7 +303,7 @@ object BackendMessage { */ case class CallContext( correlationId: String, - sessionId: String, + sessionId: Option[String], userId: Option[String], username: Option[String], consumerId: Option[String], @@ -309,7 +311,7 @@ case class CallContext( ) object CallContext { - + /** * Extract CallContext from OutboundMessage */ @@ -327,4 +329,4 @@ object CallContext { .toMap ) } -} \ No newline at end of file +}