Skip to content

Commit 071ba4e

Browse files
Merge pull request #998 from geo-engine/quota_log_wip
Quota and Data usage Logging
2 parents 70a47a4 + 77b8009 commit 071ba4e

File tree

73 files changed

+1809
-268
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+1809
-268
lines changed

datatypes/src/primitives/datetime.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,12 @@ impl From<chrono::DateTime<chrono::Utc>> for DateTime {
322322
}
323323
}
324324

325+
impl From<DateTime> for chrono::DateTime<chrono::Utc> {
326+
fn from(datetime: DateTime) -> Self {
327+
datetime.datetime
328+
}
329+
}
330+
325331
impl From<DateTime> for chrono::DateTime<chrono::FixedOffset> {
326332
fn from(datetime: DateTime) -> Self {
327333
Self::from(&datetime)
@@ -334,12 +340,6 @@ impl From<&DateTime> for chrono::DateTime<chrono::FixedOffset> {
334340
}
335341
}
336342

337-
impl From<DateTime> for chrono::DateTime<chrono::Utc> {
338-
fn from(datetime: DateTime) -> Self {
339-
Self::from(&datetime)
340-
}
341-
}
342-
343343
impl From<&DateTime> for chrono::DateTime<chrono::Utc> {
344344
fn from(datetime: &DateTime) -> Self {
345345
datetime.datetime

operators/benches/query_chunks.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use geoengine_datatypes::{
2525
SpatialPartition2D, SpatialResolution, TimeInterval, VectorQueryRectangle,
2626
},
2727
raster::Pixel,
28-
util::{test::TestDefault, Identifier},
28+
util::test::TestDefault,
2929
};
3030
use geoengine_operators::{
3131
engine::{
@@ -34,7 +34,7 @@ use geoengine_operators::{
3434
SingleVectorMultipleRasterSources, StatisticsWrappingMockExecutionContext,
3535
TypedRasterQueryProcessor, VectorOperator, VectorQueryProcessor, WorkflowOperatorPath,
3636
},
37-
meta::quota::{ComputationContext, ComputationUnit, QuotaCheck, QuotaChecker, QuotaTracking},
37+
meta::quota::{QuotaCheck, QuotaChecker, QuotaTracking},
3838
processing::{
3939
AggregateFunctionParams, ColumnNames, FeatureAggregationMethod, NeighborhoodAggregate,
4040
NeighborhoodAggregateParams, NeighborhoodParams, RasterVectorJoin, RasterVectorJoinParams,
@@ -70,17 +70,18 @@ type BenchmarkElementCounts = HashMap<String, u64>;
7070
fn setup_contexts() -> (StatisticsWrappingMockExecutionContext, MockQueryContext) {
7171
let exe_ctx = StatisticsWrappingMockExecutionContext::test_default();
7272

73-
let computation_unit = ComputationUnit {
74-
issuer: uuid::Uuid::new_v4(),
75-
context: ComputationContext::new(),
76-
};
73+
let user = uuid::Uuid::new_v4();
74+
let workflow = uuid::Uuid::new_v4();
75+
let computation = uuid::Uuid::new_v4();
7776

7877
let query_ctx = MockQueryContext::new_with_query_extensions(
7978
ChunkByteSize::test_default(),
8079
None,
8180
Some(QuotaTracking::new(
8281
tokio::sync::mpsc::unbounded_channel().0,
83-
computation_unit,
82+
user,
83+
workflow,
84+
computation,
8485
)),
8586
Some(Box::new(MockQuotaChecker) as QuotaChecker),
8687
);

operators/src/adapters/stream_statistics_adapter.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ pub struct StreamStatisticsAdapter<S> {
1616
span: Span,
1717
quota: QuotaTracking,
1818
path: WorkflowOperatorPath,
19+
operator_name: &'static str,
20+
/// If the wrapped stream is a source, pass the value of the `data` parameter to the quota tracking
21+
data: Option<String>,
1922
}
2023

2124
impl<S> StreamStatisticsAdapter<S> {
@@ -24,6 +27,8 @@ impl<S> StreamStatisticsAdapter<S> {
2427
span: Span,
2528
quota: QuotaTracking,
2629
path: WorkflowOperatorPath,
30+
operator_name: &'static str,
31+
data: Option<String>,
2732
) -> StreamStatisticsAdapter<S> {
2833
StreamStatisticsAdapter {
2934
stream,
@@ -32,6 +37,8 @@ impl<S> StreamStatisticsAdapter<S> {
3237
span,
3338
quota,
3439
path,
40+
operator_name,
41+
data,
3542
}
3643
}
3744

@@ -76,7 +83,11 @@ where
7683
empty = false,
7784
);
7885

79-
(*this.quota).work_unit_done();
86+
(*this.quota).work_unit_done(
87+
this.operator_name,
88+
this.path.clone(),
89+
this.data.clone(),
90+
);
8091
}
8192
None => {
8293
tracing::debug!(
@@ -100,9 +111,8 @@ mod tests {
100111

101112
use super::*;
102113

103-
use crate::meta::quota::{ComputationContext, ComputationUnit, QuotaMessage};
114+
use crate::meta::quota::{ComputationUnit, QuotaMessage};
104115
use futures::StreamExt;
105-
use geoengine_datatypes::util::Identifier;
106116
use tokio::sync::mpsc::unbounded_channel;
107117
use tracing::{span, Level};
108118
use uuid::Uuid;
@@ -112,14 +122,17 @@ mod tests {
112122
let v = vec![1, 2, 3];
113123
let v_stream = futures::stream::iter(v);
114124
let (tx, mut rx) = unbounded_channel::<QuotaMessage>();
115-
let issuer = Uuid::new_v4();
116-
let context = ComputationContext::new();
117-
let quota = QuotaTracking::new(tx, ComputationUnit { issuer, context });
125+
let user = Uuid::new_v4();
126+
let workflow = Uuid::new_v4();
127+
let computation = Uuid::new_v4();
128+
let quota = QuotaTracking::new(tx, user, workflow, computation);
118129
let mut v_stat_stream = StreamStatisticsAdapter::new(
119130
v_stream,
120131
span!(Level::TRACE, "test"),
121132
quota,
122133
WorkflowOperatorPath::initialize_root(),
134+
"test",
135+
None,
123136
);
124137

125138
let one = v_stat_stream.next().await;
@@ -142,7 +155,15 @@ mod tests {
142155

143156
assert_eq!(
144157
rx.recv().await.unwrap(),
145-
ComputationUnit { issuer, context }.into()
158+
ComputationUnit {
159+
user,
160+
workflow,
161+
computation,
162+
operator_name: "test",
163+
operator_path: WorkflowOperatorPath::initialize_root(),
164+
data: None,
165+
}
166+
.into()
146167
);
147168
}
148169
}

operators/src/cache/cache_operator.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::cache::shared_cache::{AsyncCache, SharedCache};
66
use crate::engine::{
77
CanonicOperatorName, ChunkByteSize, InitializedRasterOperator, InitializedVectorOperator,
88
QueryContext, QueryProcessor, RasterResultDescriptor, ResultDescriptor,
9-
TypedRasterQueryProcessor,
9+
TypedRasterQueryProcessor, WorkflowOperatorPath,
1010
};
1111
use crate::error::Error;
1212
use crate::util::Result;
@@ -90,6 +90,14 @@ impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedR
9090
fn canonic_name(&self) -> CanonicOperatorName {
9191
self.source.canonic_name()
9292
}
93+
94+
fn name(&self) -> &'static str {
95+
self.source.name()
96+
}
97+
98+
fn path(&self) -> WorkflowOperatorPath {
99+
self.source.path()
100+
}
93101
}
94102

95103
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
@@ -137,6 +145,14 @@ impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedV
137145
fn canonic_name(&self) -> CanonicOperatorName {
138146
self.source.canonic_name()
139147
}
148+
149+
fn name(&self) -> &'static str {
150+
self.source.name()
151+
}
152+
153+
fn path(&self) -> WorkflowOperatorPath {
154+
self.source.path()
155+
}
140156
}
141157

142158
/// A cache operator that caches the results of its source operator

operators/src/engine/execution_context.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::query::QueryAbortRegistration;
22
use super::{
33
CreateSpan, InitializedPlotOperator, InitializedRasterOperator, InitializedVectorOperator,
4-
MockQueryContext, WorkflowOperatorPath,
4+
MockQueryContext,
55
};
66
use crate::engine::{
77
ChunkByteSize, RasterResultDescriptor, ResultDescriptor, VectorResultDescriptor,
@@ -40,21 +40,18 @@ pub trait ExecutionContext: Send
4040
&self,
4141
op: Box<dyn InitializedRasterOperator>,
4242
span: CreateSpan,
43-
path: WorkflowOperatorPath, // TODO: remove and allow operators to tell its path
4443
) -> Box<dyn InitializedRasterOperator>;
4544

4645
fn wrap_initialized_vector_operator(
4746
&self,
4847
op: Box<dyn InitializedVectorOperator>,
4948
span: CreateSpan,
50-
path: WorkflowOperatorPath,
5149
) -> Box<dyn InitializedVectorOperator>;
5250

5351
fn wrap_initialized_plot_operator(
5452
&self,
5553
op: Box<dyn InitializedPlotOperator>,
5654
span: CreateSpan,
57-
path: WorkflowOperatorPath,
5855
) -> Box<dyn InitializedPlotOperator>;
5956

6057
async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId>;
@@ -187,7 +184,6 @@ impl ExecutionContext for MockExecutionContext {
187184
&self,
188185
op: Box<dyn InitializedRasterOperator>,
189186
_span: CreateSpan,
190-
_path: WorkflowOperatorPath,
191187
) -> Box<dyn InitializedRasterOperator> {
192188
op
193189
}
@@ -196,7 +192,6 @@ impl ExecutionContext for MockExecutionContext {
196192
&self,
197193
op: Box<dyn InitializedVectorOperator>,
198194
_span: CreateSpan,
199-
_path: WorkflowOperatorPath,
200195
) -> Box<dyn InitializedVectorOperator> {
201196
op
202197
}
@@ -205,7 +200,6 @@ impl ExecutionContext for MockExecutionContext {
205200
&self,
206201
op: Box<dyn InitializedPlotOperator>,
207202
_span: CreateSpan,
208-
_path: WorkflowOperatorPath,
209203
) -> Box<dyn InitializedPlotOperator> {
210204
op
211205
}
@@ -382,25 +376,22 @@ impl ExecutionContext for StatisticsWrappingMockExecutionContext {
382376
&self,
383377
op: Box<dyn InitializedRasterOperator>,
384378
span: CreateSpan,
385-
path: WorkflowOperatorPath,
386379
) -> Box<dyn InitializedRasterOperator> {
387-
InitializedOperatorWrapper::new(op, span, path).boxed()
380+
InitializedOperatorWrapper::new(op, span).boxed()
388381
}
389382

390383
fn wrap_initialized_vector_operator(
391384
&self,
392385
op: Box<dyn InitializedVectorOperator>,
393386
span: CreateSpan,
394-
path: WorkflowOperatorPath,
395387
) -> Box<dyn InitializedVectorOperator> {
396-
InitializedOperatorWrapper::new(op, span, path).boxed()
388+
InitializedOperatorWrapper::new(op, span).boxed()
397389
}
398390

399391
fn wrap_initialized_plot_operator(
400392
&self,
401393
op: Box<dyn InitializedPlotOperator>,
402394
_span: CreateSpan,
403-
_path: WorkflowOperatorPath,
404395
) -> Box<dyn InitializedPlotOperator> {
405396
op
406397
}

operators/src/engine/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ macro_rules! span_fn {
195195
tracing::Level::TRACE,
196196
<$op>::TYPE_NAME,
197197
path = %path,
198-
query_counter = %query_counter
198+
query_counter = %query_counter,
199199
)
200200
}
201201
};

operators/src/engine/operator.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub trait RasterOperator:
5151
#[allow(clippy::used_underscore_items)] // TODO: maybe rename?
5252
let op = self._initialize(path.clone(), context).await?;
5353

54-
Ok(context.wrap_initialized_raster_operator(op, span, path))
54+
Ok(context.wrap_initialized_raster_operator(op, span))
5555
}
5656

5757
/// Wrap a box around a `RasterOperator`
@@ -86,7 +86,7 @@ pub trait VectorOperator:
8686
let span = self.span();
8787
debug!("Initialize {}, path: {}", self.typetag_name(), &path);
8888
let op = self._initialize(path.clone(), context).await?;
89-
Ok(context.wrap_initialized_vector_operator(op, span, path))
89+
Ok(context.wrap_initialized_vector_operator(op, span))
9090
}
9191

9292
/// Wrap a box around a `VectorOperator`
@@ -121,7 +121,7 @@ pub trait PlotOperator:
121121
debug!("Initialize {}, path: {}", self.typetag_name(), &path);
122122
#[allow(clippy::used_underscore_items)] // TODO: maybe rename?
123123
let op = self._initialize(path.clone(), context).await?;
124-
Ok(context.wrap_initialized_plot_operator(op, span, path))
124+
Ok(context.wrap_initialized_plot_operator(op, span))
125125
}
126126

