diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f203d8b..ecaa62e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ env: CARTESI_MACHINE_SHA256_ARM64: 787d823756000cdecd72da8a3494b4c08613087379035959e561bbaef7a220ba jobs: - rust: + checks: runs-on: ubuntu-latest timeout-minutes: 30 @@ -30,7 +30,7 @@ jobs: libslirp-dev - name: Install Rust toolchain - uses: dtolnay/rust-toolchain@stable + uses: dtolnay/rust-toolchain@1.91.1 with: components: rustfmt, clippy @@ -51,13 +51,16 @@ jobs: - name: Clippy run: cargo clippy --workspace --all-targets --all-features --locked -- -D warnings + - name: Watchdog Lua tests + run: lua watchdog/tests/run.lua + - name: Test timeout-minutes: 15 run: RUN_ANVIL_TESTS=1 cargo test --workspace --all-targets --all-features --locked canonical-guest: runs-on: ubuntu-latest - needs: rust + needs: checks timeout-minutes: 45 steps: @@ -82,7 +85,7 @@ jobs: rollups-e2e: runs-on: ubuntu-latest - needs: rust + needs: checks timeout-minutes: 60 steps: diff --git a/.gitignore b/.gitignore index 822d909..99024ce 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,7 @@ sequencer.db sequencer.db-shm sequencer.db-wal /out/ +examples/canonical-app/out/ /.DS_Store +.vscode/ soljson-latest.js diff --git a/docs/watchdog/README.md b/docs/watchdog/README.md new file mode 100644 index 0000000..843f508 --- /dev/null +++ b/docs/watchdog/README.md @@ -0,0 +1,120 @@ +# Watchdog + +The watchdog is an off-chain safety process that compares sequencer API state +against state produced by the canonical Cartesi Machine at an L1 safe block. + +## V1 Shape + +The implementation lives in `watchdog/` and is intentionally split into small +Lua modules: + +- `http.lua`: HTTP adapter, currently `lua-curl` oriented. +- `jsonrpc.lua`: JSON-RPC request/response validation. +- `l1.lua`: partitioned `eth_getLogs` scanning and strict L1 log ordering. +- `abi.lua`: decoding for the `InputAdded` / `EvmAdvance` envelope. +- `machine.lua`: narrow adapter boundary for Cartesi Machine bindings. +- `machine_cli.lua`: `cartesi-machine` CLI adapter for loading snapshot + directories, writing raw input files, advancing, and saving a new snapshot + directory. +- `compare.lua`: raw byte comparison. +- `checkpoint.lua`: manifest-backed checkpoint persistence. +- `alarm.lua`: webhook alarm delivery. +- `retry.lua`: bounded retry helper used by the runtime. +- `runner.lua`: one-shot orchestration across checkpoint load, sequencer poll, + L1 fetch, CM replay, raw compare, alarm, and checkpoint write. + +The L1 reader follows the Rust partition strategy from +`sequencer/src/partition.rs`: if an RPC provider rejects a large range, the +range is split recursively and retried. Lua decodes and validates input +envelopes, but it does not classify payload tags. Direct input vs batch +submission remains scheduler logic inside the canonical machine. + +`l1.lua` has the `InputAdded(address,uint256,bytes)` event topic baked in and +filters logs by `topic0 = InputAdded` and `topic1 = app address`, matching the +Rust reader's app-filtered InputBox scan. + +## Runtime Contract + +The future sequencer endpoint shape should be generic over the app state bytes, +even though the toy wallet app will likely use JSON: + +```json +{ + "safe_block": 123, + "state": "{\"balances\":{}}" +} +``` + +`state` must be the exact bytes produced by the bare-metal app serializer +for the app state anchored at `safe_block`. The watchdog compares those raw +bytes with the bytes returned by CM inspect. It must not canonicalize both +values before deciding pass/fail. + +The main design gate is safe-state semantics: if the sequencer has already +applied soft-confirmed transactions beyond L1 safety, `get_state` still needs a +safe-only state view through snapshotting, replay, or a separate projection. + +## Checkpoints + +V1 persists only the resulting Cartesi Machine checkpoint, not the fetched L1 +inputs. + +```text +checkpoint_dir/ + current.json + checkpoints/ + 00000000000001234567/ + snapshot/ + manifest.json +``` + +`manifest.json` records `safe_block`, timestamp, and optionally the CM image +hash. A new checkpoint directory is written first, then `current.json` is +atomically replaced to point at it. + +When bootstrapping without an existing checkpoint, the operator provides both: + +- `WATCHDOG_CM_SNAPSHOT_DIR` +- `WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK` + +## Modes + +The default `WATCHDOG_MODE` is `advance`. In this mode the watchdog does not +poll the sequencer. It: + +1. Loads the latest checkpoint, or the bootstrap snapshot directory. +2. Reads the L1 safe block from the RPC (or `WATCHDOG_TARGET_SAFE_BLOCK` when + provided for tests/manual runs). +3. Fetches and decodes `InputAdded` logs for the block range. +4. Feeds the raw InputBox input bytes into the CM adapter. +5. Saves a new snapshot directory and advances `current.json`. + +`WATCHDOG_MODE=compare` is reserved for the future state comparison flow once +CM inspect and the sequencer state endpoint are available. + +Useful runtime knobs: + +- `WATCHDOG_CM_EXECUTABLE`: Cartesi Machine executable, default + `cartesi-machine`. +- `WATCHDOG_CM_WORK_DIR`: temporary directory for staged input files, default + `/tmp`. +- `WATCHDOG_RETRY_ATTEMPTS`: bounded retry attempts per run, default `3`. +- `WATCHDOG_RETRY_DELAY_SEC`: delay between retry attempts, default `5`. +- `WATCHDOG_TARGET_SAFE_BLOCK`: manual/test override for the target safe block. + +## Local Tests + +Run the pure Lua tests with: + +```bash +just test-watchdog +``` + +These cover raw comparison, golden InputAdded ABI decoding, L1 ordering, +recursive range partitioning, JSON-RPC `eth_getLogs` filter construction, +config parsing, checkpoint writes, advance-mode runner behavior, the +fake-backed compare runner, the CLI adapter's input file staging, and retry +exhaustion/success behavior. + +End-to-end comparison tests will be added once CM inspect and the sequencer +`get_state` endpoint are available. diff --git a/examples/canonical-app/justfile b/examples/canonical-app/justfile index 10f08f0..73762fc 100644 --- a/examples/canonical-app/justfile +++ b/examples/canonical-app/justfile @@ -19,13 +19,13 @@ build-dapp: build-dapp-devnet build-dapp-devnet: mkdir -p {{out_dir}} - SOURCE_DATE_EPOCH={{source_date_epoch}} CARGO_PROFILE_RELEASE_STRIP=symbols CROSS_CONFIG=Cross.toml DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --package canonical-app --bin canonical-app-devnet --target riscv64gc-unknown-linux-musl --release + SOURCE_DATE_EPOCH={{source_date_epoch}} CARGO_PROFILE_RELEASE_STRIP=symbols CARGO_TARGET_DIR=../../target CROSS_CONFIG=Cross.toml DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --package canonical-app --bin canonical-app-devnet --target riscv64gc-unknown-linux-musl --release cp ../../target/riscv64gc-unknown-linux-musl/release/canonical-app-devnet {{dapp_binary_devnet}} cp {{dapp_binary_devnet}} {{dapp_binary}} build-dapp-sepolia: mkdir -p {{out_dir}} - SOURCE_DATE_EPOCH={{source_date_epoch}} CARGO_PROFILE_RELEASE_STRIP=symbols CROSS_CONFIG=Cross.toml DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --package canonical-app --bin canonical-app-sepolia --target riscv64gc-unknown-linux-musl --release + SOURCE_DATE_EPOCH={{source_date_epoch}} CARGO_PROFILE_RELEASE_STRIP=symbols CARGO_TARGET_DIR=../../target CROSS_CONFIG=Cross.toml DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --package canonical-app --bin canonical-app-sepolia --target riscv64gc-unknown-linux-musl --release cp ../../target/riscv64gc-unknown-linux-musl/release/canonical-app-sepolia {{dapp_binary_sepolia}} cp {{dapp_binary_sepolia}} {{dapp_binary}} diff --git a/justfile b/justfile index 61dfda5..bcf0e6d 100644 --- a/justfile +++ b/justfile @@ -12,6 +12,9 @@ check-all-targets: test: cargo test --workspace +test-watchdog: + lua watchdog/tests/run.lua + # Run sequencer tests sequentially so partition static config (init) is not shared across parallel tests. test-sequencer: cargo test -p sequencer --lib -- --test-threads=1 diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..4f22047 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.91.1" +components = ["rustfmt", "clippy"] diff --git a/watchdog/abi.lua b/watchdog/abi.lua new file mode 100644 index 0000000..0c423bd --- /dev/null +++ b/watchdog/abi.lua @@ -0,0 +1,138 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local abi = {} + +local WORD_HEX_LEN = 64 + +local function strip_0x(value) + assert(type(value) == "string", "hex value must be a string") + if value:sub(1, 2) == "0x" or value:sub(1, 2) == "0X" then + return value:sub(3) + end + return value +end + +local function assert_hex(value) + if value:match("^[0-9a-fA-F]*$") == nil then + error("invalid hex string") + end +end + +local function word_at(hex, index) + local start = (index * WORD_HEX_LEN) + 1 + local word = hex:sub(start, start + WORD_HEX_LEN - 1) + if #word ~= WORD_HEX_LEN then + error("ABI word out of bounds") + end + return word +end + +local function uint_word_to_number(word) + local value = 0 + for i = 1, #word do + local nibble = tonumber(word:sub(i, i), 16) + value = (value * 16) + nibble + if value > 9007199254740991 then + error("uint value too large for precise Lua number") + end + end + return value +end + +local function uint_word_to_hex(word) + local stripped = word:gsub("^0+", "") + if stripped == "" then + return "0x0" + end + return "0x" .. stripped:lower() +end + +local function address_from_word(word) + if word:sub(1, 24) ~= string.rep("0", 24) then + error("address word has non-zero high bytes") + end + return "0x" .. word:sub(25):lower() +end + +function abi.bytes_from_hex(hex) + hex = strip_0x(hex) + assert_hex(hex) + if (#hex % 2) ~= 0 then + error("hex string must have even length") + end + + return (hex:gsub("..", function(byte) + return string.char(tonumber(byte, 16)) + end)) +end + +function abi.hex_from_bytes(bytes) + return (bytes:gsub(".", function(char) + return string.format("%02x", char:byte()) + end)) +end + +function abi.decode_single_dynamic_bytes(encoded) + local hex = strip_0x(encoded) + assert_hex(hex) + local offset = uint_word_to_number(word_at(hex, 0)) + if (offset % 32) ~= 0 then + error("dynamic bytes offset is not word-aligned") + end + + local offset_words = offset // 32 + local len = uint_word_to_number(word_at(hex, offset_words)) + local data_hex_start = ((offset_words + 1) * WORD_HEX_LEN) + 1 + local data_hex = hex:sub(data_hex_start, data_hex_start + (len * 2) - 1) + if #data_hex ~= len * 2 then + error("dynamic bytes payload out of bounds") + end + return abi.bytes_from_hex(data_hex) +end + +function abi.decode_evm_advance_call(encoded) + local hex = strip_0x(encoded) + assert_hex(hex) + + -- EvmAdvanceCall is calldata, so accept and skip the 4-byte selector. + if (#hex % WORD_HEX_LEN) == 8 then + hex = hex:sub(9) + end + + local payload_offset = uint_word_to_number(word_at(hex, 7)) + if (payload_offset % 32) ~= 0 then + error("payload offset is not word-aligned") + end + + local payload_offset_words = payload_offset // 32 + local payload_len = uint_word_to_number(word_at(hex, payload_offset_words)) + local payload_hex_start = ((payload_offset_words + 1) * WORD_HEX_LEN) + 1 + local payload_hex = hex:sub(payload_hex_start, payload_hex_start + (payload_len * 2) - 1) + if #payload_hex ~= payload_len * 2 then + error("payload out of bounds") + end + + return { + chain_id_hex = uint_word_to_hex(word_at(hex, 0)), + app_contract = address_from_word(word_at(hex, 1)), + msg_sender = address_from_word(word_at(hex, 2)), + block_number = uint_word_to_number(word_at(hex, 3)), + block_timestamp_hex = uint_word_to_hex(word_at(hex, 4)), + prev_randao_hex = uint_word_to_hex(word_at(hex, 5)), + index_hex = uint_word_to_hex(word_at(hex, 6)), + payload = abi.bytes_from_hex(payload_hex), + } +end + +function abi.decode_input_added_log(log) + if type(log) ~= "table" or type(log.data) ~= "string" then + error("log.data is required") + end + local input = abi.decode_single_dynamic_bytes(log.data) + local decoded = abi.decode_evm_advance_call(abi.hex_from_bytes(input)) + decoded.raw_input = input + return decoded +end + +return abi diff --git a/watchdog/alarm.lua b/watchdog/alarm.lua new file mode 100644 index 0000000..085eff6 --- /dev/null +++ b/watchdog/alarm.lua @@ -0,0 +1,43 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local alarm = {} + +local function encode_json_object(fields) + local parts = {} + for key, value in pairs(fields) do + local encoded_value + if type(value) == "number" then + encoded_value = tostring(value) + elseif type(value) == "boolean" then + encoded_value = value and "true" or "false" + else + local string_value = tostring(value) + :gsub("\\", "\\\\") + :gsub('"', '\\"') + :gsub("\n", "\\n") + encoded_value = '"' .. string_value .. '"' + end + table.insert(parts, string.format('"%s":%s', key, encoded_value)) + end + return "{" .. table.concat(parts, ",") .. "}" +end + +function alarm.send_webhook(http, webhook_url, payload) + if not webhook_url or webhook_url == "" then + return nil, "WATCHDOG_WEBHOOK_URL is not configured" + end + local body = encode_json_object(payload) + local response, err = http:post(webhook_url, body, { + ["content-type"] = "application/json", + }) + if not response then + return nil, err + end + if response.status < 200 or response.status >= 300 then + return nil, "alarm webhook HTTP " .. tostring(response.status) + end + return true +end + +return alarm diff --git a/watchdog/checkpoint.lua b/watchdog/checkpoint.lua new file mode 100644 index 0000000..c43574d --- /dev/null +++ b/watchdog/checkpoint.lua @@ -0,0 +1,153 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local checkpoint = {} + +local function join(...) + local parts = { ... } + return table.concat(parts, "/"):gsub("//+", "/") +end + +local function read_all(path) + local file, err = io.open(path, "rb") + if not file then + return nil, err + end + local data = file:read("*a") + file:close() + return data +end + +local function write_all(path, data) + local file, err = io.open(path, "wb") + if not file then + return nil, err + end + file:write(data) + file:close() + return true +end + +local function shell_quote(value) + value = tostring(value) + return "'" .. value:gsub("'", "'\\''") .. "'" +end + +local function json_escape(value) + return value:gsub("\\", "\\\\"):gsub('"', '\\"'):gsub("\n", "\\n") +end + +local function manifest_json(manifest) + local fields = { + string.format('"safe_block":%d', manifest.safe_block), + string.format('"created_at":"%s"', json_escape(manifest.created_at or os.date("!%Y-%m-%dT%H:%M:%SZ"))), + } + if manifest.cm_image_hash then + table.insert(fields, string.format('"cm_image_hash":"%s"', json_escape(manifest.cm_image_hash))) + end + return "{" .. table.concat(fields, ",") .. "}\n" +end + +local function pointer_json(relative_path) + return string.format('{"checkpoint":"%s"}\n', json_escape(relative_path)) +end + +local function parse_pointer(data) + return data:match('"checkpoint"%s*:%s*"([^"]+)"') +end + +function checkpoint.safe_block_from_manifest(manifest_data) + local value = tostring(manifest_data or ""):match('"safe_block"%s*:%s*(%d+)') + if not value then + return nil, "manifest missing safe_block" + end + return tonumber(value) +end + +function checkpoint.load(dir) + local pointer_data, err = read_all(join(dir, "current.json")) + if not pointer_data then + return nil, err + end + local relative_path = parse_pointer(pointer_data) + if not relative_path then + return nil, "invalid checkpoint pointer" + end + local checkpoint_dir = join(dir, relative_path) + local manifest, manifest_err = read_all(join(checkpoint_dir, "manifest.json")) + if not manifest then + return nil, manifest_err + end + local safe_block, safe_block_err = checkpoint.safe_block_from_manifest(manifest) + if not safe_block then + return nil, safe_block_err + end + return { + path = checkpoint_dir, + manifest_json = manifest, + safe_block = safe_block, + snapshot_dir = join(checkpoint_dir, "snapshot"), + } +end + +function checkpoint.prepare(dir, safe_block) + assert(type(safe_block) == "number", "safe_block must be a number") + + local name = string.format("%020d", safe_block) + local relative_path = join("checkpoints", name) + local full_path = join(dir, relative_path) + local snapshot_dir = join(full_path, "snapshot") + + local ok = os.execute("mkdir -p " .. shell_quote(full_path)) + if ok ~= true and ok ~= 0 then + return nil, "mkdir failed: " .. full_path + end + + return { + path = full_path, + snapshot_dir = snapshot_dir, + relative_path = relative_path, + } +end + +function checkpoint.commit_prepared(dir, prepared, safe_block, manifest) + assert(type(prepared) == "table", "prepared checkpoint is required") + assert(type(safe_block) == "number", "safe_block must be a number") + manifest = manifest or {} + manifest.safe_block = safe_block + + local ok, err = write_all(join(prepared.path, "manifest.json"), manifest_json(manifest)) + if not ok then + return nil, err + end + + local tmp_pointer = join(dir, "current.json.tmp") + ok, err = write_all(tmp_pointer, pointer_json(prepared.relative_path)) + if not ok then + return nil, err + end + ok, err = os.rename(tmp_pointer, join(dir, "current.json")) + if not ok then + return nil, err + end + + return { + path = prepared.path, + snapshot_dir = prepared.snapshot_dir, + } +end + +function checkpoint.write(dir, safe_block, snapshot_writer, manifest) + assert(type(snapshot_writer) == "function", "snapshot_writer must be a function") + local prepared, prepare_err = checkpoint.prepare(dir, safe_block) + if not prepared then + return nil, prepare_err + end + local ok, err = snapshot_writer(prepared.snapshot_dir) + if not ok then + return nil, err + end + return checkpoint.commit_prepared(dir, prepared, safe_block, manifest) +end + +return checkpoint diff --git a/watchdog/compare.lua b/watchdog/compare.lua new file mode 100644 index 0000000..f1c0f52 --- /dev/null +++ b/watchdog/compare.lua @@ -0,0 +1,36 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local compare = {} + +function compare.first_mismatch_offset(a, b) + assert(type(a) == "string", "left value must be a string") + assert(type(b) == "string", "right value must be a string") + + local limit = math.min(#a, #b) + for i = 1, limit do + if a:byte(i) ~= b:byte(i) then + return i + end + end + + if #a ~= #b then + return limit + 1 + end + + return nil +end + +function compare.raw_equal(expected, actual) + local mismatch = compare.first_mismatch_offset(expected, actual) + return mismatch == nil, mismatch +end + +function compare.assert_state_response(state) + if type(state) ~= "string" then + return nil, "state must be a string" + end + return true +end + +return compare diff --git a/watchdog/config.lua b/watchdog/config.lua new file mode 100644 index 0000000..1f07dc1 --- /dev/null +++ b/watchdog/config.lua @@ -0,0 +1,90 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local config = {} + +local function required(name, env) + local value = env[name] + if value == nil or value == "" then + error(name .. " is required") + end + return value +end + +local function optional_number(name, default, env) + local value = env[name] + if value == nil or value == "" then + return default + end + local parsed = tonumber(value) + if not parsed then + error(name .. " must be a number") + end + return parsed +end + +local function optional_required_number(value_name, number_name, env) + if env[value_name] == nil or env[value_name] == "" then + return nil + end + local value = env[number_name] + if value == nil or value == "" then + error(number_name .. " is required when " .. value_name .. " is set") + end + return optional_number(number_name, nil, env) +end + +local function split_csv(value) + local out = {} + for part in tostring(value or ""):gmatch("[^,]+") do + table.insert(out, part) + end + return out +end + +function config.load(env) + env = env or os.getenv + if type(env) == "function" then + local getenv = env + env = setmetatable({}, { + __index = function(_, key) + return getenv(key) + end, + }) + end + + local mode = env.WATCHDOG_MODE or "advance" + if mode ~= "advance" and mode ~= "compare" then + error("WATCHDOG_MODE must be 'advance' or 'compare'") + end + + return { + mode = mode, + sequencer_url = mode == "compare" and required("WATCHDOG_SEQUENCER_URL", env) or env.WATCHDOG_SEQUENCER_URL, + l1_rpc_url = required("WATCHDOG_L1_RPC_URL", env), + input_box_address = required("WATCHDOG_INPUTBOX_ADDRESS", env), + app_address = required("WATCHDOG_APP_ADDRESS", env), + input_added_topic = env.WATCHDOG_INPUT_ADDED_TOPIC, + checkpoint_dir = required("WATCHDOG_CHECKPOINT_DIR", env), + cm_snapshot_dir = env.WATCHDOG_CM_SNAPSHOT_DIR, + cm_snapshot_safe_block = optional_required_number( + "WATCHDOG_CM_SNAPSHOT_DIR", + "WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK", + env + ), + cm_work_dir = env.WATCHDOG_CM_WORK_DIR or "/tmp", + cm_executable = env.WATCHDOG_CM_EXECUTABLE or "cartesi-machine", + target_safe_block = optional_number("WATCHDOG_TARGET_SAFE_BLOCK", nil, env), + poll_interval_sec = optional_number("WATCHDOG_POLL_INTERVAL_SEC", 30, env), + retry_attempts = optional_number("WATCHDOG_RETRY_ATTEMPTS", 3, env), + retry_delay_sec = optional_number("WATCHDOG_RETRY_DELAY_SEC", 5, env), + safe_confirmations = optional_number("WATCHDOG_SAFE_CONFIRMATIONS", 12, env), + webhook_url = env.WATCHDOG_WEBHOOK_URL, + once = env.WATCHDOG_ONCE == "1", + long_block_range_error_codes = split_csv( + env.WATCHDOG_LONG_BLOCK_RANGE_ERROR_CODES or "-32005,-32600,-32602,-32616" + ), + } +end + +return config diff --git a/watchdog/http.lua b/watchdog/http.lua new file mode 100644 index 0000000..ae8f1ae --- /dev/null +++ b/watchdog/http.lua @@ -0,0 +1,90 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local http = {} + +function http.new_curl() + local ok, curl = pcall(require, "cURL") + if not ok then + ok, curl = pcall(require, "lcurl") + end + if not ok then + error("lua-curl binding not found; install lua-curl/lcurl or inject an http adapter") + end + + local client = {} + + function client.post(_self, url, body, headers) + local chunks = {} + local header_list = {} + for key, value in pairs(headers or {}) do + table.insert(header_list, key .. ": " .. value) + end + + local easy = curl.easy({ + url = url, + post = true, + postfields = body, + httpheader = header_list, + timeout = 30, + writefunction = function(chunk) + table.insert(chunks, chunk) + return #chunk + end, + }) + + local ok_perform, err = pcall(function() + easy:perform() + end) + if not ok_perform then + easy:close() + return nil, tostring(err) + end + + local status = easy:getinfo_response_code() + easy:close() + return { + status = status, + body = table.concat(chunks), + headers = {}, + } + end + + function client.get(_self, url, headers) + local chunks = {} + local header_list = {} + for key, value in pairs(headers or {}) do + table.insert(header_list, key .. ": " .. value) + end + + local easy = curl.easy({ + url = url, + httpheader = header_list, + timeout = 30, + writefunction = function(chunk) + table.insert(chunks, chunk) + return #chunk + end, + }) + + local ok_perform, err = pcall(function() + easy:perform() + end) + if not ok_perform then + easy:close() + return nil, tostring(err) + end + + local status = easy:getinfo_response_code() + easy:close() + return { + status = status, + body = table.concat(chunks), + headers = {}, + } + end + + return client +end + +return http diff --git a/watchdog/jsonrpc.lua b/watchdog/jsonrpc.lua new file mode 100644 index 0000000..e2989de --- /dev/null +++ b/watchdog/jsonrpc.lua @@ -0,0 +1,107 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local jsonrpc = {} + +local function quantity(value) + assert(type(value) == "number" and value >= 0, "quantity must be a non-negative number") + return string.format("0x%x", value) +end + +local function strip_0x(value) + return tostring(value):gsub("^0[xX]", "") +end + +local function topic_address(address) + assert(type(address) == "string" and address ~= "", "topic address is required") + local raw = strip_0x(address):lower() + assert(#raw == 40 and raw:match("^[0-9a-f]+$") ~= nil, "topic address must be 20-byte hex") + return "0x" .. string.rep("0", 24) .. raw +end + +function jsonrpc.new(http, json, url) + assert(type(http) == "table" and type(http.post) == "function", "http.post is required") + assert(type(json) == "table" and type(json.encode) == "function", "json.encode is required") + assert(type(json.decode) == "function", "json.decode is required") + assert(type(url) == "string" and url ~= "", "url is required") + + local client = { + http = http, + json = json, + url = url, + next_id = 1, + } + + function client:call(method, params) + local id = self.next_id + self.next_id = self.next_id + 1 + local body = self.json.encode({ + jsonrpc = "2.0", + id = id, + method = method, + params = params or {}, + }) + + local response, http_err = self.http:post(self.url, body, { + ["content-type"] = "application/json", + }) + if not response then + return nil, http_err + end + if response.status < 200 or response.status >= 300 then + return nil, "HTTP " .. tostring(response.status) + end + + local decoded + local ok = pcall(function() + decoded = self.json.decode(response.body) + end) + if not ok then + return nil, "invalid JSON-RPC response JSON" + end + if type(decoded) ~= "table" then + return nil, "JSON-RPC response must be an object" + end + if decoded.id ~= id then + return nil, "JSON-RPC response id mismatch" + end + if decoded.error ~= nil then + local code = decoded.error.code or "unknown" + local message = decoded.error.message or "JSON-RPC error" + return nil, tostring(code) .. ": " .. tostring(message) + end + return decoded.result + end + + function client:get_logs(filter) + assert(type(filter.input_added_topic) == "string", "input_added_topic is required") + local topics = { filter.input_added_topic } + if filter.app_address then + topics[2] = topic_address(filter.app_address) + end + + return self:call("eth_getLogs", { + { + address = filter.address, + fromBlock = quantity(filter.from_block), + toBlock = quantity(filter.to_block), + topics = topics, + }, + }) + end + + function client:get_block_number_by_tag(tag) + local block, err = self:call("eth_getBlockByNumber", { tag, false }) + if not block then + return nil, err + end + if type(block.number) ~= "string" then + return nil, "block response missing number" + end + return tonumber(block.number:gsub("^0[xX]", ""), 16) + end + + return client +end + +return jsonrpc diff --git a/watchdog/l1.lua b/watchdog/l1.lua new file mode 100644 index 0000000..5db728a --- /dev/null +++ b/watchdog/l1.lua @@ -0,0 +1,137 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local abi = require("watchdog.abi") + +local l1 = {} + +l1.INPUT_ADDED_TOPIC = "0xc05d337121a6e8605c6ec0b72aa29c4210ffe6e5b9cefdd6a7058188a8f66f98" + +l1.DEFAULT_LONG_BLOCK_RANGE_ERROR_CODES = { + "-32005", + "-32600", + "-32602", + "-32616", +} + +local function contains_any(message, codes) + message = tostring(message or "") + for _, code in ipairs(codes or {}) do + if message:find(code, 1, true) then + return true + end + end + return false +end + +local function hex_quantity_to_number(value, field) + if type(value) == "number" then + return value + end + if type(value) ~= "string" or value:sub(1, 2) ~= "0x" then + error((field or "quantity") .. " must be an Ethereum hex quantity") + end + return tonumber(value:sub(3), 16) +end + +local function log_order_key(log) + return { + hex_quantity_to_number(log.blockNumber, "blockNumber"), + hex_quantity_to_number(log.transactionIndex or "0x0", "transactionIndex"), + hex_quantity_to_number(log.logIndex or "0x0", "logIndex"), + } +end + +function l1.sort_logs(logs) + table.sort(logs, function(a, b) + local ak = log_order_key(a) + local bk = log_order_key(b) + if ak[1] ~= bk[1] then + return ak[1] < bk[1] + end + if ak[2] ~= bk[2] then + return ak[2] < bk[2] + end + return ak[3] < bk[3] + end) + return logs +end + +function l1.fetch_logs_partitioned(rpc, params) + assert(type(rpc) == "table" and type(rpc.get_logs) == "function", "rpc.get_logs is required") + assert(type(params) == "table", "params are required") + + local start_block = assert(params.start_block, "start_block is required") + local end_block = assert(params.end_block, "end_block is required") + local codes = params.long_block_range_error_codes or l1.DEFAULT_LONG_BLOCK_RANGE_ERROR_CODES + local input_added_topic = params.input_added_topic or l1.INPUT_ADDED_TOPIC + + local function go(from_block, to_block) + local logs, err = rpc:get_logs({ + address = params.input_box_address, + app_address = params.app_address, + from_block = from_block, + to_block = to_block, + input_added_topic = input_added_topic, + }) + if logs then + return logs + end + + if from_block < to_block and contains_any(err, codes) then + local mid = from_block + ((to_block - from_block) // 2) + local left, left_err = go(from_block, mid) + if not left then + return nil, left_err + end + local right, right_err = go(mid + 1, to_block) + if not right then + return nil, right_err + end + for _, log in ipairs(right) do + table.insert(left, log) + end + return left + end + + return nil, err + end + + if end_block < start_block then + return {} + end + + local logs, err = go(start_block, end_block) + if not logs then + return nil, err + end + return l1.sort_logs(logs) +end + +function l1.decode_and_validate_log(log) + local decoded = abi.decode_input_added_log(log) + local block_number = hex_quantity_to_number(log.blockNumber, "blockNumber") + if decoded.block_number ~= block_number then + error(string.format( + "InputAdded block number mismatch: log=%d payload=%d", + block_number, + decoded.block_number + )) + end + return decoded +end + +function l1.fetch_inputs(rpc, params) + local logs, err = l1.fetch_logs_partitioned(rpc, params) + if not logs then + return nil, err + end + + local inputs = {} + for _, log in ipairs(logs) do + table.insert(inputs, l1.decode_and_validate_log(log)) + end + return inputs +end + +return l1 diff --git a/watchdog/machine.lua b/watchdog/machine.lua new file mode 100644 index 0000000..62bdc87 --- /dev/null +++ b/watchdog/machine.lua @@ -0,0 +1,43 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local machine = {} + +function machine.new(binding) + assert(type(binding) == "table", "Cartesi Machine binding is required") + assert(type(binding.load_snapshot) == "function", "binding.load_snapshot is required") + assert(type(binding.feed_input) == "function", "binding.feed_input is required") + assert(type(binding.inspect_state) == "function", "binding.inspect_state is required") + assert(type(binding.save_snapshot) == "function", "binding.save_snapshot is required") + + local driver = { binding = binding } + + function driver:load(path) + return self.binding.load_snapshot(path) + end + + function driver:feed_inputs(instance, inputs) + for _, input in ipairs(inputs) do + -- Do not classify payload tags here. The scheduler inside the CM owns + -- direct-input vs batch-submission semantics. + local ok, err = self.binding.feed_input(instance, input) + if not ok then + return nil, err + end + end + return true + end + + function driver:inspect_state(instance) + return self.binding.inspect_state(instance) + end + + function driver:save(instance, path) + assert(type(self.binding.save_snapshot) == "function", "binding.save_snapshot is required") + return self.binding.save_snapshot(instance, path) + end + + return driver +end + +return machine diff --git a/watchdog/machine_cli.lua b/watchdog/machine_cli.lua new file mode 100644 index 0000000..0ef8bd9 --- /dev/null +++ b/watchdog/machine_cli.lua @@ -0,0 +1,124 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local machine = require("watchdog.machine") + +local machine_cli = {} + +local function shell_quote(value) + value = tostring(value) + return "'" .. value:gsub("'", "'\\''") .. "'" +end + +local function mkdir_p(path) + local ok = os.execute("mkdir -p " .. shell_quote(path)) + if ok ~= true and ok ~= 0 then + return nil, "mkdir failed: " .. path + end + return true +end + +local function write_all(path, data) + local file, err = io.open(path, "wb") + if not file then + return nil, err + end + file:write(data) + file:close() + return true +end + +local function run_command(command) + local ok = os.execute(command) + if ok == true or ok == 0 then + return true + end + return nil, "command failed: " .. command +end + +local function new_work_dir(base_dir) + base_dir = base_dir or "/tmp" + return string.format( + "%s/watchdog-cm-%d-%d", + base_dir:gsub("/+$", ""), + os.time(), + math.random(1000000) + ) +end + +function machine_cli.new(opts) + opts = opts or {} + local executable = opts.executable or "cartesi-machine" + local work_dir = opts.work_dir or "/tmp" + + local binding = {} + + function binding.load_snapshot(snapshot_dir) + assert(type(snapshot_dir) == "string" and snapshot_dir ~= "", "snapshot_dir is required") + local instance = { + source_snapshot_dir = snapshot_dir, + work_dir = new_work_dir(work_dir), + input_dir = nil, + input_count = 0, + } + local ok, err = mkdir_p(instance.work_dir) + if not ok then + return nil, err + end + instance.input_dir = instance.work_dir .. "/inputs" + ok, err = mkdir_p(instance.input_dir) + if not ok then + return nil, err + end + return instance + end + + function binding.feed_input(instance, input) + assert(type(instance) == "table", "machine instance is required") + assert(type(input) == "table", "input is required") + local raw_input = input.raw_input + if type(raw_input) ~= "string" then + return nil, "input.raw_input is required" + end + + local path = string.format("%s/input-%d.bin", instance.input_dir, instance.input_count) + local ok, err = write_all(path, raw_input) + if not ok then + return nil, err + end + instance.input_count = instance.input_count + 1 + return true + end + + function binding.inspect_state(_instance) + -- Inspect requires rollback semantics around the loaded snapshot. The + -- advance-only watchdog mode does not call this; compare mode remains + -- deferred until the canonical app inspect contract exists. + return nil, "CM inspect is not implemented for the CLI adapter yet" + end + + function binding.save_snapshot(instance, snapshot_dir) + assert(type(instance) == "table", "machine instance is required") + assert(type(snapshot_dir) == "string" and snapshot_dir ~= "", "snapshot_dir is required") + + local command = table.concat({ + shell_quote(executable), + "--no-rollback", + "--load=" .. shell_quote(instance.source_snapshot_dir) .. ",sharing:none", + "--cmio-advance-state=input:" .. shell_quote(instance.input_dir .. "/input-%i.bin") + .. ",input_index_begin:0,input_index_end:" .. tostring(instance.input_count), + "--store=" .. shell_quote(snapshot_dir), + "--quiet", + }, " ") + + return run_command(command) + end + + return machine.new(binding) +end + +machine_cli._private = { + shell_quote = shell_quote, +} + +return machine_cli diff --git a/watchdog/main.lua b/watchdog/main.lua new file mode 100644 index 0000000..4096177 --- /dev/null +++ b/watchdog/main.lua @@ -0,0 +1,78 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package.path = "./?.lua;./?/init.lua;" .. package.path + +local config = require("watchdog.config") +local http_mod = require("watchdog.http") +local jsonrpc = require("watchdog.jsonrpc") +local machine_cli = require("watchdog.machine_cli") +local retry = require("watchdog.retry") +local runner = require("watchdog.runner") +local sequencer_mod = require("watchdog.sequencer") + +local function load_json() + local ok, cjson = pcall(require, "cjson") + if ok then + return cjson + end + error("lua-cjson is required for watchdog runtime") +end + +local function load_machine(cfg) + return machine_cli.new({ + executable = cfg.cm_executable, + work_dir = cfg.cm_work_dir, + }) +end + +local function run_once(cfg, deps) + if cfg.mode == "compare" then + return runner.run_once(cfg, deps) + end + return runner.advance_checkpoint_once(cfg, deps) +end + +local function main() + local cfg = config.load() + local http = http_mod.new_curl() + local json = load_json() + local deps = { + http = http, + rpc = jsonrpc.new(http, json, cfg.l1_rpc_url), + machine = load_machine(cfg), + } + if cfg.sequencer_url then + deps.sequencer = sequencer_mod.new(http, json, cfg.sequencer_url) + end + + repeat + local result, err = retry.with_retries(function() + local ok, value = pcall(run_once, cfg, deps) + if not ok then + return nil, value + end + return value + end, { + attempts = cfg.retry_attempts, + delay_sec = cfg.retry_delay_sec, + }) + if result == nil then + io.stderr:write("watchdog run failed: " .. tostring(err) .. "\n") + os.exit(1) + end + if cfg.once then + return + end + os.execute("sleep " .. tostring(cfg.poll_interval_sec)) + until false +end + +if ... == nil then + main() +end + +return { + main = main, + run_once = run_once, +} diff --git a/watchdog/retry.lua b/watchdog/retry.lua new file mode 100644 index 0000000..4316312 --- /dev/null +++ b/watchdog/retry.lua @@ -0,0 +1,30 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local retry = {} + +function retry.with_retries(fn, opts) + opts = opts or {} + local attempts = math.max(1, opts.attempts or 1) + local delay_sec = opts.delay_sec or 0 + local sleep = opts.sleep or function(seconds) + if seconds > 0 then + os.execute("sleep " .. tostring(seconds)) + end + end + + local last_err + for attempt = 1, attempts do + local result, err = fn(attempt) + if result then + return result + end + last_err = err + if attempt < attempts then + sleep(delay_sec) + end + end + return nil, last_err +end + +return retry diff --git a/watchdog/runner.lua b/watchdog/runner.lua new file mode 100644 index 0000000..87cd8d0 --- /dev/null +++ b/watchdog/runner.lua @@ -0,0 +1,209 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local alarm = require("watchdog.alarm") +local checkpoint = require("watchdog.checkpoint") +local compare = require("watchdog.compare") +local l1 = require("watchdog.l1") + +local runner = {} + +local function require_dep(deps, name) + local value = deps[name] + assert(value ~= nil, "missing dependency: " .. name) + return value +end + +local function load_checkpoint(cfg, checkpoint_mod) + local loaded = checkpoint_mod.load(cfg.checkpoint_dir) + if loaded then + return loaded + end + + if not cfg.cm_snapshot_dir or cfg.cm_snapshot_dir == "" then + error("no checkpoint found and WATCHDOG_CM_SNAPSHOT_DIR is not configured") + end + if type(cfg.cm_snapshot_safe_block) ~= "number" then + error("no checkpoint found and WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK is not configured") + end + + return { + snapshot_dir = cfg.cm_snapshot_dir, + safe_block = cfg.cm_snapshot_safe_block, + } +end + +local function fetch_inputs(cfg, deps, from_block, to_block) + if from_block > to_block then + return {} + end + + if deps.fetch_inputs then + return deps.fetch_inputs(from_block, to_block) + end + + local rpc = require_dep(deps, "rpc") + return l1.fetch_inputs(rpc, { + start_block = from_block, + end_block = to_block, + input_box_address = cfg.input_box_address, + app_address = cfg.app_address, + input_added_topic = cfg.input_added_topic, + long_block_range_error_codes = cfg.long_block_range_error_codes, + }) +end + +local function send_alarm(cfg, deps, payload) + if deps.alarm then + return deps.alarm(payload) + end + if deps.http and cfg.webhook_url then + return alarm.send_webhook(deps.http, cfg.webhook_url, payload) + end + return true +end + +local function target_safe_block(cfg, deps) + if type(cfg.target_safe_block) == "number" then + return cfg.target_safe_block + end + if deps.safe_block then + return deps.safe_block() + end + if deps.rpc and type(deps.rpc.get_block_number_by_tag) == "function" then + return deps.rpc:get_block_number_by_tag("safe") + end + return nil, "target safe block is not configured" +end + +function runner.run_once(cfg, deps) + deps = deps or {} + local checkpoint_mod = deps.checkpoint or checkpoint + local sequencer = require_dep(deps, "sequencer") + local machine = require_dep(deps, "machine") + + local loaded = load_checkpoint(cfg, checkpoint_mod) + local sequencer_state, state_err = sequencer:get_state() + if not sequencer_state then + return nil, state_err + end + + local safe_block_prev = loaded.safe_block or 0 + local safe_block_next = sequencer_state.safe_block + if safe_block_next < safe_block_prev then + local payload = { + kind = "safe_block_regressed", + previous_safe_block = safe_block_prev, + sequencer_safe_block = safe_block_next, + } + send_alarm(cfg, deps, payload) + return nil, payload + end + + local inputs, input_err = fetch_inputs(cfg, deps, safe_block_prev + 1, safe_block_next) + if not inputs then + return nil, input_err + end + + local instance, load_err = machine:load(loaded.snapshot_dir) + if not instance then + return nil, load_err + end + + local fed, feed_err = machine:feed_inputs(instance, inputs) + if not fed then + return nil, feed_err + end + + local cm_state, inspect_err = machine:inspect_state(instance) + if not cm_state then + return nil, inspect_err + end + + local equal, mismatch_offset = compare.raw_equal(sequencer_state.state, cm_state) + if not equal then + local payload = { + kind = "state_mismatch", + previous_safe_block = safe_block_prev, + sequencer_safe_block = safe_block_next, + mismatch_offset = mismatch_offset, + } + send_alarm(cfg, deps, payload) + return nil, payload + end + + if safe_block_next > safe_block_prev then + local written, write_err = checkpoint_mod.write(cfg.checkpoint_dir, safe_block_next, function(snapshot_dir) + return machine:save(instance, snapshot_dir) + end, { + created_at = os.date("!%Y-%m-%dT%H:%M:%SZ"), + cm_image_hash = cfg.cm_image_hash, + }) + if not written then + return nil, write_err + end + end + + return { + ok = true, + previous_safe_block = safe_block_prev, + safe_block = safe_block_next, + input_count = #inputs, + } +end + +function runner.advance_checkpoint_once(cfg, deps) + deps = deps or {} + local checkpoint_mod = deps.checkpoint or checkpoint + local machine = require_dep(deps, "machine") + + local loaded = load_checkpoint(cfg, checkpoint_mod) + local safe_block_prev = loaded.safe_block or 0 + local safe_block_next, safe_err = target_safe_block(cfg, deps) + if not safe_block_next then + return nil, safe_err + end + if safe_block_next < safe_block_prev then + return nil, { + kind = "safe_block_regressed", + previous_safe_block = safe_block_prev, + safe_block = safe_block_next, + } + end + + local inputs, input_err = fetch_inputs(cfg, deps, safe_block_prev + 1, safe_block_next) + if not inputs then + return nil, input_err + end + + local instance, load_err = machine:load(loaded.snapshot_dir) + if not instance then + return nil, load_err + end + + local fed, feed_err = machine:feed_inputs(instance, inputs) + if not fed then + return nil, feed_err + end + + if safe_block_next > safe_block_prev then + local written, write_err = checkpoint_mod.write(cfg.checkpoint_dir, safe_block_next, function(snapshot_dir) + return machine:save(instance, snapshot_dir) + end, { + created_at = os.date("!%Y-%m-%dT%H:%M:%SZ"), + cm_image_hash = cfg.cm_image_hash, + }) + if not written then + return nil, write_err + end + end + + return { + ok = true, + previous_safe_block = safe_block_prev, + safe_block = safe_block_next, + input_count = #inputs, + } +end + +return runner diff --git a/watchdog/sequencer.lua b/watchdog/sequencer.lua new file mode 100644 index 0000000..93b9ddb --- /dev/null +++ b/watchdog/sequencer.lua @@ -0,0 +1,48 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +local compare = require("watchdog.compare") + +local sequencer = {} + +function sequencer.new(http, json, base_url) + assert(type(http) == "table" and type(http.get) == "function", "http.get is required") + assert(type(json) == "table" and type(json.decode) == "function", "json.decode is required") + assert(type(base_url) == "string" and base_url ~= "", "base_url is required") + + local client = { + http = http, + json = json, + base_url = base_url:gsub("/+$", ""), + } + + function client:get_state() + local response, err = self.http:get(self.base_url .. "/get_state") + if not response then + return nil, err + end + if response.status < 200 or response.status >= 300 then + return nil, "HTTP " .. tostring(response.status) + end + + local decoded + local ok_decode = pcall(function() + decoded = self.json.decode(response.body) + end) + if not ok_decode or type(decoded) ~= "table" then + return nil, "invalid get_state response JSON" + end + if type(decoded.safe_block) ~= "number" then + return nil, "safe_block must be a number" + end + local ok, validation_err = compare.assert_state_response(decoded.state) + if not ok then + return nil, validation_err + end + return decoded + end + + return client +end + +return sequencer diff --git a/watchdog/tests/fixtures/input_added_evm_advance.lua b/watchdog/tests/fixtures/input_added_evm_advance.lua new file mode 100644 index 0000000..da542eb --- /dev/null +++ b/watchdog/tests/fixtures/input_added_evm_advance.lua @@ -0,0 +1,30 @@ +-- Static fixture for an InputBox `InputAdded(address,uint256,bytes)` log whose +-- `bytes input` field contains an EvmAdvance calldata envelope. + +return { + log = { + blockNumber = "0x63", + transactionIndex = "0x0", + logIndex = "0x0", + data = "0x" + .. "0000000000000000000000000000000000000000000000000000000000000020" + .. "0000000000000000000000000000000000000000000000000000000000000144" + .. "1234567800000000000000000000000000000000000000000000000000000000" + .. "00007a6900000000000000000000000011111111111111111111111111111111" + .. "1111111100000000000000000000000022222222222222222222222222222222" + .. "2222222200000000000000000000000000000000000000000000000000000000" + .. "0000006300000000000000000000000000000000000000000000000000000000" + .. "000004d200000000000000000000000000000000000000000000000000000000" + .. "0000000000000000000000000000000000000000000000000000000000000000" + .. "0000000300000000000000000000000000000000000000000000000000000000" + .. "0000010000000000000000000000000000000000000000000000000000000000" + .. "0000000400aabbcc000000000000000000000000000000000000000000000000" + .. "0000000000000000000000000000000000000000000000000000000000000000", + }, + expected = { + app_contract = "0x1111111111111111111111111111111111111111", + msg_sender = "0x2222222222222222222222222222222222222222", + block_number = 99, + payload_hex = "00aabbcc", + }, +} diff --git a/watchdog/tests/run.lua b/watchdog/tests/run.lua new file mode 100644 index 0000000..3b392f7 --- /dev/null +++ b/watchdog/tests/run.lua @@ -0,0 +1,532 @@ +-- (c) Cartesi and individual authors (see AUTHORS) +-- SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package.path = "./?.lua;./?/init.lua;" .. package.path + +local abi = require("watchdog.abi") +local alarm = require("watchdog.alarm") +local checkpoint = require("watchdog.checkpoint") +local compare = require("watchdog.compare") +local config = require("watchdog.config") +local jsonrpc = require("watchdog.jsonrpc") +local l1 = require("watchdog.l1") +local machine_cli = require("watchdog.machine_cli") +local retry = require("watchdog.retry") +local runner = require("watchdog.runner") +local sequencer = require("watchdog.sequencer") + +local tests = {} + +local function test(name, fn) + table.insert(tests, { name = name, fn = fn }) +end + +local function assert_eq(actual, expected) + if actual ~= expected then + error(string.format("expected %q, got %q", tostring(expected), tostring(actual)), 2) + end +end + +test("raw compare fails byte-different JSON", function() + local ok, offset = compare.raw_equal('{"a":1}', '{ "a": 1 }') + assert_eq(ok, false) + assert(offset ~= nil, "expected mismatch offset") +end) + +test("decodes InputAdded log EvmAdvance envelope", function() + local fixture = dofile("watchdog/tests/fixtures/input_added_evm_advance.lua") + local decoded = abi.decode_input_added_log(fixture.log) + assert_eq(decoded.app_contract, fixture.expected.app_contract) + assert_eq(decoded.msg_sender, fixture.expected.msg_sender) + assert_eq(decoded.block_number, fixture.expected.block_number) + assert_eq(abi.hex_from_bytes(decoded.payload), fixture.expected.payload_hex) + assert(decoded.raw_input ~= nil and #decoded.raw_input > 0, "fixture keeps raw input bytes") +end) + +test("sorts logs in L1 order", function() + local logs = { + { blockNumber = "0x2", transactionIndex = "0x0", logIndex = "0x5" }, + { blockNumber = "0x1", transactionIndex = "0x9", logIndex = "0x0" }, + { blockNumber = "0x2", transactionIndex = "0x0", logIndex = "0x1" }, + } + l1.sort_logs(logs) + assert_eq(logs[1].blockNumber, "0x1") + assert_eq(logs[2].logIndex, "0x1") + assert_eq(logs[3].logIndex, "0x5") +end) + +test("partitions long block range errors", function() + local calls = {} + local rpc = {} + function rpc.get_logs(_self, filter) + table.insert(calls, { filter.from_block, filter.to_block, filter.input_added_topic }) + if filter.from_block == 1 and filter.to_block == 4 then + return nil, "RPC error -32005: query returned more than allowed" + end + return {} + end + + local logs, err = l1.fetch_logs_partitioned(rpc, { + start_block = 1, + end_block = 4, + input_box_address = "0xinputbox", + app_address = "0x1111111111111111111111111111111111111111", + }) + + assert(logs, err) + assert_eq(#calls, 3) + assert_eq(calls[2][1], 1) + assert_eq(calls[2][2], 2) + assert_eq(calls[3][1], 3) + assert_eq(calls[3][2], 4) + assert_eq(calls[3][3], l1.INPUT_ADDED_TOPIC) +end) + +test("jsonrpc get_logs builds InputAdded app filter", function() + local captured = nil + local json = {} + function json.encode(value) + captured = value + return "encoded" + end + function json.decode(_body) + return { jsonrpc = "2.0", id = 1, result = {} } + end + + local http = {} + function http.post(_self, url, body, headers) + assert_eq(url, "http://rpc") + assert_eq(body, "encoded") + assert_eq(headers["content-type"], "application/json") + return { status = 200, body = "{}" } + end + + local client = jsonrpc.new(http, json, "http://rpc") + local logs, err = client:get_logs({ + address = "0x9999999999999999999999999999999999999999", + app_address = "0x1111111111111111111111111111111111111111", + from_block = 10, + to_block = 12, + input_added_topic = l1.INPUT_ADDED_TOPIC, + }) + + assert(logs, err) + assert(type(captured) == "table", "json request captured") + local request = captured + assert_eq(request.method, "eth_getLogs") + local filter = request.params[1] + assert_eq(filter.fromBlock, "0xa") + assert_eq(filter.toBlock, "0xc") + assert_eq(filter.address, "0x9999999999999999999999999999999999999999") + assert_eq(filter.topics[1], l1.INPUT_ADDED_TOPIC) + assert_eq( + filter.topics[2], + "0x0000000000000000000000001111111111111111111111111111111111111111" + ) +end) + +test("config loads snapshot directory safe block and optional topic", function() + local env = { + WATCHDOG_L1_RPC_URL = "http://rpc", + WATCHDOG_INPUTBOX_ADDRESS = "0x9999999999999999999999999999999999999999", + WATCHDOG_APP_ADDRESS = "0x1111111111111111111111111111111111111111", + WATCHDOG_INPUT_ADDED_TOPIC = "0xtopic", + WATCHDOG_CHECKPOINT_DIR = "/tmp/checkpoints", + WATCHDOG_CM_SNAPSHOT_DIR = "/tmp/snapshot", + WATCHDOG_CM_SNAPSHOT_SAFE_BLOCK = "42", + } + + local cfg = config.load(env) + + assert_eq(cfg.input_added_topic, "0xtopic") + assert_eq(cfg.cm_snapshot_dir, "/tmp/snapshot") + assert_eq(cfg.cm_snapshot_safe_block, 42) + assert_eq(cfg.mode, "advance") +end) + +test("config rejects unknown mode", function() + local ok, err = pcall(function() + config.load({ + WATCHDOG_MODE = "bad", + WATCHDOG_L1_RPC_URL = "http://rpc", + WATCHDOG_INPUTBOX_ADDRESS = "0x9999999999999999999999999999999999999999", + WATCHDOG_APP_ADDRESS = "0x1111111111111111111111111111111111111111", + WATCHDOG_CHECKPOINT_DIR = "/tmp/checkpoints", + }) + end) + assert_eq(ok, false) + assert(tostring(err):find("WATCHDOG_MODE", 1, true) ~= nil, "mode error is explicit") +end) + +test("checkpoint writes manifest-backed current pointer", function() + local dir = os.tmpname() + os.remove(dir) + os.execute(string.format('mkdir -p "%s"', dir)) + + local written, err = checkpoint.write(dir, 12, function(snapshot_dir) + os.execute(string.format('mkdir -p "%s"', snapshot_dir)) + local file = io.open(snapshot_dir .. "/marker", "wb") + assert(file ~= nil, "marker file opened") + file:write("snapshot") + file:close() + return true + end, { + created_at = "2026-04-28T00:00:00Z", + }) + assert(written, err) + + local loaded, load_err = checkpoint.load(dir) + assert(loaded, load_err) + assert_eq(loaded.snapshot_dir, dir .. "/checkpoints/00000000000000000012/snapshot") + assert(loaded.manifest_json:find('"safe_block":12', 1, true) ~= nil, "manifest has safe block") +end) + +test("checkpoint rejects manifest without safe block", function() + local safe_block, err = checkpoint.safe_block_from_manifest("{}") + assert_eq(safe_block, nil) + assert_eq(err, "manifest missing safe_block") +end) + +local function fake_cfg() + return { + checkpoint_dir = "/tmp/watchdog-test", + cm_snapshot_dir = "/tmp/genesis-snapshot", + cm_snapshot_safe_block = 0, + input_box_address = "0xinputbox", + app_address = "0x1111111111111111111111111111111111111111", + input_added_topic = "0xtopic", + long_block_range_error_codes = l1.DEFAULT_LONG_BLOCK_RANGE_ERROR_CODES, + } +end + +local function fake_machine(inspect_state) + local machine = { + loaded_path = nil, + fed_inputs = nil, + } + function machine:load(path) + self.loaded_path = path + return { path = path } + end + function machine:feed_inputs(_instance, inputs) + self.fed_inputs = inputs + return true + end + function machine.inspect_state(_self, _instance) + return inspect_state + end + function machine:save(_instance, snapshot_dir) + self.saved_snapshot_dir = snapshot_dir + return true + end + return machine +end + +test("runner happy path replays inputs and writes checkpoint", function() + local checkpoint_writes = {} + local checkpoint_mod = { + load = function(_dir) + return { + snapshot_dir = "/tmp/checkpoints/0001/snapshot", + safe_block = 10, + } + end, + write = function(dir, safe_block, snapshot_writer, manifest) + local ok, err = snapshot_writer("/tmp/new-snapshot") + assert(ok, err) + table.insert(checkpoint_writes, { + dir = dir, + safe_block = safe_block, + manifest = manifest, + }) + return true + end, + } + local machine = fake_machine('{"ok":true}') + local result, err = runner.run_once(fake_cfg(), { + checkpoint = checkpoint_mod, + sequencer = { + get_state = function() + return { + safe_block = 12, + state = '{"ok":true}', + } + end, + }, + fetch_inputs = function(from_block, to_block) + assert_eq(from_block, 11) + assert_eq(to_block, 12) + return { { payload = "a" }, { payload = "b" } } + end, + machine = machine, + }) + + assert(result, err) + assert_eq(result.safe_block, 12) + assert_eq(result.input_count, 2) + assert_eq(machine.loaded_path, "/tmp/checkpoints/0001/snapshot") + assert_eq(machine.saved_snapshot_dir, "/tmp/new-snapshot") + assert_eq(#machine.fed_inputs, 2) + assert_eq(#checkpoint_writes, 1) + assert_eq(checkpoint_writes[1].safe_block, 12) +end) + +test("runner alarms on raw state mismatch", function() + local alarms = {} + local result, err = runner.run_once(fake_cfg(), { + checkpoint = { + load = function(_dir) + return { snapshot_dir = "/tmp/snapshot", safe_block = 1 } + end, + }, + sequencer = { + get_state = function() + return { + safe_block = 1, + state = '{"a":1}', + } + end, + }, + fetch_inputs = function() + return {} + end, + machine = fake_machine('{ "a": 1 }'), + alarm = function(payload) + table.insert(alarms, payload) + return true + end, + }) + + assert_eq(result, nil) + assert(type(err) == "table", "expected mismatch payload") + assert_eq(err.kind, "state_mismatch") + assert_eq(#alarms, 1) + assert_eq(alarms[1].kind, "state_mismatch") +end) + +test("runner alarms on sequencer safe block regression", function() + local alarms = {} + local result, err = runner.run_once(fake_cfg(), { + checkpoint = { + load = function(_dir) + return { snapshot_dir = "/tmp/snapshot", safe_block = 5 } + end, + }, + sequencer = { + get_state = function() + return { + safe_block = 4, + state = "{}", + } + end, + }, + machine = fake_machine("{}"), + alarm = function(payload) + table.insert(alarms, payload) + return true + end, + }) + + assert_eq(result, nil) + assert(type(err) == "table", "expected regression payload") + assert_eq(err.kind, "safe_block_regressed") + assert_eq(#alarms, 1) +end) + +test("sequencer client validates generic state response", function() + local http = {} + function http.get(_self, url) + assert_eq(url, "http://sequencer/get_state") + return { + status = 200, + body = "body", + } + end + local json = {} + function json.decode(_body) + return { + safe_block = 7, + state = "raw-state", + } + end + + local client = sequencer.new(http, json, "http://sequencer/") + local state, err = client:get_state() + assert(state, err) + assert_eq(state.safe_block, 7) + assert_eq(state.state, "raw-state") +end) + +test("sequencer client rejects invalid JSON", function() + local http = {} + function http.get(_self, _url) + return { + status = 200, + body = "not-json", + } + end + local json = {} + function json.decode(_body) + error("decode failed") + end + + local client = sequencer.new(http, json, "http://sequencer") + local state, err = client:get_state() + assert_eq(state, nil) + assert_eq(err, "invalid get_state response JSON") +end) + +test("advance runner fetches inputs and saves checkpoint without sequencer", function() + local checkpoint_writes = {} + local machine = fake_machine("unused") + local result, err = runner.advance_checkpoint_once(fake_cfg(), { + checkpoint = { + load = function(_dir) + return { snapshot_dir = "/tmp/snapshot", safe_block = 7 } + end, + write = function(dir, safe_block, snapshot_writer, manifest) + local ok, write_err = snapshot_writer("/tmp/advanced-snapshot") + assert(ok, write_err) + table.insert(checkpoint_writes, { dir = dir, safe_block = safe_block, manifest = manifest }) + return true + end, + }, + safe_block = function() + return 9 + end, + fetch_inputs = function(from_block, to_block) + assert_eq(from_block, 8) + assert_eq(to_block, 9) + return { { raw_input = "one" }, { raw_input = "two" } } + end, + machine = machine, + }) + + assert(result, err) + assert_eq(result.safe_block, 9) + assert_eq(result.input_count, 2) + assert_eq(machine.saved_snapshot_dir, "/tmp/advanced-snapshot") + assert_eq(#checkpoint_writes, 1) +end) + +test("machine cli adapter writes raw input files", function() + local base = os.tmpname() + os.remove(base) + os.execute(string.format('mkdir -p "%s"', base)) + local driver = machine_cli.new({ work_dir = base, executable = "cartesi-machine" }) + local instance = assert(driver:load("/tmp/source-snapshot")) + assert(driver:feed_inputs(instance, { + { raw_input = "abc" }, + { raw_input = "def" }, + })) + + local file = io.open(instance.input_dir .. "/input-0.bin", "rb") + assert(file ~= nil, "first input file exists") + assert_eq(file:read("*a"), "abc") + file:close() + file = io.open(instance.input_dir .. "/input-1.bin", "rb") + assert(file ~= nil, "second input file exists") + assert_eq(file:read("*a"), "def") + file:close() +end) + +test("machine cli adapter leaves snapshot directory creation to cartesi-machine", function() + local base = os.tmpname() + os.remove(base) + os.execute(string.format('mkdir -p "%s"', base)) + local driver = machine_cli.new({ work_dir = base, executable = "true" }) + local instance = assert(driver:load("/tmp/source-snapshot")) + local snapshot_dir = base .. "/snapshot" + + assert(driver:save(instance, snapshot_dir)) + local exists = os.rename(snapshot_dir, snapshot_dir) + assert(not exists, "adapter must not pre-create --store target") +end) + +test("retry succeeds after transient failures", function() + local attempts = 0 + local sleeps = 0 + local result, err = retry.with_retries(function() + attempts = attempts + 1 + if attempts < 3 then + return nil, "transient" + end + return "ok" + end, { + attempts = 3, + delay_sec = 1, + sleep = function(seconds) + assert_eq(seconds, 1) + sleeps = sleeps + 1 + end, + }) + + assert_eq(result, "ok") + assert_eq(err, nil) + assert_eq(attempts, 3) + assert_eq(sleeps, 2) +end) + +test("retry returns final error after exhaustion", function() + local attempts = 0 + local result, err = retry.with_retries(function() + attempts = attempts + 1 + return nil, "failed-" .. tostring(attempts) + end, { + attempts = 2, + delay_sec = 0, + sleep = function() end, + }) + + assert_eq(result, nil) + assert_eq(err, "failed-2") + assert_eq(attempts, 2) +end) + +test("alarm webhook posts JSON payload", function() + local sent = {} + local http = {} + function http.post(_self, url, body, headers) + sent.url = url + sent.body = body + sent.headers = headers + return { status = 204, body = "" } + end + + local ok, err = alarm.send_webhook(http, "http://alarm", { + kind = "state_mismatch", + safe_block = 12, + }) + + assert(ok, err) + assert_eq(sent.url, "http://alarm") + assert_eq(sent.headers["content-type"], "application/json") + assert(sent.body:find('"kind":"state_mismatch"', 1, true) ~= nil, "body includes kind") + assert(sent.body:find('"safe_block":12', 1, true) ~= nil, "body includes safe block") +end) + +test("alarm webhook reports non-success status", function() + local http = {} + function http.post(_self, _url, _body, _headers) + return { status = 500, body = "" } + end + + local ok, err = alarm.send_webhook(http, "http://alarm", { kind = "test" }) + assert_eq(ok, nil) + assert_eq(err, "alarm webhook HTTP 500") +end) + +local failures = 0 +for _, t in ipairs(tests) do + local ok, err = pcall(t.fn) + if ok then + io.write("ok - " .. t.name .. "\n") + else + failures = failures + 1 + io.write("not ok - " .. t.name .. ": " .. tostring(err) .. "\n") + end +end + +if failures > 0 then + os.exit(1) +end