Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4714,28 +4714,26 @@ def _set_result(self, host, connection, pool, response):
self.query, cl, error=response,
retry_num=self._query_retries)
elif isinstance(response, PreparedQueryNotFound):
query_id = response.info

if self.prepared_statement:
query_id = self.prepared_statement.query_id
assert query_id == response.info, \
"Got different query ID in server response (%s) than we " \
"had before (%s)" % (response.info, query_id)
else:
query_id = response.info
# Cache local in-flight context first so lookup by either id can succeed.
self.session.cluster.add_prepared(self.prepared_statement.query_id, self.prepared_statement)

try:
prepared_statement = self.session.cluster._prepared_statements[query_id]
self.prepared_statement = self.session.cluster._prepared_statements[query_id]
except KeyError:
if not self.prepared_statement:
log.error("Tried to execute unknown prepared statement: id=%s",
query_id.encode('hex'))
log.error("Tried to execute unknown prepared statement: id=%s", hexlify(query_id))
self._set_final_exception(response)
return
else:
prepared_statement = self.prepared_statement
self.session.cluster._prepared_statements[query_id] = prepared_statement
log.warning(
"UNPREPARED for query id %s while executing statement id %s. "
"Could not resolve returned id in cache, proceeding with in-flight context.",
hexlify(query_id), hexlify(self.prepared_statement.query_id))

current_keyspace = self._connection.keyspace
prepared_keyspace = prepared_statement.keyspace
prepared_keyspace = self.prepared_statement.keyspace
if not ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) \
and prepared_keyspace and current_keyspace != prepared_keyspace:
self._set_final_exception(
Expand All @@ -4745,11 +4743,13 @@ def _set_result(self, host, connection, pool, response):
(current_keyspace, prepared_keyspace)))
return

