diff --git a/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java index 76e45d08a..716356606 100644 --- a/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java +++ b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java @@ -32,6 +32,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; import io.ebean.EbeanServer; import io.ebean.Transaction; +import net.sf.oval.constraint.NotNull; import java.sql.Connection; import java.sql.PreparedStatement; @@ -45,7 +46,9 @@ import java.time.temporal.ChronoUnit; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.persistence.PersistenceException; @@ -70,6 +73,7 @@ public class DailyPartitionCreator extends AbstractActorWithTimers { private final Clock _clock; private final Schedule _schedule; private Optional _lastRun; + private final Executor _executor; private DailyPartitionCreator( final EbeanServer ebeanServer, @@ -77,19 +81,23 @@ private DailyPartitionCreator( final String schema, final String table, final Duration scheduleOffset, - final int lookahead - ) { - this(ebeanServer, periodicMetrics, schema, table, scheduleOffset, lookahead, Clock.systemUTC()); + final int lookahead, + final Executor executor + ) { + this(ebeanServer, periodicMetrics, schema, table, scheduleOffset, lookahead, Clock.systemUTC(), executor); } - /* package private */ DailyPartitionCreator( + /* CHECKSTYLE.OFF: ParameterNumber */ + /* package private */ + DailyPartitionCreator( final EbeanServer ebeanServer, final PeriodicMetrics periodicMetrics, final String schema, final String table, final Duration scheduleOffset, final int lookaheadDays, - final Clock clock + final Clock clock, + final Executor executor ) { _ebeanServer = ebeanServer; _periodicMetrics = periodicMetrics; @@ -105,6 +113,7 @@ private DailyPartitionCreator( _table = table; _clock = clock; _partitionCache = Sets.newHashSet(); + _executor = executor; } /** @@ -116,6 +125,7 @@ private DailyPartitionCreator( * @param table The parent table name * @param scheduleOffset Execution offset from midnight * @param lookahead maximum number of partitions to create in advance + * @param executor executor for performing the actual partition creation * @return A new Props. */ public static Props props( @@ -124,7 +134,8 @@ public static Props props( final String schema, final String table, final Duration scheduleOffset, - final int lookahead + final int lookahead, + final Executor executor ) { return Props.create( DailyPartitionCreator.class, @@ -134,10 +145,12 @@ public static Props props( schema, table, scheduleOffset, - lookahead + lookahead, + executor ) ); } + /* CHECKSTYLE.ON: ParameterNumber */ /** * Ask the actor referenced by {@code ref} to create the partition(s) needed @@ -178,12 +191,13 @@ public void preStart() { @Override - public void postStop() throws Exception { - super.postStop(); - LOGGER.info().setMessage("Actor was stopped") + public void preRestart(final Throwable error, final Optional msg) { + LOGGER.info().setMessage("Actor is crashing") .addData("schema", _schema) .addData("table", _table) .addData("lookahead", _lookaheadDays) + .addData("error", error) + .addData("msg", msg) .log(); } @@ -191,9 +205,18 @@ public void postStop() throws Exception { public Receive createReceive() { return new ReceiveBuilder() .matchEquals(TICK, msg -> tick()) - .match(CreateForRange.class, msg -> { - final Status.Status resp = execute(msg.getStart(), msg.getEnd()); - msg.getReplyTo().ifPresent(replyTo -> getSender().tell(resp, replyTo)); + .match(CreateForRange.class, this::execute) + .match(CreateForRangeComplete.class, msg -> { + _lastRun = Optional.of(msg.getExecutedAt()); + updateCache(msg.getRequest().getStart(), msg.getRequest().getEnd()); + + msg.getRequest().getReplyTo().ifPresent(replyTo -> { + final Status.Status resp = msg.getError() + .map(err -> (Status.Status) new Status.Failure(err)) + .orElseGet(() -> new Status.Success(null)); + + replyTo.tell(resp, self()); + }); }) .build(); } @@ -224,10 +247,11 @@ private void tick() { .setEnd(endDate) .build(); getSelf().tell(createPartitions, getSelf()); + return; } } - private Status.Status execute(final LocalDate startDate, final LocalDate endDate) { + private void execute(final CreateForRange msg) { // Much like other portions of the codebase dealing with time, the dates // used in this class are all fixed to UTC. So while the code in this @@ -235,6 +259,9 @@ private Status.Status execute(final LocalDate startDate, final LocalDate endDate // dates are UTC and these conversions happen at the interaction // boundary (tick, ensurePartitionExists). + final LocalDate startDate = msg.getStart(); + final LocalDate endDate = msg.getEnd(); + LocalDate d = startDate; boolean allPartitionsExist = true; while (d.compareTo(endDate) <= 0) { @@ -252,7 +279,9 @@ private Status.Status execute(final LocalDate startDate, final LocalDate endDate .addData("startDate", startDate) .addData("endDate", endDate) .log(); - return new Status.Success(null); + // Just reply directly instead of going through CreateForRangeComplete + msg.getReplyTo().ifPresent(ref -> ref.tell(new Status.Success(null), self())); + return; } LOGGER.info() @@ -263,27 +292,34 @@ private Status.Status execute(final LocalDate startDate, final LocalDate endDate .addData("endDate", endDate) .log(); - Status.Status status = new Status.Success(null); - final Instant start = Instant.now(); - try { - execute(_schema, _table, startDate, endDate); - _lastRun = Optional.of(_clock.instant()); - updateCache(startDate, endDate); - } catch (final PersistenceException e) { - status = new Status.Failure(e); - LOGGER.error() - .setMessage("Failed to create daily partitions for table") - .addData("schema", _schema) - .addData("table", _table) - .addData("startDate", startDate) - .addData("endDate", endDate) - .setThrowable(e) - .log(); - } finally { - recordTimer("create_latency", Duration.between(start, Instant.now())); - recordCounter("create", status instanceof Status.Success ? 0 : 1); - } - return status; + final Instant start = _clock.instant(); + final CompletionStage messageFut = + execute(_schema, _table, startDate, endDate) + .handle((ignore, error) -> { + // The system clock is thread-safe, although we should + // take care with using any other implementation in testing. + // + // While unobservable in testing, this read could potentially race. + final Instant completedAt = _clock.instant(); + recordTimer("create_latency", Duration.between(start, completedAt)); + recordCounter("create", error == null ? 1 : 0); + if (error != null) { + LOGGER.error() + .setMessage("Failed to create daily partitions for table") + .addData("schema", _schema) + .addData("table", _table) + .addData("startDate", startDate) + .addData("endDate", endDate) + .setThrowable(error) + .log(); + } + return new CreateForRangeComplete.Builder() + .setRequest(msg) + .setError(error) + .setExecutedAt(completedAt) + .build(); + }); + Patterns.pipe(messageFut, getContext().getDispatcher()).to(self()); } private void updateCache(final LocalDate start, final LocalDate end) { @@ -301,8 +337,10 @@ private void updateCache(final LocalDate start, final LocalDate end) { * @param table the parent table * @param startDate the start date, inclusive. * @param endDate the end date, exclusive. + * + * @return A future representing completion of this operation. */ - protected void execute( + protected CompletionStage execute( final String schema, final String table, final LocalDate startDate, @@ -316,22 +354,22 @@ protected void execute( // while SQLUpdate does not allow for SELECT statements. // // https://ebean.io/docs/intro/queries/jdbc-query - // - // TODO(cbriones): Move DB operation off the dispatcher thread pool. - final String sql = "SELECT portal.create_daily_partition(?, ?, ?, ?);"; - try (Transaction tx = _ebeanServer.beginTransaction()) { - final Connection conn = tx.getConnection(); - try (PreparedStatement stmt = conn.prepareStatement(sql)) { - stmt.setString(1, schema); - stmt.setString(2, table); - stmt.setDate(3, java.sql.Date.valueOf(startDate)); - stmt.setDate(4, java.sql.Date.valueOf(endDate)); - stmt.execute(); + return CompletableFuture.runAsync(() -> { + final String sql = "SELECT portal.create_daily_partition(?, ?, ?, ?);"; + try (Transaction tx = _ebeanServer.beginTransaction()) { + final Connection conn = tx.getConnection(); + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setString(1, schema); + stmt.setString(2, table); + stmt.setDate(3, java.sql.Date.valueOf(startDate)); + stmt.setDate(4, java.sql.Date.valueOf(endDate)); + stmt.execute(); + } + tx.commit(); + } catch (final SQLException e) { + throw new PersistenceException("Could not create daily partitions", e); } - tx.commit(); - } catch (final SQLException e) { - throw new PersistenceException("Could not create daily partitions", e); - } + }, _executor); } private static final class CreateForRange { @@ -401,4 +439,75 @@ public Builder setReplyTo(final ActorRef replyTo) { } } } + + private static final class CreateForRangeComplete { + private final CreateForRange _request; + private final Instant _executedAt; + private final Optional _error; + + private CreateForRangeComplete(final CreateForRangeComplete.Builder builder) { + _request = builder._request; + _executedAt = builder._executedAt; + _error = Optional.ofNullable(builder._error); + } + + public CreateForRange getRequest() { + return _request; + } + + public Instant getExecutedAt() { + return _executedAt; + } + + public Optional getError() { + return _error; + } + + static final class Builder extends OvalBuilder { + @Nullable + private Throwable _error; + @NotNull + private CreateForRange _request; + @NotNull + private Instant _executedAt; + + Builder() { + super(CreateForRangeComplete::new); + } + + /** + * Sets the original request. + * + * @param request the original request. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setRequest(final CreateForRange request) { + _request = request; + return this; + } + + /** + * Sets the executedAt. + * + * @param value the executed at. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setExecutedAt(final Instant value) { + _executedAt = value; + return this; + } + + /** + * Sets the error. Default is null. + * + * @param error the error. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setError(@Nullable final Throwable error) { + _error = error; + return this; + } + } + + } } diff --git a/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java b/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java index db8a8cdaf..4c503732e 100644 --- a/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java +++ b/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java @@ -105,7 +105,8 @@ public DatabaseAlertExecutionRepository( "portal", "alert_executions", partitionCreationOffset, - partitionCreationLookahead + partitionCreationLookahead, + _executor ); } diff --git a/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java b/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java index d17d86ec9..b4e7ef843 100644 --- a/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java +++ b/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java @@ -37,8 +37,13 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import javax.persistence.PersistenceException; import static org.hamcrest.Matchers.equalTo; @@ -69,6 +74,7 @@ public class DailyPartitionCreatorTest { // ActorSystem fields private ActorSystem _actorSystem; private TestKit _probe; + private ExecutorService _executor; @Before public void setUp() { @@ -78,14 +84,15 @@ public void setUp() { _actorSystem = ActorSystem.create(); _probe = new TestKit(_actorSystem); + _executor = Executors.newSingleThreadExecutor(); } private ActorRef createActor() { - return createActor(() -> { }); + return createActor(() -> CompletableFuture.completedFuture(null)); } @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON") - private ActorRef createActor(final Runnable onExecute) { + private ActorRef createActor(final Supplier> onExecute) { // Create an actor with the db execution behavior mocked out. final Props props = Props.create( DailyPartitionCreator.class, @@ -96,20 +103,20 @@ private ActorRef createActor(final Runnable onExecute) { TEST_TABLE, Duration.ZERO, (int) TEST_LOOKAHEAD, - _clock + _clock, + _executor ) { @Override - protected void execute( + protected CompletionStage execute( final String schema, final String table, final LocalDate startDate, final LocalDate endDate ) { - onExecute.run(); - _probe.getRef().tell( - new ExecuteCall(schema, table, startDate, endDate), - _probe.getRef() - ); + return onExecute.get() + .whenComplete((ignore, error) -> { + _probe.getRef().tell(new ExecuteCall(schema, table, startDate, endDate), ActorRef.noSender()); + }); } } ); @@ -121,6 +128,7 @@ protected void execute( @After public void tearDown() { TestKit.shutdownActorSystem(_actorSystem); + _executor.shutdown(); } @Test @@ -195,7 +203,9 @@ public void testCreatePartitionsOnDemand() throws Exception { public void testExecutionError() throws Exception { final ActorRef ref = createActor( () -> { - throw new PersistenceException("Something went wrong"); + final CompletableFuture fut = new CompletableFuture<>(); + fut.completeExceptionally(new PersistenceException("Something went wrong")); + return fut; } ); DailyPartitionCreator.ensurePartitionExistsForInstant(ref, CLOCK_START, MSG_TIMEOUT)