Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,23 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
start: Option<L1MessageKey>,
n: usize,
) -> Result<Vec<L1MessageEnvelope>, DatabaseError> {
// function that returns the queue index of the L1 message after the last included L1
// message.
let next_l1_message_queue_index = async || {
Ok::<i64, DatabaseError>(
models::l1_message::Entity::find()
.select_only()
.filter(models::l1_message::Column::L2BlockNumber.is_not_null())
.column_as(models::l1_message::Column::QueueIndex.max(), "max_queue_index")
.into_tuple::<Option<i64>>()
.one(self.get_connection())
.await?
.flatten()
.map(|idx| idx + 1)
.unwrap_or(0),
)
};

match start {
// Provides n L1 messages with increasing queue index starting from the provided queue
// index.
Expand Down Expand Up @@ -865,12 +882,16 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
return Ok(vec![]);
};

// Get the next L1 message queue index after the last included L1 message.
let next_l1_message_queue_index = next_l1_message_queue_index().await?;

// Create a filter condition for messages that have an L1 block number less than or
// equal to the finalized block number and have not been included in an L2 block
// (i.e. L2BlockNumber is null).
let condition = Condition::all()
.add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64))
.add(models::l1_message::Column::L2BlockNumber.is_null());
.add(models::l1_message::Column::L2BlockNumber.is_null())
.add(models::l1_message::Column::QueueIndex.gte(next_l1_message_queue_index));
// Yield n messages matching the condition ordered by increasing queue index.
Ok(models::l1_message::Entity::find()
.filter(condition)
Expand All @@ -897,12 +918,17 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
} else {
return Ok(vec![]);
};

// Get the next L1 message queue index after the last included L1 message.
let next_l1_message_queue_index = next_l1_message_queue_index().await?;

// Create a filter condition for messages that have an L1 block number less than
// or equal to the target block number and have not been included in an L2 block
// (i.e. L2BlockNumber is null).
let condition = Condition::all()
.add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64))
.add(models::l1_message::Column::L2BlockNumber.is_null());
.add(models::l1_message::Column::L2BlockNumber.is_null())
.add(models::l1_message::Column::QueueIndex.gte(next_l1_message_queue_index));
// Yield n messages matching the condition ordered by increasing queue index.
Ok(models::l1_message::Entity::find()
.filter(condition)
Expand Down
4 changes: 3 additions & 1 deletion sequencer-migration/send-l1-messages.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,7 @@ main() {
log_info "Next queue index: $next_queue_index "
done

echo "Done"
log_info "Done"
}

main "$@"