Skip to content

Commit d715cfa

Browse files
sjuddConvex, Inc.
authored andcommitted
Avoid unnecessary runtime clones in try_join helpers (#26426)
GitOrigin-RevId: d5ad8eebd2e5d63708b34a652226f92dc8d72d59
1 parent f7e13a7 commit d715cfa

File tree

9 files changed

+21
-25
lines changed

9 files changed

+21
-25
lines changed

crates/application/src/export_worker.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ impl<RT: Runtime> ExportWorker<RT> {
313313
}
314314

315315
async fn upload_tables(
316-
runtime: RT,
316+
runtime: &RT,
317317
storage: Arc<dyn Storage>,
318318
format: ExportFormat,
319319
tables: BTreeMap<TabletId, (TableNumber, TableName, TableSummary)>,
@@ -398,7 +398,7 @@ impl<RT: Runtime> ExportWorker<RT> {
398398
},
399399
ExportFormat::CleanJsonl | ExportFormat::InternalJson => {
400400
let mut table_uploads = Self::upload_tables(
401-
self.runtime.clone(),
401+
&self.runtime,
402402
self.storage.clone(),
403403
format,
404404
tables.clone(),
@@ -440,8 +440,7 @@ impl<RT: Runtime> ExportWorker<RT> {
440440
}
441441
});
442442
let table_object_keys: BTreeMap<_, _> =
443-
try_join_buffered(self.runtime.clone(), "table_uploads", complete_upload_futs)
444-
.await?;
443+
try_join_buffered(&self.runtime, "table_uploads", complete_upload_futs).await?;
445444
tracing::info!(
446445
"Export succeeded! {} snapshots written to storage. Format: {format:?}",
447446
tablet_ids.len()

crates/common/src/runtime/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,14 @@ pub async fn try_join_buffered<
106106
T: Send + 'static,
107107
C: Default + Send + 'static + Extend<T>,
108108
>(
109-
rt: RT,
109+
rt: &RT,
110110
name: &'static str,
111111
tasks: impl Iterator<Item = impl Future<Output = anyhow::Result<T>> + Send + 'static>
112112
+ Send
113113
+ 'static,
114114
) -> anyhow::Result<C> {
115115
assert_send(
116-
stream::iter(tasks.map(|task| assert_send(try_join(rt.clone(), name, assert_send(task)))))
116+
stream::iter(tasks.map(|task| assert_send(try_join(rt, name, assert_send(task)))))
117117
.buffered(JOIN_BUFFER_SIZE)
118118
.try_collect(),
119119
)
@@ -134,22 +134,22 @@ pub async fn try_join_buffer_unordered<
134134
T: Send + 'static,
135135
C: Default + Send + 'static + Extend<T>,
136136
>(
137-
rt: RT,
137+
rt: &RT,
138138
name: &'static str,
139139
tasks: impl Iterator<Item = impl Future<Output = anyhow::Result<T>> + Send + 'static>
140140
+ Send
141141
+ 'static,
142142
) -> anyhow::Result<C> {
143143
assert_send(
144-
stream::iter(tasks.map(|task| try_join(rt.clone(), name, task)))
144+
stream::iter(tasks.map(|task| try_join(rt, name, task)))
145145
.buffer_unordered(JOIN_BUFFER_SIZE)
146146
.try_collect(),
147147
)
148148
.await
149149
}
150150

151151
pub async fn try_join<RT: Runtime, T: Send + 'static>(
152-
rt: RT,
152+
rt: &RT,
153153
name: &'static str,
154154
fut: impl Future<Output = anyhow::Result<T>> + Send + 'static,
155155
) -> anyhow::Result<T> {

crates/database/src/index_workers/index_meta.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,13 @@ pub trait SearchIndex: Clone + Debug {
122122
Self: Sized;
123123

124124
async fn download_previous_segments<RT: Runtime>(
125-
rt: RT,
125+
rt: &RT,
126126
storage: Arc<dyn Storage>,
127127
segment: Vec<Self::Segment>,
128128
) -> anyhow::Result<Self::PreviousSegments>;
129129

130130
async fn upload_previous_segments<RT: Runtime>(
131-
rt: RT,
131+
rt: &RT,
132132
storage: Arc<dyn Storage>,
133133
segments: Self::PreviousSegments,
134134
) -> anyhow::Result<Vec<Self::Segment>>;

crates/database/src/index_workers/search_flusher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
552552
};
553553

554554
let mut mutable_previous_segments = T::IndexType::download_previous_segments(
555-
params.runtime.clone(),
555+
&params.runtime,
556556
params.storage.clone(),
557557
previous_segments,
558558
)
@@ -575,7 +575,7 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
575575
.await?;
576576

577577
let updated_previous_segments = T::IndexType::upload_previous_segments(
578-
params.runtime.clone(),
578+
&params.runtime,
579579
params.storage,
580580
mutable_previous_segments,
581581
)

crates/database/src/index_workers/writer.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -567,8 +567,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
567567
),
568568
);
569569
let mut previous_segments =
570-
T::download_previous_segments(runtime.clone(), storage.clone(), segments_to_update)
571-
.await?;
570+
T::download_previous_segments(&runtime, storage.clone(), segments_to_update).await?;
572571
let mut documents = database.load_documents_in_table(
573572
*index_name.table(),
574573
TimestampRange::new((Bound::Excluded(start_ts), Bound::Included(current_ts)))?,
@@ -582,6 +581,6 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
582581
}
583582
}
584583

585-
T::upload_previous_segments(runtime, storage, previous_segments).await
584+
T::upload_previous_segments(&runtime, storage, previous_segments).await
586585
}
587586
}