127127
/// Wrap a box around a `PlotOperator`
@@ -135,6 +135,7 @@ pub trait PlotOperator:
135135
fn span(&self) -> CreateSpan;
136136
}
137137

138+
// TODO: implement a derive macro for common fields of operators: name, path, data, result_descriptor and automatically implement common trait functions
138139
pub trait InitializedRasterOperator: Send + Sync {
139140
/// Get the result descriptor of the `Operator`
140141
fn result_descriptor(&self) -> &RasterResultDescriptor;
@@ -152,6 +153,17 @@ pub trait InitializedRasterOperator: Send + Sync {
152153

153154
/// Get a canonic representation of the operator and its sources
154155
fn canonic_name(&self) -> CanonicOperatorName;
156+
157+
/// Get the unique name of the operator
158+
fn name(&self) -> &'static str;
159+
160+
// Get the path of the operator in the workflow
161+
fn path(&self) -> WorkflowOperatorPath;
162+
163+
/// Return the name of the data loaded by the operator (if any)
164+
fn data(&self) -> Option<String> {
165+
None
166+
}
155167
}
156168

157169
pub trait InitializedVectorOperator: Send + Sync {
@@ -172,6 +184,17 @@ pub trait InitializedVectorOperator: Send + Sync {
172184
/// Get a canonic representation of the operator and its sources.
173185
/// This only includes *logical* operators, not wrappers
174186
fn canonic_name(&self) -> CanonicOperatorName;
187+
188+
/// Get the unique name of the operator
189+
fn name(&self) -> &'static str;
190+
191+
// Get the path of the operator in the workflow
192+
fn path(&self) -> WorkflowOperatorPath;
193+
194+
/// Return the name of the data loaded by the operator (if any)
195+
fn data(&self) -> Option<String> {
196+
None
197+
}
175198
}
176199

177200
/// A canonic name for an operator and its sources
@@ -247,6 +270,18 @@ impl InitializedRasterOperator for Box<dyn InitializedRasterOperator> {
247270
fn canonic_name(&self) -> CanonicOperatorName {
248271
self.as_ref().canonic_name()
249272
}
273+
274+
fn name(&self) -> &'static str {
275+
self.as_ref().name()
276+
}
277+
278+
fn path(&self) -> WorkflowOperatorPath {
279+
self.as_ref().path()
280+
}
281+
282+
fn data(&self) -> Option<String> {
283+
self.as_ref().data()
284+
}
250285
}
251286

252287
impl InitializedVectorOperator for Box<dyn InitializedVectorOperator> {
@@ -261,6 +296,18 @@ impl InitializedVectorOperator for Box<dyn InitializedVectorOperator> {
261296
fn canonic_name(&self) -> CanonicOperatorName {
262297
self.as_ref().canonic_name()
263298
}
299+
300+
fn name(&self) -> &'static str {
301+
self.as_ref().name()
302+
}
303+
304+
fn path(&self) -> WorkflowOperatorPath {
305+
self.as_ref().path()
306+
}
307+
308+
fn data(&self) -> Option<String> {
309+
self.as_ref().data()
310+
}
264311
}
265312

266313
impl InitializedPlotOperator for Box<dyn InitializedPlotOperator> {

0 commit comments

Comments
 (0)