Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ RUN apt-get update && apt-get install -y \

ARG PACKAGE

COPY artifacts /opt/${PACKAGE}

COPY --from=builder /app/target/release/${PACKAGE} /opt/${PACKAGE}/bin/${PACKAGE}

ENV PACKAGE=${PACKAGE}
Expand Down
1 change: 1 addition & 0 deletions artifacts/geofence/mexico.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions artifacts/geofence/us-territories.txt

Large diffs are not rendered by default.

2,736 changes: 2,736 additions & 0 deletions artifacts/ssl/global-bundle.pem

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion db_store/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ impl Settings {
}

pub fn pool_options(&self) -> PgPoolOptions {
PgPoolOptions::new().max_connections(self.max_connections)
let acquire_timeout_secs = std::env::var("SQLX_ACQUIRE_TIMEOUT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(30);

PgPoolOptions::new()
.max_connections(self.max_connections)
.acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs))
}
}
92 changes: 87 additions & 5 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ pub struct FileInfoPollerConfig<Message, State, Store, Parser> {
p: PhantomData<Message>,
}

impl<Message, State, Store, Parser> FileInfoPollerConfig<Message, State, Store, Parser> {
/// This is a companion to the Builders `prefix_without_dot`.
///
/// When retreiving files from s3, we want to use the prefix as given.
/// However, when we mark those files as stored, we use the prefix from the
/// FileInfo, which may not match the prefix in this config.
///
/// So, when querying against sources that originate from FileInfo, we want
/// to use a dotless prefix.
fn prefix_without_dot(&self) -> &str {
self.prefix.trim_end_matches('.')
}
}

impl<Message, State, Store, Parser> FileInfoPollerConfigBuilder<Message, State, Store, Parser> {
/// Set the lookback behavior to start after the given timestamp.
///
Expand Down Expand Up @@ -242,7 +256,7 @@ where
let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size);
let latest_file_timestamp = config
.state
.latest_timestamp(&config.process_name, &config.prefix)
.latest_timestamp(&config.process_name, config.prefix_without_dot())
.await?;

Ok((
Expand Down Expand Up @@ -447,6 +461,7 @@ where
}
}

#[derive(Default)]
pub struct ProstFileInfoPollerParser;

#[async_trait::async_trait]
Expand Down Expand Up @@ -637,13 +652,13 @@ pub mod sqlx_postgres {

use super::*;

type TestResult<T = ()> = std::result::Result<T, Box<dyn std::error::Error>>;

#[derive(Clone, prost::Message)]
struct TestMsg {}

#[sqlx::test]
async fn poller_filters_files_by_exact_prefix(
pool: sqlx::PgPool,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
async fn poller_filters_files_by_exact_prefix(pool: sqlx::PgPool) -> TestResult {
create_files_processed_table(&pool).await?;

let awsl = AwsLocal::new().await;
Expand Down Expand Up @@ -698,7 +713,7 @@ pub mod sqlx_postgres {
#[sqlx::test]
async fn do_not_reprocess_files_when_offset_exceeds_earliest_file(
pool: PgPool,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
) -> TestResult {
// Cleaning the files_processed table should not cause files within the
// `FileInfoPoller.config.offset` window to be reprocessed.

Expand Down Expand Up @@ -765,6 +780,40 @@ pub mod sqlx_postgres {
Ok(())
}

#[sqlx::test]
async fn dotted_prefix_finds_correct_start_after(pool: PgPool) -> TestResult {
//`FileInfoPollerState` uses `file_type` prefix (may be )
// When we look for a start_after date, we use the prefix given to the file-info-poller.
// When storing that a file was processed, we use the prefix from FileInfo.

create_files_processed_table(&pool).await?;

const PREFIX: &str = "test_prefix";
let now = Utc::now();

// This is the timestamp our poller should start with
let mut txn = pool.begin().await?;
txn.record("default", &(PREFIX, now).into()).await?;
txn.commit().await?;

let (_receiver, server) = FileInfoPollerConfigBuilder::<TestMsg, _, _, _>::default()
.state(pool.clone())
.store(UnimplementedStore)
.parser(ProstFileInfoPollerParser)
.prefix(PREFIX)
.lookback_start_after(DateTime::UNIX_EPOCH)
.create()
.await?;

assert_eq!(
server.latest_file_timestamp,
Some(nanos_trunc(now)),
"retreive the last file from the database"
);

Ok(())
}

// There is no auto-migration for tests in this lib workspace.
async fn create_files_processed_table(pool: &PgPool) -> sqlx::Result<()> {
pool.execute(
Expand All @@ -783,6 +832,13 @@ pub mod sqlx_postgres {
Ok(())
}

// Truncate timestamps compared to db timestamps for postgres14.9
fn nanos_trunc(ts: DateTime<Utc>) -> DateTime<Utc> {
use chrono::DurationRound;
ts.duration_trunc(chrono::Duration::nanoseconds(1000))
.unwrap()
}

async fn consume_files_and_mark_processed<T: Send>(
mut receiver: FileInfoStreamReceiver<T>,
pool: &PgPool,
Expand All @@ -802,6 +858,32 @@ pub mod sqlx_postgres {

Ok(msgs)
}

// When you need to make a FileInfoPoller that doesn't use a Store...
struct UnimplementedStore;

#[async_trait::async_trait]
impl FileInfoPollerStore for UnimplementedStore {
async fn list_all<A, B>(
&self,
_file_type: &str,
_after: A,
_before: B,
) -> Result<Vec<FileInfo>>
where
A: Into<Option<DateTime<Utc>>> + Send + Sync + Copy,
B: Into<Option<DateTime<Utc>>> + Send + Sync + Copy,
{
unimplemented!("never called in test")
}

async fn get_raw<K>(&self, _key: K) -> Result<ByteStream>
where
K: Into<String> + Send + Sync,
{
unimplemented!("never called in test")
}
}
}
}

Expand Down
Loading