diff --git a/src/main/scala/de/otto/anthology/DomainLinkingStage.scala b/src/main/scala/de/otto/anthology/DomainLinkingStage.scala index f8ac355..4a98130 100644 --- a/src/main/scala/de/otto/anthology/DomainLinkingStage.scala +++ b/src/main/scala/de/otto/anthology/DomainLinkingStage.scala @@ -49,7 +49,6 @@ object DomainLinkingStage extends LazyLogging: case (Some(qaid), pass) => try - logger.info(s"Fetching domain aggregate from state store (${StateStoreSection.DOM}/$qaid)...") val aggregateOpt: Option[Aggregate] = stateStore .getJson(s"${StateStoreSection.DOM}/$qaid") @@ -61,8 +60,6 @@ object DomainLinkingStage extends LazyLogging: // when aggregates on both ends are deleted // TODO only when "many"-side was deleted? - logger.info(s"Domain aggregate $qaid was deleted. Try processing deletion now...") - try val linkKey = s"${StateStoreSection.LNK}/$qaid" val linkValues = @@ -104,10 +101,6 @@ object DomainLinkingStage extends LazyLogging: case Some(aggregate) => - logger.info( - s"Domain aggregate $qaid found. Try setting links to other domain aggregates now..." - ) - try val parsedDoc: DocumentContext = jsonPathContext.parse(aggregate.toJson) diff --git a/src/main/scala/de/otto/anthology/DomainSource.scala b/src/main/scala/de/otto/anthology/DomainSource.scala index 5ebe830..96799a1 100644 --- a/src/main/scala/de/otto/anthology/DomainSource.scala +++ b/src/main/scala/de/otto/anthology/DomainSource.scala @@ -36,7 +36,7 @@ object DomainSource extends LazyLogging: aggregateOpt: Option[Aggregate], aggregateConfigs: Seq[AggregateConfig] ): Option[AggregateConfig] = - if aggregateConfigs.size == 1 then aggregateConfigs.headOption + if aggregateConfigs.size == 1 && aggregateConfigs.head.recognitionPath.isEmpty then aggregateConfigs.headOption else aggregateOpt match case None => diff --git a/src/test/scala/de/otto/anthology/DomainSourceTest.scala b/src/test/scala/de/otto/anthology/DomainSourceTest.scala index cd6e2c9..ce7f493 100644 --- a/src/test/scala/de/otto/anthology/DomainSourceTest.scala +++ b/src/test/scala/de/otto/anthology/DomainSourceTest.scala @@ -161,3 +161,58 @@ class DomainSourceTest extends AnyFlatSpec, Matchers, Diagrams, EmbeddedKafka, B _._1 == QualifiedAggregateId(DomainName("domain-x"), AggregateName("Agg-A"), AggregateId("3")) ) ) + + it should "recognise one aggregate config and ignore the rest" in: + // given + val cluster = ClusterName("cluster-x") + val topic = TopicName("topic-domain-x") + val group = "group" + + publishToKafka(topic.toString, "1", """{ "id": "1", "foo": "barA" }""") + publishToKafka(topic.toString, "2", """{ "id": "2", "foo": "barB" }""") + publishToKafka(topic.toString, "3", """{ "id": "3", "foo": "barC" }""") + + // when + val sourceConfig = KafkaSourceConfig(cluster, topic, group) + + val aggregateConfigA = + AggregateConfig(AggregateName("Agg-A"), Some(JsonPath.compile("$[?(@.foo == 'barA')]")), None, None, None) + + val domainConfig = + DomainConfig( + DomainName("domain-x"), + sourceConfig, + Seq(aggregateConfigA) + ) + + supervised: + val consumerSettings: ConsumerSettings[AggregateId, Option[Aggregate]] = + ConsumerSettings + .default(domainConfig.kafka.consumerGroup) + .bootstrapServers(bootstrapServer.split(",").map(_.trim)*) + .keyDeserializer(AggregateIdDeserializer) + .valueDeserializer(AggregateDeserializer) + .autoOffsetReset(AutoOffsetReset.Earliest) + val consumer = consumerSettings.toThreadSafeConsumerWrapper + val sourceSettings = KafkaSourceSettings(sourceConfig, ConsumerName(domainConfig.name.toString), consumer) + val domainSource = DomainSource(domainConfig, sourceSettings) + + val channel = domainSource.runToChannel() + + // then + val result1: (Option[(QualifiedAggregateId, Option[Aggregate])], Passthrough) = channel.receive() + assert( + result1._1.exists( + _._1 == QualifiedAggregateId(DomainName("domain-x"), AggregateName("Agg-A"), AggregateId("1")) + ) + ) + + val result2: (Option[(QualifiedAggregateId, Option[Aggregate])], Passthrough) = channel.receive() + assert( + result2._1.isEmpty + ) + + val result3: (Option[(QualifiedAggregateId, Option[Aggregate])], Passthrough) = channel.receive() + assert( + result3._1.isEmpty + )