Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@
<artifactId>tools</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.datastax.oss.driver.core.metadata;

import static org.awaitility.Awaitility.await;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
Expand Down Expand Up @@ -340,16 +342,15 @@ public void should_receive_each_tablet_exactly_once() {
}

private static boolean waitSessionLearnedTabletInfo(CqlSession session) {
if (isSessionLearnedTabletInfo(session)) {
return true;
}
// Wait till tablet update, which is async, is completed
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
await()
.atMost(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(50))
.until(() -> isSessionLearnedTabletInfo(session));
return true;
} catch (org.awaitility.core.ConditionTimeoutException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we import this as well?

return false;
}
return isSessionLearnedTabletInfo(session);
}

private static boolean checkIfRoutedProperly(CqlSession session, Statement stmt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,12 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception
// trigger node1 UP -> DOWN
eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.DOWN, node1));

Thread.sleep(expireAfter.toMillis());

// then node-level metrics should be evicted from node1, but
// node2 and node3 metrics should not have been evicted
await().untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
await()
.atMost(expireAfter.plusSeconds(5))
.pollInterval(Duration.ofMillis(100))
.untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
assertNodeMetricsNotEvicted(session, node2);
assertNodeMetricsNotEvicted(session, node3);

Expand Down Expand Up @@ -219,19 +220,25 @@ public void should_not_evict_down_node_metrics_when_node_is_back_up_before_timeo
eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.FORCED_DOWN, node2));
eventBus.fire(NodeStateEvent.removed(node3));

Thread.sleep(500);
// Wait for half the expiry window before bringing nodes back up
await().pollDelay(Duration.ofMillis(500)).atMost(Duration.ofSeconds(5)).until(() -> true);

// trigger nodes DOWN -> UP, should cancel the timeouts
eventBus.fire(NodeStateEvent.changed(NodeState.DOWN, NodeState.UP, node1));
eventBus.fire(NodeStateEvent.changed(NodeState.FORCED_DOWN, NodeState.UP, node2));
eventBus.fire(NodeStateEvent.added(node3));

Thread.sleep(expireAfter.toMillis());

// then no node-level metrics should be evicted
assertNodeMetricsNotEvicted(session, node1);
assertNodeMetricsNotEvicted(session, node2);
assertNodeMetricsNotEvicted(session, node3);
// Wait for the full expiry duration and verify metrics are never evicted
await()
.during(expireAfter)
.atMost(expireAfter.plusSeconds(5))
.pollInterval(Duration.ofMillis(200))
.untilAsserted(
() -> {
assertNodeMetricsNotEvicted(session, node1);
assertNodeMetricsNotEvicted(session, node2);
assertNodeMetricsNotEvicted(session, node3);
});

} finally {
AbstractMetricUpdater.MIN_EXPIRE_AFTER = Duration.ofMinutes(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package com.datastax.oss.driver.core.resolver;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -34,8 +35,6 @@
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.categories.IsolatedTests;
Expand Down Expand Up @@ -130,30 +129,16 @@ public void replace_cluster_test() {
ccmBridge.create();
ccmBridge.start();
session = builder.build();
boolean allNodesUp = false;
int nodesUp = 0;
for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) {
try {
Collection<Node> nodes = session.getMetadata().getNodes().values();
nodesUp = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
nodesUp++;
}
}
if (nodesUp == numberOfNodes) {
allNodesUp = true;
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
if (!allNodesUp) {
final CqlSession firstSession = session;
try {
await()
.atMost(Duration.ofSeconds(CLUSTER_WAIT_SECONDS))
.pollInterval(Duration.ofSeconds(1))
.until(() -> countUpNodes(firstSession) == numberOfNodes);
} catch (org.awaitility.core.ConditionTimeoutException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
nodesUp,
countUpNodes(firstSession),
numberOfNodes,
CLUSTER_WAIT_SECONDS);
}
Expand All @@ -178,30 +163,15 @@ public void replace_cluster_test() {
CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) {
ccmBridge.create();
ccmBridge.start();
boolean allNodesUp = false;
int nodesUp = 0;
for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) {
try {
Collection<Node> nodes = session.getMetadata().getNodes().values();
nodesUp = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
nodesUp++;
}
}
if (nodesUp == numberOfNodes) {
allNodesUp = true;
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
if (!allNodesUp) {
try {
await()
.atMost(Duration.ofSeconds(CLUSTER_WAIT_SECONDS))
.pollInterval(Duration.ofSeconds(1))
.until(() -> countUpNodes(session) == numberOfNodes);
} catch (org.awaitility.core.ConditionTimeoutException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📍

LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
nodesUp,
countUpNodes(session),
numberOfNodes,
CLUSTER_WAIT_SECONDS);
}
Expand Down Expand Up @@ -269,30 +239,8 @@ public void cannot_reconnect_with_resolved_socket() {
ccmBridge.create();
ccmBridge.start();
session = builder.build();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("select * from system.local where key='local'")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
final CqlSession firstUnusedSession = session;
awaitAllNodesUp(firstUnusedSession, 3);
ResultSet rs = session.execute("select * from system.local where key='local'");
assertThat(rs).isNotNull();
Row row = rs.one();
Expand Down Expand Up @@ -329,29 +277,7 @@ public void cannot_reconnect_with_resolved_socket() {
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
SimpleStatement statement =
new SimpleStatementBuilder("select * from system.local where key='local'")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
awaitAllNodesUp(session, 3);
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator<Node> iterator = nodes.iterator();
Expand Down Expand Up @@ -384,32 +310,34 @@ public void cannot_reconnect_with_resolved_socket() {
// Now the driver should fail to reconnect since unresolved hostname is gone.
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("select * from system.local where key='local'")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
awaitAllNodesUp(session, 3);
session.execute("select * from system.local where key='local'");
}
session.close();
}

private static int countUpNodes(CqlSession session) {
int count = 0;
for (Node node : session.getMetadata().getNodes().values()) {
if (node.getUpSinceMillis() > 0) {
count++;
}
}
return count;
}

private static void awaitAllNodesUp(CqlSession session, int expectedNodes) {
try {
await()
.atMost(Duration.ofSeconds(CLUSTER_WAIT_SECONDS))
.pollInterval(Duration.ofSeconds(1))
.until(() -> countUpNodes(session) == expectedNodes);
} catch (org.awaitility.core.ConditionTimeoutException e) {
LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
countUpNodes(session),
expectedNodes,
CLUSTER_WAIT_SECONDS);
}
}
}
Loading