From 30ecc5e40a441063896f98bddba203f0109cd0b8 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Wed, 4 Mar 2026 17:44:20 +0530 Subject: [PATCH] fix: raise TimeoutError on ZMQ retry exhaustion (#393) send_json_with_retry() and recv_json_with_retry() now raise TimeoutError instead of silently returning None when all 5 retries are exhausted. read() and write() catch the new exception so callers never see an unhandled crash. - send_json_with_retry: raise TimeoutError (was: return) - recv_json_with_retry: raise TimeoutError (was: return None) - read(): except TimeoutError -> (default, False), status TIMEOUT - read(): remove dead 'message is None' guard (now unreachable) - write(): except TimeoutError -> log and continue - tests: save/restore global state in try/finally to prevent leakage - test_read_status: use TimeoutError instead of response=None Tests added in test_concore.py and test_concoredocker.py. All 75 tests pass. Closes #393 --- concore_base.py | 15 +++-- tests/test_concore.py | 129 ++++++++++++++++++++++++++++++++++++ tests/test_concoredocker.py | 91 +++++++++++++++++++++++++ tests/test_read_status.py | 4 +- 4 files changed, 231 insertions(+), 8 deletions(-) diff --git a/concore_base.py b/concore_base.py index 35f2c34..9173289 100644 --- a/concore_base.py +++ b/concore_base.py @@ -59,8 +59,7 @@ def send_json_with_retry(self, message): except zmq.Again: logger.warning(f"Send timeout (attempt {attempt + 1}/5)") time.sleep(0.5) - logger.error("Failed to send after retries.") - return + raise TimeoutError(f"ZMQ send failed after 5 retries on {self.address}") def recv_json_with_retry(self): """Receive JSON message with retries if timeout occurs.""" @@ -70,8 +69,7 @@ def recv_json_with_retry(self): except zmq.Again: logger.warning(f"Receive timeout (attempt {attempt + 1}/5)") time.sleep(0.5) - logger.error("Failed to receive after retries.") - return None + raise TimeoutError(f"ZMQ recv failed after 5 retries on {self.address}") def init_zmq_port(mod, port_name, port_type, address, socket_type_str): @@ -270,9 +268,6 @@ def read(mod, port_identifier, name, initstr_val): zmq_p = mod.zmq_ports[port_identifier] try: message = zmq_p.recv_json_with_retry() - if message is None: - last_read_status = "TIMEOUT" - return default_return_val, False # Strip simtime prefix if present (mirroring file-based read behavior) if isinstance(message, list) and len(message) > 0: first_element = message[0] @@ -282,6 +277,10 @@ def read(mod, port_identifier, name, initstr_val): return message[1:], True last_read_status = "SUCCESS" return message, True + except TimeoutError as e: + logger.error(f"ZMQ read timeout on port {port_identifier} (name: {name}): {e}. Returning default.") + last_read_status = "TIMEOUT" + return default_return_val, False except zmq.error.ZMQError as e: logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.") last_read_status = "TIMEOUT" @@ -384,6 +383,8 @@ def write(mod, port_identifier, name, val, delta=0): # Mutation breaks cross-language determinism (see issue #385). else: zmq_p.send_json_with_retry(zmq_val) + except TimeoutError as e: + logger.error(f"ZMQ write timeout on port {port_identifier} (name: {name}): {e}") except zmq.error.ZMQError as e: logger.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}") except Exception as e: diff --git a/tests/test_concore.py b/tests/test_concore.py index b1a980e..fb67054 100644 --- a/tests/test_concore.py +++ b/tests/test_concore.py @@ -1,6 +1,7 @@ import pytest import os import numpy as np +from unittest.mock import patch class TestSafeLiteralEval: @@ -450,3 +451,131 @@ def test_write_timestamp_matches_cpp_semantics(self, temp_dir): "After 3 writes with delta=1 simtime must remain 0 " "(matching C++/MATLAB/Verilog); got %s" % concore.simtime ) + + +class TestZMQRetryExhaustion: + """Issue #393 – recv_json_with_retry / send_json_with_retry must raise + TimeoutError instead of silently returning None when retries are + exhausted, and callers (read / write) must handle it gracefully.""" + + @patch("concore_base.time.sleep") + def test_recv_raises_timeout_error(self, _mock_sleep): + """recv_json_with_retry must raise TimeoutError after 5 failed attempts.""" + import concore_base + + port = concore_base.ZeroMQPort.__new__(concore_base.ZeroMQPort) + port.address = "tcp://127.0.0.1:9999" + + class FakeSocket: + def recv_json(self, flags=0): + import zmq + + raise zmq.Again("Resource temporarily unavailable") + + port.socket = FakeSocket() + with pytest.raises(TimeoutError): + port.recv_json_with_retry() + + @patch("concore_base.time.sleep") + def test_send_raises_timeout_error(self, _mock_sleep): + """send_json_with_retry must raise TimeoutError after 5 failed attempts.""" + import concore_base + + port = concore_base.ZeroMQPort.__new__(concore_base.ZeroMQPort) + port.address = "tcp://127.0.0.1:9999" + + class FakeSocket: + def send_json(self, data, flags=0): + import zmq + + raise zmq.Again("Resource temporarily unavailable") + + port.socket = FakeSocket() + with pytest.raises(TimeoutError): + port.send_json_with_retry([42]) + + def test_read_returns_default_on_timeout(self, temp_dir): + """read() must return (default, False) when ZMQ recv times out.""" + import concore + import concore_base + + # Save original global state to avoid leaking into other tests. + had_inpath = hasattr(concore, "inpath") + orig_inpath = concore.inpath if had_inpath else None + orig_zmq_ports = dict(concore.zmq_ports) + had_simtime = hasattr(concore, "simtime") + orig_simtime = concore.simtime if had_simtime else None + + try: + concore.inpath = os.path.join(temp_dir, "in") + + class TimeoutPort: + address = "tcp://127.0.0.1:0" + socket = None + + def recv_json_with_retry(self): + raise TimeoutError("ZMQ recv failed after 5 retries") + + concore.zmq_ports["t_in"] = TimeoutPort() + concore.simtime = 0 + + result, ok = concore.read("t_in", "x", "[0.0]") + + assert result == [0.0] + assert ok is False + assert concore_base.last_read_status == "TIMEOUT" + finally: + # Restore zmq_ports and other globals to their original state. + concore.zmq_ports.clear() + concore.zmq_ports.update(orig_zmq_ports) + + if had_inpath: + concore.inpath = orig_inpath + elif hasattr(concore, "inpath"): + delattr(concore, "inpath") + + if had_simtime: + concore.simtime = orig_simtime + elif hasattr(concore, "simtime"): + delattr(concore, "simtime") + + def test_write_does_not_crash_on_timeout(self, temp_dir): + """write() must not propagate TimeoutError to the caller.""" + import concore + + # Save original global state to avoid leaking into other tests. + had_outpath = hasattr(concore, "outpath") + orig_outpath = concore.outpath if had_outpath else None + orig_zmq_ports = dict(concore.zmq_ports) + had_simtime = hasattr(concore, "simtime") + orig_simtime = concore.simtime if had_simtime else None + + try: + concore.outpath = os.path.join(temp_dir, "out") + os.makedirs(os.path.join(temp_dir, "out_t_out"), exist_ok=True) + + class TimeoutPort: + address = "tcp://127.0.0.1:0" + socket = None + + def send_json_with_retry(self, message): + raise TimeoutError("ZMQ send failed after 5 retries") + + concore.zmq_ports["t_out"] = TimeoutPort() + concore.simtime = 0 + + concore.write("t_out", "y", [1.0], delta=1) + finally: + # Restore zmq_ports and other globals to their original state. + concore.zmq_ports.clear() + concore.zmq_ports.update(orig_zmq_ports) + + if had_outpath: + concore.outpath = orig_outpath + elif hasattr(concore, "outpath"): + delattr(concore, "outpath") + + if had_simtime: + concore.simtime = orig_simtime + elif hasattr(concore, "simtime"): + delattr(concore, "simtime") diff --git a/tests/test_concoredocker.py b/tests/test_concoredocker.py index 40d6808..fe19dd1 100644 --- a/tests/test_concoredocker.py +++ b/tests/test_concoredocker.py @@ -247,3 +247,94 @@ def recv_json_with_retry(self): assert result == original assert ok is True + + +class TestZMQRetryExhaustion: + """Issue #393 – concoredocker read/write must handle TimeoutError + raised by ZMQ retry exhaustion without crashing.""" + + def test_read_returns_default_on_timeout(self, temp_dir): + """read() must return (default, False) when ZMQ recv times out.""" + import concoredocker + import concore_base + + # Save original global state to avoid leaking into other tests. + had_inpath = hasattr(concoredocker, "inpath") + orig_inpath = concoredocker.inpath if had_inpath else None + orig_zmq_ports = dict(concoredocker.zmq_ports) + had_simtime = hasattr(concoredocker, "simtime") + orig_simtime = concoredocker.simtime if had_simtime else None + + try: + concoredocker.inpath = os.path.join(temp_dir, "in") + + class TimeoutPort: + address = "tcp://127.0.0.1:0" + socket = None + + def recv_json_with_retry(self): + raise TimeoutError("ZMQ recv failed after 5 retries") + + concoredocker.zmq_ports["t_in"] = TimeoutPort() + concoredocker.simtime = 0 + + result, ok = concoredocker.read("t_in", "x", "[0.0]") + + assert result == [0.0] + assert ok is False + assert concore_base.last_read_status == "TIMEOUT" + finally: + # Restore zmq_ports and other globals to their original state. + concoredocker.zmq_ports.clear() + concoredocker.zmq_ports.update(orig_zmq_ports) + + if had_inpath: + concoredocker.inpath = orig_inpath + elif hasattr(concoredocker, "inpath"): + delattr(concoredocker, "inpath") + + if had_simtime: + concoredocker.simtime = orig_simtime + elif hasattr(concoredocker, "simtime"): + delattr(concoredocker, "simtime") + + def test_write_does_not_crash_on_timeout(self, temp_dir): + """write() must not propagate TimeoutError to the caller.""" + import concoredocker + + # Save original global state to avoid leaking into other tests. + had_outpath = hasattr(concoredocker, "outpath") + orig_outpath = concoredocker.outpath if had_outpath else None + orig_zmq_ports = dict(concoredocker.zmq_ports) + had_simtime = hasattr(concoredocker, "simtime") + orig_simtime = concoredocker.simtime if had_simtime else None + + try: + concoredocker.outpath = os.path.join(temp_dir, "out") + os.makedirs(os.path.join(temp_dir, "out_t_out"), exist_ok=True) + + class TimeoutPort: + address = "tcp://127.0.0.1:0" + socket = None + + def send_json_with_retry(self, message): + raise TimeoutError("ZMQ send failed after 5 retries") + + concoredocker.zmq_ports["t_out"] = TimeoutPort() + concoredocker.simtime = 0 + + concoredocker.write("t_out", "y", [1.0], delta=1) + finally: + # Restore zmq_ports and other globals to their original state. + concoredocker.zmq_ports.clear() + concoredocker.zmq_ports.update(orig_zmq_ports) + + if had_outpath: + concoredocker.outpath = orig_outpath + elif hasattr(concoredocker, "outpath"): + delattr(concoredocker, "outpath") + + if had_simtime: + concoredocker.simtime = orig_simtime + elif hasattr(concoredocker, "simtime"): + delattr(concoredocker, "simtime") diff --git a/tests/test_read_status.py b/tests/test_read_status.py index 29e6434..b9fe33d 100644 --- a/tests/test_read_status.py +++ b/tests/test_read_status.py @@ -179,7 +179,9 @@ def setup(self, monkeypatch): concore.zmq_ports.update(self.original_ports) def test_zmq_timeout_returns_default_and_false(self): - dummy = DummyZMQPort(response=None) # recv returns None → timeout + dummy = DummyZMQPort( + raise_on_recv=TimeoutError("ZMQ recv failed after 5 retries") + ) self.concore.zmq_ports["test_port"] = dummy data, ok = self.concore.read("test_port", "ym", "[]")