diff --git a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java index 8ec1894a8f7..da284beb087 100644 --- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java +++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.lessThan; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -45,6 +46,7 @@ import org.apache.storm.messaging.TaskMessage; import org.apache.storm.messaging.TransportFactory; import org.apache.storm.utils.Utils; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,43 +64,24 @@ public class NettyTest { * manually that the server and the client connections are ready before we commence testing. If we don't do this, then we will lose the * first messages being sent between the client and the server, which will fail the tests. */ - private void waitUntilReady(IConnection... connections) throws Exception { + private void waitUntilReady(IConnection... connections) { LOG.info("Waiting until all Netty connections are ready..."); - int intervalMs = 10; - int maxWaitMs = 5000; - int waitedMs = 0; - while (true) { - if (Arrays.stream(connections) - .allMatch(WorkerState::isConnectionReady)) { - LOG.info("All Netty connections are ready"); - break; - } - if (waitedMs > maxWaitMs) { - throw new RuntimeException("Netty connections were not ready within " + maxWaitMs + " ms"); - } - Thread.sleep(intervalMs); - waitedMs += intervalMs; - } + Awaitility.await("all Netty connections to be ready") + .atMost(5, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> Arrays.stream(connections).allMatch(WorkerState::isConnectionReady)); + LOG.info("All Netty connections are ready"); } private IConnectionCallback mkConnectionCallback(Consumer myFn) { return (batch) -> batch.forEach(myFn::accept); } - private Runnable sleep() { - return () -> { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw Utils.wrapInRuntime(e); - } - }; - } - private void waitForNotNull(AtomicReference response) { - Testing.whileTimeout(Testing.TEST_TIMEOUT_MS, - () -> response.get() == null, - sleep()); + Awaitility.await("response to be non-null") + .atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> response.get() != null); } private void send(IConnection client, int taskId, byte[] messageBytes) { @@ -199,9 +182,10 @@ private void doTestLoad(Map stormConf) throws Exception { List tasks = new ArrayList<>(); tasks.add(1); tasks.add(2); - Testing.whileTimeout(Testing.TEST_TIMEOUT_MS, - () -> client.getLoad(tasks).isEmpty(), - sleep()); + Awaitility.await("client to receive load metrics") + .atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> !client.getLoad(tasks).isEmpty()); Map load = client.getLoad(tasks); assertThat(load.get(1).getBoltLoad(), is(0.0)); assertThat(load.get(2).getBoltLoad(), is(1.0)); @@ -326,11 +310,12 @@ private void doTestBatch(Map stormConf) throws Exception { IntStream.range(1, numMessages) .forEach(i -> send(client, taskId, String.valueOf(i).getBytes(StandardCharsets.UTF_8))); - Testing.whileTimeout(Testing.TEST_TIMEOUT_MS, - () -> responses.size() < numMessages - 1, - () -> { + Awaitility.await("all batch messages to be received") + .atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> { LOG.info("{} of {} received", responses.size(), numMessages - 1); - sleep().run(); + return responses.size() >= numMessages - 1; }); IntStream.range(1, numMessages) .forEach(i -> assertThat(new String(responses.get(i - 1).message(), StandardCharsets.UTF_8), is(String.valueOf(i)))); diff --git a/storm-core/test/jvm/org/apache/storm/metric/MetricsIntegrationTest.java b/storm-core/test/jvm/org/apache/storm/metric/MetricsIntegrationTest.java index f7536eb8cda..7074d23960e 100644 --- a/storm-core/test/jvm/org/apache/storm/metric/MetricsIntegrationTest.java +++ b/storm-core/test/jvm/org/apache/storm/metric/MetricsIntegrationTest.java @@ -87,30 +87,32 @@ private static Map metricsConf() { } private static void waitForAtLeastNBuckets(int n, String compId, String metricName, - LocalCluster cluster) throws Exception { - Testing.whileTimeout(Testing.TEST_TIMEOUT_MS, - () -> { + LocalCluster cluster) { + Awaitility.with() + .pollInterval(10, TimeUnit.MILLISECONDS) + .conditionEvaluationListener(condition -> { + try { + cluster.advanceClusterTime(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .until(() -> { Map> taskIdToBuckets = FakeMetricConsumer.getTaskIdToBuckets(compId, metricName); if (n != 0 && taskIdToBuckets == null) { - return true; + return false; } if (taskIdToBuckets == null) { - return false; + return true; } for (Collection buckets : taskIdToBuckets.values()) { if (buckets.size() < n) { - return true; + return false; } } - return false; - }, - () -> { - try { - cluster.advanceClusterTime(1); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + return true; }); } diff --git a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java index 35f490d874b..210aa7246aa 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.storm.Config; import org.apache.storm.generated.AuthorizationException; @@ -39,6 +40,7 @@ import org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer.AclFunctionEntry; import org.apache.storm.security.auth.authorizer.DenyAuthorizer; import org.apache.storm.utils.Time; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; @@ -66,18 +68,20 @@ public static void close() { exec.shutdownNow(); } - public static DRPCRequest getNextAvailableRequest(DRPC server, String func) throws Exception { - DRPCRequest request = null; - long timedout = System.currentTimeMillis() + 5_000; - while (System.currentTimeMillis() < timedout) { - request = server.fetchRequest(func); - if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) { - return request; - } - Thread.sleep(1); - } - fail("Test timed out waiting for a request on " + func); - return request; + public static DRPCRequest getNextAvailableRequest(DRPC server, String func) { + AtomicReference result = new AtomicReference<>(); + Awaitility.await("DRPC request on " + func) + .atMost(5, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.MILLISECONDS) + .until(() -> { + DRPCRequest req = server.fetchRequest(func); + if (req != null && req.get_request_id() != null && !req.get_request_id().isEmpty()) { + result.set(req); + return true; + } + return false; + }); + return result.get(); } @Test diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java index 1c55c1c6531..be67e0b34e3 100644 --- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java +++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -58,6 +58,7 @@ import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.slf4j.Logger; @@ -737,10 +738,10 @@ public void testMultipleKeysOneUser() throws Exception { lrsrcSet = localizer.getUserFiles().get(user1); assertEquals(2, lrsrcSet.size(), "local resource set size wrong"); - long end = System.currentTimeMillis() + 100; - while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) { - Thread.sleep(1); - } + Awaitility.await("keyFile2 to be deleted") + .atMost(5, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.MILLISECONDS) + .until(() -> !keyFile2.exists()); assertTrue(keyFile.exists(), "blob deleted"); assertFalse(keyFile2.exists(), "blob not deleted"); assertTrue(keyFile3.exists(), "blob deleted"); diff --git a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java index 535652efeba..f34c4c2519c 100644 --- a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java +++ b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java @@ -26,6 +26,7 @@ import org.apache.storm.metricstore.MetricException; import org.apache.storm.metricstore.MetricStore; import org.apache.storm.metricstore.MetricStoreConfig; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -313,15 +314,11 @@ public void testMetricCleanup() throws Exception { assertTrue(list.contains(m2)); } - private void waitForInsertFinish(Metric m) throws Exception { + private void waitForInsertFinish(Metric m) { Metric last = new Metric(m); - int attempts = 0; - do { - Thread.sleep(1); - attempts++; - if (attempts > 5000) { - throw new Exception("Insertion timing out"); - } - } while (!store.populateValue(last)); + Awaitility.await("metric insertion to complete") + .atMost(5, java.util.concurrent.TimeUnit.SECONDS) + .pollInterval(1, java.util.concurrent.TimeUnit.MILLISECONDS) + .until(() -> store.populateValue(last)); } } diff --git a/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java index 3d67cd66dd1..0706fe7c4dd 100644 --- a/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java +++ b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java @@ -41,6 +41,7 @@ import org.apache.storm.security.auth.workertoken.WorkerTokenManager; import org.apache.storm.testing.InProcessZookeeper; import org.apache.storm.thrift.transport.TTransportException; +import org.awaitility.Awaitility; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.NimbusClient; import org.apache.storm.utils.Time; @@ -143,15 +144,10 @@ public static void withServer(String loginCfg, LOG.info("Starting Serving..."); server.serve(); }).start(); - Testing.whileTimeout( - () -> !server.isServing(), - () -> { - try { - Time.sleep(100); - } catch (InterruptedException e) { - //Ignored - } - }); + Awaitility.await("ThriftServer to start serving") + .atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(server::isServing); try { LOG.info("Starting to run {}", body); body.accept(server, conf);