-
-
Notifications
You must be signed in to change notification settings - Fork 356
feat: support remote pmtiles directory auto-discovery #2704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -80,6 +80,20 @@ pub trait TileSourceConfiguration: ConfigurationLivecycleHooks { | |||
| url: Url, | ||||
| cache: CachePolicy, | ||||
| ) -> impl Future<Output = MartinResult<BoxedSource>> + Send; | ||||
|
|
||||
| /// Expand a URL that may refer to a remote "directory" (prefix) into one URL per object. | ||||
| /// | ||||
| /// The default implementation treats the URL as a single object and returns it unchanged. | ||||
| /// Source types that support remote listing (e.g. `PMTiles` via `object_store`) may override | ||||
| /// this to enumerate objects matching `allowed_extension` under a prefix. | ||||
| #[allow(unused_variables)] | ||||
| fn expand_url( | ||||
| &self, | ||||
| url: Url, | ||||
| allowed_extension: &[&str], | ||||
| ) -> impl Future<Output = MartinResult<Vec<Url>>> + Send { | ||||
| async move { Ok(vec![url]) } | ||||
| } | ||||
| } | ||||
|
|
||||
| #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] | ||||
|
|
@@ -419,29 +433,54 @@ async fn resolve_one_path_int<T: TileSourceConfiguration>( | |||
| let mut results = Vec::new(); | ||||
|
|
||||
| if let Some(url) = parse_url(T::parse_urls(), &path)? { | ||||
| let target_ext = extension.iter().find(|&e| url.to_string().ends_with(e)); | ||||
| let id = if let Some(ext) = target_ext { | ||||
| url.path_segments() | ||||
| .and_then(Iterator::last) | ||||
| .and_then(|s| { | ||||
| // Strip extension and trailing dot, or keep the original string | ||||
| s.strip_suffix(ext) | ||||
| .and_then(|s| s.strip_suffix('.')) | ||||
| .or(Some(s)) | ||||
| }) | ||||
| .unwrap_or("web_source") | ||||
| } else { | ||||
| "web_source" | ||||
| }; | ||||
|
|
||||
| let id = idr.resolve(id, url.to_string()); | ||||
| configs.insert(id.clone(), FileConfigSrc::Path(path)); | ||||
| results.push( | ||||
| custom | ||||
| .new_sources_url(id.clone(), url.clone(), default_cache) | ||||
| .await?, | ||||
| ); | ||||
| info!("Configured source {id} from URL {}", sanitize_url(&url)); | ||||
| let expanded = custom.expand_url(url.clone(), extension).await?; | ||||
| // If expansion returned anything other than the original single URL, the user gave us | ||||
| // a prefix/directory — remember it as a "directory" so the round-tripped config still | ||||
| // scans it on reload, mirroring the local-directory branch below. | ||||
| let was_prefix = expanded.len() != 1 || expanded[0] != url; | ||||
| if was_prefix { | ||||
| directories.push(path.clone()); | ||||
| } | ||||
| if expanded.is_empty() { | ||||
| warn!( | ||||
| "No files matching {extension:?} found under {}", | ||||
| sanitize_url(&url) | ||||
| ); | ||||
|
Comment on lines
+445
to
+448
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this likely should trigger our on-invalid setting martin/martin/src/config/file/main.rs Line 477 in 7071f12
|
||||
| } | ||||
| for child_url in expanded { | ||||
| let target_ext = extension | ||||
| .iter() | ||||
| .find(|&e| child_url.to_string().ends_with(e)); | ||||
| let id = if let Some(ext) = target_ext { | ||||
| child_url | ||||
| .path_segments() | ||||
| .and_then(Iterator::last) | ||||
| .and_then(|s| { | ||||
| // Strip extension and trailing dot, or keep the original string | ||||
| s.strip_suffix(ext) | ||||
| .and_then(|s| s.strip_suffix('.')) | ||||
| .or(Some(s)) | ||||
| }) | ||||
| .unwrap_or("web_source") | ||||
| } else { | ||||
| "web_source" | ||||
| }; | ||||
|
|
||||
| let id = idr.resolve(id, child_url.to_string()); | ||||
| configs.insert( | ||||
| id.clone(), | ||||
| FileConfigSrc::Path(PathBuf::from(child_url.as_str())), | ||||
| ); | ||||
| results.push( | ||||
| custom | ||||
| .new_sources_url(id.clone(), child_url.clone(), default_cache) | ||||
| .await?, | ||||
| ); | ||||
| info!( | ||||
| "Configured source {id} from URL {}", | ||||
| sanitize_url(&child_url) | ||||
| ); | ||||
| } | ||||
| } else { | ||||
| let is_dir = path.is_dir(); | ||||
| let dir_files = if is_dir { | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ use std::env; | |
| use std::path::PathBuf; | ||
| use std::str::FromStr as _; | ||
|
|
||
| use futures::StreamExt as _; | ||
| use martin_core::tiles::BoxedSource; | ||
| use martin_core::tiles::pmtiles::{PmtCache, PmtCacheInstance, PmtilesSource}; | ||
| use serde::{Deserialize, Serialize}; | ||
|
|
@@ -256,4 +257,99 @@ impl TileSourceConfiguration for PmtConfig { | |
| let source = PmtilesSource::new(dir_cache, id, store, path, cache.zoom()).await?; | ||
| Ok(Box::new(source)) | ||
| } | ||
|
|
||
| async fn expand_url(&self, url: Url, allowed_extension: &[&str]) -> MartinResult<Vec<Url>> { | ||
| // If the URL path already ends in a known extension, treat it as a single object | ||
| // and skip the list call — this preserves the previous behavior for direct file URLs. | ||
| if allowed_extension.iter().any(|e| url.path().ends_with(e)) { | ||
| return Ok(vec![url]); | ||
| } | ||
|
|
||
| // The URL points at a prefix/"directory". Use object_store to enumerate children. | ||
| // `parse_url_opts` returns (store, path) where `path` is the key prefix relative to | ||
| // the store's root (e.g. bucket-relative for s3/gs/az, fs-root-relative for file://). | ||
| // Listed `ObjectMeta.location` values are in the same coordinate system. | ||
| let (store, prefix) = object_store::parse_url_opts(&url, &self.options) | ||
| .map_err(|e| ConfigFileError::ObjectStoreUrlParsing(e, url.to_string()))?; | ||
|
|
||
| let mut stream = store.list(Some(&prefix)); | ||
| let mut results = Vec::new(); | ||
| while let Some(meta) = stream.next().await { | ||
| let meta = meta.map_err(|e| ConfigFileError::ObjectStoreListing(e, url.to_string()))?; | ||
| let loc = meta.location.as_ref(); | ||
| if !allowed_extension.iter().any(|ext| loc.ends_with(ext)) { | ||
| continue; | ||
| } | ||
| // Reconstruct a child URL by keeping the original scheme/host and replacing the | ||
| // path with the listed object's full key. This round-trips through | ||
| // `parse_url_opts` for every backend object_store exposes via URL (s3, gs, az, | ||
| // file), because in all of those the URL path component *is* the store key. | ||
| let mut child = url.clone(); | ||
| child.set_path(&format!("/{loc}")); | ||
| results.push(child); | ||
| } | ||
| Ok(results) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would like to see a test using a ( I think this should work via either minio, localstack or some other version. |
||
| use std::fs; | ||
|
|
||
| use super::*; | ||
|
|
||
| // Exercise expand_url against the file:// object_store backend by populating a tempdir | ||
| // with a mix of pmtiles and unrelated files. We use file:// because object_store only | ||
| // dispatches URL parsing for its built-in schemes (s3/gs/az/file/http), and file:// is | ||
| // the only one that works without a network or external mock. | ||
| #[tokio::test] | ||
| async fn expand_url_lists_pmtiles_in_prefix() { | ||
| let dir = tempfile::tempdir().unwrap(); | ||
| fs::write(dir.path().join("a.pmtiles"), b"").unwrap(); | ||
| fs::write(dir.path().join("b.pmtiles"), b"").unwrap(); | ||
| fs::write(dir.path().join("ignore.txt"), b"").unwrap(); | ||
|
|
||
| let url = Url::from_directory_path(dir.path()).unwrap(); | ||
| let cfg = PmtConfig::default(); | ||
| let mut expanded: Vec<String> = cfg | ||
| .expand_url(url, &["pmtiles"]) | ||
| .await | ||
| .unwrap() | ||
| .into_iter() | ||
| .map(|u| u.to_string()) | ||
| .collect(); | ||
| expanded.sort(); | ||
|
|
||
| assert_eq!(expanded.len(), 2); | ||
| assert!(expanded[0].ends_with("/a.pmtiles"), "{expanded:?}"); | ||
| assert!(expanded[1].ends_with("/b.pmtiles"), "{expanded:?}"); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn expand_url_returns_single_file_url_unchanged() { | ||
| let dir = tempfile::tempdir().unwrap(); | ||
| let file = dir.path().join("one.pmtiles"); | ||
| fs::write(&file, b"").unwrap(); | ||
|
|
||
| let url = Url::from_file_path(&file).unwrap(); | ||
| let cfg = PmtConfig::default(); | ||
| let expanded = cfg.expand_url(url.clone(), &["pmtiles"]).await.unwrap(); | ||
|
|
||
| // The URL already looks like a single pmtiles file; no listing should happen and the | ||
| // URL should round-trip unmodified (important because the caller derives the source | ||
| // ID from the URL's filename). | ||
| assert_eq!(expanded, vec![url]); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn expand_url_empty_prefix_returns_empty() { | ||
| let dir = tempfile::tempdir().unwrap(); | ||
| fs::write(dir.path().join("other.txt"), b"").unwrap(); | ||
|
|
||
| let url = Url::from_directory_path(dir.path()).unwrap(); | ||
| let cfg = PmtConfig::default(); | ||
| let expanded = cfg.expand_url(url, &["pmtiles"]).await.unwrap(); | ||
|
|
||
| assert!(expanded.is_empty()); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not entriely a fan of this achritecture, since this means maintaining it.
We will need to refactor this to this sometime in the future.
If this PR comes with good tests, we can still go forward with this, but it needs to come with tests that this behaviour works as intended.
martin/martin/src/reload.rs
Line 29 in 7071f12