Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import io.ebean.EbeanServer;
import io.ebean.Transaction;
import net.sf.oval.constraint.NotNull;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -45,7 +46,9 @@
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.persistence.PersistenceException;

Expand All @@ -70,26 +73,31 @@ public class DailyPartitionCreator extends AbstractActorWithTimers {
private final Clock _clock;
private final Schedule _schedule;
private Optional<Instant> _lastRun;
private final Executor _executor;

private DailyPartitionCreator(
final EbeanServer ebeanServer,
final PeriodicMetrics periodicMetrics,
final String schema,
final String table,
final Duration scheduleOffset,
final int lookahead
) {
this(ebeanServer, periodicMetrics, schema, table, scheduleOffset, lookahead, Clock.systemUTC());
final int lookahead,
final Executor executor
) {
this(ebeanServer, periodicMetrics, schema, table, scheduleOffset, lookahead, Clock.systemUTC(), executor);
}

/* package private */ DailyPartitionCreator(
/* CHECKSTYLE.OFF: ParameterNumber */
/* package private */
DailyPartitionCreator(
final EbeanServer ebeanServer,
final PeriodicMetrics periodicMetrics,
final String schema,
final String table,
final Duration scheduleOffset,
final int lookaheadDays,
final Clock clock
final Clock clock,
final Executor executor
) {
_ebeanServer = ebeanServer;
_periodicMetrics = periodicMetrics;
Expand All @@ -105,6 +113,7 @@ private DailyPartitionCreator(
_table = table;
_clock = clock;
_partitionCache = Sets.newHashSet();
_executor = executor;
}

/**
Expand All @@ -116,6 +125,7 @@ private DailyPartitionCreator(
* @param table The parent table name
* @param scheduleOffset Execution offset from midnight
* @param lookahead maximum number of partitions to create in advance
* @param executor executor for performing the actual partition creation
* @return A new Props.
*/
public static Props props(
Expand All @@ -124,7 +134,8 @@ public static Props props(
final String schema,
final String table,
final Duration scheduleOffset,
final int lookahead
final int lookahead,
final Executor executor
) {
return Props.create(
DailyPartitionCreator.class,
Expand All @@ -134,10 +145,12 @@ public static Props props(
schema,
table,
scheduleOffset,
lookahead
lookahead,
executor
)
);
}
/* CHECKSTYLE.ON: ParameterNumber */

/**
* Ask the actor referenced by {@code ref} to create the partition(s) needed
Expand Down Expand Up @@ -178,22 +191,32 @@ public void preStart() {


@Override
public void postStop() throws Exception {
super.postStop();
LOGGER.info().setMessage("Actor was stopped")
public void preRestart(final Throwable error, final Optional<Object> msg) {
LOGGER.info().setMessage("Actor is crashing")
.addData("schema", _schema)
.addData("table", _table)
.addData("lookahead", _lookaheadDays)
.addData("error", error)
.addData("msg", msg)
.log();
}

@Override
public Receive createReceive() {
return new ReceiveBuilder()
.matchEquals(TICK, msg -> tick())
.match(CreateForRange.class, msg -> {
final Status.Status resp = execute(msg.getStart(), msg.getEnd());
msg.getReplyTo().ifPresent(replyTo -> getSender().tell(resp, replyTo));
.match(CreateForRange.class, this::execute)
.match(CreateForRangeComplete.class, msg -> {
_lastRun = Optional.of(msg.getExecutedAt());
updateCache(msg.getRequest().getStart(), msg.getRequest().getEnd());

msg.getRequest().getReplyTo().ifPresent(replyTo -> {
final Status.Status resp = msg.getError()
.map(err -> (Status.Status) new Status.Failure(err))
.orElseGet(() -> new Status.Success(null));

replyTo.tell(resp, self());
});
})
.build();
}
Expand Down Expand Up @@ -224,17 +247,21 @@ private void tick() {
.setEnd(endDate)
.build();
getSelf().tell(createPartitions, getSelf());
return;
}
}

private Status.Status execute(final LocalDate startDate, final LocalDate endDate) {
private void execute(final CreateForRange msg) {

// Much like other portions of the codebase dealing with time, the dates
// used in this class are all fixed to UTC. So while the code in this
// method uses a LocalDate, there's an implicit assumption that all
// dates are UTC and these conversions happen at the interaction
// boundary (tick, ensurePartitionExists).

final LocalDate startDate = msg.getStart();
final LocalDate endDate = msg.getEnd();

LocalDate d = startDate;
boolean allPartitionsExist = true;
while (d.compareTo(endDate) <= 0) {
Expand All @@ -252,7 +279,9 @@ private Status.Status execute(final LocalDate startDate, final LocalDate endDate
.addData("startDate", startDate)
.addData("endDate", endDate)
.log();
return new Status.Success(null);
// Just reply directly instead of going through CreateForRangeComplete
msg.getReplyTo().ifPresent(ref -> ref.tell(new Status.Success(null), self()));
return;
}

LOGGER.info()
Expand All @@ -263,27 +292,34 @@ private Status.Status execute(final LocalDate startDate, final LocalDate endDate
.addData("endDate", endDate)
.log();

Status.Status status = new Status.Success(null);
final Instant start = Instant.now();
try {
execute(_schema, _table, startDate, endDate);
_lastRun = Optional.of(_clock.instant());
updateCache(startDate, endDate);
} catch (final PersistenceException e) {
status = new Status.Failure(e);
LOGGER.error()
.setMessage("Failed to create daily partitions for table")
.addData("schema", _schema)
.addData("table", _table)
.addData("startDate", startDate)
.addData("endDate", endDate)
.setThrowable(e)
.log();
} finally {
recordTimer("create_latency", Duration.between(start, Instant.now()));
recordCounter("create", status instanceof Status.Success ? 0 : 1);
}
return status;
final Instant start = _clock.instant();
final CompletionStage<CreateForRangeComplete> messageFut =
execute(_schema, _table, startDate, endDate)
.handle((ignore, error) -> {
// The system clock is thread-safe, although we should
// take care with using any other implementation in testing.
//
// While unobservable in testing, this read could potentially race.
final Instant completedAt = _clock.instant();
recordTimer("create_latency", Duration.between(start, completedAt));
recordCounter("create", error == null ? 1 : 0);
if (error != null) {
LOGGER.error()
.setMessage("Failed to create daily partitions for table")
.addData("schema", _schema)
.addData("table", _table)
.addData("startDate", startDate)
.addData("endDate", endDate)
.setThrowable(error)
.log();
}
return new CreateForRangeComplete.Builder()
.setRequest(msg)
.setError(error)
.setExecutedAt(completedAt)
.build();
});
Patterns.pipe(messageFut, getContext().getDispatcher()).to(self());
}

private void updateCache(final LocalDate start, final LocalDate end) {
Expand All @@ -301,8 +337,10 @@ private void updateCache(final LocalDate start, final LocalDate end) {
* @param table the parent table
* @param startDate the start date, inclusive.
* @param endDate the end date, exclusive.
*
* @return A future representing completion of this operation.
*/
protected void execute(
protected CompletionStage<Void> execute(
final String schema,
final String table,
final LocalDate startDate,
Expand All @@ -316,22 +354,22 @@ protected void execute(
// while SQLUpdate does not allow for SELECT statements.
//
// https://ebean.io/docs/intro/queries/jdbc-query
//
// TODO(cbriones): Move DB operation off the dispatcher thread pool.
final String sql = "SELECT portal.create_daily_partition(?, ?, ?, ?);";
try (Transaction tx = _ebeanServer.beginTransaction()) {
final Connection conn = tx.getConnection();
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, schema);
stmt.setString(2, table);
stmt.setDate(3, java.sql.Date.valueOf(startDate));
stmt.setDate(4, java.sql.Date.valueOf(endDate));
stmt.execute();
return CompletableFuture.runAsync(() -> {
final String sql = "SELECT portal.create_daily_partition(?, ?, ?, ?);";
try (Transaction tx = _ebeanServer.beginTransaction()) {
final Connection conn = tx.getConnection();
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, schema);
stmt.setString(2, table);
stmt.setDate(3, java.sql.Date.valueOf(startDate));
stmt.setDate(4, java.sql.Date.valueOf(endDate));
stmt.execute();
}
tx.commit();
} catch (final SQLException e) {
throw new PersistenceException("Could not create daily partitions", e);
}
tx.commit();
} catch (final SQLException e) {
throw new PersistenceException("Could not create daily partitions", e);
}
}, _executor);
}

private static final class CreateForRange {
Expand Down Expand Up @@ -401,4 +439,75 @@ public Builder setReplyTo(final ActorRef replyTo) {
}
}
}

private static final class CreateForRangeComplete {
private final CreateForRange _request;
private final Instant _executedAt;
private final Optional<Throwable> _error;

private CreateForRangeComplete(final CreateForRangeComplete.Builder builder) {
_request = builder._request;
_executedAt = builder._executedAt;
_error = Optional.ofNullable(builder._error);
}

public CreateForRange getRequest() {
return _request;
}

public Instant getExecutedAt() {
return _executedAt;
}

public Optional<Throwable> getError() {
return _error;
}

static final class Builder extends OvalBuilder<CreateForRangeComplete> {
@Nullable
private Throwable _error;
@NotNull
private CreateForRange _request;
@NotNull
private Instant _executedAt;

Builder() {
super(CreateForRangeComplete::new);
}

/**
* Sets the original request.
*
* @param request the original request.
* @return This instance of {@code Builder} for chaining.
*/
public Builder setRequest(final CreateForRange request) {
_request = request;
return this;
}

/**
* Sets the executedAt.
*
* @param value the executed at.
* @return This instance of {@code Builder} for chaining.
*/
public Builder setExecutedAt(final Instant value) {
_executedAt = value;
return this;
}

/**
* Sets the error. Default is null.
*
* @param error the error.
* @return This instance of {@code Builder} for chaining.
*/
public Builder setError(@Nullable final Throwable error) {
_error = error;
return this;
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public DatabaseAlertExecutionRepository(
"portal",
"alert_executions",
partitionCreationOffset,
partitionCreationLookahead
partitionCreationLookahead,
_executor
);
}

Expand Down
Loading