Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions common/tl/constants/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ namespace vk {
namespace tl {
namespace common {

namespace tracing::traceContext {
static constexpr uint32_t return_reserved_status_0 = 1 << 0;
static constexpr uint32_t return_reserved_status_1 = 1 << 1;
static constexpr uint32_t parent_id = 1 << 2;
static constexpr uint32_t source_id = 1 << 3;
static constexpr uint32_t return_reserved_level_0 = 1 << 4;
static constexpr uint32_t return_reserved_level_1 = 1 << 5;
static constexpr uint32_t return_reserved_level_2 = 1 << 6;
static constexpr uint32_t return_debug = 1 << 7;
} // namespace tracing::traceContext
namespace tracing::trace_context_flags {
inline constexpr uint32_t reserved_status_0 = 1U << 0U;
inline constexpr uint32_t reserved_status_1 = 1U << 1U;
inline constexpr uint32_t parent_id = 1U << 2U;
inline constexpr uint32_t source_id = 1U << 3U;
inline constexpr uint32_t reserved_level_0 = 1U << 4U;
inline constexpr uint32_t reserved_level_1 = 1U << 5U;
inline constexpr uint32_t reserved_level_2 = 1U << 6U;
inline constexpr uint32_t debug = 1U << 7U;
} // namespace tracing::trace_context_flags

namespace rpc_invoke_req_extra_flags {
inline constexpr uint32_t return_binlog_pos = 1U << 0U;
Expand Down
28 changes: 14 additions & 14 deletions common/tl/tl-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ struct traceID final {
};

class traceContext final {
static constexpr uint32_t RETURN_RESERVED_STATUS_0_FLAG = vk::tl::common::tracing::traceContext::return_reserved_status_0;
static constexpr uint32_t RETURN_RESERVED_STATUS_1_FLAG = vk::tl::common::tracing::traceContext::return_reserved_status_1;
static constexpr uint32_t PARENT_ID_FLAG = vk::tl::common::tracing::traceContext::parent_id;
static constexpr uint32_t SOURCE_ID_FLAG = vk::tl::common::tracing::traceContext::source_id;
static constexpr uint32_t RETURN_RESERVED_LEVEL_0_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_0;
static constexpr uint32_t RETURN_RESERVED_LEVEL_1_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_1;
static constexpr uint32_t RETURN_RESERVED_LEVEL_2_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_2;
static constexpr uint32_t RETURN_DEBUG_FLAG = vk::tl::common::tracing::traceContext::return_debug;
static constexpr uint32_t RESERVED_STATUS_0_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_status_0;
static constexpr uint32_t RESERVED_STATUS_1_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_status_1;
static constexpr uint32_t PARENT_ID_FLAG = vk::tl::common::tracing::trace_context_flags::parent_id;
static constexpr uint32_t SOURCE_ID_FLAG = vk::tl::common::tracing::trace_context_flags::source_id;
static constexpr uint32_t RESERVED_LEVEL_0_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_level_0;
static constexpr uint32_t RESERVED_LEVEL_1_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_level_1;
static constexpr uint32_t RESERVED_LEVEL_2_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_level_2;
static constexpr uint32_t DEBUG_FLAG = vk::tl::common::tracing::trace_context_flags::debug;

public:
tracing::traceID trace_id{};
Expand Down Expand Up @@ -169,12 +169,12 @@ class traceContext final {
ok = ok && !tl_fetch_error();
}

reserved_status_0 = static_cast<bool>(fields_mask & RETURN_RESERVED_STATUS_0_FLAG);
reserved_status_1 = static_cast<bool>(fields_mask & RETURN_RESERVED_STATUS_1_FLAG);
reserved_level_0 = static_cast<bool>(fields_mask & RETURN_RESERVED_LEVEL_0_FLAG);
reserved_level_1 = static_cast<bool>(fields_mask & RETURN_RESERVED_LEVEL_1_FLAG);
reserved_level_2 = static_cast<bool>(fields_mask & RETURN_RESERVED_LEVEL_2_FLAG);
debug_flag = static_cast<bool>(fields_mask & RETURN_DEBUG_FLAG);
reserved_status_0 = static_cast<bool>(fields_mask & RESERVED_STATUS_0_FLAG);
reserved_status_1 = static_cast<bool>(fields_mask & RESERVED_STATUS_1_FLAG);
reserved_level_0 = static_cast<bool>(fields_mask & RESERVED_LEVEL_0_FLAG);
reserved_level_1 = static_cast<bool>(fields_mask & RESERVED_LEVEL_1_FLAG);
reserved_level_2 = static_cast<bool>(fields_mask & RESERVED_LEVEL_2_FLAG);
debug_flag = static_cast<bool>(fields_mask & DEBUG_FLAG);

return ok;
}
Expand Down
9 changes: 7 additions & 2 deletions runtime-light/server/rpc/init-functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <optional>
#include <string_view>
#include <type_traits>
#include <utility>
Expand All @@ -17,6 +18,7 @@
#include "runtime-common/core/std/containers.h"
#include "runtime-light/core/globals/php-script-globals.h"
#include "runtime-light/server/rpc/rpc-server-state.h"
#include "runtime-light/stdlib/diagnostics/logs.h"
#include "runtime-light/streams/stream.h"
#include "runtime-light/tl/tl-core.h"
#include "runtime-light/tl/tl-functions.h"
Expand Down Expand Up @@ -209,8 +211,11 @@ void init_server(kphp::component::stream&& request_stream, kphp::stl::vector<std
if (invoke_rpc.opt_actor_id) {
superglobals.v$_SERVER.set_value(string{RPC_ACTOR_ID.data(), RPC_ACTOR_ID.size()}, (*invoke_rpc.opt_actor_id).value);
}

const std::optional<tl::mask> opt_extra_fields_mask{invoke_rpc.opt_extra.transform([](const auto& extra) noexcept { return extra.get_flags(); })};
if (invoke_rpc.opt_extra) {
superglobals.v$_SERVER.set_value(string{RPC_EXTRA_FLAGS.data(), RPC_EXTRA_FLAGS.size()}, static_cast<int64_t>((*invoke_rpc.opt_extra).get_flags().value));
kphp::log::assertion(opt_extra_fields_mask.has_value());
superglobals.v$_SERVER.set_value(string{RPC_EXTRA_FLAGS.data(), RPC_EXTRA_FLAGS.size()}, static_cast<int64_t>((*opt_extra_fields_mask).value));
process_rpc_invoke_req_extra(*invoke_rpc.opt_extra, superglobals);
}
kphp::log::info("rpc server initialized with: "
Expand All @@ -222,7 +227,7 @@ void init_server(kphp::component::stream&& request_stream, kphp::stl::vector<std
"request -> {:#x}",
invoke_rpc.net_pid.get_pid(), invoke_rpc.net_pid.get_port(), invoke_rpc.query_id.value,
invoke_rpc.opt_actor_id.has_value() ? (*invoke_rpc.opt_actor_id).value : 0,
invoke_rpc.opt_extra.has_value() ? (*invoke_rpc.opt_extra).get_flags().value : 0, request_magic.value);
opt_extra_fields_mask.has_value() ? (*opt_extra_fields_mask).value : 0, request_magic.value);
}

} // namespace kphp::rpc
3 changes: 2 additions & 1 deletion runtime-light/stdlib/rpc/rpc-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ inline kphp::coro::task<> f$rpc_server_store_response(class_instance<C$VK$TL$Rpc
// as we are in a coroutine, we must own the data to prevent it from being overwritten by another coroutine,
// so create a TLBuffer owned by this coroutine
auto& rpc_server_instance_st{RpcServerInstanceState::get()};
tl::K2RpcResponse rpc_response{.value = tl::k2RpcResponseHeader{.flags = {}, .extra = {}, .result = rpc_server_instance_st.tl_storer.view()}};
tl::K2RpcResponse rpc_response{.value =
tl::k2RpcResponseHeader{.flags = {}, .extra_flags = {}, .extra = {}, .result = rpc_server_instance_st.tl_storer.view()}};
tl::storer tls{rpc_response.footprint()};
rpc_response.store(tls);

Expand Down
13 changes: 8 additions & 5 deletions runtime-light/tl/tl-functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ class K2InvokeRpc final {
static constexpr auto EXTRA_FLAG = static_cast<uint32_t>(1U << 1U);

public:
tl::mask flags{};
tl::i64 query_id{};
tl::netPid net_pid{};
std::optional<tl::i64> opt_actor_id;
Expand All @@ -314,14 +313,18 @@ class K2InvokeRpc final {
bool fetch(tl::fetcher& tlf) noexcept {
tl::magic magic{};
bool ok{magic.fetch(tlf) && magic.expect(K2_INVOKE_RPC_MAGIC)};
ok = ok && flags.fetch(tlf);

tl::mask fields_mask{};
ok = ok && fields_mask.fetch(tlf);
ok = ok && query_id.fetch(tlf);
ok = ok && net_pid.fetch(tlf);
if (static_cast<bool>(flags.value & ACTOR_ID_FLAG)) {
if (static_cast<bool>(fields_mask.value & ACTOR_ID_FLAG)) {
ok = ok && opt_actor_id.emplace().fetch(tlf);
}
if (static_cast<bool>(flags.value & EXTRA_FLAG)) {
ok = ok && opt_extra.emplace().fetch(tlf);
if (static_cast<bool>(fields_mask.value & EXTRA_FLAG)) {
tl::mask extra_fields_mask{};
ok = ok && extra_fields_mask.fetch(tlf);
ok = ok && opt_extra.emplace().fetch(tlf, extra_fields_mask);
}
const auto opt_query{tlf.fetch_bytes(tlf.remaining())};
query = opt_query.value_or(std::span<const std::byte>{});
Expand Down
50 changes: 24 additions & 26 deletions runtime-light/tl/tl-types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ bool CertInfoItem::fetch(tl::fetcher& tlf) noexcept {

// ===== RPC =====

bool rpcInvokeReqExtra::fetch(tl::fetcher& tlf) noexcept {
tl::mask flags{};
bool ok{flags.fetch(tlf)};
bool rpcInvokeReqExtra::fetch(tl::fetcher& tlf, const tl::mask& flags) noexcept {
bool ok{true};
if (ok && static_cast<bool>(flags.value & WAIT_BINLOG_POS_FLAG)) {
ok &= opt_wait_binlog_pos.emplace().fetch(tlf);
}
Expand Down Expand Up @@ -197,30 +196,29 @@ bool rpcInvokeReqExtra::fetch(tl::fetcher& tlf) noexcept {
tl::mask rpcInvokeReqExtra::get_flags() const noexcept {
tl::mask flags{.value = static_cast<tl::mask::underlying_type>(return_binlog_pos)};

flags.value |= static_cast<tl::mask::underlying_type>(return_binlog_time) << 1;
flags.value |= static_cast<tl::mask::underlying_type>(return_pid) << 2;
flags.value |= static_cast<tl::mask::underlying_type>(return_request_sizes) << 3;
flags.value |= static_cast<tl::mask::underlying_type>(return_failed_subqueries) << 4;
flags.value |= static_cast<tl::mask::underlying_type>(return_query_stats) << 6;
flags.value |= static_cast<tl::mask::underlying_type>(no_result) << 7;
flags.value |= static_cast<tl::mask::underlying_type>(return_view_number) << 27;

flags.value |= static_cast<tl::mask::underlying_type>(opt_wait_binlog_pos.has_value()) << 16;
flags.value |= static_cast<tl::mask::underlying_type>(opt_string_forward_keys.has_value()) << 18;
flags.value |= static_cast<tl::mask::underlying_type>(opt_int_forward_keys.has_value()) << 19;
flags.value |= static_cast<tl::mask::underlying_type>(opt_string_forward.has_value()) << 20;
flags.value |= static_cast<tl::mask::underlying_type>(opt_int_forward.has_value()) << 21;
flags.value |= static_cast<tl::mask::underlying_type>(opt_custom_timeout_ms.has_value()) << 23;
flags.value |= static_cast<tl::mask::underlying_type>(opt_supported_compression_version.has_value()) << 25;
flags.value |= static_cast<tl::mask::underlying_type>(opt_random_delay.has_value()) << 26;
flags.value |= static_cast<tl::mask::underlying_type>(opt_persistent_query.has_value()) << 28;
flags.value |= static_cast<tl::mask::underlying_type>(opt_trace_context.has_value()) << 29;
flags.value |= static_cast<tl::mask::underlying_type>(opt_execution_context.has_value()) << 30;
flags.value |= static_cast<tl::mask::underlying_type>(return_binlog_time) << 1U;
flags.value |= static_cast<tl::mask::underlying_type>(return_pid) << 2U;
flags.value |= static_cast<tl::mask::underlying_type>(return_request_sizes) << 3U;
flags.value |= static_cast<tl::mask::underlying_type>(return_failed_subqueries) << 4U;
flags.value |= static_cast<tl::mask::underlying_type>(return_query_stats) << 6U;
flags.value |= static_cast<tl::mask::underlying_type>(no_result) << 7U;
flags.value |= static_cast<tl::mask::underlying_type>(return_view_number) << 27U;

flags.value |= static_cast<tl::mask::underlying_type>(opt_wait_binlog_pos.has_value()) << 16U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_string_forward_keys.has_value()) << 18U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_int_forward_keys.has_value()) << 19U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_string_forward.has_value()) << 20U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_int_forward.has_value()) << 21U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_custom_timeout_ms.has_value()) << 23U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_supported_compression_version.has_value()) << 25U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_random_delay.has_value()) << 26U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_persistent_query.has_value()) << 28U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_trace_context.has_value()) << 29U;
flags.value |= static_cast<tl::mask::underlying_type>(opt_execution_context.has_value()) << 30U;
return flags;
}

void rpcReqResultExtra::store(tl::storer& tls) const noexcept {
flags.store(tls);
void rpcReqResultExtra::store(tl::storer& tls, const tl::mask& flags) const noexcept {
if (static_cast<bool>(flags.value & BINLOG_POS_FLAG)) {
binlog_pos.store(tls);
}
Expand Down Expand Up @@ -249,8 +247,8 @@ void rpcReqResultExtra::store(tl::storer& tls) const noexcept {
}
}

size_t rpcReqResultExtra::footprint() const noexcept {
size_t footprint{flags.footprint()};
size_t rpcReqResultExtra::footprint(const tl::mask& flags) const noexcept {
size_t footprint{};
if (static_cast<bool>(flags.value & BINLOG_POS_FLAG)) {
footprint += binlog_pos.footprint();
}
Expand Down
55 changes: 28 additions & 27 deletions runtime-light/tl/tl-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1008,14 +1008,14 @@ struct traceID final {
};

class traceContext final {
static constexpr uint32_t RETURN_RESERVED_STATUS_0_FLAG = vk::tl::common::tracing::traceContext::return_reserved_status_0;
static constexpr uint32_t RETURN_RESERVED_STATUS_1_FLAG = vk::tl::common::tracing::traceContext::return_reserved_status_1;
static constexpr uint32_t PARENT_ID_FLAG = vk::tl::common::tracing::traceContext::parent_id;
static constexpr uint32_t SOURCE_ID_FLAG = vk::tl::common::tracing::traceContext::source_id;
static constexpr uint32_t RETURN_RESERVED_LEVEL_0_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_0;
static constexpr uint32_t RETURN_RESERVED_LEVEL_1_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_1;
static constexpr uint32_t RETURN_RESERVED_LEVEL_2_FLAG = vk::tl::common::tracing::traceContext::return_reserved_level_2;
static constexpr uint32_t RETURN_DEBUG_FLAG = vk::tl::common::tracing::traceContext::return_debug;
static constexpr uint32_t RESERVED_STATUS_0_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_status_0;
static constexpr uint32_t RESERVED_STATUS_1_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_status_1;
static constexpr uint32_t PARENT_ID_FLAG = vk::tl::common::tracing::trace_context_flags::parent_id;
static constexpr uint32_t SOURCE_ID_FLAG = vk::tl::common::tracing::trace_context_flags::source_id;
static constexpr uint32_t RESERVED_LEVEL_0_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_level_0;
static constexpr uint32_t RESERVED_LEVEL_1_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_level_1;
static constexpr uint32_t RESERVED_LEVEL_2_FLAG = vk::tl::common::tracing::trace_context_flags::reserved_level_2;
static constexpr uint32_t DEBUG_FLAG = vk::tl::common::tracing::trace_context_flags::debug;

public:
tl::tracing::traceID trace_id{};
Expand All @@ -1036,23 +1036,23 @@ class traceContext final {
bool debug_flag{};

bool fetch(tl::fetcher& tlf) noexcept {
tl::u32 fields_mask{};
tl::mask fields_mask{};
bool ok{fields_mask.fetch(tlf)};

ok = ok && trace_id.fetch(tlf);
if (ok && static_cast<bool>(fields_mask.value & PARENT_ID_FLAG)) {
ok = ok && opt_parent_id.emplace().fetch(tlf);
ok &= opt_parent_id.emplace().fetch(tlf);
}
if (ok && static_cast<bool>(fields_mask.value & SOURCE_ID_FLAG)) {
ok = ok && opt_source_id.emplace().fetch(tlf);
ok &= opt_source_id.emplace().fetch(tlf);
}

reserved_status_0 = static_cast<bool>(fields_mask.value & RETURN_RESERVED_STATUS_0_FLAG);
reserved_status_1 = static_cast<bool>(fields_mask.value & RETURN_RESERVED_STATUS_1_FLAG);
reserved_level_0 = static_cast<bool>(fields_mask.value & RETURN_RESERVED_LEVEL_0_FLAG);
reserved_level_1 = static_cast<bool>(fields_mask.value & RETURN_RESERVED_LEVEL_1_FLAG);
reserved_level_2 = static_cast<bool>(fields_mask.value & RETURN_RESERVED_LEVEL_2_FLAG);
debug_flag = static_cast<bool>(fields_mask.value & RETURN_DEBUG_FLAG);
reserved_status_0 = static_cast<bool>(fields_mask.value & RESERVED_STATUS_0_FLAG);
reserved_status_1 = static_cast<bool>(fields_mask.value & RESERVED_STATUS_1_FLAG);
reserved_level_0 = static_cast<bool>(fields_mask.value & RESERVED_LEVEL_0_FLAG);
reserved_level_1 = static_cast<bool>(fields_mask.value & RESERVED_LEVEL_1_FLAG);
reserved_level_2 = static_cast<bool>(fields_mask.value & RESERVED_LEVEL_2_FLAG);
debug_flag = static_cast<bool>(fields_mask.value & DEBUG_FLAG);

return ok;
}
Expand All @@ -1070,6 +1070,7 @@ class traceContext final {
return flags;
}
};

} // namespace tracing

class rpcInvokeReqExtra final {
Expand Down Expand Up @@ -1114,17 +1115,17 @@ class rpcInvokeReqExtra final {
std::optional<tl::string> opt_execution_context;
bool return_view_number{};

bool fetch(tl::fetcher& tlf) noexcept;
bool fetch(tl::fetcher& tlf, const tl::mask& flags) noexcept;

tl::mask get_flags() const noexcept;
};

struct RpcInvokeReqExtra final {
tl::rpcInvokeReqExtra inner{};

bool fetch(tl::fetcher& tlf) noexcept {
bool fetch(tl::fetcher& tlf, const tl::mask& flags) noexcept {
tl::magic magic{};
return magic.fetch(tlf) && magic.expect(TL_RPC_INVOKE_REQ_EXTRA) && inner.fetch(tlf);
return magic.fetch(tlf) && magic.expect(TL_RPC_INVOKE_REQ_EXTRA) && inner.fetch(tlf, flags);
}
};

Expand All @@ -1141,7 +1142,6 @@ class rpcReqResultExtra final {
static constexpr uint32_t VIEW_NUMBER_FLAG = vk::tl::common::rpc_req_result_extra_flags::view_number;

public:
tl::mask flags{};
tl::i64 binlog_pos{};
tl::i64 binlog_time{};
tl::netPid engine_pid{};
Expand All @@ -1153,16 +1153,16 @@ class rpcReqResultExtra final {
tl::i64 epoch_number{};
tl::i64 view_number{};

void store(tl::storer& tls) const noexcept;
void store(tl::storer& tls, const tl::mask& flags) const noexcept;

size_t footprint() const noexcept;
size_t footprint(const tl::mask& flags) const noexcept;
};

struct RpcReqResultExtra final {
tl::rpcReqResultExtra inner{};

void store(tl::storer& tls) const noexcept {
tl::magic{.value = TL_RPC_REQ_RESULT_EXTRA}.store(tls), inner.store(tls);
void store(tl::storer& tls, const tl::mask& flags) const noexcept {
tl::magic{.value = TL_RPC_REQ_RESULT_EXTRA}.store(tls), inner.store(tls, flags);
}
};

Expand All @@ -1181,15 +1181,16 @@ struct k2RpcResponseError final {

struct k2RpcResponseHeader final {
tl::mask flags{};
tl::mask extra_flags{};
tl::rpcReqResultExtra extra{};
std::span<const std::byte> result;

void store(tl::storer& tls) const noexcept {
flags.store(tls), extra.store(tls), tls.store_bytes(result);
flags.store(tls), extra_flags.store(tls), extra.store(tls, extra_flags), tls.store_bytes(result);
}

constexpr size_t footprint() const noexcept {
return flags.footprint() + extra.footprint() + result.size();
return flags.footprint() + extra_flags.footprint() + extra.footprint(extra_flags) + result.size();
}
};

Expand Down
Loading