crates/database/src/search_and_vector_bootstrap.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,8 +539,7 @@ impl<RT: Runtime> SearchAndVectorIndexBootstrapWorker<RT> {
539539
}
540540
});
541541
let indexes_with_fast_forward_ts =
542-
try_join_buffer_unordered(self.runtime.clone(), "get_index_futs", get_index_futs)
543-
.await?;
542+
try_join_buffer_unordered(&self.runtime, "get_index_futs", get_index_futs).await?;
544543
let indexes_to_bootstrap = IndexesToBootstrap::create(
545544
self.persistence.upper_bound(),
546545
indexes_with_fast_forward_ts,

crates/database/src/text_index_worker/text_meta.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ impl SearchIndex for TextSearchIndex {
169169
}
170170

171171
async fn download_previous_segments<RT: Runtime>(
172-
rt: RT,
172+
rt: &RT,
173173
storage: Arc<dyn Storage>,
174174
segments: Vec<Self::Segment>,
175175
) -> anyhow::Result<Self::PreviousSegments> {
@@ -189,7 +189,7 @@ impl SearchIndex for TextSearchIndex {
189189
}
190190

191191
async fn upload_previous_segments<RT: Runtime>(
192-
rt: RT,
192+
rt: &RT,
193193
storage: Arc<dyn Storage>,
194194
segments: Self::PreviousSegments,
195195
) -> anyhow::Result<Vec<Self::Segment>> {

crates/database/src/vector_index_worker/vector_meta.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ impl SearchIndex for VectorSearchIndex {
181181
}
182182

183183
async fn download_previous_segments<RT: Runtime>(
184-
rt: RT,
184+
rt: &RT,
185185
storage: Arc<dyn Storage>,
186186
segments: Vec<Self::Segment>,
187187
) -> anyhow::Result<Self::PreviousSegments> {
@@ -197,7 +197,7 @@ impl SearchIndex for VectorSearchIndex {
197197
}
198198

199199
async fn upload_previous_segments<RT: Runtime>(
200-
rt: RT,
200+
rt: &RT,
201201
storage: Arc<dyn Storage>,
202202
segments: Self::PreviousSegments,
203203
) -> anyhow::Result<Vec<Self::Segment>> {

crates/search/src/incremental_index.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -472,8 +472,7 @@ async fn get_all_segment_term_metadata<RT: Runtime>(
472472
},
473473
);
474474
let segments_term_metadata: Vec<_> =
475-
try_join_buffer_unordered(rt.clone(), "text_term_metadata", segment_term_metadata_futs)
476-
.await?;
475+
try_join_buffer_unordered(rt, "text_term_metadata", segment_term_metadata_futs).await?;
477476
Ok(segments_term_metadata)
478477
}
479478

@@ -544,7 +543,7 @@ pub async fn fetch_compact_and_upload_text_segment<RT: Runtime>(
544543
) -> anyhow::Result<FragmentedTextSegment> {
545544
let _storage = storage.clone();
546545
let opened_segments = try_join_buffer_unordered(
547-
rt.clone(),
546+
rt,
548547
"text_segment_merge",
549548
segments.into_iter().map(move |segment| {
550549
let pool = blocking_thread_pool.clone();

0 commit comments

Comments
 (0)