log.debug("Re-preparing unrecognized prepared statement against host %s: %s",
host, prepared_statement.query_string)
prepared_keyspace = prepared_statement.keyspace \
log.debug(
"Re-preparing unrecognized prepared statement against host %s: %s",
host, self.prepared_statement.query_string
)
prepared_keyspace = self.prepared_statement.keyspace \
if ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) else None
prepare_message = PrepareMessage(query=prepared_statement.query_string,
prepare_message = PrepareMessage(query=self.prepared_statement.query_string,
Comment on lines 4716 to +4752

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at those changes and I don't see what you are trying to achieve.
In PR description you wrote:

Expected behavior

If the driver still has the original PreparedStatement context, it should:

  1. issue PREPARE,
  2. then retry the original EXECUTE, regardless of why the coordinator no longer recognizes the prepared id.

But this is what the driver already does?

  1. Cache the in-flight self.prepared_statement first (when present).

Why does it need to be done first?

In general the existing logic looks sound to me:

  • In first step it determines id of preprared statement, trying both self.prepared_statement and response.info, and also validating protocol assumptions.
  • Then it resolves prepared statement object, also trying both cluster-scoped cache (self.session.cluster._prepared_statements) and self.prepared_statement. It also adds self.prepared_statement to cache in that case.

So, what problem is this PR trying to solve? In what scenario does existing logic fail?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one of this scenarios scylladb/scylladb#27657
the driver should retry but it doesn't

Copy link

@Lorak-mmk Lorak-mmk Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linked issue describes the need for automatic reprepare. Driver already had mechanism for automatic repreparation - your PR doesn't add it, just modifies its logic. Which is why I'm asking what EXACTLY was wrong with the previous logic, how exactly it not working.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd have a test that fails before the fix and passes after it. May not be easy to reproduce perhaps.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd have a test that fails before the fix and passes after it.

Sure, but for now I'd be happy with a textual reasoning, so that I can understand the changes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lorak-mmk , here is the part of the mentioned discussion: scylladb/scylladb#27657 (comment)

If I understand correctly there was an experiment when Scylla returned UNPREPARED = 0x2500 sc to driver and it didnt repreapre the statement. So if driver already has mechanism for automatic repreparation it didnt work during the experiment - scylladb/scylladb#27657 (comment)

Copy link

@Lorak-mmk Lorak-mmk Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I got that. What I don't understand from this PR description is why exactly it didn't work, and how those changes address that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR the logic works roughly like this:

If self.prepared_statement exists, the code forces the query_id to be the local one (self.prepared_statement.query_id).
It then asserts that this id matches the one returned by the server (response.info).
After that it tries to resolve the statement:
It looks it up in the cache using
cluster._prepared_statements[query_id].
If that lookup fails (KeyError), it falls back to the local prepared statement (self.prepared_statement) and stores it in the cache under that query_id.

now this is might be problematic because:

  1. It ignores the server’s returned ID when a local statement exists
    If we already have a local prepared statement, the code overwrites the ID with the local one before doing the lookup.
    That means the ID returned by the server is effectively ignored in this path, even though it might be the correct one to use.

  2. The assertion can break recovery
    The code asserts that the local ID and the server-returned ID must match.
    However, in real situations (for example cache invalidation, server restart or crushes), the server might legitimately return a different ID.
    When that happens, the assertion triggers and kills the flow before the driver has a chance to recover or re-prepare the statement gracefully.

  3. Cache updates may reinforce the wrong mapping
    If the cache lookup fails, the code stores the local prepared statement back into the cache under the current query_id.
    But in this branch the query_id is the local ID, not necessarily the one the server returned.
    Because of this, the logic never explicitly tries the clean path of:
    First resolve using the ID the server returned, and only fall back to the local statement if needed.

In some LWT tests we actually see this behavior, instead of re-sending its just returns does nothing and waits forever

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I got that. What I don't understand from this PR description is why exactly it didn't work, and how those changes address that.

why exactly it didn't work

This was actually a question for the drivers team :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually a question for the drivers team :)

sadly almost 2 months ago :(

keyspace=prepared_keyspace)
# since this might block, run on the executor to avoid hanging
# the event loop thread
Expand Down
60 changes: 60 additions & 0 deletions tests/unit/test_response_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,66 @@ def test_prepared_query_not_found_bad_keyspace(self):
with pytest.raises(ValueError):
rf.result()

def test_prepared_query_not_found_uses_local_prepared_context(self):
session = self.make_session()
pool = session._pools.get.return_value
connection = Mock(spec=Connection)
pool.borrow_connection.return_value = (connection, 1)

rf = self.make_response_future(session)
rf.send_request()

session.cluster.protocol_version = ProtocolVersion.V4
session.cluster._prepared_statements = {}
rf._connection.keyspace = "FooKeyspace"

rf.prepared_statement = Mock()
rf.prepared_statement.query_id = b"known-query-id"
rf.prepared_statement.query_string = "SELECT * FROM foobar"
rf.prepared_statement.keyspace = "FooKeyspace"

# Different query id in UNPREPARED response should not prevent reprepare when local context exists.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? Prepared id suddenly changing seems like a serious protocol violation. What are the scenarios when it can happen?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because thats what i understand , whats expected , there can be a crush a race with the server , and then the prepared statement will be evicted and we need to rerun it.
it has lower performance but returning an error is incorrect.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not questioning the need for automatic repreparation. It is a well-known mechanism, and all drivers do this, including Python driver before your changes.
What this review comment is about is about prepared statements id. Those IDs are constant - dependent on query string, and connection keyspace. All existing drivers depend on that.

Statement getting evicted from the cache is a known and handled scenario, but the id doesn't change in this scenario, even across restarts.

result = Mock(spec=PreparedQueryNotFound, info=b"other-query-id")
rf._set_result(None, None, None, result)

assert session.submit.call_args
args, _ = session.submit.call_args
assert rf._reprepare == args[-5]
assert isinstance(args[-4], PrepareMessage)
assert args[-4].query == "SELECT * FROM foobar"

def test_prepared_query_not_found_prefers_returned_id_from_cache(self):
session = self.make_session()
pool = session._pools.get.return_value
connection = Mock(spec=Connection)
pool.borrow_connection.return_value = (connection, 1)

rf = self.make_response_future(session)
rf.send_request()

session.cluster.protocol_version = ProtocolVersion.V4
rf._connection.keyspace = "FooKeyspace"

rf.prepared_statement = Mock()
rf.prepared_statement.query_id = b"local-id"
rf.prepared_statement.query_string = "SELECT * FROM local_ctx"
rf.prepared_statement.keyspace = "FooKeyspace"

cached_stmt = Mock()
cached_stmt.query_id = b"returned-id"
cached_stmt.query_string = "SELECT * FROM returned_ctx"
cached_stmt.keyspace = "FooKeyspace"
session.cluster._prepared_statements = {cached_stmt.query_id: cached_stmt}

result = Mock(spec=PreparedQueryNotFound, info=cached_stmt.query_id)
rf._set_result(None, None, None, result)

assert session.submit.call_args
args, _ = session.submit.call_args
assert rf._reprepare == args[-5]
assert isinstance(args[-4], PrepareMessage)
assert args[-4].query == "SELECT * FROM returned_ctx"

def test_repeat_orig_query_after_succesful_reprepare(self):
query_id = b'abc123' # Just a random binary string so we don't hit id mismatch exception
session = self.make_session()
Expand Down
Loading