Skip to content

Commit 30ecc5e

Browse files
fix: raise TimeoutError on ZMQ retry exhaustion (#393)
send_json_with_retry() and recv_json_with_retry() now raise TimeoutError instead of silently returning None when all 5 retries are exhausted. read() and write() catch the new exception so callers never see an unhandled crash. - send_json_with_retry: raise TimeoutError (was: return) - recv_json_with_retry: raise TimeoutError (was: return None) - read(): except TimeoutError -> (default, False), status TIMEOUT - read(): remove dead 'message is None' guard (now unreachable) - write(): except TimeoutError -> log and continue - tests: save/restore global state in try/finally to prevent leakage - test_read_status: use TimeoutError instead of response=None Tests added in test_concore.py and test_concoredocker.py. All 75 tests pass. Closes #393
1 parent 01e0ec0 commit 30ecc5e

4 files changed

Lines changed: 231 additions & 8 deletions

File tree

concore_base.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ def send_json_with_retry(self, message):
5959
except zmq.Again:
6060
logger.warning(f"Send timeout (attempt {attempt + 1}/5)")
6161
time.sleep(0.5)
62-
logger.error("Failed to send after retries.")
63-
return
62+
raise TimeoutError(f"ZMQ send failed after 5 retries on {self.address}")
6463

6564
def recv_json_with_retry(self):
6665
"""Receive JSON message with retries if timeout occurs."""
@@ -70,8 +69,7 @@ def recv_json_with_retry(self):
7069
except zmq.Again:
7170
logger.warning(f"Receive timeout (attempt {attempt + 1}/5)")
7271
time.sleep(0.5)
73-
logger.error("Failed to receive after retries.")
74-
return None
72+
raise TimeoutError(f"ZMQ recv failed after 5 retries on {self.address}")
7573

7674

7775
def init_zmq_port(mod, port_name, port_type, address, socket_type_str):
@@ -270,9 +268,6 @@ def read(mod, port_identifier, name, initstr_val):
270268
zmq_p = mod.zmq_ports[port_identifier]
271269
try:
272270
message = zmq_p.recv_json_with_retry()
273-
if message is None:
274-
last_read_status = "TIMEOUT"
275-
return default_return_val, False
276271
# Strip simtime prefix if present (mirroring file-based read behavior)
277272
if isinstance(message, list) and len(message) > 0:
278273
first_element = message[0]
@@ -282,6 +277,10 @@ def read(mod, port_identifier, name, initstr_val):
282277
return message[1:], True
283278
last_read_status = "SUCCESS"
284279
return message, True
280+
except TimeoutError as e:
281+
logger.error(f"ZMQ read timeout on port {port_identifier} (name: {name}): {e}. Returning default.")
282+
last_read_status = "TIMEOUT"
283+
return default_return_val, False
285284
except zmq.error.ZMQError as e:
286285
logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
287286
last_read_status = "TIMEOUT"
@@ -384,6 +383,8 @@ def write(mod, port_identifier, name, val, delta=0):
384383
# Mutation breaks cross-language determinism (see issue #385).
385384
else:
386385
zmq_p.send_json_with_retry(zmq_val)
386+
except TimeoutError as e:
387+
logger.error(f"ZMQ write timeout on port {port_identifier} (name: {name}): {e}")
387388
except zmq.error.ZMQError as e:
388389
logger.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
389390
except Exception as e:

tests/test_concore.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
import os
33
import numpy as np
4+
from unittest.mock import patch
45

56

67
class TestSafeLiteralEval:
@@ -450,3 +451,131 @@ def test_write_timestamp_matches_cpp_semantics(self, temp_dir):
450451
"After 3 writes with delta=1 simtime must remain 0 "
451452
"(matching C++/MATLAB/Verilog); got %s" % concore.simtime
452453
)
454+
455+
456+
class TestZMQRetryExhaustion:
457+
"""Issue #393 – recv_json_with_retry / send_json_with_retry must raise
458+
TimeoutError instead of silently returning None when retries are
459+
exhausted, and callers (read / write) must handle it gracefully."""
460+
461+
@patch("concore_base.time.sleep")
462+
def test_recv_raises_timeout_error(self, _mock_sleep):
463+
"""recv_json_with_retry must raise TimeoutError after 5 failed attempts."""
464+
import concore_base
465+
466+
port = concore_base.ZeroMQPort.__new__(concore_base.ZeroMQPort)
467+
port.address = "tcp://127.0.0.1:9999"
468+
469+
class FakeSocket:
470+
def recv_json(self, flags=0):
471+
import zmq
472+
473+
raise zmq.Again("Resource temporarily unavailable")
474+
475+
port.socket = FakeSocket()
476+
with pytest.raises(TimeoutError):
477+
port.recv_json_with_retry()
478+
479+
@patch("concore_base.time.sleep")
480+
def test_send_raises_timeout_error(self, _mock_sleep):
481+
"""send_json_with_retry must raise TimeoutError after 5 failed attempts."""
482+
import concore_base
483+
484+
port = concore_base.ZeroMQPort.__new__(concore_base.ZeroMQPort)
485+
port.address = "tcp://127.0.0.1:9999"
486+
487+
class FakeSocket:
488+
def send_json(self, data, flags=0):
489+
import zmq
490+
491+
raise zmq.Again("Resource temporarily unavailable")
492+
493+
port.socket = FakeSocket()
494+
with pytest.raises(TimeoutError):
495+
port.send_json_with_retry([42])
496+
497+
def test_read_returns_default_on_timeout(self, temp_dir):
498+
"""read() must return (default, False) when ZMQ recv times out."""
499+
import concore
500+
import concore_base
501+
502+
# Save original global state to avoid leaking into other tests.
503+
had_inpath = hasattr(concore, "inpath")
504+
orig_inpath = concore.inpath if had_inpath else None
505+
orig_zmq_ports = dict(concore.zmq_ports)
506+
had_simtime = hasattr(concore, "simtime")
507+
orig_simtime = concore.simtime if had_simtime else None
508+
509+
try:
510+
concore.inpath = os.path.join(temp_dir, "in")
511+
512+
class TimeoutPort:
513+
address = "tcp://127.0.0.1:0"
514+
socket = None
515+
516+
def recv_json_with_retry(self):
517+
raise TimeoutError("ZMQ recv failed after 5 retries")
518+
519+
concore.zmq_ports["t_in"] = TimeoutPort()
520+
concore.simtime = 0
521+
522+
result, ok = concore.read("t_in", "x", "[0.0]")
523+
524+
assert result == [0.0]
525+
assert ok is False
526+
assert concore_base.last_read_status == "TIMEOUT"
527+
finally:
528+
# Restore zmq_ports and other globals to their original state.
529+
concore.zmq_ports.clear()
530+
concore.zmq_ports.update(orig_zmq_ports)
531+
532+
if had_inpath:
533+
concore.inpath = orig_inpath
534+
elif hasattr(concore, "inpath"):
535+
delattr(concore, "inpath")
536+
537+
if had_simtime:
538+
concore.simtime = orig_simtime
539+
elif hasattr(concore, "simtime"):
540+
delattr(concore, "simtime")
541+
542+
def test_write_does_not_crash_on_timeout(self, temp_dir):
543+
"""write() must not propagate TimeoutError to the caller."""
544+
import concore
545+
546+
# Save original global state to avoid leaking into other tests.
547+
had_outpath = hasattr(concore, "outpath")
548+
orig_outpath = concore.outpath if had_outpath else None
549+
orig_zmq_ports = dict(concore.zmq_ports)
550+
had_simtime = hasattr(concore, "simtime")
551+
orig_simtime = concore.simtime if had_simtime else None
552+
553+
try:
554+
concore.outpath = os.path.join(temp_dir, "out")
555+
os.makedirs(os.path.join(temp_dir, "out_t_out"), exist_ok=True)
556+
557+
class TimeoutPort:
558+
address = "tcp://127.0.0.1:0"
559+
socket = None
560+
561+
def send_json_with_retry(self, message):
562+
raise TimeoutError("ZMQ send failed after 5 retries")
563+
564+
concore.zmq_ports["t_out"] = TimeoutPort()
565+
concore.simtime = 0
566+
567+
concore.write("t_out", "y", [1.0], delta=1)
568+
finally:
569+
# Restore zmq_ports and other globals to their original state.
570+
concore.zmq_ports.clear()
571+
concore.zmq_ports.update(orig_zmq_ports)
572+
573+
if had_outpath:
574+
concore.outpath = orig_outpath
575+
elif hasattr(concore, "outpath"):
576+
delattr(concore, "outpath")
577+
578+
if had_simtime:
579+
concore.simtime = orig_simtime
580+
elif hasattr(concore, "simtime"):
581+
delattr(concore, "simtime")

