diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index fa343078e68..f780af8802f 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -223,6 +223,11 @@
tools
test
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java
index 9146d379980..32d6c79aede 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java
@@ -1,5 +1,7 @@
package com.datastax.oss.driver.core.metadata;
+import static org.awaitility.Awaitility.await;
+
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
@@ -340,16 +342,15 @@ public void should_receive_each_tablet_exactly_once() {
}
private static boolean waitSessionLearnedTabletInfo(CqlSession session) {
- if (isSessionLearnedTabletInfo(session)) {
- return true;
- }
- // Wait till tablet update, which is async, is completed
try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ await()
+ .atMost(Duration.ofSeconds(5))
+ .pollInterval(Duration.ofMillis(50))
+ .until(() -> isSessionLearnedTabletInfo(session));
+ return true;
+ } catch (org.awaitility.core.ConditionTimeoutException e) {
+ return false;
}
- return isSessionLearnedTabletInfo(session);
}
private static boolean checkIfRoutedProperly(CqlSession session, Statement stmt) {
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java
index e6121217619..ac9c2673875 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java
@@ -174,11 +174,12 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception
// trigger node1 UP -> DOWN
eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.DOWN, node1));
- Thread.sleep(expireAfter.toMillis());
-
// then node-level metrics should be evicted from node1, but
// node2 and node3 metrics should not have been evicted
- await().untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
+ await()
+ .atMost(expireAfter.plusSeconds(5))
+ .pollInterval(Duration.ofMillis(100))
+ .untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
assertNodeMetricsNotEvicted(session, node2);
assertNodeMetricsNotEvicted(session, node3);
@@ -219,19 +220,25 @@ public void should_not_evict_down_node_metrics_when_node_is_back_up_before_timeo
eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.FORCED_DOWN, node2));
eventBus.fire(NodeStateEvent.removed(node3));
- Thread.sleep(500);
+ // Wait for half the expiry window before bringing nodes back up
+ await().pollDelay(Duration.ofMillis(500)).atMost(Duration.ofSeconds(5)).until(() -> true);
// trigger nodes DOWN -> UP, should cancel the timeouts
eventBus.fire(NodeStateEvent.changed(NodeState.DOWN, NodeState.UP, node1));
eventBus.fire(NodeStateEvent.changed(NodeState.FORCED_DOWN, NodeState.UP, node2));
eventBus.fire(NodeStateEvent.added(node3));
- Thread.sleep(expireAfter.toMillis());
-
- // then no node-level metrics should be evicted
- assertNodeMetricsNotEvicted(session, node1);
- assertNodeMetricsNotEvicted(session, node2);
- assertNodeMetricsNotEvicted(session, node3);
+ // Wait for the full expiry duration and verify metrics are never evicted
+ await()
+ .during(expireAfter)
+ .atMost(expireAfter.plusSeconds(5))
+ .pollInterval(Duration.ofMillis(200))
+ .untilAsserted(
+ () -> {
+ assertNodeMetricsNotEvicted(session, node1);
+ assertNodeMetricsNotEvicted(session, node2);
+ assertNodeMetricsNotEvicted(session, node3);
+ });
} finally {
AbstractMetricUpdater.MIN_EXPIRE_AFTER = Duration.ofMinutes(5);
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java
index ad8614ef281..919ee214897 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java
@@ -24,6 +24,7 @@
package com.datastax.oss.driver.core.resolver;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -34,8 +35,6 @@
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
-import com.datastax.oss.driver.api.core.cql.SimpleStatement;
-import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.categories.IsolatedTests;
@@ -130,30 +129,16 @@ public void replace_cluster_test() {
ccmBridge.create();
ccmBridge.start();
session = builder.build();
- boolean allNodesUp = false;
- int nodesUp = 0;
- for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) {
- try {
- Collection nodes = session.getMetadata().getNodes().values();
- nodesUp = 0;
- for (Node node : nodes) {
- if (node.getUpSinceMillis() > 0) {
- nodesUp++;
- }
- }
- if (nodesUp == numberOfNodes) {
- allNodesUp = true;
- break;
- }
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- break;
- }
- }
- if (!allNodesUp) {
+ final CqlSession firstSession = session;
+ try {
+ await()
+ .atMost(Duration.ofSeconds(CLUSTER_WAIT_SECONDS))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> countUpNodes(firstSession) == numberOfNodes);
+ } catch (org.awaitility.core.ConditionTimeoutException e) {
LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
- nodesUp,
+ countUpNodes(firstSession),
numberOfNodes,
CLUSTER_WAIT_SECONDS);
}
@@ -178,30 +163,15 @@ public void replace_cluster_test() {
CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) {
ccmBridge.create();
ccmBridge.start();
- boolean allNodesUp = false;
- int nodesUp = 0;
- for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) {
- try {
- Collection nodes = session.getMetadata().getNodes().values();
- nodesUp = 0;
- for (Node node : nodes) {
- if (node.getUpSinceMillis() > 0) {
- nodesUp++;
- }
- }
- if (nodesUp == numberOfNodes) {
- allNodesUp = true;
- break;
- }
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- break;
- }
- }
- if (!allNodesUp) {
+ try {
+ await()
+ .atMost(Duration.ofSeconds(CLUSTER_WAIT_SECONDS))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> countUpNodes(session) == numberOfNodes);
+ } catch (org.awaitility.core.ConditionTimeoutException e) {
LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
- nodesUp,
+ countUpNodes(session),
numberOfNodes,
CLUSTER_WAIT_SECONDS);
}
@@ -269,30 +239,8 @@ public void cannot_reconnect_with_resolved_socket() {
ccmBridge.create();
ccmBridge.start();
session = builder.build();
- long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
- while (System.currentTimeMillis() < endTime) {
- try {
- nodes = session.getMetadata().getNodes().values();
- int upNodes = 0;
- for (Node node : nodes) {
- if (node.getUpSinceMillis() > 0) {
- upNodes++;
- }
- }
- if (upNodes == 3) {
- break;
- }
- // session.refreshSchema();
- SimpleStatement statement =
- new SimpleStatementBuilder("select * from system.local where key='local'")
- .setTimeout(Duration.ofSeconds(3))
- .build();
- session.executeAsync(statement);
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- break;
- }
- }
+ final CqlSession firstUnusedSession = session;
+ awaitAllNodesUp(firstUnusedSession, 3);
ResultSet rs = session.execute("select * from system.local where key='local'");
assertThat(rs).isNotNull();
Row row = rs.one();
@@ -329,29 +277,7 @@ public void cannot_reconnect_with_resolved_socket() {
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
ccmBridge.create();
ccmBridge.start();
- long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
- while (System.currentTimeMillis() < endTime) {
- try {
- nodes = session.getMetadata().getNodes().values();
- int upNodes = 0;
- for (Node node : nodes) {
- if (node.getUpSinceMillis() > 0) {
- upNodes++;
- }
- }
- if (upNodes == 3) {
- break;
- }
- SimpleStatement statement =
- new SimpleStatementBuilder("select * from system.local where key='local'")
- .setTimeout(Duration.ofSeconds(3))
- .build();
- session.executeAsync(statement);
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- break;
- }
- }
+ awaitAllNodesUp(session, 3);
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator iterator = nodes.iterator();
@@ -384,32 +310,34 @@ public void cannot_reconnect_with_resolved_socket() {
// Now the driver should fail to reconnect since unresolved hostname is gone.
ccmBridge.create();
ccmBridge.start();
- long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
- while (System.currentTimeMillis() < endTime) {
- try {
- nodes = session.getMetadata().getNodes().values();
- int upNodes = 0;
- for (Node node : nodes) {
- if (node.getUpSinceMillis() > 0) {
- upNodes++;
- }
- }
- if (upNodes == 3) {
- break;
- }
- // session.refreshSchema();
- SimpleStatement statement =
- new SimpleStatementBuilder("select * from system.local where key='local'")
- .setTimeout(Duration.ofSeconds(3))
- .build();
- session.executeAsync(statement);
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- break;
- }
- }
+ awaitAllNodesUp(session, 3);
session.execute("select * from system.local where key='local'");
}
session.close();
}
+
+ private static int countUpNodes(CqlSession session) {
+ int count = 0;
+ for (Node node : session.getMetadata().getNodes().values()) {
+ if (node.getUpSinceMillis() > 0) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private static void awaitAllNodesUp(CqlSession session, int expectedNodes) {
+ try {
+ await()
+ .atMost(Duration.ofSeconds(CLUSTER_WAIT_SECONDS))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> countUpNodes(session) == expectedNodes);
+ } catch (org.awaitility.core.ConditionTimeoutException e) {
+ LOG.error(
+ "Driver sees only {} nodes UP instead of {} after waiting {}s",
+ countUpNodes(session),
+ expectedNodes,
+ CLUSTER_WAIT_SECONDS);
+ }
+ }
}