11package dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .sync ;
22
33import static org .junit .Assert .assertNotNull ;
4+ import static org .junit .Assert .assertNull ;
45import static org .junit .Assert .assertTrue ;
56import static org .junit .jupiter .api .Assertions .assertEquals ;
67import static org .mockito .ArgumentMatchers .any ;
1213import static org .mockito .Mockito .verify ;
1314import static org .mockito .Mockito .when ;
1415
16+ import com .google .protobuf .Struct ;
1517import dev .openfeature .contrib .providers .flagd .FlagdOptions ;
1618import dev .openfeature .contrib .providers .flagd .resolver .common .ChannelConnector ;
1719import dev .openfeature .contrib .providers .flagd .resolver .common .QueueingStreamObserver ;
@@ -73,6 +75,7 @@ void onNextEnqueuesDataPayload() throws Exception {
7375 BlockingQueue <QueuePayload > streamQueue = connector .getStreamQueue ();
7476 QueuePayload payload = streamQueue .poll (1000 , TimeUnit .MILLISECONDS );
7577 assertNotNull (payload );
78+ assertNotNull (payload .getSyncContext ());
7679 assertEquals (QueuePayloadType .DATA , payload .getType ());
7780 // should NOT have restarted the stream (1 call)
7881 verify (stub , times (1 )).syncFlags (any (), any ());
@@ -94,13 +97,37 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception {
9497 BlockingQueue <QueuePayload > streamQueue = connector .getStreamQueue ();
9598 QueuePayload payload = streamQueue .poll (1000 , TimeUnit .MILLISECONDS );
9699 assertNotNull (payload );
100+ assertNull (payload .getSyncContext ());
97101 assertEquals (QueuePayloadType .DATA , payload .getType ());
98102 // should NOT have restarted the stream (1 call)
99103 verify (stub , times (1 )).syncFlags (any (), any ());
100104 // should NOT have called getMetadata
101105 verify (blockingStub , times (0 )).getMetadata (any ());
102106 }
103107
108+ @ Test
109+ void onNextEnqueuesDataPayloadWithSyncContext () throws Exception {
110+ // disable GetMetadata call
111+ SyncStreamQueueSource connector = new SyncStreamQueueSource (
112+ FlagdOptions .builder ().build (), mockConnector , stub , blockingStub );
113+ latch = new CountDownLatch (1 );
114+ connector .init ();
115+ latch .await ();
116+
117+ // fire onNext (data) event
118+ Struct syncContext = Struct .newBuilder ().build ();
119+ observer .onNext (SyncFlagsResponse .newBuilder ().setSyncContext (syncContext ).build ());
120+
121+ // should enqueue data payload
122+ BlockingQueue <QueuePayload > streamQueue = connector .getStreamQueue ();
123+ QueuePayload payload = streamQueue .poll (1000 , TimeUnit .MILLISECONDS );
124+ assertNotNull (payload );
125+ assertEquals (syncContext , payload .getSyncContext ());
126+ assertEquals (QueuePayloadType .DATA , payload .getType ());
127+ // should NOT have restarted the stream (1 call)
128+ verify (stub , times (1 )).syncFlags (any (), any ());
129+ }
130+
104131 @ Test
105132 void onErrorEnqueuesDataPayload () throws Exception {
106133 SyncStreamQueueSource connector =
0 commit comments