Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions concore_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ def send_json_with_retry(self, message):
except zmq.Again:
logger.warning(f"Send timeout (attempt {attempt + 1}/5)")
time.sleep(0.5)
logger.error("Failed to send after retries.")
return
raise TimeoutError(f"ZMQ send failed after 5 retries on {self.address}")

def recv_json_with_retry(self):
"""Receive JSON message with retries if timeout occurs."""
Expand All @@ -70,8 +69,7 @@ def recv_json_with_retry(self):
except zmq.Again:
logger.warning(f"Receive timeout (attempt {attempt + 1}/5)")
time.sleep(0.5)
logger.error("Failed to receive after retries.")
return None
raise TimeoutError(f"ZMQ recv failed after 5 retries on {self.address}")


def init_zmq_port(mod, port_name, port_type, address, socket_type_str):
Expand Down Expand Up @@ -270,9 +268,6 @@ def read(mod, port_identifier, name, initstr_val):
zmq_p = mod.zmq_ports[port_identifier]
try:
message = zmq_p.recv_json_with_retry()
if message is None:
last_read_status = "TIMEOUT"
return default_return_val, False
# Strip simtime prefix if present (mirroring file-based read behavior)
if isinstance(message, list) and len(message) > 0:
first_element = message[0]
Expand All @@ -282,6 +277,10 @@ def read(mod, port_identifier, name, initstr_val):
return message[1:], True
last_read_status = "SUCCESS"
return message, True
except TimeoutError as e:
logger.error(f"ZMQ read timeout on port {port_identifier} (name: {name}): {e}. Returning default.")
last_read_status = "TIMEOUT"
return default_return_val, False
except zmq.error.ZMQError as e:
logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
last_read_status = "TIMEOUT"
Expand Down Expand Up @@ -384,6 +383,8 @@ def write(mod, port_identifier, name, val, delta=0):
# Mutation breaks cross-language determinism (see issue #385).
else:
zmq_p.send_json_with_retry(zmq_val)
except TimeoutError as e:
logger.error(f"ZMQ write timeout on port {port_identifier} (name: {name}): {e}")
except zmq.error.ZMQError as e:
logger.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
except Exception as e:
Expand Down
129 changes: 129 additions & 0 deletions tests/test_concore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import os
import numpy as np
from unittest.mock import patch


class TestSafeLiteralEval:
Expand Down Expand Up @@ -450,3 +451,131 @@ def test_write_timestamp_matches_cpp_semantics(self, temp_dir):
"After 3 writes with delta=1 simtime must remain 0 "
"(matching C++/MATLAB/Verilog); got %s" % concore.simtime
)


class TestZMQRetryExhaustion:
"""Issue #393 – recv_json_with_retry / send_json_with_retry must raise
TimeoutError instead of silently returning None when retries are
exhausted, and callers (read / write) must handle it gracefully."""

@patch("concore_base.time.sleep")
def test_recv_raises_timeout_error(self, _mock_sleep):
"""recv_json_with_retry must raise TimeoutError after 5 failed attempts."""
import concore_base

port = concore_base.ZeroMQPort.__new__(concore_base.ZeroMQPort)
port.address = "tcp://127.0.0.1:9999"

class FakeSocket:
def recv_json(self, flags=0):
import zmq

raise zmq.Again("Resource temporarily unavailable")

port.socket = FakeSocket()
with pytest.raises(TimeoutError):
port.recv_json_with_retry()

@patch("concore_base.time.sleep")
def test_send_raises_timeout_error(self, _mock_sleep):
"""send_json_with_retry must raise TimeoutError after 5 failed attempts."""
import concore_base

port = concore_base.ZeroMQPort.__new__(concore_base.ZeroMQPort)
port.address = "tcp://127.0.0.1:9999"

class FakeSocket:
def send_json(self, data, flags=0):
import zmq

raise zmq.Again("Resource temporarily unavailable")

port.socket = FakeSocket()
with pytest.raises(TimeoutError):
port.send_json_with_retry([42])

def test_read_returns_default_on_timeout(self, temp_dir):
"""read() must return (default, False) when ZMQ recv times out."""
import concore
import concore_base

# Save original global state to avoid leaking into other tests.
had_inpath = hasattr(concore, "inpath")
orig_inpath = concore.inpath if had_inpath else None
orig_zmq_ports = dict(concore.zmq_ports)
had_simtime = hasattr(concore, "simtime")
orig_simtime = concore.simtime if had_simtime else None

try:
concore.inpath = os.path.join(temp_dir, "in")

class TimeoutPort:
address = "tcp://127.0.0.1:0"
socket = None

def recv_json_with_retry(self):
raise TimeoutError("ZMQ recv failed after 5 retries")

concore.zmq_ports["t_in"] = TimeoutPort()
concore.simtime = 0

result, ok = concore.read("t_in", "x", "[0.0]")

assert result == [0.0]
assert ok is False
assert concore_base.last_read_status == "TIMEOUT"
finally:
# Restore zmq_ports and other globals to their original state.
concore.zmq_ports.clear()
concore.zmq_ports.update(orig_zmq_ports)

if had_inpath:
concore.inpath = orig_inpath
elif hasattr(concore, "inpath"):
delattr(concore, "inpath")