tests/test_concoredocker.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,3 +247,94 @@ def recv_json_with_retry(self):
247247

248248
assert result == original
249249
assert ok is True
250+
251+
252+
class TestZMQRetryExhaustion:
253+
"""Issue #393 – concoredocker read/write must handle TimeoutError
254+
raised by ZMQ retry exhaustion without crashing."""
255+
256+
def test_read_returns_default_on_timeout(self, temp_dir):
257+
"""read() must return (default, False) when ZMQ recv times out."""
258+
import concoredocker
259+
import concore_base
260+
261+
# Save original global state to avoid leaking into other tests.
262+
had_inpath = hasattr(concoredocker, "inpath")
263+
orig_inpath = concoredocker.inpath if had_inpath else None
264+
orig_zmq_ports = dict(concoredocker.zmq_ports)
265+
had_simtime = hasattr(concoredocker, "simtime")
266+
orig_simtime = concoredocker.simtime if had_simtime else None
267+
268+
try:
269+
concoredocker.inpath = os.path.join(temp_dir, "in")
270+
271+
class TimeoutPort:
272+
address = "tcp://127.0.0.1:0"
273+
socket = None
274+
275+
def recv_json_with_retry(self):
276+
raise TimeoutError("ZMQ recv failed after 5 retries")
277+
278+
concoredocker.zmq_ports["t_in"] = TimeoutPort()
279+
concoredocker.simtime = 0
280+
281+
result, ok = concoredocker.read("t_in", "x", "[0.0]")
282+
283+
assert result == [0.0]
284+
assert ok is False
285+
assert concore_base.last_read_status == "TIMEOUT"
286+
finally:
287+
# Restore zmq_ports and other globals to their original state.
288+
concoredocker.zmq_ports.clear()
289+
concoredocker.zmq_ports.update(orig_zmq_ports)
290+
291+
if had_inpath:
292+
concoredocker.inpath = orig_inpath
293+
elif hasattr(concoredocker, "inpath"):
294+
delattr(concoredocker, "inpath")
295+
296+
if had_simtime:
297+
concoredocker.simtime = orig_simtime
298+
elif hasattr(concoredocker, "simtime"):
299+
delattr(concoredocker, "simtime")
300+
301+
def test_write_does_not_crash_on_timeout(self, temp_dir):
302+
"""write() must not propagate TimeoutError to the caller."""
303+
import concoredocker
304+
305+
# Save original global state to avoid leaking into other tests.
306+
had_outpath = hasattr(concoredocker, "outpath")
307+
orig_outpath = concoredocker.outpath if had_outpath else None
308+
orig_zmq_ports = dict(concoredocker.zmq_ports)
309+
had_simtime = hasattr(concoredocker, "simtime")
310+
orig_simtime = concoredocker.simtime if had_simtime else None
311+
312+
try:
313+
concoredocker.outpath = os.path.join(temp_dir, "out")
314+
os.makedirs(os.path.join(temp_dir, "out_t_out"), exist_ok=True)
315+
316+
class TimeoutPort:
317+
address = "tcp://127.0.0.1:0"
318+
socket = None
319+
320+
def send_json_with_retry(self, message):
321+
raise TimeoutError("ZMQ send failed after 5 retries")
322+
323+
concoredocker.zmq_ports["t_out"] = TimeoutPort()
324+
concoredocker.simtime = 0
325+
326+
concoredocker.write("t_out", "y", [1.0], delta=1)
327+
finally:
328+
# Restore zmq_ports and other globals to their original state.
329+
concoredocker.zmq_ports.clear()
330+
concoredocker.zmq_ports.update(orig_zmq_ports)
331+
332+
if had_outpath:
333+
concoredocker.outpath = orig_outpath
334+
elif hasattr(concoredocker, "outpath"):
335+
delattr(concoredocker, "outpath")
336+
337+
if had_simtime:
338+
concoredocker.simtime = orig_simtime
339+
elif hasattr(concoredocker, "simtime"):
340+
delattr(concoredocker, "simtime")

tests/test_read_status.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ def setup(self, monkeypatch):
179179
concore.zmq_ports.update(self.original_ports)
180180

181181
def test_zmq_timeout_returns_default_and_false(self):
182-
dummy = DummyZMQPort(response=None) # recv returns None → timeout
182+
dummy = DummyZMQPort(
183+
raise_on_recv=TimeoutError("ZMQ recv failed after 5 retries")
184+
)
183185
self.concore.zmq_ports["test_port"] = dummy
184186

185187
data, ok = self.concore.read("test_port", "ym", "[]")

0 commit comments

Comments
 (0)