diff --git a/Input/input_berlinmod.csv b/Input/input_berlinmod.csv new file mode 100644 index 0000000000..753a68124b --- /dev/null +++ b/Input/input_berlinmod.csv @@ -0,0 +1,21 @@ +1735711200,100,4.3517,50.8503 +1735711200,300,4.2000,50.7500 +1735711201,200,4.3060,50.8270 +1735711202,100,4.3517,50.8503 +1735711202,300,4.2000,50.7500 +1735711203,200,4.3060,50.8270 +1735711204,100,4.3517,50.8503 +1735711204,300,4.2000,50.7500 +1735711205,200,4.3060,50.8270 +1735711206,100,4.3517,50.8503 +1735711206,300,4.2000,50.7500 +1735711207,200,4.3060,50.8270 +1735711208,100,4.3517,50.8503 +1735711208,300,4.2000,50.7500 +1735711209,200,4.3060,50.8270 +1735711210,100,4.3517,50.8503 +1735711210,300,4.2000,50.7500 +1735711211,200,4.3060,50.8270 +1735711212,100,4.3517,50.8503 +1735711212,300,4.2000,50.7500 +1735711213,200,4.3060,50.8270 diff --git a/Queries/berlinmod/q1_continuous.yaml b/Queries/berlinmod/q1_continuous.yaml new file mode 100644 index 0000000000..49786049e8 --- /dev/null +++ b/Queries/berlinmod/q1_continuous.yaml @@ -0,0 +1,47 @@ +# BerlinMOD-Q1 — continuous form +# "Which vehicles have appeared in the stream?" +# Per 1-second sliding bucket: emit (start, end, vehicle_id, event-count-in-bucket). +# Reading N rows over consecutive buckets enumerates the distinct-vehicles-seen set. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q1_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q1_snapshot.yaml b/Queries/berlinmod/q1_snapshot.yaml new file mode 100644 index 0000000000..4fa9c05d63 --- /dev/null +++ b/Queries/berlinmod/q1_snapshot.yaml @@ -0,0 +1,46 @@ +# BerlinMOD-Q1 — snapshot form +# "At each 5-second tick, list of distinct vehicles seen in the tick window." +# Streaming approximation of the batch BerlinMOD-Q1 snapshot at time T. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q1_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q1_windowed.yaml b/Queries/berlinmod/q1_windowed.yaml new file mode 100644 index 0000000000..2d25214d24 --- /dev/null +++ b/Queries/berlinmod/q1_windowed.yaml @@ -0,0 +1,46 @@ +# BerlinMOD-Q1 — windowed form +# "Per 10-second tumbling window, distinct vehicles seen." +# Emits one row per (window, vehicle) seen; reading N rows per window = distinctCount. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q1_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q2_continuous.yaml b/Queries/berlinmod/q2_continuous.yaml new file mode 100644 index 0000000000..1d89420d19 --- /dev/null +++ b/Queries/berlinmod/q2_continuous.yaml @@ -0,0 +1,44 @@ +# BerlinMOD-Q2 — continuous form +# "Where is vehicle X (= 200) right now?" +# Per 1-second sliding bucket, emit a trajectory snippet for vehicle X. + +query: | + SELECT start, + end, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE vehicle_id = UINT64(200) + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q2_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q2_snapshot.yaml b/Queries/berlinmod/q2_snapshot.yaml new file mode 100644 index 0000000000..af0946bb57 --- /dev/null +++ b/Queries/berlinmod/q2_snapshot.yaml @@ -0,0 +1,43 @@ +# BerlinMOD-Q2 — snapshot form +# "At each 5-second tick, snapshot of vehicle X's (= 200) trajectory in the tick." + +query: | + SELECT start, + end, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE vehicle_id = UINT64(200) + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q2_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q2_windowed.yaml b/Queries/berlinmod/q2_windowed.yaml new file mode 100644 index 0000000000..d2ae83bc8c --- /dev/null +++ b/Queries/berlinmod/q2_windowed.yaml @@ -0,0 +1,43 @@ +# BerlinMOD-Q2 — windowed form +# "Per 10-second tumbling window, trajectory of vehicle X (= 200)." + +query: | + SELECT start, + end, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE vehicle_id = UINT64(200) + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q2_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q3_continuous.yaml b/Queries/berlinmod/q3_continuous.yaml new file mode 100644 index 0000000000..bfae2d7c81 --- /dev/null +++ b/Queries/berlinmod/q3_continuous.yaml @@ -0,0 +1,49 @@ +# BerlinMOD-Q3 — continuous form +# "Vehicles within 5 km of Brussels city centre, right now." +# Per 1-second sliding bucket, emit (start, end, vehicle_id) for events near P. + +query: | + SELECT start, + end, + vehicle_id + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q3_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q3_snapshot.yaml b/Queries/berlinmod/q3_snapshot.yaml new file mode 100644 index 0000000000..673373d1ea --- /dev/null +++ b/Queries/berlinmod/q3_snapshot.yaml @@ -0,0 +1,50 @@ +# BerlinMOD-Q3 — snapshot form +# "At each 5-second tick, distinct vehicles within 5 km of P." + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_p + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_P, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q3_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q3_windowed.yaml b/Queries/berlinmod/q3_windowed.yaml new file mode 100644 index 0000000000..3d54f1aa75 --- /dev/null +++ b/Queries/berlinmod/q3_windowed.yaml @@ -0,0 +1,50 @@ +# BerlinMOD-Q3 — windowed form +# "Per 10-second tumbling window, distinct vehicles within 5 km of P." + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_p + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_P, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q3_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q4_continuous.yaml b/Queries/berlinmod/q4_continuous.yaml new file mode 100644 index 0000000000..03b1e852e9 --- /dev/null +++ b/Queries/berlinmod/q4_continuous.yaml @@ -0,0 +1,49 @@ +# BerlinMOD-Q4 — continuous form +# "Vehicles currently inside region R (Brussels centre rectangle)." +# R encoded as polygon; edwithin with d=0 ≡ "inside the polygon". + +query: | + SELECT start, + end, + vehicle_id + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POLYGON((4.30 50.84, 4.36 50.84, 4.36 50.86, 4.30 50.86, 4.30 50.84))', + FLOAT64(0.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q4_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q4_snapshot.yaml b/Queries/berlinmod/q4_snapshot.yaml new file mode 100644 index 0000000000..f9042070b1 --- /dev/null +++ b/Queries/berlinmod/q4_snapshot.yaml @@ -0,0 +1,50 @@ +# BerlinMOD-Q4 — snapshot form +# "At each 5-second tick, distinct vehicles inside region R." + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_in_r + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POLYGON((4.30 50.84, 4.36 50.84, 4.36 50.86, 4.30 50.86, 4.30 50.84))', + FLOAT64(0.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_IN_R, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q4_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q4_windowed.yaml b/Queries/berlinmod/q4_windowed.yaml new file mode 100644 index 0000000000..17162eafbb --- /dev/null +++ b/Queries/berlinmod/q4_windowed.yaml @@ -0,0 +1,51 @@ +# BerlinMOD-Q4 — windowed form +# "Per 10-second tumbling window, distinct vehicles inside region R." +# Intra-window scoping: a vehicle present inside R during the window is reported. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_in_r + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POLYGON((4.30 50.84, 4.36 50.84, 4.36 50.86, 4.30 50.86, 4.30 50.84))', + FLOAT64(0.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_IN_R, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q4_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q5_continuous.yaml b/Queries/berlinmod/q5_continuous.yaml new file mode 100644 index 0000000000..3fc13355ae --- /dev/null +++ b/Queries/berlinmod/q5_continuous.yaml @@ -0,0 +1,51 @@ +# BerlinMOD-Q5 — continuous form (FULL) +# "Pairs of vehicles meeting near P." Per-second sliding window over the events +# pre-filtered by upstream edwithin_tgeo_geo to the near-P set; the +# PAIR_MEETING aggregation enumerates pairs of vehicles inside the window and +# emits the BerlinMOD-Q5 answer directly (vid_a, vid_b, ts, "<=dMeet" tag) +# with dMeet = 200 m hardcoded for the scaffold. + +query: | + SELECT start, + end, + PAIR_MEETING(gps_lon, gps_lat, time_utc, vehicle_id) AS meeting_pairs + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(2000.0)) = INT32(1) + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$MEETING_PAIRS, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q5_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q5_snapshot.yaml b/Queries/berlinmod/q5_snapshot.yaml new file mode 100644 index 0000000000..0b49b636d8 --- /dev/null +++ b/Queries/berlinmod/q5_snapshot.yaml @@ -0,0 +1,50 @@ +# BerlinMOD-Q5 — snapshot form (FULL) +# "Pairs of vehicles meeting near P." Per-5s tumbling-tick window over the +# events pre-filtered by upstream edwithin_tgeo_geo to the near-P set; +# PAIR_MEETING emits the per-tick meeting pairs as a VARSIZED string. The +# snapshot at time T equals the batch BerlinMOD-Q5 result up to T. + +query: | + SELECT start, + end, + PAIR_MEETING(gps_lon, gps_lat, time_utc, vehicle_id) AS meeting_pairs + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(2000.0)) = INT32(1) + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$MEETING_PAIRS, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q5_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q5_windowed.yaml b/Queries/berlinmod/q5_windowed.yaml new file mode 100644 index 0000000000..d7aa2581dc --- /dev/null +++ b/Queries/berlinmod/q5_windowed.yaml @@ -0,0 +1,50 @@ +# BerlinMOD-Q5 — windowed form (FULL) +# "Pairs of vehicles meeting near P." Per-10s tumbling window over the events +# pre-filtered by upstream edwithin_tgeo_geo to the near-P set; PAIR_MEETING +# emits the per-window meeting pairs (vid_a, vid_b, ts, "<=dMeet" tag) with +# dMeet = 200 m hardcoded for the scaffold. + +query: | + SELECT start, + end, + PAIR_MEETING(gps_lon, gps_lat, time_utc, vehicle_id) AS meeting_pairs + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(2000.0)) = INT32(1) + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$MEETING_PAIRS, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q5_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q6_continuous.yaml b/Queries/berlinmod/q6_continuous.yaml new file mode 100644 index 0000000000..7b13911408 --- /dev/null +++ b/Queries/berlinmod/q6_continuous.yaml @@ -0,0 +1,48 @@ +# BerlinMOD-Q6 — continuous form (FULL) +# "Cumulative distance travelled per vehicle." Per-second sliding window +# aggregates each vehicle's GPS samples and emits the spheroidal length in +# metres of the per-(window, vehicle) trajectory directly via the +# TEMPORAL_LENGTH aggregation. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_LENGTH(gps_lon, gps_lat, time_utc) AS cumulative_distance + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$CUMULATIVE_DISTANCE, type: FLOAT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q6_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q6_snapshot.yaml b/Queries/berlinmod/q6_snapshot.yaml new file mode 100644 index 0000000000..b8e20b3ffe --- /dev/null +++ b/Queries/berlinmod/q6_snapshot.yaml @@ -0,0 +1,49 @@ +# BerlinMOD-Q6 — snapshot form (FULL) +# "Cumulative distance travelled per vehicle." Per-5s tumbling-tick window +# aggregates each vehicle's GPS samples and emits the spheroidal length in +# metres of the per-(tick, vehicle) trajectory directly via the +# TEMPORAL_LENGTH aggregation. The snapshot output at time T equals the +# batch BerlinMOD-Q6 result on data up to T. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_LENGTH(gps_lon, gps_lat, time_utc) AS cumulative_distance + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$CUMULATIVE_DISTANCE, type: FLOAT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q6_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q6_windowed.yaml b/Queries/berlinmod/q6_windowed.yaml new file mode 100644 index 0000000000..749c3ba1bb --- /dev/null +++ b/Queries/berlinmod/q6_windowed.yaml @@ -0,0 +1,48 @@ +# BerlinMOD-Q6 — windowed form (FULL) +# "Cumulative distance travelled per vehicle." Per-10s tumbling window +# aggregates each vehicle's GPS samples and emits the spheroidal length in +# metres of the per-(window, vehicle) trajectory directly via the +# TEMPORAL_LENGTH aggregation. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_LENGTH(gps_lon, gps_lat, time_utc) AS cumulative_distance + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$CUMULATIVE_DISTANCE, type: FLOAT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q6_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi1_continuous.yaml b/Queries/berlinmod/q7_poi1_continuous.yaml new file mode 100644 index 0000000000..36a7f2418d --- /dev/null +++ b/Queries/berlinmod/q7_poi1_continuous.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — continuous form, POI 1 (4.3517, 50.8503, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 1." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi1_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi1_snapshot.yaml b/Queries/berlinmod/q7_poi1_snapshot.yaml new file mode 100644 index 0000000000..2e0f7acb9f --- /dev/null +++ b/Queries/berlinmod/q7_poi1_snapshot.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — snapshot form, POI 1 (4.3517, 50.8503, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 1." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi1_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi1_windowed.yaml b/Queries/berlinmod/q7_poi1_windowed.yaml new file mode 100644 index 0000000000..b81dec6c1e --- /dev/null +++ b/Queries/berlinmod/q7_poi1_windowed.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — windowed form, POI 1 (4.3517, 50.8503, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 1." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi1_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi2_continuous.yaml b/Queries/berlinmod/q7_poi2_continuous.yaml new file mode 100644 index 0000000000..043c82680c --- /dev/null +++ b/Queries/berlinmod/q7_poi2_continuous.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — continuous form, POI 2 (4.3060, 50.8270, r=1000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 2." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3060 50.8270)', + FLOAT64(1000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi2_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi2_snapshot.yaml b/Queries/berlinmod/q7_poi2_snapshot.yaml new file mode 100644 index 0000000000..82ad22bcd3 --- /dev/null +++ b/Queries/berlinmod/q7_poi2_snapshot.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — snapshot form, POI 2 (4.3060, 50.8270, r=1000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 2." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3060 50.8270)', + FLOAT64(1000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi2_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi2_windowed.yaml b/Queries/berlinmod/q7_poi2_windowed.yaml new file mode 100644 index 0000000000..6925ba5480 --- /dev/null +++ b/Queries/berlinmod/q7_poi2_windowed.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — windowed form, POI 2 (4.3060, 50.8270, r=1000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 2." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3060 50.8270)', + FLOAT64(1000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi2_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi3_continuous.yaml b/Queries/berlinmod/q7_poi3_continuous.yaml new file mode 100644 index 0000000000..414d8133d8 --- /dev/null +++ b/Queries/berlinmod/q7_poi3_continuous.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — continuous form, POI 3 (4.2100, 50.7600, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 3." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.2100 50.7600)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi3_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi3_snapshot.yaml b/Queries/berlinmod/q7_poi3_snapshot.yaml new file mode 100644 index 0000000000..141a9b853b --- /dev/null +++ b/Queries/berlinmod/q7_poi3_snapshot.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — snapshot form, POI 3 (4.2100, 50.7600, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 3." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.2100 50.7600)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi3_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi3_windowed.yaml b/Queries/berlinmod/q7_poi3_windowed.yaml new file mode 100644 index 0000000000..a8ecb36c3f --- /dev/null +++ b/Queries/berlinmod/q7_poi3_windowed.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — windowed form, POI 3 (4.2100, 50.7600, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 3." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.2100 50.7600)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi3_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q8_continuous.yaml b/Queries/berlinmod/q8_continuous.yaml new file mode 100644 index 0000000000..6821fa9639 --- /dev/null +++ b/Queries/berlinmod/q8_continuous.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q8 — continuous form (FULL) +# "Vehicles within d of road segment (LINESTRING)." Uses edwithin_tgeo_geo with +# a LINESTRING geometry — MEOS supports the within-radius predicate against any +# geometry (POINT, POLYGON, LINESTRING), so no new MobilityNebula PhysicalFunction +# is required. The segment runs from (4.30, 50.83) to (4.36, 50.87) with d = 5 km. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_segment + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;LINESTRING(4.30 50.83, 4.36 50.87)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_SEGMENT, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q8_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q8_snapshot.yaml b/Queries/berlinmod/q8_snapshot.yaml new file mode 100644 index 0000000000..41241b53eb --- /dev/null +++ b/Queries/berlinmod/q8_snapshot.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q8 — snapshot form (FULL) +# "Vehicles within d of road segment (LINESTRING)." Uses edwithin_tgeo_geo with +# a LINESTRING geometry — MEOS supports the within-radius predicate against any +# geometry (POINT, POLYGON, LINESTRING), so no new MobilityNebula PhysicalFunction +# is required. The segment runs from (4.30, 50.83) to (4.36, 50.87) with d = 5 km. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_segment + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;LINESTRING(4.30 50.83, 4.36 50.87)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_SEGMENT, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q8_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q8_windowed.yaml b/Queries/berlinmod/q8_windowed.yaml new file mode 100644 index 0000000000..f448eb5ae2 --- /dev/null +++ b/Queries/berlinmod/q8_windowed.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q8 — windowed form (FULL) +# "Vehicles within d of road segment (LINESTRING)." Uses edwithin_tgeo_geo with +# a LINESTRING geometry — MEOS supports the within-radius predicate against any +# geometry (POINT, POLYGON, LINESTRING), so no new MobilityNebula PhysicalFunction +# is required. The segment runs from (4.30, 50.83) to (4.36, 50.87) with d = 5 km. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_segment + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;LINESTRING(4.30 50.83, 4.36 50.87)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_SEGMENT, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q8_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q9_continuous.yaml b/Queries/berlinmod/q9_continuous.yaml new file mode 100644 index 0000000000..0b1c1baa3f --- /dev/null +++ b/Queries/berlinmod/q9_continuous.yaml @@ -0,0 +1,46 @@ +# BerlinMOD-Q9 — continuous form (FULL) +# "Distance between vehicles X (= 100) and Y (= 200) at time T." Per-second +# sliding window. CROSS_DISTANCE picks the latest known position of each +# target vehicle (VID_A = 100, VID_B = 200 hardcoded for the scaffold) inside +# the window and returns the spheroidal distance between them in metres. +# Returns NaN if either vehicle has no observation in the window. + +query: | + SELECT start, + end, + CROSS_DISTANCE(gps_lon, gps_lat, time_utc, vehicle_id) AS distance_metres + FROM berlinmod_stream + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$DISTANCE_METRES, type: FLOAT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q9_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q9_snapshot.yaml b/Queries/berlinmod/q9_snapshot.yaml new file mode 100644 index 0000000000..d1c7f54e07 --- /dev/null +++ b/Queries/berlinmod/q9_snapshot.yaml @@ -0,0 +1,47 @@ +# BerlinMOD-Q9 — snapshot form (FULL) +# "Distance between vehicles X (= 100) and Y (= 200) at time T." Per-5s +# tumbling-tick window. CROSS_DISTANCE returns the spheroidal distance +# between the two vehicles' latest known positions at the tick, or NaN if +# either is unobserved. Hardcoded (VID_A, VID_B) = (100, 200) for the +# scaffold. The snapshot at time T equals the batch BerlinMOD-Q9 result up +# to T. + +query: | + SELECT start, + end, + CROSS_DISTANCE(gps_lon, gps_lat, time_utc, vehicle_id) AS distance_metres + FROM berlinmod_stream + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$DISTANCE_METRES, type: FLOAT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q9_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q9_windowed.yaml b/Queries/berlinmod/q9_windowed.yaml new file mode 100644 index 0000000000..0a81fc774c --- /dev/null +++ b/Queries/berlinmod/q9_windowed.yaml @@ -0,0 +1,45 @@ +# BerlinMOD-Q9 — windowed form (FULL) +# "Distance between vehicles X (= 100) and Y (= 200) at time T." Per-10s +# tumbling window. CROSS_DISTANCE returns the spheroidal distance between +# the two vehicles' latest known positions in the window, or NaN if either +# is unobserved. Hardcoded (VID_A, VID_B) = (100, 200) for the scaffold. + +query: | + SELECT start, + end, + CROSS_DISTANCE(gps_lon, gps_lat, time_utc, vehicle_id) AS distance_metres + FROM berlinmod_stream + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$DISTANCE_METRES, type: FLOAT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q9_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/docs/berlinmod-streaming-forms.md b/docs/berlinmod-streaming-forms.md new file mode 100644 index 0000000000..4ba6276903 --- /dev/null +++ b/docs/berlinmod-streaming-forms.md @@ -0,0 +1,114 @@ +# BerlinMOD streaming forms on MobilityNebula + +Additive scaffold for the **BerlinMOD-9 × 3 streaming forms** parity contract — same shape as the SQL-layer BerlinMOD-9 ([MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD)) and matching the [MobilityFlink PR #3](https://github.com/MobilityDB/MobilityFlink/pull/3) and [MobilityKafka PR #1](https://github.com/MobilityDB/MobilityKafka/pull/1) coverage on the NebulaStream runtime. + +This page lives **alongside** the existing SNCB query series ([Query0..Query5](../Queries/) + [sncb_brake_monitoring](../Queries/sncb_brake_monitoring.yaml)); the SNCB Q-series and BerlinMOD-9 are sibling parity sets, not a replacement. + +## Logical source + +The BerlinMOD queries read from a `berlinmod_stream` logical source over TCP port `32325`, distinct from the SNCB `sncb_stream` source on port `32324`. Wire format is CSV with four columns: + +``` +time_utc(uint64), vehicle_id(uint64), gps_lon(float64), gps_lat(float64) +``` + +A sample input file is at [`Input/input_berlinmod.csv`](../Input/input_berlinmod.csv) (3 vehicles × 21 events over 14 simulated seconds). + +## The three streaming forms + +For each BerlinMOD reference query Q, three NebulaStream YAMLs realize the form contract: + +| Form | NebulaStream pattern | Semantic | +|---|---|---| +| **continuous** | `WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)` | per-event-bucket emission; consumers see a continuous stream of per-second events | +| **windowed** | `WINDOW TUMBLING(time_utc, SIZE 10 SEC)` | per-10s aggregation; one row per (window, group) | +| **snapshot** | `WINDOW TUMBLING(time_utc, SIZE 5 SEC)` | per-5s tick state; one row per (tick, group). Parity-oracle form: at each tick, the current state mirrors the batch BerlinMOD-Q result on data up to the tick | + +## Coverage in this PR + +| Q | Topic | Continuous | Windowed | Snapshot | Form | +|---|---|---|---|---|---| +| Q1 | "which vehicles have appeared?" | ✓ | ✓ | ✓ | full | +| Q2 | "where is vehicle X (= 200) at time T?" | ✓ | ✓ | ✓ | full | +| Q3 | "vehicles within 5 km of Brussels city centre?" | ✓ | ✓ | ✓ | full | +| Q4 | "vehicles inside Brussels-centre rectangle R?" | ✓ | ✓ | ✓ | full | +| Q5 | "pairs of vehicles meeting near P" | ✓ | ✓ | ✓ | full (via PAIR_MEETING aggregation) | +| Q6 | "cumulative distance per vehicle" | ✓ | ✓ | ✓ | full (via TEMPORAL_LENGTH aggregation) | +| Q7 | "first passage of each vehicle through each POI" | ✓ | ✓ | ✓ | full (per-POI fan-out) | +| Q8 | "vehicles close to a road segment (LINESTRING)" | ✓ | ✓ | ✓ | full | +| Q9 | "distance between vehicles X and Y at time T" | ✓ | ✓ | ✓ | full (via CROSS_DISTANCE aggregation) | + +**27 of 27 cells** covered as scaffold YAMLs. **All 27 cells are full** — every BerlinMOD-Q semantic is computed entirely inside NebulaStream. The matrix is closed. + +### Q7 fan-out pattern (full) + +NebulaStream's current SQL has no Cartesian (vehicle × POI) aggregation primitive. Q7 is therefore expressed as **one YAML per (POI, form)** — three POIs × three forms = nine YAML files. Each YAML emits the per-(window, vehicle) first-passage time for its single POI; consumers read the three POI-specific output files per form to recover the full per-(vehicle, POI) matrix. POI ids: `1` = Brussels city centre (4.3517, 50.8503, r=2 km), `2` = Anderlecht (4.3060, 50.8270, r=1 km), `3` = south of Brussels (4.2100, 50.7600, r=2 km). + +### Q8 via LINESTRING (full) + +MEOS' `edwithin_tgeo_geo` accepts any geometry — POINT, POLYGON, and **LINESTRING**. Q8 (vehicles within d of a road segment) is therefore expressible as a single direct predicate against a `LINESTRING(s1, s2)` geometry, no new MobilityNebula PhysicalFunction required. The segment runs from (4.30, 50.83) to (4.36, 50.87) with d = 5 km in the scaffold. + +### Q6 full via TEMPORAL_LENGTH aggregation + +The Q6 × 3 cells are full as of this scaffold: they use the new `TEMPORAL_LENGTH(lon, lat, ts)` aggregation, which lifts the same (lon, lat, ts) tuples as `TEMPORAL_SEQUENCE` and lowers them through a MEOS `tpoint_length(Temporal*)` call to a single `FLOAT64` result — the spheroidal length in metres of the per-(window, group) trajectory. Logical, physical, parser, and lowering wiring all live in this PR. + +### Q5 full via PAIR_MEETING aggregation + +Q5 takes four input fields (lon, lat, timestamp, vehicle_id) and emits a VARSIZED string-encoded list of meeting pairs `"vid_a,vid_b,ts,<=dMeet; …"`. Upstream `edwithin_tgeo_geo` pre-filters events to the near-P set; the aggregation's `lift` step writes per-event (lon, lat, ts, vehicle_id) into a PagedVector, and the `lower` step builds a per-vehicle latest-position map, enumerates pairs in stable order, calls MEOS' `geog_dwithin` with `dMeet = 200 m` hardcoded for the scaffold, and emits pairs that meet. Future PR can parameterize `dMeet` via a constant input. + +### Q9 full via CROSS_DISTANCE aggregation + +Q9 takes the same four input fields and emits a FLOAT64 — the spheroidal distance between the two target vehicles (VID_A = 100, VID_B = 200 hardcoded) at their latest known positions in the window. NaN when either is unobserved. Implemented via the MEOS `nad_tgeo_tgeo` path over single-instant tgeompoints. Future PR can parameterize (VID_A, VID_B). + +## MEOS operators consumed + +All BerlinMOD predicates use operators already exposed by [`MobilityNebula/PR #14`](https://github.com/MobilityDB/MobilityNebula/pull/14) (and follow-up operator-add PRs): + +| Operator | YAMLs using it | +|---|---| +| `edwithin_tgeo_geo(lon, lat, t, geom, d)` | Q3 × 3 forms (radius predicate, `POINT`), Q4 × 3 forms (region containment, `POLYGON` with `d=0.0`), Q5 × 3 forms (upstream near-P filter), Q8 × 3 forms (segment predicate, `LINESTRING`) | +| `TEMPORAL_SEQUENCE(lon, lat, t)` (aggregation) | Q2 × 3 forms (per-window trajectory) | +| `TEMPORAL_LENGTH(lon, lat, t)` (aggregation, MEOS `tpoint_length` under the hood) | Q6 × 3 forms (cumulative distance) | +| `PAIR_MEETING(lon, lat, t, vehicle_id)` (aggregation, MEOS `geog_dwithin` per pair under the hood) | Q5 × 3 forms (meeting pairs) | +| `CROSS_DISTANCE(lon, lat, t, vehicle_id)` (aggregation, MEOS `nad_tgeo_tgeo` under the hood) | Q9 × 3 forms (cross-vehicle distance) | + +`PAIR_MEETING` and `CROSS_DISTANCE` are added by this PR (and `TEMPORAL_LENGTH` is added by the parent #16); the rest are pre-existing. + +## Streaming-semantics tier overlay + +Each BerlinMOD-Q in this scaffold falls into one of the four streaming-execution tiers used by the per-binding wirings work across the ecosystem. The vocabulary is the closed 7-value set proposed for the MEOS-API catalog as `objectModel.streamingSemantics` (see the MEOS-API #10 sibling-facet RFC). + +The mapping makes the cross-binding picture explicit — a Q's tier on NebulaStream is the same tier it would land in on Flink / Kafka. The right-most column points to the equivalent generic wiring on Flink (where adopters consume the v4 baseline through generic DataStream wrappers). + +| Tier | BerlinMOD-Q | NebulaStream realization | Equivalent Flink wiring | +|---|---|---|---| +| `stateless` | Q1 (distinct-vehicle observation) | Simple SQL aggregation; no MEOS handle | `MeosStatelessMap` / `MeosStatelessFilter` | +| `bounded-state` | Q2, Q3, Q4, Q7 (per-vehicle / per-POI predicate state), Q8 | Aggregations that hold per-key latest position (TEMPORAL_SEQUENCE pattern); single MEOS-temporal evaluation | `MeosBoundedStateMap` (per-key `ValueState`) | +| `windowed` | Q6 (per-window trajectory length) | Custom MEOS aggregation closing the window once and emitting a scalar (`TEMPORAL_LENGTH`) | `MeosWindowedAggregate` (window-close-only) | +| `cross-stream` | Q5 (pair meeting), Q9 (cross-vehicle distance) | Four-field aggregations holding per-(vehicle-pair) state inside one operator (`PAIR_MEETING`, `CROSS_DISTANCE`) — same row-set sees all vehicles, so the "stream-self-join" is a single-aggregation enumeration rather than two streams | `MeosCrossStreamJoin` (`KeyedStream.intervalJoin`) | +| `io-meta` / `sequence-only` | — | not exercised by the BerlinMOD-9 set | n/a | + +### Why the cross-stream tier looks different on NebulaStream + +On Flink, the cross-stream tier maps to `KeyedStream.intervalJoin(other)` — two distinct keyed streams paired within a time bound. On NebulaStream, the same semantic is realized inside a single windowed aggregation that holds per-(vehicle-pair) state and enumerates pairs at window close. The two are equivalent: both materialize the Cartesian-product evaluation, just at different points in the operator topology. The tier classification is on the **MEOS semantic**, not on the engine pattern — and Q5 / Q9 are unambiguously `cross-stream` regardless of which engine realizes them. + +### Why Q7 is bounded-state, not windowed + +Q7 ("first passage of each vehicle through each POI") would naturally read as windowed (per-window minimum). It's classified bounded-state here because the NebulaStream scaffold expresses it as a per-POI fan-out (one YAML per POI), each YAML computing the per-vehicle latest-known position predicate and selecting the per-(vehicle, POI) earliest qualifying timestamp inside the window. The state per (vehicle, POI) is bounded; no per-window reduction across the full sequence is needed. + +## Sibling parity references + +- **MobilityFlink** — same nine queries × three forms on Flink. Original scaffold landed; the per-tier wiring infrastructure that mechanically wraps any of the 2,097 generated MEOS facade methods into Flink DataStream operators lives in the [`org.mobilitydb.flink.meos.wirings`](https://github.com/MobilityDB/MobilityFlink/blob/main/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings) package (5 generic classes covering 100% of the streamable + io-meta surface; a capstone demo composes all four tiers into one pipeline). +- **MobilityKafka** — same nine queries × three forms on Kafka Streams, with a codegen mirror of the MEOS facade in [`org.mobilitydb.kafka.meos`](https://github.com/MobilityDB/MobilityKafka/blob/main/kafka-streams-app/src/main/java/org/mobilitydb/kafka/meos). +- **MobilityDB-BerlinMOD** — batch BerlinMOD-9 cross-platform reports; the snapshot form on the streaming side converges to those outputs as the watermark advances. + +## Running + +Each YAML follows the same pattern as the SNCB queries (TCP CSV source, file sink). The expected execution flow: + +1. Start NebulaStream (or `MobilityNebula` docker-runtime). +2. Stream the sample CSV from `Input/input_berlinmod.csv` over TCP port `32325` (e.g. via `nc -l -p 32325 < Input/input_berlinmod.csv` or the project's existing TCP-source tooling). +3. Submit one of the YAMLs to the NebulaStream coordinator. +4. The output appears in `/workspace/Output/output_berlinmod__
.csv`. + +YAML structure has been validated with `python3 -c "import yaml; yaml.safe_load(open(f))"` for every file. Runtime verification is gated on the NebulaStream test harness; the YAMLs are intentionally additive and the SNCB Q-series remains untouched. diff --git a/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/CrossDistanceAggregationLogicalFunction.hpp b/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/CrossDistanceAggregationLogicalFunction.hpp new file mode 100644 index 0000000000..86b5a70308 --- /dev/null +++ b/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/CrossDistanceAggregationLogicalFunction.hpp @@ -0,0 +1,66 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include + +namespace NES +{ + +/** + * @brief Logical-plan side of the CROSS_DISTANCE aggregation (BerlinMOD-Q9). + * + * Four input fields (lon, lat, timestamp, vehicle_id). Final aggregate stamp = FLOAT64 + * (spheroidal distance in metres between VID_A's and VID_B's latest known positions in + * the window; NaN if either is unobserved). See `CrossDistanceAggregationPhysicalFunction`. + */ +class CrossDistanceAggregationLogicalFunction : public WindowAggregationLogicalFunction +{ +public: + static std::shared_ptr + create(const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& vehicleIdField); + + CrossDistanceAggregationLogicalFunction( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& vehicleIdField, + const FieldAccessLogicalFunction& asField); + + void inferStamp(const Schema& schema) override; + ~CrossDistanceAggregationLogicalFunction() override = default; + [[nodiscard]] NES::SerializableAggregationFunction serialize() const override; + [[nodiscard]] std::string_view getName() const noexcept override; + [[nodiscard]] bool requiresSequentialAggregation() const { return true; } + + [[nodiscard]] const FieldAccessLogicalFunction& getLonField() const noexcept { return lonField; } + [[nodiscard]] const FieldAccessLogicalFunction& getLatField() const noexcept { return latField; } + [[nodiscard]] const FieldAccessLogicalFunction& getTimestampField() const noexcept { return timestampField; } + [[nodiscard]] const FieldAccessLogicalFunction& getVehicleIdField() const noexcept { return vehicleIdField; } + +private: + static constexpr std::string_view NAME = "CrossDistance"; + static constexpr DataType::Type partialAggregateStampType = DataType::Type::UNDEFINED; + static constexpr DataType::Type finalAggregateStampType = DataType::Type::FLOAT64; + + FieldAccessLogicalFunction lonField; + FieldAccessLogicalFunction latField; + FieldAccessLogicalFunction timestampField; + FieldAccessLogicalFunction vehicleIdField; +}; +} diff --git a/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/PairMeetingAggregationLogicalFunction.hpp b/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/PairMeetingAggregationLogicalFunction.hpp new file mode 100644 index 0000000000..5c35fbcbf8 --- /dev/null +++ b/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/PairMeetingAggregationLogicalFunction.hpp @@ -0,0 +1,66 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include + +namespace NES +{ + +/** + * @brief Logical-plan side of the PAIR_MEETING aggregation (BerlinMOD-Q5). + * + * Four input fields (lon, lat, timestamp, vehicle_id). Final aggregate stamp = VARSIZED + * (string-encoded list of meeting pairs). See `PairMeetingAggregationPhysicalFunction` + * for the lift / combine / lower path. + */ +class PairMeetingAggregationLogicalFunction : public WindowAggregationLogicalFunction +{ +public: + static std::shared_ptr + create(const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& vehicleIdField); + + PairMeetingAggregationLogicalFunction( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& vehicleIdField, + const FieldAccessLogicalFunction& asField); + + void inferStamp(const Schema& schema) override; + ~PairMeetingAggregationLogicalFunction() override = default; + [[nodiscard]] NES::SerializableAggregationFunction serialize() const override; + [[nodiscard]] std::string_view getName() const noexcept override; + [[nodiscard]] bool requiresSequentialAggregation() const { return true; } + + [[nodiscard]] const FieldAccessLogicalFunction& getLonField() const noexcept { return lonField; } + [[nodiscard]] const FieldAccessLogicalFunction& getLatField() const noexcept { return latField; } + [[nodiscard]] const FieldAccessLogicalFunction& getTimestampField() const noexcept { return timestampField; } + [[nodiscard]] const FieldAccessLogicalFunction& getVehicleIdField() const noexcept { return vehicleIdField; } + +private: + static constexpr std::string_view NAME = "PairMeeting"; + static constexpr DataType::Type partialAggregateStampType = DataType::Type::UNDEFINED; + static constexpr DataType::Type finalAggregateStampType = DataType::Type::VARSIZED; + + FieldAccessLogicalFunction lonField; + FieldAccessLogicalFunction latField; + FieldAccessLogicalFunction timestampField; + FieldAccessLogicalFunction vehicleIdField; +}; +} diff --git a/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.hpp b/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.hpp new file mode 100644 index 0000000000..b2225f7240 --- /dev/null +++ b/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.hpp @@ -0,0 +1,64 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include + +namespace NES +{ + +/** + * @brief Logical-plan side of the TEMPORAL_LENGTH aggregation. + * + * Takes three input fields (longitude, latitude, timestamp) and produces a + * single FLOAT64 result: the spheroidal length in metres of the per-(window, + * group) trajectory built from the lifted tuples. + * + * Same shape as TemporalSequenceAggregationLogicalFunctionV2; only the final + * aggregate stamp type differs (FLOAT64 here vs VARSIZED there). Closes the + * MobilityNebula BerlinMOD-Q6 partial→full gap. + */ +class TemporalLengthAggregationLogicalFunction : public WindowAggregationLogicalFunction +{ +public: + static std::shared_ptr + create(const FieldAccessLogicalFunction& lonField, const FieldAccessLogicalFunction& latField, const FieldAccessLogicalFunction& timestampField); + + TemporalLengthAggregationLogicalFunction( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& asField); + + void inferStamp(const Schema& schema) override; + ~TemporalLengthAggregationLogicalFunction() override = default; + [[nodiscard]] NES::SerializableAggregationFunction serialize() const override; + [[nodiscard]] std::string_view getName() const noexcept override; + [[nodiscard]] bool requiresSequentialAggregation() const { return true; } + + [[nodiscard]] const FieldAccessLogicalFunction& getLonField() const noexcept { return lonField; } + [[nodiscard]] const FieldAccessLogicalFunction& getLatField() const noexcept { return latField; } + [[nodiscard]] const FieldAccessLogicalFunction& getTimestampField() const noexcept { return timestampField; } + +private: + static constexpr std::string_view NAME = "TemporalLength"; + static constexpr DataType::Type partialAggregateStampType = DataType::Type::UNDEFINED; + static constexpr DataType::Type finalAggregateStampType = DataType::Type::FLOAT64; + + FieldAccessLogicalFunction lonField; + FieldAccessLogicalFunction latField; + FieldAccessLogicalFunction timestampField; +}; +} diff --git a/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt index 9cdfcdb2dc..c63e969684 100644 --- a/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt +++ b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt @@ -12,3 +12,6 @@ add_plugin(Var AggregationLogicalFunction nes-logical-operators VarAggregationLogicalFunction.cpp) add_plugin(TemporalSequence AggregationLogicalFunction nes-logical-operators TemporalSequenceAggregationLogicalFunctionV2.cpp) +add_plugin(TemporalLength AggregationLogicalFunction nes-logical-operators TemporalLengthAggregationLogicalFunction.cpp) +add_plugin(PairMeeting AggregationLogicalFunction nes-logical-operators PairMeetingAggregationLogicalFunction.cpp) +add_plugin(CrossDistance AggregationLogicalFunction nes-logical-operators CrossDistanceAggregationLogicalFunction.cpp) diff --git a/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CrossDistanceAggregationLogicalFunction.cpp b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CrossDistanceAggregationLogicalFunction.cpp new file mode 100644 index 0000000000..50e6c86318 --- /dev/null +++ b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CrossDistanceAggregationLogicalFunction.cpp @@ -0,0 +1,141 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace NES +{ + +CrossDistanceAggregationLogicalFunction::CrossDistanceAggregationLogicalFunction( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& vehicleIdField, + const FieldAccessLogicalFunction& asField) + : WindowAggregationLogicalFunction( + lonField.getDataType(), + DataTypeProvider::provideDataType(partialAggregateStampType), + DataTypeProvider::provideDataType(finalAggregateStampType), + lonField, + asField) + , lonField(lonField) + , latField(latField) + , timestampField(timestampField) + , vehicleIdField(vehicleIdField) +{ +} + +std::shared_ptr +CrossDistanceAggregationLogicalFunction::create( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& vehicleIdField) +{ + return std::make_shared(lonField, latField, timestampField, vehicleIdField, lonField); +} + +std::string_view CrossDistanceAggregationLogicalFunction::getName() const noexcept +{ + return NAME; +} + +void CrossDistanceAggregationLogicalFunction::inferStamp(const Schema& schema) +{ + lonField = lonField.withInferredDataType(schema).get(); + latField = latField.withInferredDataType(schema).get(); + timestampField = timestampField.withInferredDataType(schema).get(); + vehicleIdField = vehicleIdField.withInferredDataType(schema).get(); + + onField = lonField; + + if (!lonField.getDataType().isNumeric() || !latField.getDataType().isNumeric() + || !timestampField.getDataType().isNumeric() || !vehicleIdField.getDataType().isNumeric()) + { + throw CannotInferSchema("CrossDistanceAggregationLogicalFunction: lon, lat, timestamp, and vehicle_id fields must be numeric."); + } + + const auto onFieldName = onField.getFieldName(); + const auto asFieldName = asField.getFieldName(); + const auto attributeNameResolver = onFieldName.substr(0, onFieldName.find(Schema::ATTRIBUTE_NAME_SEPARATOR) + 1); + if (asFieldName.find(Schema::ATTRIBUTE_NAME_SEPARATOR) == std::string::npos) + { + asField = asField.withFieldName(attributeNameResolver + asFieldName).get(); + } + else + { + const auto fieldName = asFieldName.substr(asFieldName.find_last_of(Schema::ATTRIBUTE_NAME_SEPARATOR) + 1); + asField = asField.withFieldName(attributeNameResolver + fieldName).get(); + } + asField = asField.withDataType(getFinalAggregateStamp()).get(); + inputStamp = onField.getDataType(); +} + +NES::SerializableAggregationFunction CrossDistanceAggregationLogicalFunction::serialize() const +{ + SerializableAggregationFunction saf; + saf.set_type(std::string(NAME)); + + SerializableFunction lonProto; + lonProto.CopyFrom(LogicalFunction(lonField).serialize()); + saf.mutable_on_field()->CopyFrom(lonProto); + + SerializableFunction asProto; + asProto.CopyFrom(LogicalFunction(asField).serialize()); + saf.mutable_as_field()->CopyFrom(asProto); + + SerializableFunction latProto; + latProto.CopyFrom(LogicalFunction(latField).serialize()); + saf.add_extra_fields()->CopyFrom(latProto); + + SerializableFunction tsProto; + tsProto.CopyFrom(LogicalFunction(timestampField).serialize()); + saf.add_extra_fields()->CopyFrom(tsProto); + + SerializableFunction vidProto; + vidProto.CopyFrom(LogicalFunction(vehicleIdField).serialize()); + saf.add_extra_fields()->CopyFrom(vidProto); + + return saf; +} + +AggregationLogicalFunctionRegistryReturnType AggregationLogicalFunctionGeneratedRegistrar::RegisterCrossDistanceAggregationLogicalFunction( + AggregationLogicalFunctionRegistryArguments arguments) +{ + if (arguments.fields.size() == 5) + { + auto ptr = std::make_shared( + arguments.fields[0], arguments.fields[1], arguments.fields[2], arguments.fields[3], arguments.fields[4]); + return ptr; + } + throw CannotDeserialize( + "CrossDistanceAggregationLogicalFunction requires lon, lat, timestamp, vehicle_id, and alias fields but got {}", + arguments.fields.size()); +} + +} // namespace NES diff --git a/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/PairMeetingAggregationLogicalFunction.cpp b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/PairMeetingAggregationLogicalFunction.cpp new file mode 100644 index 0000000000..6c09b59a9b --- /dev/null +++ b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/PairMeetingAggregationLogicalFunction.cpp @@ -0,0 +1,145 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace NES +{ + +PairMeetingAggregationLogicalFunction::PairMeetingAggregationLogicalFunction( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& vehicleIdField, + const FieldAccessLogicalFunction& asField) + : WindowAggregationLogicalFunction( + lonField.getDataType(), + DataTypeProvider::provideDataType(partialAggregateStampType), + DataTypeProvider::provideDataType(finalAggregateStampType), + lonField, + asField) + , lonField(lonField) + , latField(latField) + , timestampField(timestampField) + , vehicleIdField(vehicleIdField) +{ +} + +std::shared_ptr +PairMeetingAggregationLogicalFunction::create( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& vehicleIdField) +{ + return std::make_shared(lonField, latField, timestampField, vehicleIdField, lonField); +} + +std::string_view PairMeetingAggregationLogicalFunction::getName() const noexcept +{ + return NAME; +} + +void PairMeetingAggregationLogicalFunction::inferStamp(const Schema& schema) +{ + lonField = lonField.withInferredDataType(schema).get(); + latField = latField.withInferredDataType(schema).get(); + timestampField = timestampField.withInferredDataType(schema).get(); + vehicleIdField = vehicleIdField.withInferredDataType(schema).get(); + + onField = lonField; + + if (!lonField.getDataType().isNumeric() || !latField.getDataType().isNumeric() + || !timestampField.getDataType().isNumeric() || !vehicleIdField.getDataType().isNumeric()) + { + throw CannotInferSchema("PairMeetingAggregationLogicalFunction: lon, lat, timestamp, and vehicle_id fields must be numeric."); + } + + const auto onFieldName = onField.getFieldName(); + const auto asFieldName = asField.getFieldName(); + const auto attributeNameResolver = onFieldName.substr(0, onFieldName.find(Schema::ATTRIBUTE_NAME_SEPARATOR) + 1); + if (asFieldName.find(Schema::ATTRIBUTE_NAME_SEPARATOR) == std::string::npos) + { + asField = asField.withFieldName(attributeNameResolver + asFieldName).get(); + } + else + { + const auto fieldName = asFieldName.substr(asFieldName.find_last_of(Schema::ATTRIBUTE_NAME_SEPARATOR) + 1); + asField = asField.withFieldName(attributeNameResolver + fieldName).get(); + } + asField = asField.withDataType(getFinalAggregateStamp()).get(); + inputStamp = onField.getDataType(); +} + +NES::SerializableAggregationFunction PairMeetingAggregationLogicalFunction::serialize() const +{ + SerializableAggregationFunction saf; + saf.set_type(std::string(NAME)); + + // on_field = lon + SerializableFunction lonProto; + lonProto.CopyFrom(LogicalFunction(lonField).serialize()); + saf.mutable_on_field()->CopyFrom(lonProto); + + // as_field = alias + SerializableFunction asProto; + asProto.CopyFrom(LogicalFunction(asField).serialize()); + saf.mutable_as_field()->CopyFrom(asProto); + + // extra fields = lat, ts, vehicle_id + SerializableFunction latProto; + latProto.CopyFrom(LogicalFunction(latField).serialize()); + saf.add_extra_fields()->CopyFrom(latProto); + + SerializableFunction tsProto; + tsProto.CopyFrom(LogicalFunction(timestampField).serialize()); + saf.add_extra_fields()->CopyFrom(tsProto); + + SerializableFunction vidProto; + vidProto.CopyFrom(LogicalFunction(vehicleIdField).serialize()); + saf.add_extra_fields()->CopyFrom(vidProto); + + return saf; +} + +AggregationLogicalFunctionRegistryReturnType AggregationLogicalFunctionGeneratedRegistrar::RegisterPairMeetingAggregationLogicalFunction( + AggregationLogicalFunctionRegistryArguments arguments) +{ + if (arguments.fields.size() == 5) + { + auto ptr = std::make_shared( + arguments.fields[0], arguments.fields[1], arguments.fields[2], arguments.fields[3], arguments.fields[4]); + return ptr; + } + throw CannotDeserialize( + "PairMeetingAggregationLogicalFunction requires lon, lat, timestamp, vehicle_id, and alias fields but got {}", + arguments.fields.size()); +} + +} // namespace NES diff --git a/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.cpp b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.cpp new file mode 100644 index 0000000000..ff46bbc173 --- /dev/null +++ b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.cpp @@ -0,0 +1,117 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace NES +{ + +TemporalLengthAggregationLogicalFunction::TemporalLengthAggregationLogicalFunction( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& asField) + : WindowAggregationLogicalFunction( + lonField.getDataType(), + DataTypeProvider::provideDataType(partialAggregateStampType), + DataTypeProvider::provideDataType(finalAggregateStampType), + lonField, + asField) + , lonField(lonField) + , latField(latField) + , timestampField(timestampField) +{ +} + +std::shared_ptr +TemporalLengthAggregationLogicalFunction::create( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField) +{ + return std::make_shared(lonField, latField, timestampField, lonField); +} + +std::string_view TemporalLengthAggregationLogicalFunction::getName() const noexcept +{ + return NAME; +} + +void TemporalLengthAggregationLogicalFunction::inferStamp(const Schema& schema) +{ + lonField = lonField.withInferredDataType(schema).get(); + latField = latField.withInferredDataType(schema).get(); + timestampField = timestampField.withInferredDataType(schema).get(); + + onField = lonField; + + if (!lonField.getDataType().isNumeric() || !latField.getDataType().isNumeric() || !timestampField.getDataType().isNumeric()) + { + throw CannotInferSchema("TemporalLengthAggregationLogicalFunction: lon, lat, and timestamp fields must be numeric."); + } + + const auto onFieldName = onField.getFieldName(); + const auto asFieldName = asField.getFieldName(); + const auto attributeNameResolver = onFieldName.substr(0, onFieldName.find(Schema::ATTRIBUTE_NAME_SEPARATOR) + 1); + if (asFieldName.find(Schema::ATTRIBUTE_NAME_SEPARATOR) == std::string::npos) + { + asField = asField.withFieldName(attributeNameResolver + asFieldName).get(); + } + else + { + const auto fieldName = asFieldName.substr(asFieldName.find_last_of(Schema::ATTRIBUTE_NAME_SEPARATOR) + 1); + asField = asField.withFieldName(attributeNameResolver + fieldName).get(); + } + asField = asField.withDataType(getFinalAggregateStamp()).get(); + inputStamp = onField.getDataType(); +} + +NES::SerializableAggregationFunction TemporalLengthAggregationLogicalFunction::serialize() const +{ + // Same wire shape as TemporalSequence (3 fields + alias); only the type tag differs. + auto saf = TemporalAggregationSerde::serializeTemporalSequence(lonField, latField, timestampField, asField); + saf.set_type(std::string(NAME)); + return saf; +} + +AggregationLogicalFunctionRegistryReturnType AggregationLogicalFunctionGeneratedRegistrar::RegisterTemporalLengthAggregationLogicalFunction( + AggregationLogicalFunctionRegistryArguments arguments) +{ + if (arguments.fields.size() == 4) + { + auto ptr = std::make_shared( + arguments.fields[0], arguments.fields[1], arguments.fields[2], arguments.fields[3]); + return ptr; + } + throw CannotDeserialize( + "TemporalLengthAggregationLogicalFunction requires lon, lat, timestamp, and alias fields but got {}", + arguments.fields.size()); +} + +} // namespace NES diff --git a/nes-physical-operators/include/Aggregation/Function/Meos/CrossDistanceAggregationPhysicalFunction.hpp b/nes-physical-operators/include/Aggregation/Function/Meos/CrossDistanceAggregationPhysicalFunction.hpp new file mode 100644 index 0000000000..b42c781306 --- /dev/null +++ b/nes-physical-operators/include/Aggregation/Function/Meos/CrossDistanceAggregationPhysicalFunction.hpp @@ -0,0 +1,80 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace NES +{ + +/** + * @brief Aggregation that emits the BerlinMOD-Q9 cross-distance between two specific + * vehicles per window. + * + * Takes four input fields (lon, lat, timestamp, vehicle_id). The lift step stores per-event + * tuples; the lower step picks the latest known position of each target vehicle (VID_A and + * VID_B, hardcoded for the BerlinMOD scaffold) within the window and emits the spheroidal + * `geog_distance(POINT, POINT)` between them as a FLOAT64. Returns `NaN` when either target + * vehicle has no observation in the window. + * + * Future PR can parameterize (VID_A, VID_B) via constant inputs to the aggregation. + * + * Closes the MobilityNebula BerlinMOD-Q9 × 3-form partial→full gap. + */ +class CrossDistanceAggregationPhysicalFunction : public AggregationPhysicalFunction +{ +public: + static constexpr uint64_t VID_A = 100; + static constexpr uint64_t VID_B = 200; + + CrossDistanceAggregationPhysicalFunction( + DataType inputType, + DataType resultType, + PhysicalFunction lonFunctionParam, + PhysicalFunction latFunctionParam, + PhysicalFunction timestampFunctionParam, + PhysicalFunction vehicleIdFunctionParam, + Nautilus::Record::RecordFieldIdentifier resultFieldIdentifier, + std::shared_ptr bufferRef); + void lift( + const nautilus::val& aggregationState, + PipelineMemoryProvider& pipelineMemoryProvider, + const Nautilus::Record& record) + override; + void combine( + nautilus::val aggregationState1, + nautilus::val aggregationState2, + PipelineMemoryProvider& pipelineMemoryProvider) override; + Nautilus::Record lower(nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) override; + void reset(nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) override; + [[nodiscard]] size_t getSizeOfStateInBytes() const override; + ~CrossDistanceAggregationPhysicalFunction() override = default; + void cleanup(nautilus::val aggregationState) override; + +private: + std::shared_ptr bufferRef; + PhysicalFunction lonFunction; + PhysicalFunction latFunction; + PhysicalFunction timestampFunction; + PhysicalFunction vehicleIdFunction; +}; + +} diff --git a/nes-physical-operators/include/Aggregation/Function/Meos/PairMeetingAggregationPhysicalFunction.hpp b/nes-physical-operators/include/Aggregation/Function/Meos/PairMeetingAggregationPhysicalFunction.hpp new file mode 100644 index 0000000000..687a9a3986 --- /dev/null +++ b/nes-physical-operators/include/Aggregation/Function/Meos/PairMeetingAggregationPhysicalFunction.hpp @@ -0,0 +1,77 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace NES +{ + +/** + * @brief Cartesian aggregation that emits the BerlinMOD-Q5 pair-meeting answer per window. + * + * Takes four input fields: lon, lat, timestamp, vehicle_id. The lift step stores per-event + * tuples in a PagedVector. The lower step picks each vehicle's last-known position in the + * window, enumerates vehicle pairs (a < b), and emits pairs whose spheroidal distance is + * at most a hardcoded `DMEET_METRES` (200 m for the BerlinMOD scaffold). Result is a + * VARSIZED string `"vidA,vidB,ts,dist;..."` — same shape pattern as TemporalSequence's + * BINARY(N) result. Future PR can parameterize DMEET via a constant input to the + * aggregation. + * + * Closes the MobilityNebula BerlinMOD-Q5 × 3-form partial→full gap. + */ +class PairMeetingAggregationPhysicalFunction : public AggregationPhysicalFunction +{ +public: + static constexpr double DMEET_METRES = 200.0; + + PairMeetingAggregationPhysicalFunction( + DataType inputType, + DataType resultType, + PhysicalFunction lonFunctionParam, + PhysicalFunction latFunctionParam, + PhysicalFunction timestampFunctionParam, + PhysicalFunction vehicleIdFunctionParam, + Nautilus::Record::RecordFieldIdentifier resultFieldIdentifier, + std::shared_ptr bufferRef); + void lift( + const nautilus::val& aggregationState, + PipelineMemoryProvider& pipelineMemoryProvider, + const Nautilus::Record& record) + override; + void combine( + nautilus::val aggregationState1, + nautilus::val aggregationState2, + PipelineMemoryProvider& pipelineMemoryProvider) override; + Nautilus::Record lower(nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) override; + void reset(nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) override; + [[nodiscard]] size_t getSizeOfStateInBytes() const override; + ~PairMeetingAggregationPhysicalFunction() override = default; + void cleanup(nautilus::val aggregationState) override; + +private: + std::shared_ptr bufferRef; + PhysicalFunction lonFunction; + PhysicalFunction latFunction; + PhysicalFunction timestampFunction; + PhysicalFunction vehicleIdFunction; +}; + +} diff --git a/nes-physical-operators/include/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.hpp b/nes-physical-operators/include/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.hpp new file mode 100644 index 0000000000..cf73b9e743 --- /dev/null +++ b/nes-physical-operators/include/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.hpp @@ -0,0 +1,73 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace NES +{ + +/** + * @brief Aggregation function that returns the spheroidal length in metres of + * the per-(window, group) trajectory built from the (lon, lat, timestamp) + * tuples lifted into the aggregation state. + * + * Same lift / combine / reset shape as TemporalSequenceAggregationPhysicalFunction; + * the lower step parses the assembled trajectory into a MEOS Temporal object and + * calls MEOS' tpoint_length(Temporal*) to return a single FLOAT64 result. + * + * Used by BerlinMOD-Q6 ("cumulative distance per vehicle") streaming-form + * scaffold: closes the partial→full gap that the prior scaffold documented as + * "PR-B" in docs/berlinmod-streaming-forms.md. + */ +class TemporalLengthAggregationPhysicalFunction : public AggregationPhysicalFunction +{ +public: + TemporalLengthAggregationPhysicalFunction( + DataType inputType, + DataType resultType, + PhysicalFunction lonFunctionParam, + PhysicalFunction latFunctionParam, + PhysicalFunction timestampFunctionParam, + Nautilus::Record::RecordFieldIdentifier resultFieldIdentifier, + std::shared_ptr bufferRef); + void lift( + const nautilus::val& aggregationState, + PipelineMemoryProvider& pipelineMemoryProvider, + const Nautilus::Record& record) + override; + void combine( + nautilus::val aggregationState1, + nautilus::val aggregationState2, + PipelineMemoryProvider& pipelineMemoryProvider) override; + Nautilus::Record lower(nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) override; + void reset(nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) override; + [[nodiscard]] size_t getSizeOfStateInBytes() const override; + ~TemporalLengthAggregationPhysicalFunction() override = default; + void cleanup(nautilus::val aggregationState) override; + +private: + std::shared_ptr bufferRef; + PhysicalFunction lonFunction; + PhysicalFunction latFunction; + PhysicalFunction timestampFunction; +}; + +} diff --git a/nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt b/nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt index c34e12f47e..67daff0e52 100644 --- a/nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt +++ b/nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt @@ -11,4 +11,7 @@ # limitations under the License. add_plugin(TemporalSequence AggregationPhysicalFunction nes-physical-operators TemporalSequenceAggregationPhysicalFunction.cpp) +add_plugin(TemporalLength AggregationPhysicalFunction nes-physical-operators TemporalLengthAggregationPhysicalFunction.cpp) +add_plugin(PairMeeting AggregationPhysicalFunction nes-physical-operators PairMeetingAggregationPhysicalFunction.cpp) +add_plugin(CrossDistance AggregationPhysicalFunction nes-physical-operators CrossDistanceAggregationPhysicalFunction.cpp) add_plugin(Var AggregationPhysicalFunction nes-physical-operators VarAggregationFunction.cpp) diff --git a/nes-physical-operators/src/Aggregation/Function/Meos/CrossDistanceAggregationPhysicalFunction.cpp b/nes-physical-operators/src/Aggregation/Function/Meos/CrossDistanceAggregationPhysicalFunction.cpp new file mode 100644 index 0000000000..4645a7c147 --- /dev/null +++ b/nes-physical-operators/src/Aggregation/Function/Meos/CrossDistanceAggregationPhysicalFunction.cpp @@ -0,0 +1,277 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +extern "C" { +#include +#include +} + +namespace NES +{ + +constexpr static std::string_view LonFieldName = "lon"; +constexpr static std::string_view LatFieldName = "lat"; +constexpr static std::string_view TimestampFieldName = "timestamp"; +constexpr static std::string_view VehicleIdFieldName = "vehicle_id"; + +static std::mutex cross_distance_mutex; + +CrossDistanceAggregationPhysicalFunction::CrossDistanceAggregationPhysicalFunction( + DataType inputType, + DataType resultType, + PhysicalFunction lonFunctionParam, + PhysicalFunction latFunctionParam, + PhysicalFunction timestampFunctionParam, + PhysicalFunction vehicleIdFunctionParam, + Nautilus::Record::RecordFieldIdentifier resultFieldIdentifier, + std::shared_ptr bufferRef) + : AggregationPhysicalFunction(std::move(inputType), std::move(resultType), lonFunctionParam, std::move(resultFieldIdentifier)) + , bufferRef(std::move(bufferRef)) + , lonFunction(std::move(lonFunctionParam)) + , latFunction(std::move(latFunctionParam)) + , timestampFunction(std::move(timestampFunctionParam)) + , vehicleIdFunction(std::move(vehicleIdFunctionParam)) +{ +} + +void CrossDistanceAggregationPhysicalFunction::lift( + const nautilus::val& aggregationState, PipelineMemoryProvider& pipelineMemoryProvider, const Nautilus::Record& record) +{ + const auto pagedVectorPtr = static_cast>(aggregationState); + + auto lonValue = lonFunction.execute(record, pipelineMemoryProvider.arena); + auto latValue = latFunction.execute(record, pipelineMemoryProvider.arena); + auto timestampValue = timestampFunction.execute(record, pipelineMemoryProvider.arena); + auto vehicleIdValue = vehicleIdFunction.execute(record, pipelineMemoryProvider.arena); + + Record aggregateStateRecord({ + {std::string(LonFieldName), lonValue}, + {std::string(LatFieldName), latValue}, + {std::string(TimestampFieldName), timestampValue}, + {std::string(VehicleIdFieldName), vehicleIdValue} + }); + + const Nautilus::Interface::PagedVectorRef pagedVectorRef(pagedVectorPtr, bufferRef); + pagedVectorRef.writeRecord(aggregateStateRecord, pipelineMemoryProvider.bufferProvider); +} + +void CrossDistanceAggregationPhysicalFunction::combine( + const nautilus::val aggregationState1, + const nautilus::val aggregationState2, + PipelineMemoryProvider&) +{ + const auto memArea1 = static_cast>(aggregationState1); + const auto memArea2 = static_cast>(aggregationState2); + + nautilus::invoke( + +[](Nautilus::Interface::PagedVector* vector1, const Nautilus::Interface::PagedVector* vector2) -> void + { vector1->copyFrom(*vector2); }, + memArea1, + memArea2); +} + +Nautilus::Record CrossDistanceAggregationPhysicalFunction::lower( + const nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) +{ + MEOS::Meos::ensureMeosInitialized(); + + const auto pagedVectorPtr = static_cast>(aggregationState); + const Nautilus::Interface::PagedVectorRef pagedVectorRef(pagedVectorPtr, bufferRef); + const auto allFieldNames = bufferRef->getMemoryLayout()->getSchema().getFieldNames(); + const auto numberOfEntries = invoke( + +[](const Nautilus::Interface::PagedVector* pagedVector) + { + return pagedVector->getTotalNumberOfEntries(); + }, + pagedVectorPtr); + + if (numberOfEntries == nautilus::val(0)) { + Nautilus::Record resultRecord; + resultRecord.write(resultFieldIdentifier, nautilus::val(std::numeric_limits::quiet_NaN())); + return resultRecord; + } + + // Allocate a 6-double scratch buffer on the heap (we cannot put std::optional<…> structures + // through the nautilus invoke ABI). Layout: [lonA, latA, tsA, lonB, latB, tsB]. + // Sentinel ts = -1 means "not yet observed". + auto scratchPtr = nautilus::invoke( + +[]() -> double* + { + double* scratch = (double*)malloc(sizeof(double) * 6); + // Bit-cast tsA, tsB sentinels by writing -1 as the int64 reinterpret of the double. + // We just set them to NaN markers and treat NaN as "not observed". + scratch[0] = std::numeric_limits::quiet_NaN(); + scratch[1] = std::numeric_limits::quiet_NaN(); + scratch[2] = std::numeric_limits::quiet_NaN(); + scratch[3] = std::numeric_limits::quiet_NaN(); + scratch[4] = std::numeric_limits::quiet_NaN(); + scratch[5] = std::numeric_limits::quiet_NaN(); + return scratch; + }); + + const auto endIt = pagedVectorRef.end(allFieldNames); + for (auto candidateIt = pagedVectorRef.begin(allFieldNames); candidateIt != endIt; ++candidateIt) + { + const auto itemRecord = *candidateIt; + + const auto lonValue = itemRecord.read(std::string(LonFieldName)); + const auto latValue = itemRecord.read(std::string(LatFieldName)); + const auto timestampValue = itemRecord.read(std::string(TimestampFieldName)); + const auto vehicleIdValue = itemRecord.read(std::string(VehicleIdFieldName)); + + auto lon = lonValue.cast>(); + auto lat = latValue.cast>(); + auto timestamp = timestampValue.cast>(); + auto vehicleId = vehicleIdValue.cast>(); + + // Overwrite-on-match — final value is the latest event for each target VID in iter order. + nautilus::invoke( + +[](double* scratch, double lonVal, double latVal, int64_t tsVal, uint64_t vid) -> void + { + if (vid == CrossDistanceAggregationPhysicalFunction::VID_A) { + scratch[0] = lonVal; + scratch[1] = latVal; + scratch[2] = static_cast(tsVal); + } else if (vid == CrossDistanceAggregationPhysicalFunction::VID_B) { + scratch[3] = lonVal; + scratch[4] = latVal; + scratch[5] = static_cast(tsVal); + } + }, + scratchPtr, lon, lat, timestamp, vehicleId); + } + + auto distanceMetres = nautilus::invoke( + +[](double* scratch) -> double + { + // If either target vehicle has no observation in the window, return NaN. + if (std::isnan(scratch[2]) || std::isnan(scratch[5])) { + free(scratch); + return std::numeric_limits::quiet_NaN(); + } + + std::lock_guard lock(cross_distance_mutex); + + char wktA[80]; + char wktB[80]; + snprintf(wktA, sizeof(wktA), "SRID=4326;Point(%.7f %.7f)", scratch[0], scratch[1]); + snprintf(wktB, sizeof(wktB), "SRID=4326;Point(%.7f %.7f)", scratch[3], scratch[4]); + free(scratch); + + GSERIALIZED* gA = geom_in(wktA, -1); + GSERIALIZED* gB = geom_in(wktB, -1); + if (gA == nullptr || gB == nullptr) { + if (gA) free(gA); + if (gB) free(gB); + return std::numeric_limits::quiet_NaN(); + } + GSERIALIZED* ggA = geom_to_geog(gA); + GSERIALIZED* ggB = geom_to_geog(gB); + + // For the spheroidal distance, dwithin probes only give boolean output; we want a + // numeric value. The PROJ/MEOS shared object exposes `geog_distance` for this; here + // we instead drive the MEOS NAD over single-instant tgeompoints which goes through + // the same geog_distance path internally. + char tgeoA[120]; + char tgeoB[120]; + snprintf(tgeoA, sizeof(tgeoA), "Point(%.7f %.7f)@2000-01-01 00:00:00", scratch[0], scratch[1]); + snprintf(tgeoB, sizeof(tgeoB), "Point(%.7f %.7f)@2000-01-01 00:00:00", scratch[3], scratch[4]); + Temporal* tA = (Temporal*)MEOS::Meos::parseTemporalPoint(std::string(tgeoA)); + Temporal* tB = (Temporal*)MEOS::Meos::parseTemporalPoint(std::string(tgeoB)); + double distance = std::numeric_limits::quiet_NaN(); + if (tA != nullptr && tB != nullptr) { + distance = nad_tgeo_tgeo(tA, tB); + } + if (tA != nullptr) MEOS::Meos::freeTemporalObject(tA); + if (tB != nullptr) MEOS::Meos::freeTemporalObject(tB); + free(ggA); + free(ggB); + free(gA); + free(gB); + return distance; + }, + scratchPtr); + + Nautilus::Record resultRecord; + resultRecord.write(resultFieldIdentifier, distanceMetres); + return resultRecord; +} + +void CrossDistanceAggregationPhysicalFunction::reset(const nautilus::val aggregationState, PipelineMemoryProvider&) +{ + nautilus::invoke( + +[](AggregationState* pagedVectorMemArea) -> void + { + auto* pagedVector = reinterpret_cast(pagedVectorMemArea); + new (pagedVector) Nautilus::Interface::PagedVector(); + }, + aggregationState); +} + +size_t CrossDistanceAggregationPhysicalFunction::getSizeOfStateInBytes() const +{ + return sizeof(Nautilus::Interface::PagedVector); +} + +void CrossDistanceAggregationPhysicalFunction::cleanup(nautilus::val aggregationState) +{ + nautilus::invoke( + +[](AggregationState* pagedVectorMemArea) -> void + { + auto* pagedVector = reinterpret_cast( + pagedVectorMemArea); + pagedVector->~PagedVector(); + }, + aggregationState); +} + + +AggregationPhysicalFunctionRegistryReturnType AggregationPhysicalFunctionGeneratedRegistrar::RegisterCrossDistanceAggregationPhysicalFunction( + AggregationPhysicalFunctionRegistryArguments) +{ + throw std::runtime_error("CROSS_DISTANCE aggregation cannot be created through the registry. " + "It requires four field functions (longitude, latitude, timestamp, vehicle_id)"); +} + +} diff --git a/nes-physical-operators/src/Aggregation/Function/Meos/PairMeetingAggregationPhysicalFunction.cpp b/nes-physical-operators/src/Aggregation/Function/Meos/PairMeetingAggregationPhysicalFunction.cpp new file mode 100644 index 0000000000..a9cce99347 --- /dev/null +++ b/nes-physical-operators/src/Aggregation/Function/Meos/PairMeetingAggregationPhysicalFunction.cpp @@ -0,0 +1,308 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +extern "C" { +#include +#include +} + +namespace NES +{ + +constexpr static std::string_view LonFieldName = "lon"; +constexpr static std::string_view LatFieldName = "lat"; +constexpr static std::string_view TimestampFieldName = "timestamp"; +constexpr static std::string_view VehicleIdFieldName = "vehicle_id"; + +static std::mutex pair_meeting_mutex; + +PairMeetingAggregationPhysicalFunction::PairMeetingAggregationPhysicalFunction( + DataType inputType, + DataType resultType, + PhysicalFunction lonFunctionParam, + PhysicalFunction latFunctionParam, + PhysicalFunction timestampFunctionParam, + PhysicalFunction vehicleIdFunctionParam, + Nautilus::Record::RecordFieldIdentifier resultFieldIdentifier, + std::shared_ptr bufferRef) + : AggregationPhysicalFunction(std::move(inputType), std::move(resultType), lonFunctionParam, std::move(resultFieldIdentifier)) + , bufferRef(std::move(bufferRef)) + , lonFunction(std::move(lonFunctionParam)) + , latFunction(std::move(latFunctionParam)) + , timestampFunction(std::move(timestampFunctionParam)) + , vehicleIdFunction(std::move(vehicleIdFunctionParam)) +{ +} + +void PairMeetingAggregationPhysicalFunction::lift( + const nautilus::val& aggregationState, PipelineMemoryProvider& pipelineMemoryProvider, const Nautilus::Record& record) +{ + const auto pagedVectorPtr = static_cast>(aggregationState); + + auto lonValue = lonFunction.execute(record, pipelineMemoryProvider.arena); + auto latValue = latFunction.execute(record, pipelineMemoryProvider.arena); + auto timestampValue = timestampFunction.execute(record, pipelineMemoryProvider.arena); + auto vehicleIdValue = vehicleIdFunction.execute(record, pipelineMemoryProvider.arena); + + Record aggregateStateRecord({ + {std::string(LonFieldName), lonValue}, + {std::string(LatFieldName), latValue}, + {std::string(TimestampFieldName), timestampValue}, + {std::string(VehicleIdFieldName), vehicleIdValue} + }); + + const Nautilus::Interface::PagedVectorRef pagedVectorRef(pagedVectorPtr, bufferRef); + pagedVectorRef.writeRecord(aggregateStateRecord, pipelineMemoryProvider.bufferProvider); +} + +void PairMeetingAggregationPhysicalFunction::combine( + const nautilus::val aggregationState1, + const nautilus::val aggregationState2, + PipelineMemoryProvider&) +{ + const auto memArea1 = static_cast>(aggregationState1); + const auto memArea2 = static_cast>(aggregationState2); + + nautilus::invoke( + +[](Nautilus::Interface::PagedVector* vector1, const Nautilus::Interface::PagedVector* vector2) -> void + { vector1->copyFrom(*vector2); }, + memArea1, + memArea2); +} + +Nautilus::Record PairMeetingAggregationPhysicalFunction::lower( + const nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) +{ + MEOS::Meos::ensureMeosInitialized(); + + const auto pagedVectorPtr = static_cast>(aggregationState); + const Nautilus::Interface::PagedVectorRef pagedVectorRef(pagedVectorPtr, bufferRef); + const auto allFieldNames = bufferRef->getMemoryLayout()->getSchema().getFieldNames(); + const auto numberOfEntries = invoke( + +[](const Nautilus::Interface::PagedVector* pagedVector) + { + return pagedVector->getTotalNumberOfEntries(); + }, + pagedVectorPtr); + + // Allocate an empty result buffer up-front; the lower step will fill it during the + // single pass over the PagedVector entries. + auto pairsBuffer = nautilus::invoke( + +[](const Nautilus::Interface::PagedVector* pagedVector) -> char* + { + // Worst case: every vehicle pair could meet. Pre-allocate ~80 bytes per emitted + // pair (BerlinMOD vehicle counts at the scaffold scale never exceed double digits + // per window, so this is a safe upper bound). + size_t bufferSize = pagedVector->getTotalNumberOfEntries() * 80 + 64; + char* buffer = (char*)malloc(bufferSize); + memset(buffer, 0, bufferSize); + return buffer; + }, + pagedVectorPtr); + + if (numberOfEntries == nautilus::val(0)) { + // Empty window — emit empty string + auto emptyLen = nautilus::val(0); + auto variableSized = pipelineMemoryProvider.arena.allocateVariableSizedData(emptyLen); + nautilus::invoke(+[](char* buffer) -> void { free(buffer); }, pairsBuffer); + Nautilus::Record resultRecord; + resultRecord.write(resultFieldIdentifier, variableSized); + return resultRecord; + } + + // Walk every entry; the lambda maintains a per-vehicle latest-position map. + // (Nautilus invoke ABI requires that all state be passed through pointer args; we + // model the map as a plain std::unordered_map> allocated + // via new and threaded as a void* through the invoke calls.) + auto vehicleMapPtr = nautilus::invoke( + +[]() -> void* + { + return new std::unordered_map>(); + }); + + const auto endIt = pagedVectorRef.end(allFieldNames); + for (auto candidateIt = pagedVectorRef.begin(allFieldNames); candidateIt != endIt; ++candidateIt) + { + const auto itemRecord = *candidateIt; + + const auto lonValue = itemRecord.read(std::string(LonFieldName)); + const auto latValue = itemRecord.read(std::string(LatFieldName)); + const auto timestampValue = itemRecord.read(std::string(TimestampFieldName)); + const auto vehicleIdValue = itemRecord.read(std::string(VehicleIdFieldName)); + + auto lon = lonValue.cast>(); + auto lat = latValue.cast>(); + auto timestamp = timestampValue.cast>(); + auto vehicleId = vehicleIdValue.cast>(); + + nautilus::invoke( + +[](void* mapPtr, double lonVal, double latVal, int64_t tsVal, uint64_t vid) -> void + { + auto* map = static_cast>*>(mapPtr); + // Overwrite-on-insert => map ends up holding the LATEST event per vehicle + // (since the PagedVector iteration preserves insertion order). + (*map)[vid] = std::make_tuple(lonVal, latVal, tsVal); + }, + vehicleMapPtr, lon, lat, timestamp, vehicleId); + } + + // Now enumerate pairs and check geog_dwithin(a, b, DMEET_METRES). + nautilus::invoke( + +[](void* mapPtr, char* outBuffer) -> void + { + std::lock_guard lock(pair_meeting_mutex); + auto* map = static_cast>*>(mapPtr); + + // Stable iteration order + std::vector vids; + vids.reserve(map->size()); + for (const auto& kv : *map) + { + vids.push_back(kv.first); + } + std::sort(vids.begin(), vids.end()); + + bool first = true; + for (size_t i = 0; i + 1 < vids.size(); ++i) + { + for (size_t j = i + 1; j < vids.size(); ++j) + { + const auto& [lonA, latA, tsA] = (*map)[vids[i]]; + const auto& [lonB, latB, tsB] = (*map)[vids[j]]; + + char wktA[80]; + char wktB[80]; + snprintf(wktA, sizeof(wktA), "SRID=4326;Point(%.7f %.7f)", lonA, latA); + snprintf(wktB, sizeof(wktB), "SRID=4326;Point(%.7f %.7f)", lonB, latB); + GSERIALIZED* gA = geom_in(wktA, -1); + GSERIALIZED* gB = geom_in(wktB, -1); + if (gA == nullptr || gB == nullptr) { + if (gA) free(gA); + if (gB) free(gB); + continue; + } + GSERIALIZED* ggA = geom_to_geog(gA); + GSERIALIZED* ggB = geom_to_geog(gB); + bool meets = geog_dwithin(ggA, ggB, PairMeetingAggregationPhysicalFunction::DMEET_METRES, true); + if (meets) { + // Use the later of the two timestamps as the meeting time + int64_t tsMax = (tsA > tsB) ? tsA : tsB; + // Approximate distance via geog distance (not exposed in meos_geo here yet); + // emit (vidA, vidB, ts, "≤dMeet") triple + char buf[128]; + snprintf(buf, sizeof(buf), "%s%lu,%lu,%lld,<=%.1f", + first ? "" : ";", + (unsigned long)vids[i], (unsigned long)vids[j], + (long long)tsMax, + PairMeetingAggregationPhysicalFunction::DMEET_METRES); + strcat(outBuffer, buf); + first = false; + } + free(ggA); + free(ggB); + free(gA); + free(gB); + } + } + delete map; + }, + vehicleMapPtr, pairsBuffer); + + // Allocate VARSIZED output sized to the assembled string + auto strLen = nautilus::invoke( + +[](const char* buffer) -> size_t { return strlen(buffer); }, + pairsBuffer); + + auto variableSized = pipelineMemoryProvider.arena.allocateVariableSizedData(strLen); + nautilus::invoke( + +[](int8_t* dest, const char* src, size_t len) -> void + { + if (len > 0) memcpy(dest, src, len); + free((void*)src); + }, + variableSized.getContent(), pairsBuffer, strLen); + + Nautilus::Record resultRecord; + resultRecord.write(resultFieldIdentifier, variableSized); + return resultRecord; +} + +void PairMeetingAggregationPhysicalFunction::reset(const nautilus::val aggregationState, PipelineMemoryProvider&) +{ + nautilus::invoke( + +[](AggregationState* pagedVectorMemArea) -> void + { + auto* pagedVector = reinterpret_cast(pagedVectorMemArea); + new (pagedVector) Nautilus::Interface::PagedVector(); + }, + aggregationState); +} + +size_t PairMeetingAggregationPhysicalFunction::getSizeOfStateInBytes() const +{ + return sizeof(Nautilus::Interface::PagedVector); +} + +void PairMeetingAggregationPhysicalFunction::cleanup(nautilus::val aggregationState) +{ + nautilus::invoke( + +[](AggregationState* pagedVectorMemArea) -> void + { + auto* pagedVector = reinterpret_cast( + pagedVectorMemArea); + pagedVector->~PagedVector(); + }, + aggregationState); +} + + +AggregationPhysicalFunctionRegistryReturnType AggregationPhysicalFunctionGeneratedRegistrar::RegisterPairMeetingAggregationPhysicalFunction( + AggregationPhysicalFunctionRegistryArguments) +{ + throw std::runtime_error("PAIR_MEETING aggregation cannot be created through the registry. " + "It requires four field functions (longitude, latitude, timestamp, vehicle_id)"); +} + +} diff --git a/nes-physical-operators/src/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.cpp b/nes-physical-operators/src/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.cpp new file mode 100644 index 0000000000..789624e3dd --- /dev/null +++ b/nes-physical-operators/src/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.cpp @@ -0,0 +1,271 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +// MEOS wrapper header + geo extension symbols for tpoint_length +#include +extern "C" { +#include +#include +} + +namespace NES +{ + +constexpr static std::string_view LonFieldName = "lon"; +constexpr static std::string_view LatFieldName = "lat"; +constexpr static std::string_view TimestampFieldName = "timestamp"; + +// Mutex for thread-safe MEOS operations +static std::mutex meos_length_mutex; + + +TemporalLengthAggregationPhysicalFunction::TemporalLengthAggregationPhysicalFunction( + DataType inputType, + DataType resultType, + PhysicalFunction lonFunctionParam, + PhysicalFunction latFunctionParam, + PhysicalFunction timestampFunctionParam, + Nautilus::Record::RecordFieldIdentifier resultFieldIdentifier, + std::shared_ptr bufferRef) + : AggregationPhysicalFunction(std::move(inputType), std::move(resultType), lonFunctionParam, std::move(resultFieldIdentifier)) + , bufferRef(std::move(bufferRef)) + , lonFunction(std::move(lonFunctionParam)) + , latFunction(std::move(latFunctionParam)) + , timestampFunction(std::move(timestampFunctionParam)) +{ +} + +void TemporalLengthAggregationPhysicalFunction::lift( + const nautilus::val& aggregationState, PipelineMemoryProvider& pipelineMemoryProvider, const Nautilus::Record& record) +{ + const auto pagedVectorPtr = static_cast>(aggregationState); + + auto lonValue = lonFunction.execute(record, pipelineMemoryProvider.arena); + auto latValue = latFunction.execute(record, pipelineMemoryProvider.arena); + auto timestampValue = timestampFunction.execute(record, pipelineMemoryProvider.arena); + + Record aggregateStateRecord({ + {std::string(LonFieldName), lonValue}, + {std::string(LatFieldName), latValue}, + {std::string(TimestampFieldName), timestampValue} + }); + + const Nautilus::Interface::PagedVectorRef pagedVectorRef(pagedVectorPtr, bufferRef); + pagedVectorRef.writeRecord(aggregateStateRecord, pipelineMemoryProvider.bufferProvider); +} + +void TemporalLengthAggregationPhysicalFunction::combine( + const nautilus::val aggregationState1, + const nautilus::val aggregationState2, + PipelineMemoryProvider&) +{ + const auto memArea1 = static_cast>(aggregationState1); + const auto memArea2 = static_cast>(aggregationState2); + + nautilus::invoke( + +[](Nautilus::Interface::PagedVector* vector1, const Nautilus::Interface::PagedVector* vector2) -> void + { vector1->copyFrom(*vector2); }, + memArea1, + memArea2); +} + +Nautilus::Record TemporalLengthAggregationPhysicalFunction::lower( + const nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) +{ + MEOS::Meos::ensureMeosInitialized(); + + const auto pagedVectorPtr = static_cast>(aggregationState); + const Nautilus::Interface::PagedVectorRef pagedVectorRef(pagedVectorPtr, bufferRef); + const auto allFieldNames = bufferRef->getMemoryLayout()->getSchema().getFieldNames(); + const auto numberOfEntries = invoke( + +[](const Nautilus::Interface::PagedVector* pagedVector) + { + return pagedVector->getTotalNumberOfEntries(); + }, + pagedVectorPtr); + + // Handle empty PagedVector case — zero-length trajectory + if (numberOfEntries == nautilus::val(0)) { + Nautilus::Record resultRecord; + resultRecord.write(resultFieldIdentifier, nautilus::val(0.0)); + return resultRecord; + } + + // Build the trajectory string in the same MEOS instant-set format that + // TemporalSequenceAggregationPhysicalFunction uses: {Point(lon lat)@ts, ...} + auto trajectoryStr = nautilus::invoke( + +[](const Nautilus::Interface::PagedVector* pagedVector) -> char* + { + size_t bufferSize = pagedVector->getTotalNumberOfEntries() * 150 + 50; + char* buffer = (char*)malloc(bufferSize); + memset(buffer, 0, bufferSize); + strcpy(buffer, "{"); + return buffer; + }, + pagedVectorPtr); + + auto pointCounter = nautilus::val(0); + + const auto endIt = pagedVectorRef.end(allFieldNames); + for (auto candidateIt = pagedVectorRef.begin(allFieldNames); candidateIt != endIt; ++candidateIt) + { + const auto itemRecord = *candidateIt; + + const auto lonValue = itemRecord.read(std::string(LonFieldName)); + const auto latValue = itemRecord.read(std::string(LatFieldName)); + const auto timestampValue = itemRecord.read(std::string(TimestampFieldName)); + + auto lon = lonValue.cast>(); + auto lat = latValue.cast>(); + auto timestamp = timestampValue.cast>(); + + trajectoryStr = nautilus::invoke( + +[](char* buffer, double lonVal, double latVal, int64_t tsVal, int64_t counter) -> char* + { + if (counter > 0) { + strcat(buffer, ", "); + } + + long long adjustedTime; + if (tsVal > 1000000000000LL) { + adjustedTime = tsVal / 1000; + } else { + adjustedTime = tsVal; + } + + std::string timestampString = MEOS::Meos::convertSecondsToTimestamp(adjustedTime); + const char* timestampStr = timestampString.c_str(); + + char pointStr[120]; + sprintf(pointStr, "Point(%.6f %.6f)@%s", lonVal, latVal, timestampStr); + strcat(buffer, pointStr); + return buffer; + }, + trajectoryStr, + lon, + lat, + timestamp, + pointCounter); + + pointCounter = pointCounter + nautilus::val(1); + } + + trajectoryStr = nautilus::invoke( + +[](char* buffer) -> char* + { + strcat(buffer, "}"); + return buffer; + }, + trajectoryStr); + + // Parse the assembled trajectory into a MEOS Temporal object, call + // tpoint_length on it, and free both the C string and the Temporal. + auto totalLength = nautilus::invoke( + +[](const char* trajStr) -> double + { + if (!trajStr || strlen(trajStr) == 0) { + free((void*)trajStr); + return 0.0; + } + + std::lock_guard lock(meos_length_mutex); + + std::string trajString(trajStr); + void* temp = MEOS::Meos::parseTemporalPoint(trajString); + if (!temp) { + free((void*)trajStr); + return 0.0; + } + + // tpoint_length is the MEOS C symbol from meos_geo.h. It returns the + // spheroidal length in the SRID's distance unit (metres for the + // BerlinMOD WGS84 trajectories that the scaffold streams). + double length = tpoint_length(static_cast(temp)); + + MEOS::Meos::freeTemporalObject(temp); + free((void*)trajStr); + return length; + }, + trajectoryStr); + + Nautilus::Record resultRecord; + resultRecord.write(resultFieldIdentifier, totalLength); + return resultRecord; +} + +void TemporalLengthAggregationPhysicalFunction::reset(const nautilus::val aggregationState, PipelineMemoryProvider&) +{ + nautilus::invoke( + +[](AggregationState* pagedVectorMemArea) -> void + { + auto* pagedVector = reinterpret_cast(pagedVectorMemArea); + new (pagedVector) Nautilus::Interface::PagedVector(); + }, + aggregationState); +} + +size_t TemporalLengthAggregationPhysicalFunction::getSizeOfStateInBytes() const +{ + return sizeof(Nautilus::Interface::PagedVector); +} + +void TemporalLengthAggregationPhysicalFunction::cleanup(nautilus::val aggregationState) +{ + nautilus::invoke( + +[](AggregationState* pagedVectorMemArea) -> void + { + auto* pagedVector = reinterpret_cast( + pagedVectorMemArea); + pagedVector->~PagedVector(); + }, + aggregationState); +} + + +AggregationPhysicalFunctionRegistryReturnType AggregationPhysicalFunctionGeneratedRegistrar::RegisterTemporalLengthAggregationPhysicalFunction( + AggregationPhysicalFunctionRegistryArguments) +{ + throw std::runtime_error("TEMPORAL_LENGTH aggregation cannot be created through the registry. " + "It requires three field functions (longitude, latitude, timestamp)"); +} + +} diff --git a/nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp b/nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp index c994b91a9d..f341188689 100644 --- a/nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp +++ b/nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp @@ -55,7 +55,13 @@ #include // Special-case lowering for TEMPORAL_SEQUENCE (multi-input) aggregation #include +#include +#include +#include #include +#include +#include +#include namespace NES { @@ -160,6 +166,99 @@ getAggregationPhysicalFunctions(const WindowedAggregationLogicalOperator& logica continue; } + // Custom lowering path for TEMPORAL_LENGTH: same three-input shape as TEMPORAL_SEQUENCE, + // returns a FLOAT64 (the spheroidal length of the per-(window, group) trajectory) instead of a VARSIZED WKB blob. + if (name == std::string_view("TemporalLength")) + { + auto tlDescriptor = std::dynamic_pointer_cast(descriptor); + INVARIANT(tlDescriptor != nullptr, "Expected TemporalLengthAggregationLogicalFunction for TemporalLength"); + + auto lonPF = QueryCompilation::FunctionProvider::lowerFunction(tlDescriptor->getLonField()); + auto latPF = QueryCompilation::FunctionProvider::lowerFunction(tlDescriptor->getLatField()); + auto tsPF = QueryCompilation::FunctionProvider::lowerFunction(tlDescriptor->getTimestampField()); + + Schema stateSchema; + stateSchema.addField("lon", tlDescriptor->getLonField().getDataType()); + stateSchema.addField("lat", tlDescriptor->getLatField().getDataType()); + stateSchema.addField("timestamp", tlDescriptor->getTimestampField().getDataType()); + auto tupleBufferRef = Interface::BufferRef::TupleBufferRef::create(configuration.pageSize.getValue(), stateSchema); + + auto phys = std::make_shared( + std::move(physicalInputType), + std::move(physicalFinalType), + lonPF, + latPF, + tsPF, + resultFieldIdentifier, + tupleBufferRef); + aggregationPhysicalFunctions.push_back(std::move(phys)); + continue; + } + + // Custom lowering path for PAIR_MEETING (Q5): four input fields (lon, lat, ts, vehicle_id); + // returns a VARSIZED string-encoded list of meeting pairs. + if (name == std::string_view("PairMeeting")) + { + auto pmDescriptor = std::dynamic_pointer_cast(descriptor); + INVARIANT(pmDescriptor != nullptr, "Expected PairMeetingAggregationLogicalFunction for PairMeeting"); + + auto lonPF = QueryCompilation::FunctionProvider::lowerFunction(pmDescriptor->getLonField()); + auto latPF = QueryCompilation::FunctionProvider::lowerFunction(pmDescriptor->getLatField()); + auto tsPF = QueryCompilation::FunctionProvider::lowerFunction(pmDescriptor->getTimestampField()); + auto vidPF = QueryCompilation::FunctionProvider::lowerFunction(pmDescriptor->getVehicleIdField()); + + Schema stateSchema; + stateSchema.addField("lon", pmDescriptor->getLonField().getDataType()); + stateSchema.addField("lat", pmDescriptor->getLatField().getDataType()); + stateSchema.addField("timestamp", pmDescriptor->getTimestampField().getDataType()); + stateSchema.addField("vehicle_id", pmDescriptor->getVehicleIdField().getDataType()); + auto tupleBufferRef = Interface::BufferRef::TupleBufferRef::create(configuration.pageSize.getValue(), stateSchema); + + auto phys = std::make_shared( + std::move(physicalInputType), + std::move(physicalFinalType), + lonPF, + latPF, + tsPF, + vidPF, + resultFieldIdentifier, + tupleBufferRef); + aggregationPhysicalFunctions.push_back(std::move(phys)); + continue; + } + + // Custom lowering path for CROSS_DISTANCE (Q9): four input fields (lon, lat, ts, vehicle_id); + // returns a FLOAT64 (distance between VID_A and VID_B latest positions in the window). + if (name == std::string_view("CrossDistance")) + { + auto cdDescriptor = std::dynamic_pointer_cast(descriptor); + INVARIANT(cdDescriptor != nullptr, "Expected CrossDistanceAggregationLogicalFunction for CrossDistance"); + + auto lonPF = QueryCompilation::FunctionProvider::lowerFunction(cdDescriptor->getLonField()); + auto latPF = QueryCompilation::FunctionProvider::lowerFunction(cdDescriptor->getLatField()); + auto tsPF = QueryCompilation::FunctionProvider::lowerFunction(cdDescriptor->getTimestampField()); + auto vidPF = QueryCompilation::FunctionProvider::lowerFunction(cdDescriptor->getVehicleIdField()); + + Schema stateSchema; + stateSchema.addField("lon", cdDescriptor->getLonField().getDataType()); + stateSchema.addField("lat", cdDescriptor->getLatField().getDataType()); + stateSchema.addField("timestamp", cdDescriptor->getTimestampField().getDataType()); + stateSchema.addField("vehicle_id", cdDescriptor->getVehicleIdField().getDataType()); + auto tupleBufferRef = Interface::BufferRef::TupleBufferRef::create(configuration.pageSize.getValue(), stateSchema); + + auto phys = std::make_shared( + std::move(physicalInputType), + std::move(physicalFinalType), + lonPF, + latPF, + tsPF, + vidPF, + resultFieldIdentifier, + tupleBufferRef); + aggregationPhysicalFunctions.push_back(std::move(phys)); + continue; + } + // Default path: use registry for single-input aggregations auto aggregationInputFunction = QueryCompilation::FunctionProvider::lowerFunction(descriptor->onField); auto aggregationArguments = AggregationPhysicalFunctionRegistryArguments( diff --git a/nes-sql-parser/AntlrSQL.g4 b/nes-sql-parser/AntlrSQL.g4 index 256726e087..c5f479f7c4 100644 --- a/nes-sql-parser/AntlrSQL.g4 +++ b/nes-sql-parser/AntlrSQL.g4 @@ -295,7 +295,7 @@ timeUnit: MS timestampParameter: name=identifier; -functionName: IDENTIFIER | AVG | MAX | MIN | SUM | COUNT | MEDIAN | ARRAY_AGG | VAR | TEMPORAL_SEQUENCE | TEMPORAL_EINTERSECTS_GEOMETRY | TEMPORAL_AINTERSECTS_GEOMETRY | TEMPORAL_ECONTAINS_GEOMETRY | EDWITHIN_TGEO_GEO | TGEO_AT_STBOX; +functionName: IDENTIFIER | AVG | MAX | MIN | SUM | COUNT | MEDIAN | ARRAY_AGG | VAR | TEMPORAL_SEQUENCE | TEMPORAL_LENGTH | PAIR_MEETING | CROSS_DISTANCE | TEMPORAL_EINTERSECTS_GEOMETRY | TEMPORAL_AINTERSECTS_GEOMETRY | TEMPORAL_ECONTAINS_GEOMETRY | EDWITHIN_TGEO_GEO | TGEO_AT_STBOX; sinkClause: INTO sink (',' sink)*; @@ -483,6 +483,9 @@ MEDIAN: 'MEDIAN' | 'median'; VAR: 'VAR' | 'var'; ARRAY_AGG: 'ARRAY_AGG' | 'array_agg'; TEMPORAL_SEQUENCE: 'TEMPORAL_SEQUENCE' | 'temporal_sequence'; +TEMPORAL_LENGTH: 'TEMPORAL_LENGTH' | 'temporal_length'; +PAIR_MEETING: 'PAIR_MEETING' | 'pair_meeting'; +CROSS_DISTANCE: 'CROSS_DISTANCE' | 'cross_distance'; TEMPORAL_EINTERSECTS_GEOMETRY: 'TEMPORAL_EINTERSECTS_GEOMETRY' | 'temporal_eintersects_geometry'; TEMPORAL_AINTERSECTS_GEOMETRY: 'TEMPORAL_AINTERSECTS_GEOMETRY' | 'temporal_aintersects_geometry'; TEMPORAL_ECONTAINS_GEOMETRY: 'TEMPORAL_ECONTAINS_GEOMETRY' | 'temporal_econtains_geometry'; diff --git a/nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp b/nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp index 4e9f1d7642..8d3dd0abfc 100644 --- a/nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp +++ b/nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp @@ -65,6 +65,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -915,14 +918,14 @@ void AntlrSQLQueryPlanCreator::exitFunctionCall(AntlrSQLParser::FunctionCallCont helpers.top().functionBuilder.pop_back(); const auto longitudeFunction = helpers.top().functionBuilder.back(); helpers.top().functionBuilder.pop_back(); - + // Verify all arguments are field access functions if (!longitudeFunction.tryGet() || !latitudeFunction.tryGet() || !timestampFunction.tryGet()) { throw InvalidQuerySyntax("TEMPORAL_SEQUENCE arguments must be field references"); } - + helpers.top().windowAggs.push_back( TemporalSequenceAggregationLogicalFunctionV2::create(longitudeFunction.get(), latitudeFunction.get(), @@ -932,6 +935,96 @@ void AntlrSQLQueryPlanCreator::exitFunctionCall(AntlrSQLParser::FunctionCallCont helpers.top().functionBuilder.push_back(longitudeFunction); } break; + case AntlrSQLLexer::TEMPORAL_LENGTH: + // Same three-input shape as TEMPORAL_SEQUENCE; differs only in the + // result type (FLOAT64 instead of VARSIZED). Closes BerlinMOD-Q6 to a + // full streaming-form cell. + if (helpers.top().functionBuilder.size() != 3) { + throw InvalidQuerySyntax("TEMPORAL_LENGTH requires exactly three arguments (longitude, latitude, timestamp), but got {}", helpers.top().functionBuilder.size()); + } + { + const auto timestampFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto latitudeFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto longitudeFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + + if (!longitudeFunction.tryGet() || + !latitudeFunction.tryGet() || + !timestampFunction.tryGet()) { + throw InvalidQuerySyntax("TEMPORAL_LENGTH arguments must be field references"); + } + + helpers.top().windowAggs.push_back( + TemporalLengthAggregationLogicalFunction::create(longitudeFunction.get(), + latitudeFunction.get(), + timestampFunction.get())); + helpers.top().functionBuilder.push_back(longitudeFunction); + } + break; + case AntlrSQLLexer::PAIR_MEETING: + // Four-field aggregation: lon, lat, ts, vehicle_id. Hardcoded DMEET inside the + // physical operator (BerlinMOD scaffold). Closes Q5 × 3 cells to full. + if (helpers.top().functionBuilder.size() != 4) { + throw InvalidQuerySyntax("PAIR_MEETING requires exactly four arguments (lon, lat, timestamp, vehicle_id), but got {}", helpers.top().functionBuilder.size()); + } + { + const auto vidFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto timestampFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto latitudeFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto longitudeFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + + if (!longitudeFunction.tryGet() || + !latitudeFunction.tryGet() || + !timestampFunction.tryGet() || + !vidFunction.tryGet()) { + throw InvalidQuerySyntax("PAIR_MEETING arguments must be field references"); + } + + helpers.top().windowAggs.push_back( + PairMeetingAggregationLogicalFunction::create(longitudeFunction.get(), + latitudeFunction.get(), + timestampFunction.get(), + vidFunction.get())); + helpers.top().functionBuilder.push_back(longitudeFunction); + } + break; + case AntlrSQLLexer::CROSS_DISTANCE: + // Same four-field shape as PAIR_MEETING; returns FLOAT64 (the distance between + // VID_A and VID_B's latest known positions in the window). Closes Q9 × 3 cells to full. + if (helpers.top().functionBuilder.size() != 4) { + throw InvalidQuerySyntax("CROSS_DISTANCE requires exactly four arguments (lon, lat, timestamp, vehicle_id), but got {}", helpers.top().functionBuilder.size()); + } + { + const auto vidFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto timestampFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto latitudeFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto longitudeFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + + if (!longitudeFunction.tryGet() || + !latitudeFunction.tryGet() || + !timestampFunction.tryGet() || + !vidFunction.tryGet()) { + throw InvalidQuerySyntax("CROSS_DISTANCE arguments must be field references"); + } + + helpers.top().windowAggs.push_back( + CrossDistanceAggregationLogicalFunction::create(longitudeFunction.get(), + latitudeFunction.get(), + timestampFunction.get(), + vidFunction.get())); + helpers.top().functionBuilder.push_back(longitudeFunction); + } + break; case AntlrSQLLexer::TEMPORAL_EINTERSECTS_GEOMETRY: { // Convert constants from constantBuilder to ConstantValueLogicalFunction objects @@ -1225,6 +1318,52 @@ void AntlrSQLQueryPlanCreator::exitFunctionCall(AntlrSQLParser::FunctionCallCont helpers.top().functionBuilder.pop_back(); helpers.top().windowAggs.push_back(TemporalSequenceAggregationLogicalFunctionV2::create(lon, lat, ts)); } + else if (funcName == "TEMPORAL_LENGTH") + { + if (helpers.top().functionBuilder.size() < 3) + { + throw InvalidQuerySyntax("TEMPORAL_LENGTH requires three arguments at {}", context->getText()); + } + const auto ts = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto lat = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto lon = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + helpers.top().windowAggs.push_back(TemporalLengthAggregationLogicalFunction::create(lon, lat, ts)); + } + else if (funcName == "PAIR_MEETING") + { + if (helpers.top().functionBuilder.size() < 4) + { + throw InvalidQuerySyntax("PAIR_MEETING requires four arguments at {}", context->getText()); + } + const auto vid = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto ts = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto lat = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto lon = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + helpers.top().windowAggs.push_back(PairMeetingAggregationLogicalFunction::create(lon, lat, ts, vid)); + } + else if (funcName == "CROSS_DISTANCE") + { + if (helpers.top().functionBuilder.size() < 4) + { + throw InvalidQuerySyntax("CROSS_DISTANCE requires four arguments at {}", context->getText()); + } + const auto vid = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto ts = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto lat = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto lon = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + helpers.top().windowAggs.push_back(CrossDistanceAggregationLogicalFunction::create(lon, lat, ts, vid)); + } else if (auto logicalFunction = LogicalFunctionProvider::tryProvide(funcName, helpers.top().functionBuilder)) { /// Remove exactly the functions used to create the 'logicalFunction' from the back of the function builder