Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/main/scala/de/otto/anthology/DomainLinkingStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ object DomainLinkingStage extends LazyLogging:

case (Some(qaid), pass) =>
try
logger.info(s"Fetching domain aggregate from state store (${StateStoreSection.DOM}/$qaid)...")
val aggregateOpt: Option[Aggregate] =
stateStore
.getJson(s"${StateStoreSection.DOM}/$qaid")
Expand All @@ -61,8 +60,6 @@ object DomainLinkingStage extends LazyLogging:
// when aggregates on both ends are deleted
// TODO only when "many"-side was deleted?

logger.info(s"Domain aggregate $qaid was deleted. Try processing deletion now...")

try
val linkKey = s"${StateStoreSection.LNK}/$qaid"
val linkValues =
Expand Down Expand Up @@ -104,10 +101,6 @@ object DomainLinkingStage extends LazyLogging:

case Some(aggregate) =>

logger.info(
s"Domain aggregate $qaid found. Try setting links to other domain aggregates now..."
)

try

val parsedDoc: DocumentContext = jsonPathContext.parse(aggregate.toJson)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/de/otto/anthology/DomainSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object DomainSource extends LazyLogging:
aggregateOpt: Option[Aggregate],
aggregateConfigs: Seq[AggregateConfig]
): Option[AggregateConfig] =
if aggregateConfigs.size == 1 then aggregateConfigs.headOption
if aggregateConfigs.size == 1 && aggregateConfigs.head.recognitionPath.isEmpty then aggregateConfigs.headOption
else
aggregateOpt match
case None =>
Expand Down
55 changes: 55 additions & 0 deletions src/test/scala/de/otto/anthology/DomainSourceTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,58 @@ class DomainSourceTest extends AnyFlatSpec, Matchers, Diagrams, EmbeddedKafka, B
_._1 == QualifiedAggregateId(DomainName("domain-x"), AggregateName("Agg-A"), AggregateId("3"))
)
)

it should "recognise one aggregate config and ignore the rest" in:
// given
val cluster = ClusterName("cluster-x")
val topic = TopicName("topic-domain-x")
val group = "group"

publishToKafka(topic.toString, "1", """{ "id": "1", "foo": "barA" }""")
publishToKafka(topic.toString, "2", """{ "id": "2", "foo": "barB" }""")
publishToKafka(topic.toString, "3", """{ "id": "3", "foo": "barC" }""")

// when
val sourceConfig = KafkaSourceConfig(cluster, topic, group)

val aggregateConfigA =
AggregateConfig(AggregateName("Agg-A"), Some(JsonPath.compile("$[?(@.foo == 'barA')]")), None, None, None)

val domainConfig =
DomainConfig(
DomainName("domain-x"),
sourceConfig,
Seq(aggregateConfigA)
)

supervised:
val consumerSettings: ConsumerSettings[AggregateId, Option[Aggregate]] =
ConsumerSettings
.default(domainConfig.kafka.consumerGroup)
.bootstrapServers(bootstrapServer.split(",").map(_.trim)*)
.keyDeserializer(AggregateIdDeserializer)
.valueDeserializer(AggregateDeserializer)
.autoOffsetReset(AutoOffsetReset.Earliest)
val consumer = consumerSettings.toThreadSafeConsumerWrapper
val sourceSettings = KafkaSourceSettings(sourceConfig, ConsumerName(domainConfig.name.toString), consumer)
val domainSource = DomainSource(domainConfig, sourceSettings)

val channel = domainSource.runToChannel()

// then
val result1: (Option[(QualifiedAggregateId, Option[Aggregate])], Passthrough) = channel.receive()
assert(
result1._1.exists(
_._1 == QualifiedAggregateId(DomainName("domain-x"), AggregateName("Agg-A"), AggregateId("1"))
)
)

val result2: (Option[(QualifiedAggregateId, Option[Aggregate])], Passthrough) = channel.receive()
assert(
result2._1.isEmpty
)

val result3: (Option[(QualifiedAggregateId, Option[Aggregate])], Passthrough) = channel.receive()
assert(
result3._1.isEmpty
)