Skip to content

Commit e8e9913

Browse files
authored
chore: group serve-store and builtin serve-sink (#2582)
Signed-off-by: Vigith Maurice <[email protected]>
1 parent b0dda04 commit e8e9913

File tree

8 files changed

+43
-43
lines changed

8 files changed

+43
-43
lines changed

rust/numaflow-core/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ mod tracker;
6161
/// [Map]: https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/
6262
mod mapper;
6363

64-
/// Serving store to store the result of the serving pipeline.
65-
mod serving_store;
66-
6764
/// [Watermark] _is a monotonically increasing timestamp of the oldest work/event not yet completed_
6865
///
6966
///

rust/numaflow-core/src/pipeline.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ use crate::pipeline::forwarder::source_forwarder;
1818
use crate::pipeline::isb::jetstream::reader::JetStreamReader;
1919
use crate::pipeline::isb::jetstream::writer::JetstreamWriter;
2020
use crate::pipeline::pipeline::isb::BufferReaderConfig;
21-
use crate::serving_store::ServingStore;
22-
use crate::serving_store::nats::NatsServingStore;
23-
use crate::serving_store::user_defined::UserDefinedStore;
2421
use crate::shared::create_components;
2522
use crate::shared::metrics::start_metrics_server;
23+
use crate::sink::serve::ServingStore;
24+
use crate::sink::serve::nats::NatsServingStore;
25+
use crate::sink::serve::user_defined::UserDefinedStore;
2626
use crate::tracker::TrackerHandle;
2727
use crate::watermark::WatermarkHandle;
2828
use crate::watermark::isb::ISBWatermarkHandle;

rust/numaflow-core/src/serving_store.rs

Lines changed: 0 additions & 28 deletions
This file was deleted.

rust/numaflow-core/src/shared/create_components.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ use crate::config::pipeline::map::{MapMode, MapType, MapVtxConfig};
1414
use crate::config::pipeline::{DEFAULT_BATCH_MAP_SOCKET, DEFAULT_STREAM_MAP_SOCKET};
1515
use crate::error::Error;
1616
use crate::mapper::map::MapHandle;
17-
use crate::serving_store::ServingStore;
1817
use crate::shared::grpc;
1918
use crate::shared::server_info::{ContainerType, sdk_server_info};
19+
use crate::sink::serve::ServingStore;
2020
use crate::sink::{SinkClientType, SinkWriter, SinkWriterBuilder};
2121
use crate::source::Source;
2222
use crate::source::generator::new_generator;

rust/numaflow-core/src/sink.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::metrics::{
3131
monovertex_metrics, mvtx_forward_metric_labels, pipeline_forward_metric_labels,
3232
pipeline_metrics,
3333
};
34-
use crate::serving_store::{ServingStore, StoreEntry};
34+
use crate::sink::serve::{ServingStore, StoreEntry};
3535
use crate::tracker::TrackerHandle;
3636

3737
/// A [Blackhole] sink which reads but never writes to anywhere, semantic equivalent of `/dev/null`.
@@ -44,7 +44,9 @@ mod blackhole;
4444
/// [Log]: https://numaflow.numaproj.io/user-guide/sinks/log/
4545
mod log;
4646

47-
mod serve;
47+
/// Serving [ServingStore] to store the result of the serving pipeline. It also contains the builtin [serve::ServeSink]
48+
/// to write to the serving store.
49+
pub mod serve;
4850

4951
/// [User-Defined Sink] extends Numaflow to add custom sources supported outside the builtins.
5052
///
@@ -958,12 +960,11 @@ mod tests {
958960
use super::*;
959961
use crate::config::pipeline::NatsStoreConfig;
960962
use crate::message::{IntOffset, Message, MessageID, Offset, ReadAck};
961-
use crate::serving_store::nats::NatsServingStore;
962963
use crate::shared::grpc::create_rpc_channel;
964+
use crate::sink::serve::nats::NatsServingStore;
963965
use async_nats::jetstream;
964966
use async_nats::jetstream::kv::Config;
965967
use chrono::{TimeZone, Utc};
966-
use futures::StreamExt;
967968
use numaflow::sink;
968969
use numaflow_pb::clients::sink::{SinkRequest, SinkResponse};
969970
use std::sync::Arc;

rust/numaflow-core/src/sink/serve.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,36 @@
1+
use crate::error;
2+
use crate::message::Message;
13
use crate::sink::{ResponseFromSink, ResponseStatusFromSink, Sink};
2-
use crate::{error, message::Message};
4+
use bytes::Bytes;
35

6+
/// User defined serving store to store the serving responses.
7+
pub(crate) mod user_defined;
8+
9+
/// Nats serving store to store the serving responses.
10+
pub(crate) mod nats;
11+
12+
/// Enum to represent different types of serving stores.
13+
#[derive(Clone)]
14+
pub(crate) enum ServingStore {
15+
UserDefined(user_defined::UserDefinedStore),
16+
Nats(nats::NatsServingStore),
17+
}
18+
19+
/// Entry in the serving store.
20+
#[derive(Clone, Debug)]
21+
pub(crate) struct StoreEntry {
22+
/// Pod Hash is for filtering the stream by each request originating pod while listening for
23+
/// "put" onto the KV store. This is used only on ISB KV store and this enables SSE (converse is
24+
/// that SSE is not supported on user defined store).
25+
pub(crate) pod_hash: String,
26+
/// Unique ID Of the request to which the result belongs. There could be multiple results for
27+
/// the same request.
28+
pub(crate) id: String,
29+
/// The result of the computation.
30+
pub(crate) value: Bytes,
31+
}
32+
33+
/// Builtin Sink to write to the serving store.
434
#[derive(Debug, Default, Clone)]
535
pub(crate) struct ServeSink;
636

@@ -24,9 +54,9 @@ mod tests {
2454

2555
use chrono::Utc;
2656

27-
use super::ServeSink;
2857
use crate::message::IntOffset;
2958
use crate::message::{Message, MessageID, Offset};
59+
use crate::sink::serve::ServeSink;
3060
use crate::sink::{ResponseFromSink, ResponseStatusFromSink, Sink};
3161

3262
#[tokio::test]

rust/numaflow-core/src/serving_store/nats.rs renamed to rust/numaflow-core/src/sink/serve/nats.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::config::pipeline::NatsStoreConfig;
2-
use crate::serving_store::StoreEntry;
2+
use crate::sink::serve::StoreEntry;
33
use async_nats::jetstream::Context;
44
use async_nats::jetstream::kv::Store;
55
use chrono::Utc;

rust/numaflow-core/src/serving_store/user_defined.rs renamed to rust/numaflow-core/src/sink/serve/user_defined.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use tokio::task::JoinSet;
44
use tonic::transport::Channel;
55

66
use crate::config::pipeline::UserDefinedStoreConfig;
7-
use crate::serving_store::StoreEntry;
87
use crate::shared;
8+
use crate::sink::serve::StoreEntry;
99

1010
/// User defined serving store to store the serving responses.
1111
#[derive(Clone)]

0 commit comments

Comments
 (0)