From 715882cfb94646754f73ca1e58ef9b1a43ddf68f Mon Sep 17 00:00:00 2001 From: Christian Briones Date: Tue, 22 Dec 2020 17:18:14 -0800 Subject: [PATCH 1/4] make DailyPartitionCreator use the blocking IO dispatcher --- .../alerts/impl/DailyPartitionCreator.java | 244 ++++++++++++++---- .../DatabaseAlertExecutionRepository.java | 3 +- .../impl/DailyPartitionCreatorTest.java | 32 ++- 3 files changed, 219 insertions(+), 60 deletions(-) diff --git a/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java index 76e45d08a..4170416dd 100644 --- a/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java +++ b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java @@ -32,6 +32,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; import io.ebean.EbeanServer; import io.ebean.Transaction; +import net.sf.oval.constraint.NotNull; import java.sql.Connection; import java.sql.PreparedStatement; @@ -45,7 +46,9 @@ import java.time.temporal.ChronoUnit; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.persistence.PersistenceException; @@ -70,6 +73,7 @@ public class DailyPartitionCreator extends AbstractActorWithTimers { private final Clock _clock; private final Schedule _schedule; private Optional _lastRun; + private final Executor _executor; private DailyPartitionCreator( final EbeanServer ebeanServer, @@ -77,9 +81,10 @@ private DailyPartitionCreator( final String schema, final String table, final Duration scheduleOffset, - final int lookahead - ) { - this(ebeanServer, periodicMetrics, schema, table, scheduleOffset, lookahead, Clock.systemUTC()); + final int lookahead, + final Executor executor + ) { + this(ebeanServer, periodicMetrics, schema, table, scheduleOffset, lookahead, Clock.systemUTC(), executor); } /* package private */ DailyPartitionCreator( @@ -89,7 +94,8 @@ private DailyPartitionCreator( final String table, final Duration scheduleOffset, final int lookaheadDays, - final Clock clock + final Clock clock, + final Executor executor ) { _ebeanServer = ebeanServer; _periodicMetrics = periodicMetrics; @@ -105,6 +111,7 @@ private DailyPartitionCreator( _table = table; _clock = clock; _partitionCache = Sets.newHashSet(); + _executor = executor; } /** @@ -124,7 +131,8 @@ public static Props props( final String schema, final String table, final Duration scheduleOffset, - final int lookahead + final int lookahead, + final Executor executor ) { return Props.create( DailyPartitionCreator.class, @@ -134,7 +142,8 @@ public static Props props( schema, table, scheduleOffset, - lookahead + lookahead, + executor ) ); } @@ -178,12 +187,13 @@ public void preStart() { @Override - public void postStop() throws Exception { - super.postStop(); - LOGGER.info().setMessage("Actor was stopped") + public void preRestart(final Throwable error, final Optional msg) { + LOGGER.info().setMessage("Actor is crashing") .addData("schema", _schema) .addData("table", _table) .addData("lookahead", _lookaheadDays) + .addData("error", error) + .addData("msg", msg) .log(); } @@ -191,9 +201,20 @@ public void postStop() throws Exception { public Receive createReceive() { return new ReceiveBuilder() .matchEquals(TICK, msg -> tick()) + .match(CreateForRangeComplete.class, msg -> { + _lastRun = Optional.of(msg.getExecutedAt()); + updateCache(msg.getStart(), msg.getEnd()); + + msg.getReplyTo().ifPresent(replyTo -> { + final Status.Status resp = msg.getError() + .map(err -> (Status.Status) new Status.Failure(err)) + .orElseGet(() -> new Status.Success(null)); + + replyTo.tell(resp, self()); + }); + }) .match(CreateForRange.class, msg -> { - final Status.Status resp = execute(msg.getStart(), msg.getEnd()); - msg.getReplyTo().ifPresent(replyTo -> getSender().tell(resp, replyTo)); + execute(msg.getStart(), msg.getEnd(), msg.getReplyTo()); }) .build(); } @@ -211,6 +232,7 @@ private void recordTimer(final String metricName, final Duration duration) { // Message handlers private void tick() { + LOGGER.info("tick"); recordCounter("tick", 1); final Instant now = _clock.instant(); @@ -224,10 +246,16 @@ private void tick() { .setEnd(endDate) .build(); getSelf().tell(createPartitions, getSelf()); + return; } + LOGGER.info() + .setMessage("tick received too soon, skipping.") + .addData("nextRun", nextRun) + .addData("lastRun", _lastRun) + .log(); } - private Status.Status execute(final LocalDate startDate, final LocalDate endDate) { + private void execute(final LocalDate startDate, final LocalDate endDate, final Optional replyTo) { // Much like other portions of the codebase dealing with time, the dates // used in this class are all fixed to UTC. So while the code in this @@ -252,7 +280,9 @@ private Status.Status execute(final LocalDate startDate, final LocalDate endDate .addData("startDate", startDate) .addData("endDate", endDate) .log(); - return new Status.Success(null); + // Just reply directly instead of going through CreateForRangeComplete + replyTo.ifPresent(ref -> ref.tell(new Status.Success(null), self())); + return; } LOGGER.info() @@ -263,27 +293,34 @@ private Status.Status execute(final LocalDate startDate, final LocalDate endDate .addData("endDate", endDate) .log(); - Status.Status status = new Status.Success(null); - final Instant start = Instant.now(); - try { - execute(_schema, _table, startDate, endDate); - _lastRun = Optional.of(_clock.instant()); - updateCache(startDate, endDate); - } catch (final PersistenceException e) { - status = new Status.Failure(e); - LOGGER.error() - .setMessage("Failed to create daily partitions for table") - .addData("schema", _schema) - .addData("table", _table) - .addData("startDate", startDate) - .addData("endDate", endDate) - .setThrowable(e) - .log(); - } finally { - recordTimer("create_latency", Duration.between(start, Instant.now())); - recordCounter("create", status instanceof Status.Success ? 0 : 1); - } - return status; + final Instant start = _clock.instant(); + final CompletionStage messageFut = + execute(_schema, _table, startDate, endDate) + .handle((ignore, error) -> { + // The system clock is thread-safe, although the safety of + // any other implementations is unclear. + final Instant completedAt = _clock.instant(); + recordTimer("create_latency", Duration.between(start, completedAt)); + recordCounter("create", error == null ? 1 : 0); + if (error != null) { + LOGGER.error() + .setMessage("Failed to create daily partitions for table") + .addData("schema", _schema) + .addData("table", _table) + .addData("startDate", startDate) + .addData("endDate", endDate) + .setThrowable(error) + .log(); + } + final CreateForRangeComplete.Builder msgBuilder = new CreateForRangeComplete.Builder() + .setStart(startDate) + .setEnd(endDate) + .setError(error) + .setExecutedAt(completedAt); + replyTo.ifPresent(msgBuilder::setReplyTo); + return msgBuilder.build(); + }); + Patterns.pipe(messageFut, getContext().getDispatcher()).to(self()); } private void updateCache(final LocalDate start, final LocalDate end) { @@ -301,8 +338,10 @@ private void updateCache(final LocalDate start, final LocalDate end) { * @param table the parent table * @param startDate the start date, inclusive. * @param endDate the end date, exclusive. + * + * @return A future representing completion of this operation. */ - protected void execute( + protected CompletionStage execute( final String schema, final String table, final LocalDate startDate, @@ -318,20 +357,22 @@ protected void execute( // https://ebean.io/docs/intro/queries/jdbc-query // // TODO(cbriones): Move DB operation off the dispatcher thread pool. - final String sql = "SELECT portal.create_daily_partition(?, ?, ?, ?);"; - try (Transaction tx = _ebeanServer.beginTransaction()) { - final Connection conn = tx.getConnection(); - try (PreparedStatement stmt = conn.prepareStatement(sql)) { - stmt.setString(1, schema); - stmt.setString(2, table); - stmt.setDate(3, java.sql.Date.valueOf(startDate)); - stmt.setDate(4, java.sql.Date.valueOf(endDate)); - stmt.execute(); + return CompletableFuture.runAsync(() -> { + final String sql = "SELECT portal.create_daily_partition(?, ?, ?, ?);"; + try (Transaction tx = _ebeanServer.beginTransaction()) { + final Connection conn = tx.getConnection(); + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setString(1, schema); + stmt.setString(2, table); + stmt.setDate(3, java.sql.Date.valueOf(startDate)); + stmt.setDate(4, java.sql.Date.valueOf(endDate)); + stmt.execute(); + } + tx.commit(); + } catch (final SQLException e) { + throw new PersistenceException("Could not create daily partitions", e); } - tx.commit(); - } catch (final SQLException e) { - throw new PersistenceException("Could not create daily partitions", e); - } + }, _executor); } private static final class CreateForRange { @@ -401,4 +442,111 @@ public Builder setReplyTo(final ActorRef replyTo) { } } } + + private static final class CreateForRangeComplete { + private final LocalDate _start; + private final LocalDate _end; + private final Optional _replyTo; + private final Instant _executedAt; + private final Optional _error; + + private CreateForRangeComplete(final CreateForRangeComplete.Builder builder) { + _start = builder._start; + _end = builder._end; + _replyTo = Optional.ofNullable(builder._replyTo); + _executedAt = builder._executedAt; + _error = Optional.ofNullable(builder._error); + } + + public LocalDate getStart() { + return _start; + } + + public LocalDate getEnd() { + return _end; + } + + public Optional getReplyTo() { + return _replyTo; + } + + public Instant getExecutedAt() { + return _executedAt; + } + + public Optional getError() { + return _error; + } + + static final class Builder extends OvalBuilder { + @Nullable + private Throwable _error; + private LocalDate _start; + private LocalDate _end; + @Nullable + private ActorRef _replyTo; + @NotNull + private Instant _executedAt; + + Builder() { + super(CreateForRangeComplete::new); + } + + /** + * Sets the start. + * + * @param start the start. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setStart(final LocalDate start) { + _start = start; + return this; + } + + /** + * Sets the end. + * + * @param end the end. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setEnd(final LocalDate end) { + _end = end; + return this; + } + + /** + * Sets the reply to. + * + * @param replyTo the reply to. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setReplyTo(final ActorRef replyTo) { + _replyTo = replyTo; + return this; + } + + /** + * Sets the executedAt. + * + * @param value the executed at. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setExecutedAt(final Instant value) { + _executedAt = value; + return this; + } + + /** + * Sets the error. Default is null. + * + * @param error the error. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setError(@Nullable final Throwable error) { + _error = error; + return this; + } + } + + } } diff --git a/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java b/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java index db8a8cdaf..76100709c 100644 --- a/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java +++ b/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java @@ -105,7 +105,8 @@ public DatabaseAlertExecutionRepository( "portal", "alert_executions", partitionCreationOffset, - partitionCreationLookahead + partitionCreationLookahead, + _actorSystem.getDispatcher() ); } diff --git a/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java b/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java index d17d86ec9..57ca07c8c 100644 --- a/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java +++ b/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java @@ -20,6 +20,7 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.pattern.Patterns; +import akka.pattern.PatternsCS; import akka.testkit.javadsl.TestKit; import com.arpnetworking.commons.java.time.ManualClock; import com.arpnetworking.metrics.incubator.PeriodicMetrics; @@ -37,8 +38,13 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import javax.persistence.PersistenceException; import static org.hamcrest.Matchers.equalTo; @@ -69,6 +75,7 @@ public class DailyPartitionCreatorTest { // ActorSystem fields private ActorSystem _actorSystem; private TestKit _probe; + private ExecutorService _executor; @Before public void setUp() { @@ -78,14 +85,15 @@ public void setUp() { _actorSystem = ActorSystem.create(); _probe = new TestKit(_actorSystem); + _executor = Executors.newFixedThreadPool(4); } private ActorRef createActor() { - return createActor(() -> { }); + return createActor(() -> CompletableFuture.completedFuture(null)); } @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON") - private ActorRef createActor(final Runnable onExecute) { + private ActorRef createActor(final Supplier> onExecute) { // Create an actor with the db execution behavior mocked out. final Props props = Props.create( DailyPartitionCreator.class, @@ -96,20 +104,20 @@ private ActorRef createActor(final Runnable onExecute) { TEST_TABLE, Duration.ZERO, (int) TEST_LOOKAHEAD, - _clock + _clock, + _executor ) { @Override - protected void execute( + protected CompletionStage execute( final String schema, final String table, final LocalDate startDate, final LocalDate endDate ) { - onExecute.run(); - _probe.getRef().tell( - new ExecuteCall(schema, table, startDate, endDate), - _probe.getRef() - ); + return onExecute.get() + .whenComplete((ignore, error) -> { + _probe.getRef().tell(new ExecuteCall(schema, table, startDate, endDate), ActorRef.noSender()); + }); } } ); @@ -195,12 +203,14 @@ public void testCreatePartitionsOnDemand() throws Exception { public void testExecutionError() throws Exception { final ActorRef ref = createActor( () -> { - throw new PersistenceException("Something went wrong"); + final CompletableFuture fut = new CompletableFuture<>(); + fut.completeExceptionally(new PersistenceException("Something went wrong")); + return fut; } ); DailyPartitionCreator.ensurePartitionExistsForInstant(ref, CLOCK_START, MSG_TIMEOUT) .toCompletableFuture() - .get(1, TimeUnit.SECONDS); + .get(5, TimeUnit.SECONDS); } /** From 818642e1421830dfa4ee59a9b694f769c92e9c78 Mon Sep 17 00:00:00 2001 From: Christian Briones Date: Wed, 23 Dec 2020 13:24:31 -0800 Subject: [PATCH 2/4] compose --- .../alerts/impl/DailyPartitionCreator.java | 81 +++++-------------- 1 file changed, 22 insertions(+), 59 deletions(-) diff --git a/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java index 4170416dd..437b0a3db 100644 --- a/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java +++ b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java @@ -203,9 +203,9 @@ public Receive createReceive() { .matchEquals(TICK, msg -> tick()) .match(CreateForRangeComplete.class, msg -> { _lastRun = Optional.of(msg.getExecutedAt()); - updateCache(msg.getStart(), msg.getEnd()); + updateCache(msg.getRequest().getStart(), msg.getRequest().getEnd()); - msg.getReplyTo().ifPresent(replyTo -> { + msg.getRequest().getReplyTo().ifPresent(replyTo -> { final Status.Status resp = msg.getError() .map(err -> (Status.Status) new Status.Failure(err)) .orElseGet(() -> new Status.Success(null)); @@ -213,9 +213,7 @@ public Receive createReceive() { replyTo.tell(resp, self()); }); }) - .match(CreateForRange.class, msg -> { - execute(msg.getStart(), msg.getEnd(), msg.getReplyTo()); - }) + .match(CreateForRange.class, this::execute) .build(); } @@ -255,7 +253,7 @@ private void tick() { .log(); } - private void execute(final LocalDate startDate, final LocalDate endDate, final Optional replyTo) { + private void execute(final CreateForRange msg) { // Much like other portions of the codebase dealing with time, the dates // used in this class are all fixed to UTC. So while the code in this @@ -263,6 +261,9 @@ private void execute(final LocalDate startDate, final LocalDate endDate, final O // dates are UTC and these conversions happen at the interaction // boundary (tick, ensurePartitionExists). + final LocalDate startDate = msg.getStart(); + final LocalDate endDate = msg.getEnd(); + LocalDate d = startDate; boolean allPartitionsExist = true; while (d.compareTo(endDate) <= 0) { @@ -281,7 +282,7 @@ private void execute(final LocalDate startDate, final LocalDate endDate, final O .addData("endDate", endDate) .log(); // Just reply directly instead of going through CreateForRangeComplete - replyTo.ifPresent(ref -> ref.tell(new Status.Success(null), self())); + msg.getReplyTo().ifPresent(ref -> ref.tell(new Status.Success(null), self())); return; } @@ -312,13 +313,11 @@ private void execute(final LocalDate startDate, final LocalDate endDate, final O .setThrowable(error) .log(); } - final CreateForRangeComplete.Builder msgBuilder = new CreateForRangeComplete.Builder() - .setStart(startDate) - .setEnd(endDate) + return new CreateForRangeComplete.Builder() + .setRequest(msg) .setError(error) - .setExecutedAt(completedAt); - replyTo.ifPresent(msgBuilder::setReplyTo); - return msgBuilder.build(); + .setExecutedAt(completedAt) + .build(); }); Patterns.pipe(messageFut, getContext().getDispatcher()).to(self()); } @@ -444,30 +443,18 @@ public Builder setReplyTo(final ActorRef replyTo) { } private static final class CreateForRangeComplete { - private final LocalDate _start; - private final LocalDate _end; - private final Optional _replyTo; + private final CreateForRange _request; private final Instant _executedAt; private final Optional _error; private CreateForRangeComplete(final CreateForRangeComplete.Builder builder) { - _start = builder._start; - _end = builder._end; - _replyTo = Optional.ofNullable(builder._replyTo); + _request = builder._request; _executedAt = builder._executedAt; _error = Optional.ofNullable(builder._error); } - public LocalDate getStart() { - return _start; - } - - public LocalDate getEnd() { - return _end; - } - - public Optional getReplyTo() { - return _replyTo; + public CreateForRange getRequest() { + return _request; } public Instant getExecutedAt() { @@ -481,10 +468,8 @@ public Optional getError() { static final class Builder extends OvalBuilder { @Nullable private Throwable _error; - private LocalDate _start; - private LocalDate _end; - @Nullable - private ActorRef _replyTo; + @NotNull + private CreateForRange _request; @NotNull private Instant _executedAt; @@ -493,35 +478,13 @@ static final class Builder extends OvalBuilder { } /** - * Sets the start. + * Sets the original request. * - * @param start the start. + * @param request the original request. * @return This instance of {@code Builder} for chaining. */ - public Builder setStart(final LocalDate start) { - _start = start; - return this; - } - - /** - * Sets the end. - * - * @param end the end. - * @return This instance of {@code Builder} for chaining. - */ - public Builder setEnd(final LocalDate end) { - _end = end; - return this; - } - - /** - * Sets the reply to. - * - * @param replyTo the reply to. - * @return This instance of {@code Builder} for chaining. - */ - public Builder setReplyTo(final ActorRef replyTo) { - _replyTo = replyTo; + public Builder setRequest(final CreateForRange request) { + _request = request; return this; } From ea447fb2414dceaa86a28886f2ad01f1b0c69fcc Mon Sep 17 00:00:00 2001 From: Christian Briones Date: Wed, 23 Dec 2020 13:41:21 -0800 Subject: [PATCH 3/4] more cleanup and checkstyle --- .../alerts/impl/DailyPartitionCreator.java | 22 +++++++++---------- .../impl/DailyPartitionCreatorTest.java | 6 ++--- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java index 437b0a3db..716356606 100644 --- a/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java +++ b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java @@ -87,7 +87,9 @@ private DailyPartitionCreator( this(ebeanServer, periodicMetrics, schema, table, scheduleOffset, lookahead, Clock.systemUTC(), executor); } - /* package private */ DailyPartitionCreator( + /* CHECKSTYLE.OFF: ParameterNumber */ + /* package private */ + DailyPartitionCreator( final EbeanServer ebeanServer, final PeriodicMetrics periodicMetrics, final String schema, @@ -123,6 +125,7 @@ private DailyPartitionCreator( * @param table The parent table name * @param scheduleOffset Execution offset from midnight * @param lookahead maximum number of partitions to create in advance + * @param executor executor for performing the actual partition creation * @return A new Props. */ public static Props props( @@ -147,6 +150,7 @@ public static Props props( ) ); } + /* CHECKSTYLE.ON: ParameterNumber */ /** * Ask the actor referenced by {@code ref} to create the partition(s) needed @@ -201,6 +205,7 @@ public void preRestart(final Throwable error, final Optional msg) { public Receive createReceive() { return new ReceiveBuilder() .matchEquals(TICK, msg -> tick()) + .match(CreateForRange.class, this::execute) .match(CreateForRangeComplete.class, msg -> { _lastRun = Optional.of(msg.getExecutedAt()); updateCache(msg.getRequest().getStart(), msg.getRequest().getEnd()); @@ -213,7 +218,6 @@ public Receive createReceive() { replyTo.tell(resp, self()); }); }) - .match(CreateForRange.class, this::execute) .build(); } @@ -230,7 +234,6 @@ private void recordTimer(final String metricName, final Duration duration) { // Message handlers private void tick() { - LOGGER.info("tick"); recordCounter("tick", 1); final Instant now = _clock.instant(); @@ -246,11 +249,6 @@ private void tick() { getSelf().tell(createPartitions, getSelf()); return; } - LOGGER.info() - .setMessage("tick received too soon, skipping.") - .addData("nextRun", nextRun) - .addData("lastRun", _lastRun) - .log(); } private void execute(final CreateForRange msg) { @@ -298,8 +296,10 @@ private void execute(final CreateForRange msg) { final CompletionStage messageFut = execute(_schema, _table, startDate, endDate) .handle((ignore, error) -> { - // The system clock is thread-safe, although the safety of - // any other implementations is unclear. + // The system clock is thread-safe, although we should + // take care with using any other implementation in testing. + // + // While unobservable in testing, this read could potentially race. final Instant completedAt = _clock.instant(); recordTimer("create_latency", Duration.between(start, completedAt)); recordCounter("create", error == null ? 1 : 0); @@ -354,8 +354,6 @@ protected CompletionStage execute( // while SQLUpdate does not allow for SELECT statements. // // https://ebean.io/docs/intro/queries/jdbc-query - // - // TODO(cbriones): Move DB operation off the dispatcher thread pool. return CompletableFuture.runAsync(() -> { final String sql = "SELECT portal.create_daily_partition(?, ?, ?, ?);"; try (Transaction tx = _ebeanServer.beginTransaction()) { diff --git a/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java b/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java index 57ca07c8c..b4e7ef843 100644 --- a/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java +++ b/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java @@ -20,7 +20,6 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.pattern.Patterns; -import akka.pattern.PatternsCS; import akka.testkit.javadsl.TestKit; import com.arpnetworking.commons.java.time.ManualClock; import com.arpnetworking.metrics.incubator.PeriodicMetrics; @@ -85,7 +84,7 @@ public void setUp() { _actorSystem = ActorSystem.create(); _probe = new TestKit(_actorSystem); - _executor = Executors.newFixedThreadPool(4); + _executor = Executors.newSingleThreadExecutor(); } private ActorRef createActor() { @@ -129,6 +128,7 @@ protected CompletionStage execute( @After public void tearDown() { TestKit.shutdownActorSystem(_actorSystem); + _executor.shutdown(); } @Test @@ -210,7 +210,7 @@ public void testExecutionError() throws Exception { ); DailyPartitionCreator.ensurePartitionExistsForInstant(ref, CLOCK_START, MSG_TIMEOUT) .toCompletableFuture() - .get(5, TimeUnit.SECONDS); + .get(1, TimeUnit.SECONDS); } /** From f887916f199e3675e920457225e7aaa0dc7a2788 Mon Sep 17 00:00:00 2001 From: Christian Briones Date: Wed, 23 Dec 2020 13:43:42 -0800 Subject: [PATCH 4/4] it should pass in the executor it has --- .../portal/alerts/impl/DatabaseAlertExecutionRepository.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java b/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java index 76100709c..4c503732e 100644 --- a/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java +++ b/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java @@ -106,7 +106,7 @@ public DatabaseAlertExecutionRepository( "alert_executions", partitionCreationOffset, partitionCreationLookahead, - _actorSystem.getDispatcher() + _executor ); }