diff --git a/src/geo/tgeompoint.cpp b/src/geo/tgeompoint.cpp index a70a7327..dbb2ff25 100644 --- a/src/geo/tgeompoint.cpp +++ b/src/geo/tgeompoint.cpp @@ -2184,28 +2184,32 @@ void TgeoFromTextExec(DataChunk &args, ExpressionState &state, Vector &result) { namespace { +/* spaceSplit / spaceTimeSplit support both literal and LATERAL + * column inputs via DuckDB's in_out_function plumbing. The bind + * callback only declares return types; per-row parameters are read + * at exec time from the input DataChunk. State carries iteration + * progress so STANDARD_VECTOR_SIZE-bounded output is produced + * lazily. Pattern mirrors duckdb/extension/icu/icu-table-range.cpp. + */ + struct SpaceSplitBindData : public TableFunctionData { - string_t blob; - double xsize, ysize, zsize; - string_t sorigin_blob; // empty -> default Point(0 0 0) - bool has_sorigin; - bool bitmatrix; - bool has_duration; - interval_t duration; - bool has_torigin; - timestamp_tz_t torigin; + bool with_time = false; }; -struct SpaceSplitGlobalState : public GlobalTableFunctionState { - idx_t idx = 0; - /* space_bins[i] is the raw EWKB serialisation of the i-th spatial bin - * (GSERIALIZED -> EWKB at Init time, decoded into the result vector's - * geometry format at Exec time using DuckDB-spatial's wkb_reader). - * tpoint blobs are pre-built TGEOMPOINT-aliased BLOB values. - * time_bin is populated only by the spaceTimeSplit overload. */ +struct SpaceSplitLocalState : public LocalTableFunctionState { + idx_t current_input_row = 0; + bool initialized_row = false; + idx_t out_idx = 0; std::vector> space_ewkb; std::vector time_bin; std::vector tpoint; + + void Reset() { + space_ewkb.clear(); + time_bin.clear(); + tpoint.clear(); + out_idx = 0; + } }; GSERIALIZED *DefaultOriginSplit() { @@ -2217,43 +2221,8 @@ unique_ptr SpaceSplitBindCommon(ClientContext &context, vector &return_types, vector &names, bool with_time) { - if (input.inputs.empty() || input.inputs[0].IsNull()) { - throw BinderException("spaceSplit: tgeompoint input must be non-null"); - } - auto bd = make_uniq(); - bd->blob = StringValue::Get(input.inputs[0]); - bd->xsize = input.inputs[1].GetValue(); - bd->ysize = input.inputs[2].GetValue(); - bd->zsize = input.inputs[3].GetValue(); - bd->has_sorigin = false; - bd->bitmatrix = true; - bd->has_duration = with_time; - bd->has_torigin = false; - - idx_t opt_idx = 4; - if (with_time) { - bd->duration = input.inputs[opt_idx++].GetValue(); - } - if (input.inputs.size() > opt_idx) { - Value v = input.inputs[opt_idx++]; - if (!v.IsNull()) { - bd->sorigin_blob = StringValue::Get(v); - bd->has_sorigin = true; - } - } - if (with_time && input.inputs.size() > opt_idx) { - Value v = input.inputs[opt_idx++]; - if (!v.IsNull()) { - bd->torigin = v.GetValue(); - bd->has_torigin = true; - } - } - if (input.inputs.size() > opt_idx) { - Value v = input.inputs[opt_idx++]; - if (!v.IsNull()) bd->bitmatrix = v.GetValue(); - } - + bd->with_time = with_time; if (with_time) { return_types = {GeoTypes::GEOMETRY(), LogicalType::TIMESTAMP_TZ, TgeompointType::TGEOMPOINT()}; names = {"spaceBin", "timeBin", "tpoint"}; @@ -2274,40 +2243,98 @@ unique_ptr SpaceTimeSplitBind(ClientContext &context, TableFunctio return SpaceSplitBindCommon(context, input, return_types, names, /*with_time=*/true); } -unique_ptr SpaceSplitInitCommon(ClientContext &context, - TableFunctionInitInput &input, - bool with_time) { - auto &bind = input.bind_data->Cast(); - auto state = make_uniq(); +unique_ptr SpaceSplitLocalInit(ExecutionContext &context, + TableFunctionInitInput &input, + GlobalTableFunctionState *) { + return make_uniq(); +} + +void EmitSpaceBinAt(ClientContext &context, Vector &col, idx_t row, + const std::vector &ewkb) { + if (ewkb.empty()) { + FlatVector::SetNull(col, row, true); + return; + } + GSERIALIZED *gs = geo_from_ewkb(ewkb.data(), ewkb.size(), 0); + if (!gs) { + FlatVector::SetNull(col, row, true); + return; + } + ArenaAllocator arena(BufferAllocator::Get(context)); + string_t enc = GSerializedToGeometry(gs, arena, col); + auto out_data = FlatVector::GetData(col); + out_data[row] = StringVector::AddStringOrBlob(col, enc); + free(gs); +} - /* Materialise input temporal */ - size_t in_size = bind.blob.GetSize(); - Temporal *temp = (Temporal *)malloc(in_size); - memcpy(temp, bind.blob.GetData(), in_size); +/* Read one input row's parameters and pre-compute all of its split + * bins into the local state. Called once per input row by the + * in_out_function below. + * + * Input column layout: + * spaceSplit: [0]=tgeompoint, [1]=xsize, [2]=ysize, [3]=zsize, + * [4]=origin (geometry, optional), + * [5]=bitmatrix (bool, optional) + * spaceTimeSplit:[0]=tgeompoint, [1]=xsize, [2]=ysize, [3]=zsize, + * [4]=duration (interval), + * [5]=origin (geometry, optional), + * [6]=torigin (timestamp_tz, optional), + * [7]=bitmatrix (bool, optional) + */ +void LoadSpaceSplitRow(ClientContext &context, SpaceSplitLocalState &state, + DataChunk &input, idx_t row_idx, bool with_time) { + state.Reset(); + for (idx_t c = 0; c < input.ColumnCount(); c++) { + input.data[c].Flatten(input.size()); + } + if (FlatVector::IsNull(input.data[0], row_idx)) { + return; + } + string_t blob = FlatVector::GetData(input.data[0])[row_idx]; + double xsize = FlatVector::GetData(input.data[1])[row_idx]; + double ysize = FlatVector::GetData(input.data[2])[row_idx]; + double zsize = FlatVector::GetData(input.data[3])[row_idx]; - /* Build origin geometry */ + idx_t col = 4; + MeosInterval mi{}; + if (with_time) { + if (input.ColumnCount() <= col || FlatVector::IsNull(input.data[col], row_idx)) { + return; + } + mi = IntervaltToInterval(FlatVector::GetData(input.data[col])[row_idx]); + col++; + } GSERIALIZED *origin = nullptr; - if (bind.has_sorigin) { - origin = GeometryToGSerialized(bind.sorigin_blob, 0); + if (input.ColumnCount() > col && !FlatVector::IsNull(input.data[col], row_idx)) { + string_t origin_blob = FlatVector::GetData(input.data[col])[row_idx]; + origin = GeometryToGSerialized(origin_blob, 0); + } + col++; + TimestampTz torigin = 0; + if (with_time && input.ColumnCount() > col && !FlatVector::IsNull(input.data[col], row_idx)) { + timestamp_tz_t t = FlatVector::GetData(input.data[col])[row_idx]; + torigin = (TimestampTz) DuckDBToMeosTimestamp(t).value; + col++; + } + bool bitmatrix = true; + if (input.ColumnCount() > col && !FlatVector::IsNull(input.data[col], row_idx)) { + bitmatrix = FlatVector::GetData(input.data[col])[row_idx]; } if (!origin) origin = DefaultOriginSplit(); + Temporal *temp = (Temporal *) malloc(blob.GetSize()); + memcpy(temp, blob.GetData(), blob.GetSize()); + int count = 0; Temporal **trajs = nullptr; GSERIALIZED **bins = nullptr; TimestampTz *tbins = nullptr; if (with_time) { - MeosInterval mi = IntervaltToInterval(bind.duration); - TimestampTz torigin = 0; - if (bind.has_torigin) { - torigin = (TimestampTz) DuckDBToMeosTimestamp(bind.torigin).value; - } - trajs = tgeo_space_time_split(temp, bind.xsize, bind.ysize, bind.zsize, - &mi, origin, torigin, bind.bitmatrix, true, - &bins, &tbins, &count); + trajs = tgeo_space_time_split(temp, xsize, ysize, zsize, &mi, origin, torigin, + bitmatrix, true, &bins, &tbins, &count); } else { - trajs = tgeo_space_split(temp, bind.xsize, bind.ysize, bind.zsize, - origin, bind.bitmatrix, true, &bins, &count); + trajs = tgeo_space_split(temp, xsize, ysize, zsize, origin, bitmatrix, true, + &bins, &count); } free(temp); free(origin); @@ -2316,99 +2343,97 @@ unique_ptr SpaceSplitInitCommon(ClientContext &context if (trajs) free(trajs); if (bins) free(bins); if (tbins) free(tbins); - return std::move(state); + return; } - - state->space_ewkb.reserve(count); - if (with_time) state->time_bin.reserve(count); - state->tpoint.reserve(count); - + state.space_ewkb.reserve(count); + if (with_time) state.time_bin.reserve(count); + state.tpoint.reserve(count); for (int i = 0; i < count; i++) { - /* Capture the spaceBin as EWKB; defer DuckDB-spatial encoding to Exec - * (where we have an arena allocator scoped to the result vector). */ size_t wkb_sz = 0; uint8_t *wkb = geo_as_ewkb(bins[i], nullptr, &wkb_sz); if (wkb) { - state->space_ewkb.emplace_back(wkb, wkb + wkb_sz); + state.space_ewkb.emplace_back(wkb, wkb + wkb_sz); free(wkb); } else { - state->space_ewkb.emplace_back(); + state.space_ewkb.emplace_back(); } free(bins[i]); - if (with_time) { timestamp_tz_t t = MeosToDuckDBTimestamp(timestamp_tz_t((int64_t) tbins[i])); - state->time_bin.emplace_back(Value::TIMESTAMPTZ(t)); + state.time_bin.emplace_back(Value::TIMESTAMPTZ(t)); } - size_t sz = temporal_mem_size(trajs[i]); Value tblob = Value::BLOB(reinterpret_cast(trajs[i]), sz); tblob.Reinterpret(TgeompointType::TGEOMPOINT()); - state->tpoint.push_back(std::move(tblob)); + state.tpoint.push_back(std::move(tblob)); free(trajs[i]); } free(trajs); free(bins); if (tbins) free(tbins); - return std::move(state); } -unique_ptr SpaceSplitInit(ClientContext &context, - TableFunctionInitInput &input) { - return SpaceSplitInitCommon(context, input, false); -} - -unique_ptr SpaceTimeSplitInit(ClientContext &context, - TableFunctionInitInput &input) { - return SpaceSplitInitCommon(context, input, true); -} - -void EmitSpaceBinAt(ClientContext &context, Vector &col, idx_t row, - const std::vector &ewkb) { - if (ewkb.empty()) { - FlatVector::SetNull(col, row, true); - return; - } - GSERIALIZED *gs = geo_from_ewkb(ewkb.data(), ewkb.size(), 0); - if (!gs) { - FlatVector::SetNull(col, row, true); - return; +OperatorResultType SpaceSplitInOutCommon(ExecutionContext &context, TableFunctionInput &data_p, + DataChunk &input, DataChunk &output, bool with_time) { + auto &state = data_p.local_state->Cast(); + idx_t out_row = 0; + + /* PhysicalTableScan resets `output` (chunk.size()=0) before each call, + * and treats output.size()==0 as FINISHED regardless of the returned + * OperatorResultType. PhysicalTableInOutFunction (LATERAL) honours + * the return value. Mirror duckdb/extension/icu/icu-table-range.cpp: + * leave the chunk untouched (size 0) when the input chunk is fully + * drained and return NEED_MORE_INPUT, so the source-scan path stops + * and the LATERAL path advances to the next input chunk. + */ + + while (out_row < STANDARD_VECTOR_SIZE) { + if (!state.initialized_row) { + if (state.current_input_row >= input.size()) { + /* Chunk drained. If we emitted rows in this call, return + * them as HAVE_MORE_OUTPUT and leave state.current_input_row + * at input.size() so the next invocation falls straight back + * into this branch with out_row == 0 (FINISHED in the source- + * scan path, NEED_MORE_INPUT advances chunks in the LATERAL + * path). */ + if (out_row > 0) { + output.SetCardinality(out_row); + return OperatorResultType::HAVE_MORE_OUTPUT; + } + state.current_input_row = 0; + return OperatorResultType::NEED_MORE_INPUT; + } + LoadSpaceSplitRow(context.client, state, input, state.current_input_row, with_time); + state.initialized_row = true; + } + if (state.out_idx >= state.tpoint.size()) { + state.current_input_row++; + state.initialized_row = false; + continue; + } + if (with_time) { + EmitSpaceBinAt(context.client, output.data[0], out_row, state.space_ewkb[state.out_idx]); + output.data[1].SetValue(out_row, state.time_bin[state.out_idx]); + output.data[2].SetValue(out_row, state.tpoint[state.out_idx]); + } else { + EmitSpaceBinAt(context.client, output.data[0], out_row, state.space_ewkb[state.out_idx]); + output.data[1].SetValue(out_row, state.tpoint[state.out_idx]); + } + state.out_idx++; + out_row++; } - ArenaAllocator arena(BufferAllocator::Get(context)); - string_t enc = GSerializedToGeometry(gs, arena, col); - auto out_data = FlatVector::GetData(col); - out_data[row] = StringVector::AddStringOrBlob(col, enc); - free(gs); + output.SetCardinality(out_row); + return OperatorResultType::HAVE_MORE_OUTPUT; } -void SpaceSplitExec(ClientContext &context, TableFunctionInput &input, DataChunk &output) { - auto &state = input.global_state->Cast(); - idx_t remaining = state.tpoint.size() - state.idx; - idx_t emit = MinValue(STANDARD_VECTOR_SIZE, remaining); - auto &space_col = output.data[0]; - auto &tpoint_col = output.data[1]; - for (idx_t i = 0; i < emit; i++) { - EmitSpaceBinAt(context, space_col, i, state.space_ewkb[state.idx]); - tpoint_col.SetValue(i, state.tpoint[state.idx]); - state.idx++; - } - output.SetCardinality(emit); +OperatorResultType SpaceSplitInOut(ExecutionContext &context, TableFunctionInput &data_p, + DataChunk &input, DataChunk &output) { + return SpaceSplitInOutCommon(context, data_p, input, output, /*with_time=*/false); } -void SpaceTimeSplitExec(ClientContext &context, TableFunctionInput &input, DataChunk &output) { - auto &state = input.global_state->Cast(); - idx_t remaining = state.tpoint.size() - state.idx; - idx_t emit = MinValue(STANDARD_VECTOR_SIZE, remaining); - auto &space_col = output.data[0]; - auto &time_col = output.data[1]; - auto &tpoint_col = output.data[2]; - for (idx_t i = 0; i < emit; i++) { - EmitSpaceBinAt(context, space_col, i, state.space_ewkb[state.idx]); - time_col.SetValue(i, state.time_bin[state.idx]); - tpoint_col.SetValue(i, state.tpoint[state.idx]); - state.idx++; - } - output.SetCardinality(emit); +OperatorResultType SpaceTimeSplitInOut(ExecutionContext &context, TableFunctionInput &data_p, + DataChunk &input, DataChunk &output) { + return SpaceSplitInOutCommon(context, data_p, input, output, /*with_time=*/true); } void TgeoGeoMeasureExec(DataChunk &args, ExpressionState &state, Vector &result) { @@ -2498,7 +2523,14 @@ void TgeompointType::RegisterTpointSplit(ExtensionLoader &loader) { const auto G = GeoTypes::GEOMETRY(); const auto B = LogicalType::BOOLEAN; - /* spaceSplit overloads (tgeompoint, xsize, ysize, zsize[, sorigin geom[, bitmatrix bool]]) */ + /* spaceSplit overloads (tgeompoint, xsize, ysize, zsize[, sorigin geom[, bitmatrix bool]]) + * + * Uses `in_out_function` so the call accepts LATERAL column + * references in addition to literals. The bind callback only + * declares return-types/names; per-row parameters are read at + * exec time from the input DataChunk. Pattern mirrors + * duckdb/extension/icu/icu-table-range.cpp. + */ { std::vector> arg_lists = { {T, D, D, D}, @@ -2506,7 +2538,9 @@ void TgeompointType::RegisterTpointSplit(ExtensionLoader &loader) { {T, D, D, D, G, B}, }; for (auto &args : arg_lists) { - TableFunction fn("spaceSplit", args, SpaceSplitExec, SpaceSplitBind, SpaceSplitInit); + TableFunction fn("spaceSplit", args, /*function=*/nullptr, SpaceSplitBind, + /*init_global=*/nullptr, SpaceSplitLocalInit); + fn.in_out_function = SpaceSplitInOut; loader.RegisterFunction(fn); } } @@ -2520,7 +2554,9 @@ void TgeompointType::RegisterTpointSplit(ExtensionLoader &loader) { {T, D, D, D, I, G, TS, B}, }; for (auto &args : arg_lists) { - TableFunction fn("spaceTimeSplit", args, SpaceTimeSplitExec, SpaceTimeSplitBind, SpaceTimeSplitInit); + TableFunction fn("spaceTimeSplit", args, /*function=*/nullptr, SpaceTimeSplitBind, + /*init_global=*/nullptr, SpaceSplitLocalInit); + fn.in_out_function = SpaceTimeSplitInOut; loader.RegisterFunction(fn); } } diff --git a/test/sql/parity/058c_tpoint_split_lateral.test b/test/sql/parity/058c_tpoint_split_lateral.test new file mode 100644 index 00000000..e8dd851d --- /dev/null +++ b/test/sql/parity/058c_tpoint_split_lateral.test @@ -0,0 +1,43 @@ +# name: test/sql/parity/058c_tpoint_split_lateral.test +# description: spaceSplit / spaceTimeSplit consumed via LATERAL join. +# Driven by `in_out_function` registration so the splitter +# accepts both literal arguments and per-row column inputs. +# group: [sql] + +require mobilityduck + +statement ok +CREATE TABLE trip(id INTEGER, traj TGEOMPOINT); + +statement ok +INSERT INTO trip VALUES + (1, tgeompoint '[POINT(0 0)@2000-01-01 00:00:00+00, POINT(10 10)@2000-01-05 00:00:00+00]'); + +# Same trajectory + parameters as 058b_tpoint_split.test query 1, but the +# tgeompoint comes from a table column rather than a literal — exercises +# the LATERAL column-input path of the in_out_function plumbing. + +query I +SELECT count(*) +FROM trip, LATERAL spaceSplit(traj, 5.0, 5.0, 5.0); +---- +3 + +# Geometry/subtype check via LATERAL — must match the literal-input case. + +query II +SELECT ST_AsText(spaceBin), tempSubtype(tpoint) +FROM trip, LATERAL spaceSplit(traj, 5.0, 5.0, 5.0) +ORDER BY ST_AsText(spaceBin); +---- +POINT (0 0) SequenceSet +POINT (10 10) SequenceSet +POINT (5 5) SequenceSet + +# spaceTimeSplit via LATERAL. + +query I +SELECT count(*) +FROM trip, LATERAL spaceTimeSplit(traj, 5.0, 5.0, 5.0, INTERVAL '2 days'); +---- +3