if had_simtime:
concore.simtime = orig_simtime
elif hasattr(concore, "simtime"):
delattr(concore, "simtime")

def test_write_does_not_crash_on_timeout(self, temp_dir):
"""write() must not propagate TimeoutError to the caller."""
import concore

# Save original global state to avoid leaking into other tests.
had_outpath = hasattr(concore, "outpath")
orig_outpath = concore.outpath if had_outpath else None
orig_zmq_ports = dict(concore.zmq_ports)
had_simtime = hasattr(concore, "simtime")
orig_simtime = concore.simtime if had_simtime else None

try:
concore.outpath = os.path.join(temp_dir, "out")
os.makedirs(os.path.join(temp_dir, "out_t_out"), exist_ok=True)

class TimeoutPort:
address = "tcp://127.0.0.1:0"
socket = None

def send_json_with_retry(self, message):
raise TimeoutError("ZMQ send failed after 5 retries")

concore.zmq_ports["t_out"] = TimeoutPort()
concore.simtime = 0

concore.write("t_out", "y", [1.0], delta=1)
finally:
# Restore zmq_ports and other globals to their original state.
concore.zmq_ports.clear()
concore.zmq_ports.update(orig_zmq_ports)

if had_outpath:
concore.outpath = orig_outpath
elif hasattr(concore, "outpath"):
delattr(concore, "outpath")

if had_simtime:
concore.simtime = orig_simtime
elif hasattr(concore, "simtime"):
delattr(concore, "simtime")
91 changes: 91 additions & 0 deletions tests/test_concoredocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,94 @@ def recv_json_with_retry(self):

assert result == original
assert ok is True


class TestZMQRetryExhaustion:
"""Issue #393 – concoredocker read/write must handle TimeoutError
raised by ZMQ retry exhaustion without crashing."""

def test_read_returns_default_on_timeout(self, temp_dir):
"""read() must return (default, False) when ZMQ recv times out."""
import concoredocker
import concore_base

# Save original global state to avoid leaking into other tests.
had_inpath = hasattr(concoredocker, "inpath")
orig_inpath = concoredocker.inpath if had_inpath else None
orig_zmq_ports = dict(concoredocker.zmq_ports)
had_simtime = hasattr(concoredocker, "simtime")
orig_simtime = concoredocker.simtime if had_simtime else None

try:
concoredocker.inpath = os.path.join(temp_dir, "in")

class TimeoutPort:
address = "tcp://127.0.0.1:0"
socket = None

def recv_json_with_retry(self):
raise TimeoutError("ZMQ recv failed after 5 retries")

concoredocker.zmq_ports["t_in"] = TimeoutPort()
concoredocker.simtime = 0

result, ok = concoredocker.read("t_in", "x", "[0.0]")

assert result == [0.0]
assert ok is False
assert concore_base.last_read_status == "TIMEOUT"
finally:
# Restore zmq_ports and other globals to their original state.
concoredocker.zmq_ports.clear()
concoredocker.zmq_ports.update(orig_zmq_ports)

if had_inpath:
concoredocker.inpath = orig_inpath
elif hasattr(concoredocker, "inpath"):
delattr(concoredocker, "inpath")

if had_simtime:
concoredocker.simtime = orig_simtime
elif hasattr(concoredocker, "simtime"):
delattr(concoredocker, "simtime")

def test_write_does_not_crash_on_timeout(self, temp_dir):
"""write() must not propagate TimeoutError to the caller."""
import concoredocker

# Save original global state to avoid leaking into other tests.
had_outpath = hasattr(concoredocker, "outpath")
orig_outpath = concoredocker.outpath if had_outpath else None
orig_zmq_ports = dict(concoredocker.zmq_ports)
had_simtime = hasattr(concoredocker, "simtime")
orig_simtime = concoredocker.simtime if had_simtime else None

try:
concoredocker.outpath = os.path.join(temp_dir, "out")
os.makedirs(os.path.join(temp_dir, "out_t_out"), exist_ok=True)

class TimeoutPort:
address = "tcp://127.0.0.1:0"
socket = None

def send_json_with_retry(self, message):
raise TimeoutError("ZMQ send failed after 5 retries")

concoredocker.zmq_ports["t_out"] = TimeoutPort()
concoredocker.simtime = 0

concoredocker.write("t_out", "y", [1.0], delta=1)
finally:
# Restore zmq_ports and other globals to their original state.
concoredocker.zmq_ports.clear()
concoredocker.zmq_ports.update(orig_zmq_ports)

if had_outpath:
concoredocker.outpath = orig_outpath
elif hasattr(concoredocker, "outpath"):
delattr(concoredocker, "outpath")

if had_simtime:
concoredocker.simtime = orig_simtime
elif hasattr(concoredocker, "simtime"):
delattr(concoredocker, "simtime")
4 changes: 3 additions & 1 deletion tests/test_read_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ def setup(self, monkeypatch):
concore.zmq_ports.update(self.original_ports)

def test_zmq_timeout_returns_default_and_false(self):
dummy = DummyZMQPort(response=None) # recv returns None → timeout
dummy = DummyZMQPort(
raise_on_recv=TimeoutError("ZMQ recv failed after 5 retries")
)
self.concore.zmq_ports["test_port"] = dummy

data, ok = self.concore.read("test_port", "ym", "[]")
Expand Down