@@ -6,14 +6,14 @@ use std::sync::Arc;
66use std:: sync:: LazyLock ;
77use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
88
9+ use arc_swap:: ArcSwap ;
910use async_trait:: async_trait;
1011use derive_debug:: Dbg ;
1112use futures:: Future ;
1213use martin_tile_utils:: { Encoding , Format , TileCoord , TileData , TileInfo } ;
1314use object_store:: ObjectStore ;
1415use pmtiles:: { AsyncPmTilesReader , Compression , ObjectStoreBackend , TileType } ;
1516use tilejson:: TileJSON ;
16- use tokio:: sync:: RwLock ;
1717use tokio:: sync:: mpsc:: UnboundedSender ;
1818use tracing:: { trace, warn} ;
1919use url:: Url ;
@@ -52,10 +52,12 @@ pub type ReaderRebuilder = Arc<
5252#[ derive( Clone , Dbg ) ]
5353pub struct PmtilesSource {
5454 id : String ,
55- /// Hot-swappable reader. Wrapping in `Arc<RwLock<Arc<…>>>` lets clones share the same
56- /// rebuildable instance: a successful in-source rebuild on one clone is visible to all.
55+ /// Hot-swappable reader. `ArcSwap` lets `get_tile` load the current reader with a single
56+ /// atomic pointer load (no contention with concurrent fetches) and lets a rebuild
57+ /// install a fresh reader with one atomic store. Cloning the outer `Arc<ArcSwap<…>>`
58+ /// gives every `clone_source` a shared view of the same rebuildable cell.
5759 #[ dbg( skip) ]
58- reader : Arc < RwLock < Arc < AsyncPmTilesReader < ObjectStoreBackend , PmtCacheInstance > > > > ,
60+ reader : Arc < ArcSwap < AsyncPmTilesReader < ObjectStoreBackend , PmtCacheInstance > > > ,
5961 /// Set at construction; not refreshed on rebuild. Tile metadata changes are uncommon for
6062 /// a logical tileset, so the trade-off favours a sync `get_tilejson(&self) -> &TileJSON`.
6163 #[ dbg( skip) ]
@@ -141,7 +143,7 @@ impl PmtilesSource {
141143
142144 Ok ( Self {
143145 id,
144- reader : Arc :: new ( RwLock :: new ( Arc :: new ( reader) ) ) ,
146+ reader : Arc :: new ( ArcSwap :: from_pointee ( reader) ) ,
145147 tilejson,
146148 tile_info : format,
147149 cache_zoom,
@@ -205,23 +207,16 @@ impl PmtilesSource {
205207 self
206208 }
207209
208- /// Acquire a write lock and replace the inner reader with a fresh one. Uses
209- /// double-checked equality on the held `Arc` so concurrent `SourceModified` detections
210- /// don't all rebuild redundantly.
211- async fn rebuild_if_stale (
212- & self ,
213- previous : & Arc < AsyncPmTilesReader < ObjectStoreBackend , PmtCacheInstance > > ,
214- ) -> Result < ( ) , PmtilesError > {
210+ /// Atomically install a freshly-built reader. Concurrent `SourceModified` detections
211+ /// may each rebuild; the redundant rebuilds are rare and harmless (the last winner
212+ /// installs the up-to-date reader and the others quickly observe an unchanged
213+ /// `data_version_string`).
214+ async fn rebuild_inner ( & self ) -> Result < ( ) , PmtilesError > {
215215 let Some ( rebuilder) = & self . rebuilder else {
216216 return Err ( PmtilesError :: PmtError ( pmtiles:: PmtError :: SourceModified ) ) ;
217217 } ;
218- let mut guard = self . reader . write ( ) . await ;
219- if !Arc :: ptr_eq ( & * guard, previous) {
220- // Another concurrent caller already rebuilt; nothing to do.
221- return Ok ( ( ) ) ;
222- }
223218 let fresh = ( rebuilder) ( ) . await ?;
224- * guard = Arc :: new ( fresh) ;
219+ self . reader . store ( Arc :: new ( fresh) ) ;
225220 Ok ( ( ) )
226221 }
227222}
@@ -262,13 +257,7 @@ impl Source for PmtilesSource {
262257 ) -> MartinCoreResult < TileData > {
263258 let coord = pmtiles:: TileCoord :: new ( xyz. z , xyz. x , xyz. y ) . map_err ( PmtilesError :: PmtError ) ?;
264259
265- // Snapshot the current reader Arc out of the lock so the actual fetch happens unlocked.
266- let reader = {
267- let guard = self . reader . read ( ) . await ;
268- guard. clone ( )
269- } ;
270-
271- match reader. get_tile ( coord) . await {
260+ match self . reader . load ( ) . get_tile ( coord) . await {
272261 Ok ( Some ( t) ) => Ok ( t. to_vec ( ) ) ,
273262 Ok ( None ) => {
274263 trace ! (
@@ -282,17 +271,11 @@ impl Source for PmtilesSource {
282271 "PMTiles source {} reports SourceModified; rebuilding in place" ,
283272 self . id
284273 ) ;
285- self . rebuild_if_stale ( & reader) . await ?;
286- // Best-effort: notify any registered reloader so it can invalidate stale
287- // tile-cache entries for this source id.
274+ self . rebuild_inner ( ) . await ?;
288275 if let Some ( tx) = & self . reload_signal {
289276 let _ = tx. send ( self . id . clone ( ) ) ;
290277 }
291- let fresh = {
292- let guard = self . reader . read ( ) . await ;
293- guard. clone ( )
294- } ;
295- match fresh. get_tile ( coord) . await {
278+ match self . reader . load ( ) . get_tile ( coord) . await {
296279 Ok ( Some ( t) ) => Ok ( t. to_vec ( ) ) ,
297280 Ok ( None ) => Ok ( Vec :: new ( ) ) ,
298281 Err ( e) => Err ( PmtilesError :: PmtError ( e) . into ( ) ) ,
0 commit comments