LocalTest` driver running on a Flink mini-cluster.
+
+The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in [MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD).
+
+Spatial predicates route through [`MEOSBridge`](flink-processor/src/main/java/berlinmod/MEOSBridge.java), which calls MEOS' `geog_dwithin` over WGS84 geographies via [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18) (the geodesic-wrapper PR, branched off the MEOS 1.4 regen at JMEOS#15) when libmeos is loadable on the runtime path. The distance entry points use [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18)'s `utils.spatial.Haversine.distance` (MEOS `geog_distance` over POINT/POINT) and `utils.spatial.PointToSegment.distance` (MEOS `geog_distance` over POINT/LINESTRING). When libmeos is not present (e.g. on the mini-cluster local-test runs where `-Dmobilityflink.meos.enabled=false` is set), the bridge falls back to pure-Java great-circle (`Haversine`) and planar segment-distance (`SegmentDistance`) — same semantics, identical predicate truth values to within float-precision at BerlinMOD scale.
+
+The Kafka-source entry points for Q2 and Q3 are [`BerlinMODQ2Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java) and [`BerlinMODQ3Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java); the companion producer is [`python-producer-berlinmod.py`](kafka-producer/python-producer-berlinmod.py). Generate a BerlinMOD CSV with the upstream generator (`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB) at any scale factor and feed it to the producer. The form-by-form definition with default parameters lives in [`doc/berlinmod-q3-streaming-forms.md`](doc/berlinmod-q3-streaming-forms.md).
+
+### Sibling parity work in the ecosystem
+
+- [MobilityKafka#1](https://github.com/MobilityDB/MobilityKafka/pull/1) — the same 27-cell row on Kafka Streams
+- [MobilityNebula#15](https://github.com/MobilityDB/MobilityNebula/pull/15) — 27 / 27 cells on NebulaStream scaffold (with [#16](https://github.com/MobilityDB/MobilityNebula/pull/16) adding `TEMPORAL_LENGTH` for Q6 and [#17](https://github.com/MobilityDB/MobilityNebula/pull/17) adding `PAIR_MEETING` + `CROSS_DISTANCE` for Q5/Q9, all calling MEOS C ABI directly)
+- [MobilityDB-BerlinMOD#29](https://github.com/MobilityDB/MobilityDB-BerlinMOD/pull/29) — the batch BerlinMOD-9 cross-platform timings (the snapshot form's gold-answer source)
+- [MobilityDB/.github#10](https://github.com/MobilityDB/.github/pull/10) — the ecosystem-profile description of the stream-layers tier
+
diff --git a/doc/berlinmod-q3-streaming-forms.md b/doc/berlinmod-q3-streaming-forms.md
new file mode 100644
index 0000000..fee2eb9
--- /dev/null
+++ b/doc/berlinmod-q3-streaming-forms.md
@@ -0,0 +1,107 @@
+# BerlinMOD-Q3 streaming forms
+
+This document defines what **BerlinMOD-Q3** means in each of the three
+streaming forms the parity contract specifies for the MobilityFlink /
+MobilityKafka / MobilityNebula trio (see the planned-tier section of the
+[ecosystem profile](https://github.com/MobilityDB/.github)).
+
+## The batch query
+
+> *Which vehicles were within distance `d` of point `P` at time `T`?*
+
+Parameters: a point `P = (lon, lat)`, a radius `d` in metres, and a time `T`.
+Returns: the set of `vehicle_id`s whose trajectory passed within `d` of `P` at `T`.
+
+The batch reference implementation lives in
+[MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD) and
+runs against the three SQL surfaces (MobilityDB / MobilityDuck / MobilitySpark)
+with byte-identical results — the batch oracle for the snapshot streaming form
+below.
+
+## The three streaming forms
+
+### 1. Continuous form
+
+> *"At every moment, which vehicles are currently within `d` of `P`?"*
+
+For each incoming GPS event `(vehicle_id, t, lon, lat)`:
+
+- Evaluate the radius predicate `distance((lon, lat), P) ≤ d`.
+- Emit `(vehicle_id, t, near)` per event.
+
+No window; output updates per event. Watermark-independent.
+
+Use case: real-time geofence alerting where each event matters.
+
+Implemented by [`Q3ContinuousFunction`](../flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java).
+
+### 2. Windowed form
+
+> *"Per N-second tumbling window, how many distinct vehicles were
+> within `d` of `P` at any time during the window?"*
+
+Tumbling event-time window of size `W` (default `W = 10s`). For each window:
+
+- Collect all events whose timestamp falls in the window.
+- Compute the distinct set `{vehicle_id : ∃ event in window with distance ≤ d}`.
+- Emit `(window_start, window_end, distinct_count)`.
+
+Use case: time-bucketed dashboards, near-real-time aggregates.
+
+Implemented by [`Q3WindowedFunction`](../flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java).
+
+### 3. Snapshot form — **the parity oracle**
+
+> *"At time `T`, which vehicles are within `d` of `P`?"*
+
+Watermark-driven. Per vehicle, maintain `lastKnownPosition` state. At each
+snapshot tick (event-time timer at multiples of `snapshotTickMillis`,
+default `5000 ms`):
+
+- For each vehicle's most recent `(lon, lat)`, evaluate the radius predicate.
+- Emit `(T, vehicle_id)` for every vehicle satisfying the predicate at `T`.
+
+As the watermark advances to `T = max(event_times)`, the streaming snapshot
+output **equals the batch BerlinMOD-Q3 result** on the same scale-factor
+corpus. This is the parity property the contract enforces:
+
+```
+streaming-Q3-snapshot(T) ≡ batch-BerlinMOD-Q3 on data up to T
+ (same SF, same P, same d)
+```
+
+Use case: lambda-architecture style verification — streaming pipeline's
+output must converge to the batch reference.
+
+Implemented by [`Q3SnapshotFunction`](../flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java).
+
+## Default parameters
+
+The `BerlinMODQ3Main` entry point uses:
+
+| Parameter | Value | Source |
+|---|---|---|
+| `P` (lon, lat) | (4.3517, 50.8503) — Brussels city centre | Default centre for the BerlinMOD-Brussels corpus |
+| `d` (radius) | 5 000 m | Within-city-centre scale |
+| `W` (window size) | 10 s | Same as the AIS example for consistency |
+| Snapshot tick | 5 s | Half the window for finer parity-oracle granularity |
+| Topic | `berlinmod` | Single shared topic across the three forms |
+
+## Predicate implementation
+
+The scaffold today uses a pure-Java great-circle (Haversine) distance check in
+[`Haversine`](../flink-processor/src/main/java/berlinmod/Haversine.java). This
+matches the predicate semantics of the MEOS `edwithin_tgeo_geo` operator (the
+same call used by `MobilityNebula/Queries/Query1.yaml`), so swapping the
+predicate body for a JMEOS-bridged `edwithin_tgeo_geo` call is a one-line
+change once the JMEOS surface for that operator is verified — it is marked
+`TODO(meos)` in each form's class.
+
+## Companion producer
+
+The BerlinMOD CSV → Kafka producer lives at
+[`kafka-producer/python-producer-berlinmod.py`](../kafka-producer/python-producer-berlinmod.py).
+Generate a BerlinMOD CSV at scale factor SF with the upstream generator
+(`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB), name the
+columns `(t, vehicle_id, lon, lat)`, and the producer streams it to the
+`berlinmod` topic.
diff --git a/flink-processor/jar/JMEOS.jar b/flink-processor/jar/JMEOS.jar
index 3c22044..2bc69e5 100644
Binary files a/flink-processor/jar/JMEOS.jar and b/flink-processor/jar/JMEOS.jar differ
diff --git a/flink-processor/src/main/java/aisdata/Main.java b/flink-processor/src/main/java/aisdata/Main.java
index 148caa0..5a62273 100644
--- a/flink-processor/src/main/java/aisdata/Main.java
+++ b/flink-processor/src/main/java/aisdata/Main.java
@@ -56,7 +56,9 @@ public static void main(String[] args) throws Exception {
// Initialize MEOS with proper error handling
try {
logger.info("Initializing MEOS library");
- functions.meos_initialize("UTC", errorHandler);
+ // JMEOS 1.4 split: no-arg meos_initialize() + separate tz + error-handler entry points
+ functions.meos_initialize();
+ functions.meos_initialize_timezone("UTC");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-processor/src/main/java/aisdata/TrajectoryWindowFunction.java b/flink-processor/src/main/java/aisdata/TrajectoryWindowFunction.java
index e047143..d5e1adb 100644
--- a/flink-processor/src/main/java/aisdata/TrajectoryWindowFunction.java
+++ b/flink-processor/src/main/java/aisdata/TrajectoryWindowFunction.java
@@ -31,7 +31,9 @@ public class TrajectoryWindowFunction extends
public void open(Configuration parameters) throws Exception {
super.open(parameters);
errorHandler = new error_handler(); // Initialize error handler here
- functions.meos_initialize("UTC", errorHandler);
+ // JMEOS 1.4 split: no-arg meos_initialize() + separate tz + error-handler entry points
+ functions.meos_initialize();
+ functions.meos_initialize_timezone("UTC");
logger.info("MEOS initialized in TrajectoryWindowFunction.open()");
}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODDeserializationSchema.java b/flink-processor/src/main/java/berlinmod/BerlinMODDeserializationSchema.java
new file mode 100644
index 0000000..a45b6d1
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODDeserializationSchema.java
@@ -0,0 +1,64 @@
+package berlinmod;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * JSON → {@link BerlinMODTrip} deserializer for the Kafka {@code berlinmod} topic.
+ *
+ * Expected JSON shape per record:
+ *
+ * { "t": "2007-05-28 06:00:00", "vehicle_id": 42, "lon": 4.36, "lat": 50.84 }
+ *
+ *
+ * The timestamp format is the same {@code yyyy-MM-dd HH:mm:ss} the BerlinMOD
+ * generator emits in {@code generate_berlinmod_trips.sql}; we parse it as UTC
+ * to match the AIS pipeline's event-time convention.
+ */
+public class BerlinMODDeserializationSchema implements DeserializationSchema {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final DateTimeFormatter TS_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ public BerlinMODDeserializationSchema() {
+ OBJECT_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ @Override
+ public BerlinMODTrip deserialize(byte[] message) throws IOException {
+ JsonNode node = OBJECT_MAPPER.readTree(message);
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setTimestamp(parseTimestamp(node.get("t").asText()));
+ trip.setVehicleId(node.get("vehicle_id").asInt());
+ trip.setLon(node.get("lon").asDouble());
+ trip.setLat(node.get("lat").asDouble());
+ return trip;
+ }
+
+ private long parseTimestamp(String s) {
+ LocalDateTime dt = LocalDateTime.parse(s, TS_FORMATTER);
+ return dt.atZone(ZoneId.of("UTC")).toInstant().toEpochMilli();
+ }
+
+ @Override
+ public boolean isEndOfStream(BerlinMODTrip nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeExtractor.getForClass(BerlinMODTrip.class);
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java
new file mode 100644
index 0000000..7b609b0
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java
@@ -0,0 +1,96 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q1 three streaming forms.
+ *
+ * Same 21-event synthetic corpus as Q2/Q3 local tests. Q1 has no spatial
+ * predicate and no per-event filter parameter — it simply enumerates vehicles
+ * seen.
+ *
+ *
Expected output:
+ *
+ * - Q1-continuous: 3 lines, one per distinct vehicle (firstSeenTime)
+ * - Q1-windowed: 2 windows, each with distinctCount=3
+ * - Q1-snapshot: 9 lines (3 ticks × 3 vehicles all seen by source-close)
+ *
+ */
+public class BerlinMODQ1LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ1LocalTest.class);
+
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ System.setProperty("mobilityflink.meos.enabled", "false");
+ LOG.info("BerlinMODQ1LocalTest starting; window={}s tick={}ms",
+ WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q1ContinuousFunction());
+ cont.print("Q1-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q1WindowedFunction());
+ win.print("Q1-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q1SnapshotFunction(SNAPSHOT_TICK_MILLIS));
+ snap.print("Q1-snapshot");
+
+ env.execute("BerlinMODQ1LocalTest");
+ LOG.info("BerlinMODQ1LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java
new file mode 100644
index 0000000..9446c30
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java
@@ -0,0 +1,105 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q2 three streaming forms.
+ *
+ * Same structural shape as {@link BerlinMODQ3LocalTest} but exercises Q2 ("where is vehicle X
+ * at time T?") with {@code X = 200} (the Anderlecht vehicle). Same 3-vehicle /
+ * 21-event synthetic corpus.
+ *
+ *
Expected output shape (with {@code X = 200}):
+ *
+ * - Q2-continuous: 7 events (the 7 vehicle-200 events; vehicles 100 and 300 filtered out)
+ * - Q2-windowed: 2 windows of size 10 s, each emitting the last vehicle-200 position seen in the window
+ * - Q2-snapshot: 3 ticks × 1 emission each = 3 lines (vehicle 200's last-known position at each 5 s tick)
+ *
+ *
+ * Run after {@code mvn package} with:
+ *
+ * java -cp target/flink-kafka2postgres-1.0-SNAPSHOT.jar berlinmod.BerlinMODQ2LocalTest
+ *
+ */
+public class BerlinMODQ2LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ2LocalTest.class);
+
+ private static final int TARGET_VEHICLE_ID = 200;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC
+
+ public static void main(String[] args) throws Exception {
+ System.setProperty("mobilityflink.meos.enabled", "false");
+ LOG.info("BerlinMODQ2LocalTest starting; X={} window={}s tick={}ms",
+ TARGET_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1); // deterministic output ordering
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream cont = trips
+ .process(new Q2ContinuousFunction(TARGET_VEHICLE_ID));
+ cont.map(t -> String.format("v=%d t=%d (%.4f,%.4f)",
+ t.getVehicleId(), t.getTimestamp(), t.getLon(), t.getLat()))
+ .print("Q2-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q2WindowedFunction(TARGET_VEHICLE_ID));
+ win.print("Q2-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q2SnapshotFunction(TARGET_VEHICLE_ID, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q2-snapshot");
+
+ env.execute("BerlinMODQ2LocalTest");
+ LOG.info("BerlinMODQ2LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ // Same synthetic corpus as Q3LocalTest, so any user can run both and
+ // see them work over identical inputs.
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java
new file mode 100644
index 0000000..c5b9220
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java
@@ -0,0 +1,108 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+/**
+ * Entry point for the BerlinMOD-Q2 scaffold on MobilityFlink.
+ *
+ * Runs the three streaming forms of BerlinMOD-Q2 ("where is vehicle X at
+ * time T?") side by side over the same Kafka input topic {@code berlinmod}:
+ *
+ * - {@link Q2ContinuousFunction} — emit every event of vehicle X as it arrives
+ * - {@link Q2WindowedFunction} — last-known (lon, lat) of vehicle X per N-second tumbling window
+ * - {@link Q2SnapshotFunction} — vehicle X's last-known (lon, lat) at each watermark tick;
+ * the parity-oracle form (≡ batch BerlinMOD-Q2 at the same scale factor)
+ *
+ *
+ * The queried vehicle id and other defaults match
+ * {@code doc/berlinmod-q3-streaming-forms.md}. The companion local test driver
+ * is {@link BerlinMODQ2LocalTest}.
+ */
+public class BerlinMODQ2Main {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ2Main.class);
+
+ // Default Q2 parameters — query vehicle 200 (Anderlecht), 10 s windows,
+ // 5 s snapshot tick. Matches the synthetic-corpus defaults.
+ private static final int TARGET_VEHICLE_ID = 200;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final String KAFKA_TOPIC = "berlinmod";
+ private static final String KAFKA_BOOTSTRAP = "kafka:29092";
+ private static final String CONSUMER_GROUP = "flink_berlinmod_q2";
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ2Main starting; X={} window={}s tick={}ms",
+ TARGET_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ KafkaSource kafkaSource = KafkaSource.builder()
+ .setBootstrapServers(KAFKA_BOOTSTRAP)
+ .setGroupId(CONSUMER_GROUP)
+ .setTopics(KAFKA_TOPIC)
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema())
+ .build();
+
+ DataStream raw = env.fromSource(
+ kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source (berlinmod)");
+
+ DataStream trips = raw
+ .map(new DeserializeBerlinMODMapFunction())
+ .assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
+ .withTimestampAssigner(new BerlinMODTimestampAssigner())
+ .withIdleness(Duration.ofMinutes(1))
+ );
+
+ // Continuous form — per-event pass-through for the queried vehicle
+ DataStream continuous = trips
+ .process(new Q2ContinuousFunction(TARGET_VEHICLE_ID));
+ continuous.print("Q2-continuous");
+
+ // Windowed form — last-known (lon, lat) per tumbling window
+ DataStream> windowed = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q2WindowedFunction(TARGET_VEHICLE_ID));
+ windowed.print("Q2-windowed");
+
+ // Snapshot form — keyed by vehicle, emits queried vehicle's last position at each tick
+ DataStream> snapshot = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q2SnapshotFunction(TARGET_VEHICLE_ID, SNAPSHOT_TICK_MILLIS));
+ snapshot.print("Q2-snapshot");
+
+ env.execute("BerlinMOD-Q2 (continuous / windowed / snapshot)");
+ LOG.info("BerlinMODQ2Main done");
+ }
+
+ public static class DeserializeBerlinMODMapFunction implements MapFunction {
+ @Override
+ public BerlinMODTrip map(String value) throws Exception {
+ return new BerlinMODDeserializationSchema().deserialize(value.getBytes());
+ }
+ }
+
+ public static class BerlinMODTimestampAssigner implements SerializableTimestampAssigner {
+ @Override
+ public long extractTimestamp(BerlinMODTrip element, long recordTimestamp) {
+ return element.getTimestamp();
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java
new file mode 100644
index 0000000..e1b7128
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java
@@ -0,0 +1,115 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q3 three streaming forms.
+ *
+ * Runs the same three form functions {@link BerlinMODQ3Main} runs (continuous,
+ * windowed, snapshot) but reads from a hardcoded synthetic event list via
+ * {@code env.fromCollection(...)} instead of from Kafka. This lets the scaffold
+ * be verified on any machine with Java + Maven, without Docker, a Kafka broker,
+ * the MEOS native lib, or any JMEOS call.
+ *
+ *
Synthetic corpus: 3 vehicles, 21 events over 14 simulated seconds —
+ *
+ * - Vehicle 100 — sits on Brussels city centre {@code P}, distance 0 m, near
+ * - Vehicle 200 — Anderlecht, ~4.1 km from {@code P}, near (within the 5 km radius)
+ * - Vehicle 300 — Forest, ~15.4 km from {@code P}, not near (outside the 5 km radius)
+ *
+ *
+ * Expected output shape:
+ *
+ * - Q3-continuous: 21 lines, {@code near=true} for vehicles 100 and 200, {@code false} for 300
+ * - Q3-windowed: 2 windows of size 10 s, each with {@code distinctCount=2} (vehicles 100 and 200)
+ * - Q3-snapshot: 3 ticks × 2 near vehicles = 6 lines (vehicles 100 and 200 at each of the three 5 s ticks)
+ *
+ *
+ * Run after {@code mvn package} with:
+ *
+ * java -cp target/flink-kafka2postgres-1.0-SNAPSHOT.jar berlinmod.BerlinMODQ3LocalTest
+ *
+ */
+public class BerlinMODQ3LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ3LocalTest.class);
+
+ private static final double P_LON = 4.3517;
+ private static final double P_LAT = 50.8503;
+ private static final double RADIUS_METRES = 5_000.0;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC
+
+ public static void main(String[] args) throws Exception {
+ System.setProperty("mobilityflink.meos.enabled", "false");
+ LOG.info("BerlinMODQ3LocalTest starting; P=({}, {}) radius={}m window={}s tick={}ms",
+ P_LON, P_LAT, RADIUS_METRES, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1); // deterministic output ordering for the test
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .process(new Q3ContinuousFunction(P_LON, P_LAT, RADIUS_METRES));
+ cont.print("Q3-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q3WindowedFunction(P_LON, P_LAT, RADIUS_METRES));
+ win.print("Q3-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q3SnapshotFunction(P_LON, P_LAT, RADIUS_METRES, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q3-snapshot");
+
+ env.execute("BerlinMODQ3LocalTest");
+ LOG.info("BerlinMODQ3LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ // Vehicle 100 — Brussels city centre (= P), 7 events at t0, t0+2s, …, t0+12s
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ // Vehicle 200 — Anderlecht ~4.1 km from P, 7 events at t0+1s, t0+3s, …, t0+13s
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ // Vehicle 300 — Forest ~15.4 km from P, 7 events at t0, t0+2s, …, t0+12s
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java
new file mode 100644
index 0000000..29bc518
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java
@@ -0,0 +1,111 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+/**
+ * Entry point for the BerlinMOD-Q3 scaffold on MobilityFlink.
+ *
+ * Runs the three streaming forms of BerlinMOD-Q3 side by side over the same
+ * Kafka input topic {@code berlinmod}:
+ *
+ * - {@link Q3ContinuousFunction} — per-event near/not-near
+ * - {@link Q3WindowedFunction} — distinct-count per N-second tumbling window
+ * - {@link Q3SnapshotFunction} — set of vehicles near P at each watermark tick
+ * (the parity-oracle form; ≡ batch BerlinMOD-Q3 at the same scale factor)
+ *
+ *
+ * Reference point P, radius {@code d}, window size and snapshot tick are
+ * the hardcoded defaults from the BerlinMOD-Q3 streaming-forms spec (see
+ * {@code doc/berlinmod-q3-streaming-forms.md}). The Kafka producer is the
+ * companion {@code kafka-producer/python-producer-berlinmod.py}.
+ */
+public class BerlinMODQ3Main {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ3Main.class);
+
+ // Default Q3 parameters — Brussels city centre, 5 km radius, 10 s windows,
+ // 5 s snapshot tick. Matches the defaults in the spec doc.
+ private static final double P_LON = 4.3517;
+ private static final double P_LAT = 50.8503;
+ private static final double RADIUS_METRES = 5_000.0;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final String KAFKA_TOPIC = "berlinmod";
+ private static final String KAFKA_BOOTSTRAP = "kafka:29092";
+ private static final String CONSUMER_GROUP = "flink_berlinmod_q3";
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ3Main starting; P=({}, {}) radius={}m window={}s tick={}ms",
+ P_LON, P_LAT, RADIUS_METRES, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ KafkaSource kafkaSource = KafkaSource.builder()
+ .setBootstrapServers(KAFKA_BOOTSTRAP)
+ .setGroupId(CONSUMER_GROUP)
+ .setTopics(KAFKA_TOPIC)
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema())
+ .build();
+
+ DataStream raw = env.fromSource(
+ kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source (berlinmod)");
+
+ DataStream trips = raw
+ .map(new DeserializeBerlinMODMapFunction())
+ .assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
+ .withTimestampAssigner(new BerlinMODTimestampAssigner())
+ .withIdleness(Duration.ofMinutes(1))
+ );
+
+ // Continuous form — per-event near/not-near
+ DataStream> continuous = trips
+ .process(new Q3ContinuousFunction(P_LON, P_LAT, RADIUS_METRES));
+ continuous.print("Q3-continuous");
+
+ // Windowed form — distinct count per tumbling window
+ DataStream> windowed = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q3WindowedFunction(P_LON, P_LAT, RADIUS_METRES));
+ windowed.print("Q3-windowed");
+
+ // Snapshot form — keyed by vehicle, emits set of vehicles near P at each tick
+ DataStream> snapshot = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q3SnapshotFunction(P_LON, P_LAT, RADIUS_METRES, SNAPSHOT_TICK_MILLIS));
+ snapshot.print("Q3-snapshot");
+
+ env.execute("BerlinMOD-Q3 (continuous / windowed / snapshot)");
+ LOG.info("BerlinMODQ3Main done");
+ }
+
+ public static class DeserializeBerlinMODMapFunction implements MapFunction {
+ @Override
+ public BerlinMODTrip map(String value) throws Exception {
+ return new BerlinMODDeserializationSchema().deserialize(value.getBytes());
+ }
+ }
+
+ public static class BerlinMODTimestampAssigner implements SerializableTimestampAssigner {
+ @Override
+ public long extractTimestamp(BerlinMODTrip element, long recordTimestamp) {
+ return element.getTimestamp();
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java
new file mode 100644
index 0000000..1f302a5
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java
@@ -0,0 +1,131 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q4 three streaming forms.
+ *
+ * Region R = bounding box {@code (4.30, 50.84, 4.36, 50.86)} — a rectangle
+ * around Brussels city centre. The synthetic corpus is designed to produce
+ * multiple outside → inside transitions so the entry-detection logic
+ * is exercised non-trivially:
+ *
+ *
+ * - Vehicle 100 sits inside R for all 7 events (no transitions).
+ * - Vehicle 200 oscillates: outside at t=1, inside at t=3 (entry),
+ * outside at t=5, inside at t=7 (entry), outside at t=9, inside at
+ * t=11 (entry), outside at t=13 → three entries.
+ * - Vehicle 300 stays in Forest (outside R) for all 7 events.
+ *
+ *
+ * Expected output:
+ *
+ * - Q4-continuous: 3 entries (v200's three outside → inside transitions)
+ * - Q4-windowed: per the intra-window scoping convention — window
+ * [0, 10 s) contains v100's first-seen-inside event AND v200's two entries
+ * (t=3, t=7); window [10, 20 s) contains v100's first-event-in-window
+ * AND v200's third entry (t=11). 5 emissions total.
+ * - Q4-snapshot: cumulative entries up to each tick. Tick 5: 1
+ * (v200 t=3). Tick 10: 2 (v200 t=3, t=7). Tick 15: 3 (v200 t=3, t=7,
+ * t=11). v100 contributes 0 (always inside, no transition). v300
+ * contributes 0. 6 emissions total (1+2+3).
+ *
+ */
+public class BerlinMODQ4LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ4LocalTest.class);
+
+ // Region R — Brussels centre rectangle
+ private static final double XMIN = 4.30;
+ private static final double YMIN = 50.84;
+ private static final double XMAX = 4.36;
+ private static final double YMAX = 50.86;
+
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ System.setProperty("mobilityflink.meos.enabled", "false");
+ LOG.info("BerlinMODQ4LocalTest starting; R=({},{},{},{}) window={}s tick={}ms",
+ XMIN, YMIN, XMAX, YMAX, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q4ContinuousFunction(XMIN, YMIN, XMAX, YMAX));
+ cont.print("Q4-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q4WindowedFunction(XMIN, YMIN, XMAX, YMAX));
+ win.print("Q4-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q4SnapshotFunction(XMIN, YMIN, XMAX, YMAX, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q4-snapshot");
+
+ env.execute("BerlinMODQ4LocalTest");
+ LOG.info("BerlinMODQ4LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ // v100 always inside R
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ // v200 oscillates in/out: out, IN, out, IN, out, IN, out
+ double[][] v200Path = {
+ {4.3060, 50.8270}, // t=1 out (lat<50.84)
+ {4.3060, 50.8500}, // t=3 IN
+ {4.3060, 50.8300}, // t=5 out
+ {4.3060, 50.8500}, // t=7 IN
+ {4.3060, 50.8100}, // t=9 out
+ {4.3060, 50.8500}, // t=11 IN
+ {4.3060, 50.8300}, // t=13 out
+ };
+ int idx = 0;
+ for (int i = 1; i <= 13; i += 2, idx++) {
+ events.add(make(200, T0 + i * 1000L, v200Path[idx][0], v200Path[idx][1]));
+ }
+ // v300 always outside R
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java
new file mode 100644
index 0000000..f54e102
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java
@@ -0,0 +1,110 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q5 three streaming forms.
+ *
+ * Same stationary-vehicle corpus as Q1/Q2/Q3/Q9. Reference point P =
+ * Brussels city centre (4.3517, 50.8503); {@code dP = 5 km} (vehicle near P);
+ * {@code dMeet = 5 km} (pair-meeting threshold).
+ *
+ *
Pairs:
+ *
+ * - (100, 200) — both near P; distance 4.1 km ≤ dMeet → MEET
+ * - (100, 300) — v300 not near P → don't qualify
+ * - (200, 300) — v300 not near P → don't qualify
+ *
+ *
+ * Expected output (only the (100, 200) pair qualifies):
+ *
+ * - Q5-continuous: pair (100, 200) emits on every event from t=1
+ * onward (the first t=0 events of v100 and v300 happen before v200 is
+ * known, so no pair exists yet). 21 - 2 = 19 emissions.
+ * - Q5-windowed: each of the two 10-second windows contains
+ * events for v100 and v200 — both qualify, the pair meets. 2 emissions.
+ * - Q5-snapshot: 3 ticks × 1 meeting pair = 3 emissions.
+ *
+ */
+public class BerlinMODQ5LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ5LocalTest.class);
+
+ private static final double P_LON = 4.3517;
+ private static final double P_LAT = 50.8503;
+ private static final double D_P_METRES = 5_000.0;
+ private static final double D_MEET_METRES = 5_000.0;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ System.setProperty("mobilityflink.meos.enabled", "false");
+ LOG.info("BerlinMODQ5LocalTest starting; P=({}, {}) dP={}m dMeet={}m",
+ P_LON, P_LAT, D_P_METRES, D_MEET_METRES);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(t -> 0)
+ .process(new Q5ContinuousFunction(P_LON, P_LAT, D_P_METRES, D_MEET_METRES));
+ cont.print("Q5-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q5WindowedFunction(P_LON, P_LAT, D_P_METRES, D_MEET_METRES));
+ win.print("Q5-windowed");
+
+ DataStream> snap = trips
+ .keyBy(t -> 0)
+ .process(new Q5SnapshotFunction(P_LON, P_LAT, D_P_METRES, D_MEET_METRES, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q5-snapshot");
+
+ env.execute("BerlinMODQ5LocalTest");
+ LOG.info("BerlinMODQ5LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java
new file mode 100644
index 0000000..4c18eb5
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java
@@ -0,0 +1,122 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q6 three streaming forms.
+ *
+ * Unlike Q1/Q2/Q3 which use a stationary-vehicles corpus, Q6 needs vehicles
+ * that actually move so the cumulative-distance arithmetic produces non-zero
+ * output. The synthetic corpus here drifts each vehicle by a fixed bearing
+ * per event:
+ *
+ *
+ * - Vehicle 100 drifts east ~100 m per 2 s event (0.001423° lon at lat 50.85)
+ * - Vehicle 200 drifts south ~50 m per 2 s event (0.000450° lat)
+ * - Vehicle 300 drifts west ~200 m per 2 s event (0.002846° lon)
+ *
+ *
+ * With 7 events per vehicle (6 inter-event steps), per-vehicle totals are
+ * approximately:
+ *
+ *
+ * - v100: 6 × 100 m = 600 m
+ * - v200: 6 × 50 m = 300 m
+ * - v300: 6 × 200 m = 1200 m
+ *
+ *
+ * Expected output:
+ *
+ *
+ * - Q6-continuous: 21 lines, cumulative metres rising monotonically per vehicle
+ * - Q6-windowed: 6 windowed emissions (2 windows × 3 vehicles)
+ * - Q6-snapshot: 9 emissions (3 ticks × 3 vehicles, all-source-closed)
+ *
+ */
+public class BerlinMODQ6LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ6LocalTest.class);
+
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ // Drift per event step
+ private static final double V100_DLON = 100.0 / (111_000.0 * Math.cos(Math.toRadians(50.85)));
+ private static final double V200_DLAT = -50.0 / 111_000.0;
+ private static final double V300_DLON = -200.0 / (111_000.0 * Math.cos(Math.toRadians(50.85)));
+
+ public static void main(String[] args) throws Exception {
+ System.setProperty("mobilityflink.meos.enabled", "false");
+ LOG.info("BerlinMODQ6LocalTest starting; window={}s tick={}ms",
+ WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q6ContinuousFunction());
+ cont.print("Q6-continuous");
+
+ DataStream> win = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q6WindowedFunction());
+ win.print("Q6-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q6SnapshotFunction(SNAPSHOT_TICK_MILLIS));
+ snap.print("Q6-snapshot");
+
+ env.execute("BerlinMODQ6LocalTest");
+ LOG.info("BerlinMODQ6LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ int step = 0;
+ for (int i = 0; i <= 12; i += 2, step++) {
+ events.add(make(100, T0 + i * 1000L, 4.3517 + step * V100_DLON, 50.8503));
+ }
+ step = 0;
+ for (int i = 1; i <= 13; i += 2, step++) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270 + step * V200_DLAT));
+ }
+ step = 0;
+ for (int i = 0; i <= 12; i += 2, step++) {
+ events.add(make(300, T0 + i * 1000L, 4.2000 + step * V300_DLON, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java
new file mode 100644
index 0000000..fe9f075
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java
@@ -0,0 +1,117 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q7 three streaming forms.
+ *
+ * Same stationary-vehicle corpus as Q1/Q2/Q3/Q5/Q9. POI list:
+ *
+ * - POI 1 = Brussels city centre (4.3517, 50.8503), radius 2000 m
+ * - POI 2 = Anderlecht (4.3060, 50.8270), radius 1000 m
+ * - POI 3 = south of Brussels (4.2100, 50.7600), radius 2000 m
+ *
+ *
+ * Per (vehicle, POI) match-up:
+ *
+ * - v100 is inside POI 1 (0 m), outside POI 2 (~4.1 km) and POI 3 (~13 km)
+ * - v200 is inside POI 2 (0 m), outside POI 1 and POI 3
+ * - v300 is inside POI 3 (~1.3 km), outside POI 1 and POI 2
+ *
+ *
+ * Expected output:
+ *
+ * - Q7-continuous: 3 emissions — first-passages on each vehicle's
+ * very first event (v100 t=0 → POI 1; v200 t=1 → POI 2; v300 t=0 →
+ * POI 3)
+ * - Q7-windowed: per-window intra-window first-passages —
+ * window [0, 10 s) sees all 3 first-passages; window [10, 20 s) sees
+ * all 3 again (intra-window scoping has no cross-window memory). 6 lines.
+ * - Q7-snapshot: 3 ticks × 3 cumulative (vehicle, POI) first-passages = 9 lines
+ *
+ */
+public class BerlinMODQ7LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ7LocalTest.class);
+
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ private static final List POIS = Arrays.asList(
+ new PointOfInterest(1, 4.3517, 50.8503, 2_000.0),
+ new PointOfInterest(2, 4.3060, 50.8270, 1_000.0),
+ new PointOfInterest(3, 4.2100, 50.7600, 2_000.0));
+
+ public static void main(String[] args) throws Exception {
+ System.setProperty("mobilityflink.meos.enabled", "false");
+ LOG.info("BerlinMODQ7LocalTest starting; #POIs={} window={}s tick={}ms",
+ POIS.size(), WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q7ContinuousFunction(POIS));
+ cont.print("Q7-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q7WindowedFunction(POIS));
+ win.print("Q7-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q7SnapshotFunction(POIS, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q7-snapshot");
+
+ env.execute("BerlinMODQ7LocalTest");
+ LOG.info("BerlinMODQ7LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java
new file mode 100644
index 0000000..c644860
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java
@@ -0,0 +1,108 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q8 three streaming forms.
+ *
+ * Same stationary-vehicle corpus as the other Qs. Road segment runs
+ * from (4.30, 50.83) to (4.36, 50.87) — a diagonal across the Brussels-
+ * centre region. With a {@code d = 5 km} proximity threshold:
+ *
+ *
+ * - v100 at (4.3517, 50.8503) — ~1.1 km from segment → near
+ * - v200 at (4.3060, 50.8270) — ~0.5 km from segment → near
+ * - v300 at (4.2000, 50.7500) — ~13 km from segment → not near
+ *
+ *
+ * Expected output shape:
+ *
+ * - Q8-continuous: 21 events (14 near=true for v100/v200, 7 near=false for v300)
+ * - Q8-windowed: 2 windows, each with {@code distinctCount=2} (vehicles 100 and 200)
+ * - Q8-snapshot: 3 ticks × 2 near vehicles = 6 emissions
+ *
+ *
+ * Same shape as Q3 with a segment-distance predicate substituted for the
+ * point-radius one — the algebraic pattern parity intentional.
+ */
+public class BerlinMODQ8LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ8LocalTest.class);
+
+ // Road segment endpoints
+ private static final double S1_LON = 4.30, S1_LAT = 50.83;
+ private static final double S2_LON = 4.36, S2_LAT = 50.87;
+ private static final double RADIUS_METRES = 5_000.0;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ System.setProperty("mobilityflink.meos.enabled", "false");
+ LOG.info("BerlinMODQ8LocalTest starting; segment=({},{}) → ({},{}) d={}m",
+ S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .process(new Q8ContinuousFunction(S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES));
+ cont.print("Q8-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q8WindowedFunction(S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES));
+ win.print("Q8-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q8SnapshotFunction(S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q8-snapshot");
+
+ env.execute("BerlinMODQ8LocalTest");
+ LOG.info("BerlinMODQ8LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java
new file mode 100644
index 0000000..6468ba7
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java
@@ -0,0 +1,108 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q9 three streaming forms.
+ *
+ * Same stationary-vehicle synthetic corpus as Q1/Q2/Q3 (3 vehicles, 21
+ * events). Queried pair X = 100 (Brussels city centre), Y = 200 (Anderlecht);
+ * their actual distance is ~4.1 km — the expected output for every emission.
+ *
+ *
Expected output:
+ *
+ * - Q9-continuous: 13 lines — emitted whenever either X or Y has
+ * a new event AND the other has been seen at least once. v100 fires
+ * first at t=0; v200's first event at t=1 produces the first paired
+ * emission; subsequent 12 events (alternating) each produce one
+ * emission.
+ * - Q9-windowed: 2 windows — both contain X and Y events, each
+ * emits the X-Y distance using last seen-in-window positions.
+ * - Q9-snapshot: 3 ticks × 1 emission each = 3 lines.
+ *
+ */
+public class BerlinMODQ9LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ9LocalTest.class);
+
+ private static final int X_VEHICLE_ID = 100;
+ private static final int Y_VEHICLE_ID = 200;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ System.setProperty("mobilityflink.meos.enabled", "false");
+ LOG.info("BerlinMODQ9LocalTest starting; X={} Y={} window={}s tick={}ms",
+ X_VEHICLE_ID, Y_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ // Pre-filter to {X, Y} and key by a constant so the shared X+Y state
+ // lives in a single subtask.
+ DataStream xy = trips
+ .filter(t -> t.getVehicleId() == X_VEHICLE_ID || t.getVehicleId() == Y_VEHICLE_ID);
+
+ DataStream> cont = xy
+ .keyBy(t -> 0)
+ .process(new Q9ContinuousFunction(X_VEHICLE_ID, Y_VEHICLE_ID));
+ cont.print("Q9-continuous");
+
+ DataStream> win = xy
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q9WindowedFunction(X_VEHICLE_ID, Y_VEHICLE_ID));
+ win.print("Q9-windowed");
+
+ DataStream> snap = xy
+ .keyBy(t -> 0)
+ .process(new Q9SnapshotFunction(X_VEHICLE_ID, Y_VEHICLE_ID, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q9-snapshot");
+
+ env.execute("BerlinMODQ9LocalTest");
+ LOG.info("BerlinMODQ9LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODTrip.java b/flink-processor/src/main/java/berlinmod/BerlinMODTrip.java
new file mode 100644
index 0000000..6eb8e80
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODTrip.java
@@ -0,0 +1,48 @@
+package berlinmod;
+
+/**
+ * Plain data class for a single GPS event from a BerlinMOD trip.
+ *
+ * Matches the {@code aisdata.AISData} field set but uses the BerlinMOD vehicle
+ * identifier {@code vehicleId} instead of an AIS {@code mmsi} and drops the
+ * AIS-specific {@code speed}/{@code course} channels (BerlinMOD's generator
+ * does not export those for the streaming form).
+ */
+public class BerlinMODTrip {
+ private long timestamp; // epoch milliseconds (event time)
+ private int vehicleId;
+ private double lon;
+ private double lat;
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public int getVehicleId() {
+ return vehicleId;
+ }
+
+ public void setVehicleId(int vehicleId) {
+ this.vehicleId = vehicleId;
+ }
+
+ public double getLon() {
+ return lon;
+ }
+
+ public void setLon(double lon) {
+ this.lon = lon;
+ }
+
+ public double getLat() {
+ return lat;
+ }
+
+ public void setLat(double lat) {
+ this.lat = lat;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Haversine.java b/flink-processor/src/main/java/berlinmod/Haversine.java
new file mode 100644
index 0000000..cb6f888
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Haversine.java
@@ -0,0 +1,44 @@
+package berlinmod;
+
+/**
+ * Great-circle distance in metres between two WGS84 (lon, lat) points.
+ *
+ *
Pure-Java fallback for {@link MEOSBridge#dwithinMetres} and
+ * {@link MEOSBridge#distanceMetres}, used by the BerlinMOD-9 × 3-form
+ * streaming scaffold when libmeos is not loadable on the runtime path
+ * (e.g. the mini-cluster local tests in {@code BerlinMODQ*LocalTest}). The
+ * primary spatial-predicate surface is {@link MEOSBridge}; this class is a
+ * fallback only.
+ */
+public final class Haversine {
+
+ private static final double EARTH_RADIUS_METRES = 6_371_000.0;
+
+ private Haversine() {
+ // utility
+ }
+
+ /**
+ * @return great-circle distance in metres between (lon1, lat1) and (lon2, lat2)
+ */
+ public static double distanceMetres(double lon1, double lat1, double lon2, double lat2) {
+ double phi1 = Math.toRadians(lat1);
+ double phi2 = Math.toRadians(lat2);
+ double dPhi = Math.toRadians(lat2 - lat1);
+ double dLambda = Math.toRadians(lon2 - lon1);
+
+ double a = Math.sin(dPhi / 2) * Math.sin(dPhi / 2)
+ + Math.cos(phi1) * Math.cos(phi2)
+ * Math.sin(dLambda / 2) * Math.sin(dLambda / 2);
+ double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
+ return EARTH_RADIUS_METRES * c;
+ }
+
+ /**
+ * @return true if the great-circle distance from (lon, lat) to (pLon, pLat)
+ * is ≤ {@code radiusMetres}
+ */
+ public static boolean withinMetres(double lon, double lat, double pLon, double pLat, double radiusMetres) {
+ return distanceMetres(lon, lat, pLon, pLat) <= radiusMetres;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/MEOSBridge.java b/flink-processor/src/main/java/berlinmod/MEOSBridge.java
new file mode 100644
index 0000000..12570e0
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/MEOSBridge.java
@@ -0,0 +1,158 @@
+package berlinmod;
+
+import functions.functions;
+import jnr.ffi.Pointer;
+import utils.spatial.PointToSegment;
+
+/**
+ * Runtime bridge from MobilityFlink BerlinMOD streaming-form predicates to
+ * MEOS via JMEOS.
+ *
+ *
All spatial predicates exercised by the BerlinMOD-9 × 3-form scaffold
+ * flow through this class. When the JMEOS native libmeos shared object is
+ * present and loadable, each predicate evaluates through MEOS' WGS84
+ * geography surface ({@code geom_to_geog} + {@code geog_dwithin}). When
+ * libmeos is not available, each predicate falls back to the corresponding
+ * pure-Java implementation in {@link Haversine} or {@link SegmentDistance}
+ * so the BerlinMOD mini-cluster local tests stay runnable on systems
+ * without a MEOS install.
+ *
+ *
The fallback is gated by the {@link #MEOS_AVAILABLE} static flag, set
+ * once at class-load time:
+ *
+ * - {@code -Dmobilityflink.meos.enabled=false} forces the pure-Java path
+ * even when libmeos is present (used by {@code BerlinMODQ*LocalTest}).
+ *
- Otherwise {@code MEOS_AVAILABLE} is {@code true} iff
+ * {@code functions.meos_initialize()} returns without throwing.
+ *
+ */
+public final class MEOSBridge {
+
+ /**
+ * {@code true} iff MEOS is available on this runtime and the bridge
+ * routes through it; {@code false} iff the bridge will use the pure-Java
+ * fallbacks.
+ */
+ public static final boolean MEOS_AVAILABLE;
+
+ static {
+ boolean enabled =
+ Boolean.parseBoolean(System.getProperty("mobilityflink.meos.enabled", "true"));
+ boolean ok = false;
+ if (enabled) {
+ try {
+ functions.meos_initialize();
+ ok = true;
+ } catch (Throwable t) {
+ // libmeos shared object not loadable on this runtime — fall back.
+ ok = false;
+ }
+ }
+ MEOS_AVAILABLE = ok;
+ }
+
+ private MEOSBridge() {
+ // utility
+ }
+
+ // ----------------------------------------------------------------------
+ // Public bridge surface — same shape as Haversine + SegmentDistance.
+ // ----------------------------------------------------------------------
+
+ /**
+ * @return {@code true} if the great-circle distance from {@code (lon1, lat1)}
+ * to {@code (lon2, lat2)} on the WGS84 spheroid is at most
+ * {@code radiusMetres}. MEOS-backed via {@code geog_dwithin} when
+ * available, else pure-Java {@link Haversine#withinMetres}.
+ */
+ public static boolean dwithinMetres(double lon1, double lat1,
+ double lon2, double lat2,
+ double radiusMetres) {
+ if (!MEOS_AVAILABLE) {
+ return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres);
+ }
+ Pointer g1 = pointGeog(lon1, lat1);
+ Pointer g2 = pointGeog(lon2, lat2);
+ if (g1 == null || g2 == null) {
+ return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres);
+ }
+ return functions.geog_dwithin(g1, g2, radiusMetres, true);
+ }
+
+ /**
+ * @return {@code true} if the spheroidal distance from {@code (pLon, pLat)}
+ * to the LineString {@code (s1, s2)} is at most {@code radiusMetres}.
+ * MEOS-backed via {@code geog_dwithin} on geographies built from
+ * the point and line WKTs, else pure-Java
+ * {@link SegmentDistance#withinMetres}.
+ */
+ public static boolean dwithinSegmentMetres(double pLon, double pLat,
+ double s1Lon, double s1Lat,
+ double s2Lon, double s2Lat,
+ double radiusMetres) {
+ if (!MEOS_AVAILABLE) {
+ return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres);
+ }
+ Pointer pg = pointGeog(pLon, pLat);
+ Pointer lg = lineGeog(s1Lon, s1Lat, s2Lon, s2Lat);
+ if (pg == null || lg == null) {
+ return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres);
+ }
+ return functions.geog_dwithin(pg, lg, radiusMetres, true);
+ }
+
+ /**
+ * @return the spheroidal distance in metres between two WGS84 points.
+ * MEOS-backed via {@code utils.spatial.Haversine.distance}
+ * (which calls MEOS' {@code geog_distance} over two POINT
+ * geographies) when libmeos is loadable, else pure-Java
+ * {@link Haversine#distanceMetres}.
+ */
+ public static double distanceMetres(double lon1, double lat1,
+ double lon2, double lat2) {
+ if (!MEOS_AVAILABLE) {
+ return Haversine.distanceMetres(lon1, lat1, lon2, lat2);
+ }
+ return utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2);
+ }
+
+ /**
+ * @return the spheroidal distance in metres from {@code (pLon, pLat)} to
+ * the LineString {@code (s1, s2)}. MEOS-backed via
+ * {@code utils.spatial.PointToSegment.distance} when libmeos is
+ * loadable, else pure-Java
+ * {@link SegmentDistance#distanceMetres}.
+ */
+ public static double distanceSegmentMetres(double pLon, double pLat,
+ double s1Lon, double s1Lat,
+ double s2Lon, double s2Lat) {
+ if (!MEOS_AVAILABLE) {
+ return SegmentDistance.distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat);
+ }
+ return PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat);
+ }
+
+ // ----------------------------------------------------------------------
+ // Internal helpers — WKT → geometry → geography in one MEOS-side step.
+ // ----------------------------------------------------------------------
+
+ private static Pointer pointGeog(double lon, double lat) {
+ String wkt = String.format("SRID=4326;Point(%.7f %.7f)", lon, lat);
+ Pointer g = functions.geom_in(wkt, -1);
+ if (g == null) {
+ return null;
+ }
+ return functions.geom_to_geog(g);
+ }
+
+ private static Pointer lineGeog(double s1Lon, double s1Lat,
+ double s2Lon, double s2Lat) {
+ String wkt = String.format("SRID=4326;LineString(%.7f %.7f, %.7f %.7f)",
+ s1Lon, s1Lat, s2Lon, s2Lat);
+ Pointer g = functions.geom_in(wkt, -1);
+ if (g == null) {
+ return null;
+ }
+ return functions.geom_to_geog(g);
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/PointOfInterest.java b/flink-processor/src/main/java/berlinmod/PointOfInterest.java
new file mode 100644
index 0000000..0dc3ac5
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/PointOfInterest.java
@@ -0,0 +1,24 @@
+package berlinmod;
+
+import java.io.Serializable;
+
+/**
+ * Simple point-of-interest record for BerlinMOD-Q7 — a (lon, lat) plus a
+ * proximity radius in metres and an integer id. Serializable for use in
+ * Flink operator state and configuration.
+ */
+public final class PointOfInterest implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public final int id;
+ public final double lon;
+ public final double lat;
+ public final double radiusMetres;
+
+ public PointOfInterest(int id, double lon, double lat, double radiusMetres) {
+ this.id = id;
+ this.lon = lon;
+ this.lat = lat;
+ this.radiusMetres = radiusMetres;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q1ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q1ContinuousFunction.java
new file mode 100644
index 0000000..dcaa383
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q1ContinuousFunction.java
@@ -0,0 +1,41 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q1 — continuous form.
+ *
+ * "Which vehicles have appeared in the stream?"
+ *
+ *
Emits {@code (vehicleId, firstSeenTimestamp)} the first time each vehicle
+ * is seen; subsequent events for the same vehicle are deduplicated via keyed
+ * state. Keyed by vehicleId.
+ */
+public class Q1ContinuousFunction
+ extends KeyedProcessFunction> {
+
+ private transient ValueState seen;
+
+ @Override
+ public void open(Configuration parameters) {
+ seen = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q1SeenVehicle", Boolean.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ Boolean s = seen.value();
+ if (s == null || !s) {
+ out.collect(new Tuple2<>(trip.getVehicleId(), trip.getTimestamp()));
+ seen.update(true);
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q1SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q1SnapshotFunction.java
new file mode 100644
index 0000000..03c171a
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q1SnapshotFunction.java
@@ -0,0 +1,59 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q1 — snapshot form.
+ *
+ * "At time T, which vehicles have appeared in the stream up to T?"
+ *
+ *
Keyed by vehicleId. On each event, mark the vehicle as seen and register
+ * an event-time timer at the next snapshot tick. When the timer fires at time
+ * T, emit {@code (T, vehicleId)} for each vehicle that has been seen by T.
+ *
+ *
This is the parity-oracle form: at watermark T, the streaming output is
+ * the set of vehicleIds whose first event occurred at or before T, which
+ * equals the batch BerlinMOD-Q1 result on data up to T.
+ */
+public class Q1SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private final long snapshotTickMillis;
+ private transient ValueState seen;
+
+ public Q1SnapshotFunction(long snapshotTickMillis) {
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ seen = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q1SnapshotSeen", Boolean.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ seen.update(true);
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ Boolean s = seen.value();
+ if (Boolean.TRUE.equals(s)) {
+ out.collect(new Tuple2<>(timestamp, ctx.getCurrentKey()));
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q1WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q1WindowedFunction.java
new file mode 100644
index 0000000..4581422
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q1WindowedFunction.java
@@ -0,0 +1,33 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * BerlinMOD-Q1 — windowed form.
+ *
+ * "Per N-second tumbling window, how many distinct vehicles appeared
+ * in the window?"
+ *
+ *
Emits {@code (windowStart, windowEnd, distinctCount)} per window.
+ */
+public class Q1WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ Set distinct = new HashSet<>();
+ for (BerlinMODTrip trip : elements) {
+ distinct.add(trip.getVehicleId());
+ }
+ out.collect(new Tuple3<>(ctx.window().getStart(), ctx.window().getEnd(), (long) distinct.size()));
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q2ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q2ContinuousFunction.java
new file mode 100644
index 0000000..9d87dcb
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q2ContinuousFunction.java
@@ -0,0 +1,40 @@
+package berlinmod;
+
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q2 — continuous form.
+ *
+ * "Where is vehicle X right now?"
+ *
+ *
For each incoming GPS event {@link BerlinMODTrip}, emit it unchanged if it
+ * belongs to the queried vehicle, otherwise drop. No windowing, no state —
+ * a per-event filter against {@code targetVehicleId}.
+ */
+public class Q2ContinuousFunction extends ProcessFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q2ContinuousFunction.class);
+
+ private final int targetVehicleId;
+
+ public Q2ContinuousFunction(int targetVehicleId) {
+ this.targetVehicleId = targetVehicleId;
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector out) {
+ if (trip.getVehicleId() == targetVehicleId) {
+ out.collect(trip);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Q2-continuous: vehicle={} t={} ({}, {})",
+ trip.getVehicleId(), trip.getTimestamp(), trip.getLon(), trip.getLat());
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q2SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q2SnapshotFunction.java
new file mode 100644
index 0000000..4468d82
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q2SnapshotFunction.java
@@ -0,0 +1,83 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q2 — snapshot form.
+ *
+ * "At time T, where is vehicle X?"
+ *
+ *
This is the parity-oracle form: streaming output at watermark T must
+ * equal the batch BerlinMOD-Q2 result on the same data up to T (the most
+ * recent known position of vehicle X on or before T).
+ *
+ *
Keyed by vehicleId (so the operator scales naturally if the queried
+ * vehicle changes, and so reuse across multiple queried vehicles is a
+ * fan-out keying choice rather than a code change). For events whose key
+ * matches {@code targetVehicleId}, update last-known state and register an
+ * event-time timer for the next snapshot tick. When the timer fires, emit
+ * {@code (T, lon, lat, t_of_last_event)}.
+ */
+public class Q2SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q2SnapshotFunction.class);
+
+ private final int targetVehicleId;
+ private final long snapshotTickMillis;
+
+ private transient ValueState> lastKnown; // (lon, lat, ts)
+
+ public Q2SnapshotFunction(int targetVehicleId, long snapshotTickMillis) {
+ this.targetVehicleId = targetVehicleId;
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> tInfo =
+ TypeInformation.of(new TypeHint>() {});
+ ValueStateDescriptor> desc =
+ new ValueStateDescriptor<>("q2LastKnownPosition", tInfo);
+ lastKnown = getRuntimeContext().getState(desc);
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ if (trip.getVehicleId() != targetVehicleId) {
+ return;
+ }
+ lastKnown.update(new Tuple3<>(trip.getLon(), trip.getLat(), trip.getTimestamp()));
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ Tuple3 p = lastKnown.value();
+ if (p == null) {
+ return;
+ }
+ out.collect(new Tuple4<>(timestamp, p.f0, p.f1, p.f2));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Q2-snapshot: T={} vehicle={} ({}, {}) at t={}",
+ timestamp, ctx.getCurrentKey(), p.f0, p.f1, p.f2);
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q2WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q2WindowedFunction.java
new file mode 100644
index 0000000..38ee0ff
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q2WindowedFunction.java
@@ -0,0 +1,58 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q2 — windowed form.
+ *
+ * "Per N-second tumbling window, what is vehicle X's most recent
+ * position seen within the window?"
+ *
+ *
For each window, filter to events matching {@code targetVehicleId}, keep
+ * the event with the largest timestamp, and emit
+ * {@code (windowStart, windowEnd, vehicleId, lon, lat)}. If the vehicle had
+ * no events in the window, emit nothing.
+ */
+public class Q2WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q2WindowedFunction.class);
+
+ private final int targetVehicleId;
+
+ public Q2WindowedFunction(int targetVehicleId) {
+ this.targetVehicleId = targetVehicleId;
+ }
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ BerlinMODTrip latest = null;
+ for (BerlinMODTrip trip : elements) {
+ if (trip.getVehicleId() != targetVehicleId) {
+ continue;
+ }
+ if (latest == null || trip.getTimestamp() > latest.getTimestamp()) {
+ latest = trip;
+ }
+ }
+ if (latest != null) {
+ out.collect(new Tuple5<>(
+ ctx.window().getStart(),
+ ctx.window().getEnd(),
+ latest.getVehicleId(),
+ latest.getLon(),
+ latest.getLat()));
+ LOG.info("Q2-windowed: [{}, {}) vehicle={} last=({}, {})",
+ ctx.window().getStart(), ctx.window().getEnd(),
+ latest.getVehicleId(), latest.getLon(), latest.getLat());
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java
new file mode 100644
index 0000000..6628d05
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java
@@ -0,0 +1,48 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q3 — continuous form.
+ *
+ * "At every moment, which vehicles are currently within {@code d} metres
+ * of point P?"
+ *
+ *
For each incoming GPS event {@link BerlinMODTrip}, evaluate the radius
+ * predicate and emit {@code (vehicleId, eventTimeMillis, isNear)} per event.
+ * No windowing — output updates per event, watermark-independent.
+ *
+ *
Predicate: {@link MEOSBridge#dwithinMetres} — MEOS' {@code geog_dwithin}
+ * over WGS84 geographies when libmeos is loadable, with a pure-Java great-circle
+ * fallback ({@link Haversine}) for runtimes without MEOS.
+ */
+public class Q3ContinuousFunction extends ProcessFunction> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q3ContinuousFunction.class);
+
+ private final double pLon;
+ private final double pLat;
+ private final double radiusMetres;
+
+ public Q3ContinuousFunction(double pLon, double pLat, double radiusMetres) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.radiusMetres = radiusMetres;
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) {
+ boolean near = MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(), pLon, pLat, radiusMetres);
+ out.collect(new Tuple3<>(trip.getVehicleId(), trip.getTimestamp(), near));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Q3-continuous: vehicle={} ts={} near={}", trip.getVehicleId(), trip.getTimestamp(), near);
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java
new file mode 100644
index 0000000..479854d
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java
@@ -0,0 +1,91 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q3 — snapshot form.
+ *
+ * "At time T, which vehicles are within {@code d} metres of point P?"
+ *
+ *
This is the parity-oracle form: streaming output at watermark T
+ * must equal the batch BerlinMOD-Q3 result on the same data up to T.
+ *
+ *
Keyed by vehicleId. Maintains a per-vehicle {@code lastKnownPosition}
+ * state. On each event, update the state, then register an event-time timer
+ * for the snapshot tick. When the timer fires at time T, evaluate the radius
+ * predicate against the most recent known position and emit
+ * {@code (T, vehicleId)} if the vehicle is within {@code d} of P at that
+ * snapshot.
+ *
+ *
Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin}
+ * when libmeos is loadable, with {@link Haversine} fallback otherwise. The
+ * snapshot-form output at watermark T is equal to the batch BerlinMOD-Q3
+ * result up to T regardless of which path is active.
+ */
+public class Q3SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q3SnapshotFunction.class);
+
+ private final double pLon;
+ private final double pLat;
+ private final double radiusMetres;
+ private final long snapshotTickMillis;
+
+ private transient ValueState> lastKnown; // (lon, lat, ts)
+
+ public Q3SnapshotFunction(
+ double pLon, double pLat, double radiusMetres, long snapshotTickMillis) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.radiusMetres = radiusMetres;
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> tInfo =
+ TypeInformation.of(new TypeHint>() {});
+ ValueStateDescriptor> desc =
+ new ValueStateDescriptor<>("lastKnownPosition", tInfo);
+ lastKnown = getRuntimeContext().getState(desc);
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ lastKnown.update(new Tuple3<>(trip.getLon(), trip.getLat(), trip.getTimestamp()));
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ Tuple3 p = lastKnown.value();
+ if (p == null) {
+ return;
+ }
+ if (MEOSBridge.dwithinMetres(p.f0, p.f1, pLon, pLat, radiusMetres)) {
+ Integer vehicleId = ctx.getCurrentKey();
+ out.collect(new Tuple2<>(timestamp, vehicleId));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Q3-snapshot: T={} vehicle={}", timestamp, vehicleId);
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java
new file mode 100644
index 0000000..c6dfa98
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java
@@ -0,0 +1,59 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * BerlinMOD-Q3 — windowed form.
+ *
+ * "Per N-second window, how many distinct vehicles were within
+ * {@code d} metres of point P at any time during the window?"
+ *
+ *
Tumbling event-time window of configurable size. For each window, scan
+ * all events whose timestamp falls in the window, count distinct vehicleIds
+ * for which at least one event satisfies the radius predicate, and emit
+ * {@code (windowStart, windowEnd, distinctCount)}.
+ *
+ *
Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin}
+ * over WGS84 geographies when libmeos is loadable, with {@link Haversine}
+ * fallback otherwise.
+ */
+public class Q3WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q3WindowedFunction.class);
+
+ private final double pLon;
+ private final double pLat;
+ private final double radiusMetres;
+
+ public Q3WindowedFunction(double pLon, double pLat, double radiusMetres) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.radiusMetres = radiusMetres;
+ }
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ Set distinctNear = new HashSet<>();
+ for (BerlinMODTrip trip : elements) {
+ if (MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(), pLon, pLat, radiusMetres)) {
+ distinctNear.add(trip.getVehicleId());
+ }
+ }
+ long count = distinctNear.size();
+ out.collect(new Tuple3<>(ctx.window().getStart(), ctx.window().getEnd(), count));
+ LOG.info("Q3-windowed: [{}, {}) distinct-near={}",
+ ctx.window().getStart(), ctx.window().getEnd(), count);
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q4ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q4ContinuousFunction.java
new file mode 100644
index 0000000..5a2ece9
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q4ContinuousFunction.java
@@ -0,0 +1,62 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q4 — continuous form.
+ *
+ * "Which vehicles entered region R (transition outside → inside),
+ * and when?"
+ *
+ *
Keyed by vehicleId. Per-vehicle state tracks the last seen
+ * inside-or-outside flag for R. On each event, computes the current
+ * inside-or-outside, and if the transition is outside→inside, emits
+ * {@code (vehicleId, entryTime)}.
+ *
+ *
Predicate: pure-Java axis-aligned point-in-box. The rectangular region
+ * is degenerate as a geographic predicate (no projection needed); a generic
+ * polygon-R variant would route through {@link MEOSBridge} for MEOS
+ * {@code eintersects_tgeo_geo}.
+ */
+public class Q4ContinuousFunction
+ extends KeyedProcessFunction> {
+
+ private final double xmin, ymin, xmax, ymax;
+ private transient ValueState wasInside;
+
+ public Q4ContinuousFunction(double xmin, double ymin, double xmax, double ymax) {
+ this.xmin = xmin;
+ this.ymin = ymin;
+ this.xmax = xmax;
+ this.ymax = ymax;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ wasInside = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q4WasInside", Boolean.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ boolean isInside = inBox(trip.getLon(), trip.getLat());
+ Boolean prev = wasInside.value();
+ boolean prevInside = prev != null && prev;
+ if (isInside && !prevInside) {
+ out.collect(new Tuple2<>(trip.getVehicleId(), trip.getTimestamp()));
+ }
+ wasInside.update(isInside);
+ }
+
+ private boolean inBox(double lon, double lat) {
+ return lon >= xmin && lon <= xmax && lat >= ymin && lat <= ymax;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q4SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q4SnapshotFunction.java
new file mode 100644
index 0000000..556e564
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q4SnapshotFunction.java
@@ -0,0 +1,82 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q4 — snapshot form.
+ *
+ * "At time T, what is the list of (vehicleId, entryTime) pairs for all
+ * vehicles that entered region R at or before T?"
+ *
+ *
This is the parity-oracle form: streaming output at watermark T must
+ * equal the batch BerlinMOD-Q4 result on the same data up to T.
+ *
+ *
Keyed by vehicleId. Per-vehicle state: a {@code wasInside} flag plus a
+ * {@code ListState} of recorded entry times. On each event, detect
+ * outside → inside transitions and append entry time. Register an event-time
+ * timer at the next snapshot tick. On timer fire at T, emit one
+ * {@code (T, vehicleId, entryTime)} per recorded entry.
+ */
+public class Q4SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private final double xmin, ymin, xmax, ymax;
+ private final long snapshotTickMillis;
+ private transient ValueState wasInside;
+ private transient ListState entries;
+
+ public Q4SnapshotFunction(double xmin, double ymin, double xmax, double ymax, long snapshotTickMillis) {
+ this.xmin = xmin;
+ this.ymin = ymin;
+ this.xmax = xmax;
+ this.ymax = ymax;
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ wasInside = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q4SnapWasInside", Boolean.class));
+ entries = getRuntimeContext().getListState(
+ new ListStateDescriptor<>("q4SnapEntries", Long.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ boolean curr = inBox(trip.getLon(), trip.getLat());
+ Boolean prev = wasInside.value();
+ boolean prevInside = prev != null && prev;
+ if (curr && !prevInside) {
+ entries.add(trip.getTimestamp());
+ }
+ wasInside.update(curr);
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ for (Long entry : entries.get()) {
+ if (entry <= timestamp) {
+ out.collect(new Tuple3<>(timestamp, ctx.getCurrentKey(), entry));
+ }
+ }
+ }
+
+ private boolean inBox(double lon, double lat) {
+ return lon >= xmin && lon <= xmax && lat >= ymin && lat <= ymax;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q4WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q4WindowedFunction.java
new file mode 100644
index 0000000..360d589
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q4WindowedFunction.java
@@ -0,0 +1,74 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BerlinMOD-Q4 — windowed form.
+ *
+ * "Per N-second tumbling window, which vehicles entered region R during
+ * the window, and at what time?"
+ *
+ *
Scans all events in the window, sorted per-vehicle by time, and detects
+ * outside → inside transitions within the window. Emits one
+ * {@code (windowStart, windowEnd, vehicleId, entryTime)} per detected entry.
+ *
+ *
Note: a vehicle's "outside" state at window start is inferred only from
+ * the window's first event (no cross-window state). This intra-window
+ * scoping matches BerlinMOD-Q4's "first N entries during a period" form.
+ */
+public class Q4WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ private final double xmin, ymin, xmax, ymax;
+
+ public Q4WindowedFunction(double xmin, double ymin, double xmax, double ymax) {
+ this.xmin = xmin;
+ this.ymin = ymin;
+ this.xmax = xmax;
+ this.ymax = ymax;
+ }
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ Map> perVehicle = new HashMap<>();
+ for (BerlinMODTrip trip : elements) {
+ perVehicle.computeIfAbsent(trip.getVehicleId(), k -> new ArrayList<>()).add(trip);
+ }
+ for (Map.Entry> e : perVehicle.entrySet()) {
+ List sorted = e.getValue();
+ sorted.sort((a, b) -> Long.compare(a.getTimestamp(), b.getTimestamp()));
+ boolean prevInside = false; // intra-window only — treats first event as the prior
+ for (int i = 0; i < sorted.size(); i++) {
+ BerlinMODTrip t = sorted.get(i);
+ boolean curr = inBox(t.getLon(), t.getLat());
+ if (i == 0) {
+ if (curr) {
+ // first event already inside — count as entry per the
+ // intra-window scoping convention (no prior visibility)
+ out.collect(new Tuple4<>(ctx.window().getStart(), ctx.window().getEnd(),
+ e.getKey(), t.getTimestamp()));
+ }
+ } else if (curr && !prevInside) {
+ out.collect(new Tuple4<>(ctx.window().getStart(), ctx.window().getEnd(),
+ e.getKey(), t.getTimestamp()));
+ }
+ prevInside = curr;
+ }
+ }
+ }
+
+ private boolean inBox(double lon, double lat) {
+ return lon >= xmin && lon <= xmax && lat >= ymin && lat <= ymax;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q5ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q5ContinuousFunction.java
new file mode 100644
index 0000000..d14f272
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q5ContinuousFunction.java
@@ -0,0 +1,93 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BerlinMOD-Q5 — continuous form.
+ *
+ * "Which pairs of vehicles are currently meeting near point P?"
+ *
+ *
A pair {@code (a, b)} meets near P when both vehicles are within
+ * {@code dP} metres of {@code P} and the distance between them is at most
+ * {@code dMeet} metres.
+ *
+ *
Caller is expected to key the input stream by a constant so the shared
+ * cross-vehicle last-known state lives in a single subtask. Per-event:
+ * update the last-known position of the event's vehicle, then enumerate all
+ * known pairs and emit {@code (a, b, eventTime, distanceMetres)} for every
+ * currently-meeting pair (with {@code a < b} for stable identity).
+ *
+ *
Predicate: {@link MEOSBridge#dwithinMetres} for the near-P filter and
+ * for the pairwise meeting predicate — MEOS {@code geog_dwithin} when
+ * libmeos is loadable, with {@link Haversine} fallback otherwise.
+ */
+public class Q5ContinuousFunction
+ extends KeyedProcessFunction> {
+
+ private final double pLon, pLat, dPMetres, dMeetMetres;
+ private transient MapState> lastPos;
+
+ public Q5ContinuousFunction(double pLon, double pLat, double dPMetres, double dMeetMetres) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.dPMetres = dPMetres;
+ this.dMeetMetres = dMeetMetres;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> vInfo =
+ TypeInformation.of(new TypeHint>() {});
+ lastPos = getRuntimeContext().getMapState(
+ new MapStateDescriptor<>("q5LastPos", TypeInformation.of(Integer.class), vInfo));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ lastPos.put(trip.getVehicleId(), new Tuple2<>(trip.getLon(), trip.getLat()));
+
+ // Snapshot the map and pick pairs near P
+ Map> snap = new HashMap<>();
+ for (Map.Entry> e : lastPos.entries()) {
+ snap.put(e.getKey(), e.getValue());
+ }
+ List>> nearP = new ArrayList<>();
+ for (Map.Entry> e : snap.entrySet()) {
+ Tuple2 p = e.getValue();
+ if (MEOSBridge.dwithinMetres(p.f0, p.f1, pLon, pLat, dPMetres)) {
+ nearP.add(e);
+ }
+ }
+ nearP.sort(Comparator.comparingInt(Map.Entry::getKey));
+
+ for (int i = 0; i < nearP.size(); i++) {
+ for (int j = i + 1; j < nearP.size(); j++) {
+ Tuple2 a = nearP.get(i).getValue();
+ Tuple2 b = nearP.get(j).getValue();
+ double d = MEOSBridge.distanceMetres(a.f0, a.f1, b.f0, b.f1);
+ if (d <= dMeetMetres) {
+ out.collect(new Tuple4<>(
+ nearP.get(i).getKey(), nearP.get(j).getKey(),
+ trip.getTimestamp(), d));
+ }
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q5SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q5SnapshotFunction.java
new file mode 100644
index 0000000..b2f2758
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q5SnapshotFunction.java
@@ -0,0 +1,97 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BerlinMOD-Q5 — snapshot form.
+ *
+ * "At time T, which pairs of vehicles are meeting near P (using each
+ * vehicle's most-recent-known position on or before T)?"
+ *
+ *
This is the parity-oracle form: streaming output at watermark T must
+ * equal the batch BerlinMOD-Q5 result on the same data up to T.
+ *
+ *
Caller is expected to key the stream by a constant; the
+ * cross-vehicle last-known state is a {@link MapState}. On each event,
+ * update last-known and register an event-time timer at the next snapshot
+ * tick. On timer fire at T, snapshot the map and emit all meeting pairs.
+ */
+public class Q5SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private final double pLon, pLat, dPMetres, dMeetMetres;
+ private final long snapshotTickMillis;
+ private transient MapState> lastPos;
+
+ public Q5SnapshotFunction(double pLon, double pLat, double dPMetres,
+ double dMeetMetres, long snapshotTickMillis) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.dPMetres = dPMetres;
+ this.dMeetMetres = dMeetMetres;
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> vInfo =
+ TypeInformation.of(new TypeHint>() {});
+ lastPos = getRuntimeContext().getMapState(
+ new MapStateDescriptor<>("q5SnapLastPos", TypeInformation.of(Integer.class), vInfo));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ lastPos.put(trip.getVehicleId(), new Tuple2<>(trip.getLon(), trip.getLat()));
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ Map> snap = new HashMap<>();
+ for (Map.Entry> e : lastPos.entries()) {
+ snap.put(e.getKey(), e.getValue());
+ }
+ List>> nearP = new ArrayList<>();
+ for (Map.Entry> e : snap.entrySet()) {
+ Tuple2 p = e.getValue();
+ if (MEOSBridge.dwithinMetres(p.f0, p.f1, pLon, pLat, dPMetres)) {
+ nearP.add(e);
+ }
+ }
+ nearP.sort(Comparator.comparingInt(Map.Entry::getKey));
+
+ for (int i = 0; i < nearP.size(); i++) {
+ for (int j = i + 1; j < nearP.size(); j++) {
+ Tuple2 a = nearP.get(i).getValue();
+ Tuple2 b = nearP.get(j).getValue();
+ double d = MEOSBridge.distanceMetres(a.f0, a.f1, b.f0, b.f1);
+ if (d <= dMeetMetres) {
+ out.collect(new Tuple4<>(timestamp,
+ nearP.get(i).getKey(), nearP.get(j).getKey(), d));
+ }
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q5WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q5WindowedFunction.java
new file mode 100644
index 0000000..8140d0f
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q5WindowedFunction.java
@@ -0,0 +1,75 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BerlinMOD-Q5 — windowed form.
+ *
+ * "Per N-second tumbling window, which pairs of vehicles met near P
+ * (using each vehicle's last-seen-in-window position)?"
+ *
+ *
Within each window, take each vehicle's latest position from the
+ * window's events. Run the same near-P-and-within-meet-distance pair check
+ * as the continuous form. Emit {@code (windowStart, windowEnd, a, b,
+ * distanceMetres)} per meeting pair.
+ */
+public class Q5WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ private final double pLon, pLat, dPMetres, dMeetMetres;
+
+ public Q5WindowedFunction(double pLon, double pLat, double dPMetres, double dMeetMetres) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.dPMetres = dPMetres;
+ this.dMeetMetres = dMeetMetres;
+ }
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ // Last position per vehicle within the window
+ Map latest = new HashMap<>();
+ for (BerlinMODTrip trip : elements) {
+ BerlinMODTrip prev = latest.get(trip.getVehicleId());
+ if (prev == null || trip.getTimestamp() > prev.getTimestamp()) {
+ latest.put(trip.getVehicleId(), trip);
+ }
+ }
+
+ // Filter to vehicles near P
+ List>> nearP = new ArrayList<>();
+ for (Map.Entry e : latest.entrySet()) {
+ BerlinMODTrip t = e.getValue();
+ if (MEOSBridge.dwithinMetres(t.getLon(), t.getLat(), pLon, pLat, dPMetres)) {
+ nearP.add(new HashMap.SimpleEntry<>(e.getKey(), new Tuple2<>(t.getLon(), t.getLat())));
+ }
+ }
+ nearP.sort(Comparator.comparingInt(Map.Entry::getKey));
+
+ for (int i = 0; i < nearP.size(); i++) {
+ for (int j = i + 1; j < nearP.size(); j++) {
+ Tuple2 a = nearP.get(i).getValue();
+ Tuple2 b = nearP.get(j).getValue();
+ double d = MEOSBridge.distanceMetres(a.f0, a.f1, b.f0, b.f1);
+ if (d <= dMeetMetres) {
+ out.collect(new Tuple5<>(
+ ctx.window().getStart(), ctx.window().getEnd(),
+ nearP.get(i).getKey(), nearP.get(j).getKey(), d));
+ }
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q6ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q6ContinuousFunction.java
new file mode 100644
index 0000000..85204f3
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q6ContinuousFunction.java
@@ -0,0 +1,60 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q6 — continuous form.
+ *
+ * "What is each vehicle's cumulative distance travelled so far?"
+ *
+ *
Keyed by vehicleId. For each event, computes the great-circle distance
+ * from the previous-known position (or 0 if first event), adds it to the
+ * cumulative total, and emits {@code (vehicleId, t, cumulativeMetres)}.
+ *
+ *
Predicate today: pure-Java great-circle distance (see {@link Haversine}).
+ * Same MEOS-side analogue as Q3 — a future JMEOS bridge would replace the
+ * Java accumulator with a MEOS {@code length} call over the per-vehicle
+ * trajectory.
+ */
+public class Q6ContinuousFunction
+ extends KeyedProcessFunction> {
+
+ private transient ValueState> lastPos; // lon, lat
+ private transient ValueState totalDist;
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> posType =
+ TypeInformation.of(new TypeHint>() {});
+ lastPos = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q6LastPos", posType));
+ totalDist = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q6TotalDist", Double.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ Tuple2 prev = lastPos.value();
+ Double total = totalDist.value();
+ if (total == null) {
+ total = 0.0;
+ }
+ if (prev != null) {
+ total += MEOSBridge.distanceMetres(prev.f0, prev.f1, trip.getLon(), trip.getLat());
+ }
+ lastPos.update(new Tuple2<>(trip.getLon(), trip.getLat()));
+ totalDist.update(total);
+ out.collect(new Tuple3<>(trip.getVehicleId(), trip.getTimestamp(), total));
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q6SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q6SnapshotFunction.java
new file mode 100644
index 0000000..4270f67
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q6SnapshotFunction.java
@@ -0,0 +1,76 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q6 — snapshot form.
+ *
+ * "At time T, what is each vehicle's total distance travelled up to T?"
+ *
+ *
This is the parity-oracle form: streaming output at watermark T must
+ * equal the batch BerlinMOD-Q6 result on the same data up to T.
+ *
+ *
Keyed by vehicleId. Per event, update {@code lastPos}/{@code totalDist}
+ * state (matching {@link Q6ContinuousFunction}) and register an event-time
+ * timer at the next snapshot tick. On timer fire at T, emit
+ * {@code (T, vehicleId, totalMetres)}.
+ */
+public class Q6SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private final long snapshotTickMillis;
+ private transient ValueState> lastPos;
+ private transient ValueState totalDist;
+
+ public Q6SnapshotFunction(long snapshotTickMillis) {
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> posType =
+ TypeInformation.of(new TypeHint>() {});
+ lastPos = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q6SnapLastPos", posType));
+ totalDist = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q6SnapTotalDist", Double.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ Tuple2 prev = lastPos.value();
+ Double total = totalDist.value();
+ if (total == null) {
+ total = 0.0;
+ }
+ if (prev != null) {
+ total += MEOSBridge.distanceMetres(prev.f0, prev.f1, trip.getLon(), trip.getLat());
+ }
+ lastPos.update(new Tuple2<>(trip.getLon(), trip.getLat()));
+ totalDist.update(total);
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ Double total = totalDist.value();
+ if (total != null) {
+ out.collect(new Tuple3<>(timestamp, ctx.getCurrentKey(), total));
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q6WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q6WindowedFunction.java
new file mode 100644
index 0000000..0fc9501
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q6WindowedFunction.java
@@ -0,0 +1,45 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * BerlinMOD-Q6 — windowed form.
+ *
+ * "Per N-second tumbling window, per vehicle, how far did the vehicle
+ * travel during the window?"
+ *
+ *
Keyed by vehicleId, tumbling event-time window. Within each window,
+ * sort events by timestamp and accumulate great-circle distances between
+ * consecutive points. Emit {@code (windowStart, windowEnd, vehicleId,
+ * distanceMetres)}.
+ */
+public class Q6WindowedFunction
+ extends ProcessWindowFunction, Integer, TimeWindow> {
+
+ @Override
+ public void process(
+ Integer vehicleId,
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ List sorted = new ArrayList<>();
+ for (BerlinMODTrip trip : elements) {
+ sorted.add(trip);
+ }
+ sorted.sort((a, b) -> Long.compare(a.getTimestamp(), b.getTimestamp()));
+ double total = 0.0;
+ for (int i = 1; i < sorted.size(); i++) {
+ BerlinMODTrip prev = sorted.get(i - 1);
+ BerlinMODTrip curr = sorted.get(i);
+ total += MEOSBridge.distanceMetres(prev.getLon(), prev.getLat(),
+ curr.getLon(), curr.getLat());
+ }
+ out.collect(new Tuple4<>(ctx.window().getStart(), ctx.window().getEnd(), vehicleId, total));
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q7ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q7ContinuousFunction.java
new file mode 100644
index 0000000..bc4dc83
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q7ContinuousFunction.java
@@ -0,0 +1,57 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * BerlinMOD-Q7 — continuous form.
+ *
+ * "For each (vehicle, POI) pair, when did the vehicle first come within
+ * the POI's radius?"
+ *
+ *
Keyed by vehicleId. State is a per-vehicle {@code MapState}. On each event, walk the POI list; if the vehicle is
+ * within a POI's radius AND no first-passage has been recorded for that
+ * (vehicle, POI), record it and emit {@code (vehicleId, poiId, firstPassageTime)}.
+ */
+public class Q7ContinuousFunction
+ extends KeyedProcessFunction> {
+
+ private final List pois;
+ private transient MapState firstPassed;
+
+ public Q7ContinuousFunction(List pois) {
+ this.pois = pois;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ firstPassed = getRuntimeContext().getMapState(
+ new MapStateDescriptor<>("q7FirstPassed",
+ TypeInformation.of(Integer.class),
+ TypeInformation.of(Long.class)));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ for (PointOfInterest poi : pois) {
+ if (firstPassed.contains(poi.id)) {
+ continue;
+ }
+ if (MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(), poi.lon, poi.lat, poi.radiusMetres)) {
+ firstPassed.put(poi.id, trip.getTimestamp());
+ out.collect(new Tuple3<>(trip.getVehicleId(), poi.id, trip.getTimestamp()));
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q7SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q7SnapshotFunction.java
new file mode 100644
index 0000000..49637f6
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q7SnapshotFunction.java
@@ -0,0 +1,85 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * BerlinMOD-Q7 — snapshot form.
+ *
+ * "At time T, for each (vehicle, POI), the first time the vehicle came
+ * within the POI's radius on or before T."
+ *
+ *
This is the parity-oracle form: streaming output at watermark T must
+ * equal the batch BerlinMOD-Q7 result on the same data up to T.
+ *
+ *
Keyed by vehicleId. Per-vehicle {@code MapState}.
+ * On each event, detect new first-passages (matching {@link Q7ContinuousFunction})
+ * and register an event-time timer at the next snapshot tick. On timer fire
+ * at T, emit one {@code (T, vehicleId, poiId, firstPassageTime)} per recorded
+ * first-passage with {@code firstPassageTime ≤ T}.
+ */
+public class Q7SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private final List pois;
+ private final long snapshotTickMillis;
+ private transient MapState firstPassed;
+
+ public Q7SnapshotFunction(List pois, long snapshotTickMillis) {
+ this.pois = pois;
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ firstPassed = getRuntimeContext().getMapState(
+ new MapStateDescriptor<>("q7SnapFirstPassed",
+ TypeInformation.of(Integer.class),
+ TypeInformation.of(Long.class)));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ for (PointOfInterest poi : pois) {
+ if (firstPassed.contains(poi.id)) {
+ continue;
+ }
+ if (MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(),
+ poi.lon, poi.lat, poi.radiusMetres)) {
+ firstPassed.put(poi.id, trip.getTimestamp());
+ }
+ }
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ // Iterate in poiId order for deterministic output
+ Map sorted = new TreeMap<>(Comparator.naturalOrder());
+ for (Map.Entry e : firstPassed.entries()) {
+ sorted.put(e.getKey(), e.getValue());
+ }
+ for (Map.Entry e : sorted.entrySet()) {
+ if (e.getValue() <= timestamp) {
+ out.collect(new Tuple4<>(timestamp, ctx.getCurrentKey(), e.getKey(), e.getValue()));
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q7WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q7WindowedFunction.java
new file mode 100644
index 0000000..e256c81
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q7WindowedFunction.java
@@ -0,0 +1,69 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * BerlinMOD-Q7 — windowed form.
+ *
+ * "Per N-second tumbling window, for each (vehicle, POI), what was the
+ * vehicle's first event in the window that placed it inside the POI's
+ * radius?"
+ *
+ *
Intra-window scoping (no cross-window first-passage state). Per window:
+ * group events by vehicleId, sort by time, walk and for each POI emit the
+ * timestamp of the first event in the window where the vehicle is inside
+ * that POI.
+ */
+public class Q7WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ private final List