Skip to content

Commit 0f3737f

Browse files
committed
This PR fixes UNPREPARED handling in the Python driver execution path for prepared statements.
### Problem When the server returned UNPREPARED (PreparedQueryNotFound) during an EXECUTE, the driver could fail to recover in some valid cases (cache miss, race, coordinator restart, id mismatch edge cases), even when the in-flight request still had enough PreparedStatement context to recover. ### Expected behavior If the driver still has the original PreparedStatement context, it should: 1. issue PREPARE, 2. then retry the original EXECUTE, regardless of why the coordinator no longer recognizes the prepared id. ### What this PR changes In ResponseFuture PreparedQueryNotFound handling: 1. Cache the in-flight self.prepared_statement first (when present). 2. Attempt lookup by the returned UNPREPARED id from cluster._prepared_statements. 3. Reprepare using the resolved statement. 4. If returned-id lookup fails: - fallback to in-flight context if available, - otherwise fail as unknown prepared statement. This preserves correctness while improving recovery in real-world coordinator invalidation/race/restart scenarios. ### Tests Added/updated unit coverage in tests/unit/test_response_future.py: - validates reprepare path for PreparedQueryNotFound, - validates fallback to in-flight context, - validates preference for statement resolved by returned UNPREPARED id when available. Fixes: scylladb/scylladb#27657
1 parent 1ec7f66 commit 0f3737f

2 files changed

Lines changed: 76 additions & 16 deletions

File tree

cassandra/cluster.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4714,28 +4714,28 @@ def _set_result(self, host, connection, pool, response):
47144714
self.query, cl, error=response,
47154715
retry_num=self._query_retries)
47164716
elif isinstance(response, PreparedQueryNotFound):
4717+
query_id = response.info
4718+
47174719
if self.prepared_statement:
4718-
query_id = self.prepared_statement.query_id
4719-
assert query_id == response.info, \
4720-
"Got different query ID in server response (%s) than we " \
4721-
"had before (%s)" % (response.info, query_id)
4722-
else:
4723-
query_id = response.info
4720+
# Cache local in-flight context first so lookup by either id can succeed.
4721+
self.session.cluster._prepared_statements[self.prepared_statement.query_id] = self.prepared_statement
47244722

47254723
try:
4726-
prepared_statement = self.session.cluster._prepared_statements[query_id]
4724+
self.prepared_statement = self.session.cluster._prepared_statements[query_id]
47274725
except KeyError:
47284726
if not self.prepared_statement:
4729-
log.error("Tried to execute unknown prepared statement: id=%s",
4730-
query_id.encode('hex'))
4727+
log.error(f"Tried to execute unknown prepared statement: id={hexlify(query_id)}")
47314728
self._set_final_exception(response)
47324729
return
4733-
else:
4734-
prepared_statement = self.prepared_statement
4735-
self.session.cluster._prepared_statements[query_id] = prepared_statement
4730+
log.warning(
4731+
"UNPREPARED for query id %s while executing statement id %s. "
4732+
"Could not resolve returned id in cache, proceeding with in-flight context.",
4733+
hexlify(query_id),
4734+
hexlify(self.prepared_statement.query_id),
4735+
)
47364736

47374737
current_keyspace = self._connection.keyspace
4738-
prepared_keyspace = prepared_statement.keyspace
4738+
prepared_keyspace = self.prepared_statement.keyspace
47394739
if not ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) \
47404740
and prepared_keyspace and current_keyspace != prepared_keyspace:
47414741
self._set_final_exception(
@@ -4746,10 +4746,10 @@ def _set_result(self, host, connection, pool, response):
47464746
return
47474747

47484748
log.debug("Re-preparing unrecognized prepared statement against host %s: %s",
4749-
host, prepared_statement.query_string)
4750-
prepared_keyspace = prepared_statement.keyspace \
4749+
host, self.prepared_statement.query_string)
4750+
prepared_keyspace = self.prepared_statement.keyspace \
47514751
if ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) else None
4752-
prepare_message = PrepareMessage(query=prepared_statement.query_string,
4752+
prepare_message = PrepareMessage(query=self.prepared_statement.query_string,
47534753
keyspace=prepared_keyspace)
47544754
# since this might block, run on the executor to avoid hanging
47554755
# the event loop thread

tests/unit/test_response_future.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,66 @@ def test_prepared_query_not_found_bad_keyspace(self):
621621
with pytest.raises(ValueError):
622622
rf.result()
623623

624+
def test_prepared_query_not_found_uses_local_prepared_context(self):
625+
session = self.make_session()
626+
pool = session._pools.get.return_value
627+
connection = Mock(spec=Connection)
628+
pool.borrow_connection.return_value = (connection, 1)
629+
630+
rf = self.make_response_future(session)
631+
rf.send_request()
632+
633+
session.cluster.protocol_version = ProtocolVersion.V4
634+
session.cluster._prepared_statements = {}
635+
rf._connection.keyspace = "FooKeyspace"
636+
637+
rf.prepared_statement = Mock()
638+
rf.prepared_statement.query_id = b"known-query-id"
639+
rf.prepared_statement.query_string = "SELECT * FROM foobar"
640+
rf.prepared_statement.keyspace = "FooKeyspace"
641+
642+
# Different query id in UNPREPARED response should not prevent reprepare when local context exists.
643+
result = Mock(spec=PreparedQueryNotFound, info=b"other-query-id")
644+
rf._set_result(None, None, None, result)
645+
646+
assert session.submit.call_args
647+
args, _ = session.submit.call_args
648+
assert rf._reprepare == args[-5]
649+
assert isinstance(args[-4], PrepareMessage)
650+
assert args[-4].query == "SELECT * FROM foobar"
651+
652+
def test_prepared_query_not_found_prefers_returned_id_from_cache(self):
653+
session = self.make_session()
654+
pool = session._pools.get.return_value
655+
connection = Mock(spec=Connection)
656+
pool.borrow_connection.return_value = (connection, 1)
657+
658+
rf = self.make_response_future(session)
659+
rf.send_request()
660+
661+
session.cluster.protocol_version = ProtocolVersion.V4
662+
rf._connection.keyspace = "FooKeyspace"
663+
664+
rf.prepared_statement = Mock()
665+
rf.prepared_statement.query_id = b"local-id"
666+
rf.prepared_statement.query_string = "SELECT * FROM local_ctx"
667+
rf.prepared_statement.keyspace = "FooKeyspace"
668+
669+
cached_stmt = Mock()
670+
cached_stmt.query_id = b"returned-id"
671+
cached_stmt.query_string = "SELECT * FROM returned_ctx"
672+
cached_stmt.keyspace = "FooKeyspace"
673+
session.cluster._prepared_statements = {cached_stmt.query_id: cached_stmt}
674+
675+
result = Mock(spec=PreparedQueryNotFound, info=cached_stmt.query_id)
676+
rf._set_result(None, None, None, result)
677+
678+
assert session.submit.call_args
679+
args, _ = session.submit.call_args
680+
assert rf._reprepare == args[-5]
681+
assert isinstance(args[-4], PrepareMessage)
682+
assert args[-4].query == "SELECT * FROM returned_ctx"
683+
624684
def test_repeat_orig_query_after_succesful_reprepare(self):
625685
query_id = b'abc123' # Just a random binary string so we don't hit id mismatch exception
626686
session = self.make_session()

0 commit comments

Comments
 (0)