Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
kafka-streams-app/target/
*.class
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,57 @@ The MobilityDB project is developed by the Computer & Decision Engineering Depar

More information about MobilityDB, including publications, presentations, etc., can be found in the MobilityDB [website](https://mobilitydb.com).


# BerlinMOD-9 × 3 streaming forms — the parity matrix on Kafka Streams

The streaming-side parity matrix runs all nine BerlinMOD reference queries (Q1..Q9) in three streaming forms each on this runtime: **continuous** (always-on, per-event emission), **windowed** (tumbling 10-second aggregation), and **snapshot** (5-second tick — the parity-oracle form whose output at watermark T equals the batch BerlinMOD-Q result on data up to T).

| Q | Topic | Continuous | Windowed | Snapshot |
|---|---|---|---|---|
| Q1 | "which vehicles have appeared in the stream?" | ✓ | ✓ | ✓ |
| Q2 | "where is vehicle X at time T?" | ✓ | ✓ | ✓ |
| Q3 | "vehicles within d of P at time T?" | ✓ | ✓ | ✓ |
| Q4 | "vehicles entered region R, and when?" | ✓ | ✓ | ✓ |
| Q5 | "pairs of vehicles meeting near P" | ✓ | ✓ | ✓ |
| Q6 | "cumulative distance per vehicle" | ✓ | ✓ | ✓ |
| Q7 | "first passage of vehicles through POIs" | ✓ | ✓ | ✓ |
| Q8 | "vehicles close to a road segment" | ✓ | ✓ | ✓ |
| Q9 | "distance between vehicles X and Y at time T" | ✓ | ✓ | ✓ |

**27 / 27 cells** = the full MobilityKafka parity-matrix row. Each cell has a dedicated `Q<N>{Continuous,Windowed,Snapshot}Processor` class in [`kafka-streams-app/src/main/java/berlinmod/`](kafka-streams-app/src/main/java/berlinmod/) and is locally verified via [`BerlinMODQ1LocalTest`](kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java) running on the Kafka-Streams `TopologyTestDriver` (no real broker required).

## Module structure

`kafka-streams-app/` is a Maven project (Java 21, Kafka Streams 3.6.0) holding:

- 27 per-cell `Q<N>{Continuous,Windowed,Snapshot}Processor` classes
- `BerlinMODTopology` — unified topology fanning input topic `berlinmod` to per-Q-form output topics
- `BerlinMODTrip` + `BerlinMODTripSerde` — shared data class + JSON Serde (byte-shape equivalent to MobilityFlink's `BerlinMODTrip`)
- `Haversine` + `SegmentDistance` + `PointOfInterest` — pure-Java geometry utilities used by the spatial-predicate cells
- `BerlinMODQ1LocalTest` — TopologyTestDriver-based local end-to-end driver

The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in [MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD).

Spatial predicates route through [`MEOSBridge`](kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java), which calls MEOS' `geog_dwithin` over WGS84 geographies via [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18) (the geodesic-wrapper PR, branched off the MEOS 1.4 regen at JMEOS#15) when libmeos is loadable on the runtime path. The distance entry points use [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18)'s `utils.spatial.Haversine.distance` (MEOS `geog_distance` over POINT/POINT) and `utils.spatial.PointToSegment.distance` (MEOS `geog_distance` over POINT/LINESTRING). When libmeos is not present (e.g. the TopologyTestDriver local-test run where `-Dmobilitykafka.meos.enabled=false` is set), the bridge falls back to pure-Java great-circle (`Haversine`) and planar segment-distance (`SegmentDistance`) — same semantics, identical predicate truth values to within float-precision at BerlinMOD scale.

## Build and run

```
cd kafka-streams-app
mvn -q clean package -DskipTests
java --add-opens java.base/java.lang=ALL-UNNAMED \
--add-opens java.base/java.util=ALL-UNNAMED \
--add-opens java.base/java.lang.reflect=ALL-UNNAMED \
-cp target/mobility-kafka-streams-1.0-SNAPSHOT.jar \
berlinmod.BerlinMODQ1LocalTest
```

The driver pipes a 21-event sorted-event-time corpus plus two sentinel records at `t = T0+15001` and `t = T0+20001` (to step the STREAM_TIME punctuator through the desired tick boundaries) and reads every per-Q-form output topic with the appropriate deserializer. Expected per-Q-form counts are in the PR body for the open scaffold PR.

## Sibling parity work in the ecosystem

- [MobilityFlink#3](https://github.com/MobilityDB/MobilityFlink/pull/3) — the same 27-cell row on Flink
- [MobilityNebula#15](https://github.com/MobilityDB/MobilityNebula/pull/15) — 27 / 27 cells on NebulaStream scaffold (with [#16](https://github.com/MobilityDB/MobilityNebula/pull/16) adding `TEMPORAL_LENGTH` for Q6 and [#17](https://github.com/MobilityDB/MobilityNebula/pull/17) adding `PAIR_MEETING` + `CROSS_DISTANCE` for Q5/Q9, all calling MEOS C ABI directly)
- [MobilityDB-BerlinMOD#29](https://github.com/MobilityDB/MobilityDB-BerlinMOD/pull/29) — the batch BerlinMOD-9 cross-platform timings (the snapshot form's gold-answer source)
- [MobilityDB/.github#10](https://github.com/MobilityDB/.github/pull/10) — the ecosystem-profile description of the stream-layers tier

Binary file added kafka-streams-app/jar/JMEOS.jar
Binary file not shown.
119 changes: 119 additions & 0 deletions kafka-streams-app/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mobilitydb.kafka</groupId>
<artifactId>mobility-kafka-streams</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>3.6.0</kafka.version>
<jackson.version>2.14.3</jackson.version>
<slf4j.version>1.7.30</slf4j.version>
<junit.version>5.8.2</junit.version>
</properties>

<dependencies>
<!-- JMEOS dependency — supplied as a system-path jar matching
https://github.com/MobilityDB/JMEOS/pull/15 (MEOS 1.4 regen). -->
<dependency>
<groupId>com.mobilitydb</groupId>
<artifactId>jmeos</artifactId>
<version>1.4.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/jar/JMEOS.jar</systemPath>
</dependency>

<!-- JNR-FFI — JMEOS uses jnr.ffi.Pointer in its public API. -->
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.1.10</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.12.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>berlinmod.BerlinMODQ1LocalTest</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading