Skip to content

Commit a8e6c86

Browse files
authored
Merge pull request #14 from typester/feature/subscription-initial-state
feat: subscription sends initial state at first
2 parents 010982e + e00eab8 commit a8e6c86

File tree

1 file changed

+236
-14
lines changed

1 file changed

+236
-14
lines changed

src/gql.rs

Lines changed: 236 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use async_graphql::futures_util::future::ready;
2-
use async_graphql::futures_util::{Stream, StreamExt};
3-
use async_graphql::{Context, EmptyMutation, Enum, ID, Object, Schema, Subscription, Union};
4-
use std::collections::HashMap;
2+
use async_graphql::futures_util::{Stream, StreamExt, stream};
3+
use async_graphql::parser::types::{FragmentDefinition, Selection, SelectionSet};
4+
use async_graphql::{
5+
Context, EmptyMutation, Enum, ID, Name, Object, Positioned, Schema, Subscription, Union,
6+
};
7+
use std::collections::{HashMap, HashSet};
58
use std::sync::{Arc, RwLock};
69
use tokio::sync::broadcast::Sender;
710
use tokio_stream::wrappers::BroadcastStream;
@@ -236,6 +239,201 @@ impl RiverSnapshot {
236239
.find(|state| state.name.as_deref() == Some(name))
237240
.cloned()
238241
}
242+
243+
fn snapshot_events(
244+
&self,
245+
include_lists: bool,
246+
types: Option<&HashSet<RiverEventType>>,
247+
output_filter: Option<&str>,
248+
) -> Vec<RiverEvent> {
249+
let mut events = Vec::new();
250+
let type_allowed = |ty: RiverEventType| types.map_or(true, |set| set.contains(&ty));
251+
252+
for state in self.outputs.values() {
253+
let matches_output =
254+
output_filter.map_or(true, |target| state.name.as_deref() == Some(target));
255+
if !matches_output {
256+
continue;
257+
}
258+
259+
if type_allowed(RiverEventType::OutputFocusedTags) {
260+
if let Some(tags) = state.focused_tags {
261+
let tags_list = if include_lists {
262+
state.focused_tags_list.clone()
263+
} else {
264+
None
265+
};
266+
events.push(RiverEvent::OutputFocusedTags(GOutputFocusedTags {
267+
output_id: state.output_id.clone(),
268+
name: state.name.clone(),
269+
tags,
270+
tags_list,
271+
}));
272+
}
273+
}
274+
275+
if type_allowed(RiverEventType::OutputViewTags) {
276+
if let Some(tags) = &state.view_tags {
277+
events.push(RiverEvent::OutputViewTags(GOutputViewTags {
278+
output_id: state.output_id.clone(),
279+
name: state.name.clone(),
280+
tags: tags.clone(),
281+
}));
282+
}
283+
}
284+
285+
if type_allowed(RiverEventType::OutputUrgentTags) {
286+
if let Some(tags) = state.urgent_tags {
287+
let tags_list = if include_lists {
288+
state.urgent_tags_list.clone()
289+
} else {
290+
None
291+
};
292+
events.push(RiverEvent::OutputUrgentTags(GOutputUrgentTags {
293+
output_id: state.output_id.clone(),
294+
name: state.name.clone(),
295+
tags,
296+
tags_list,
297+
}));
298+
}
299+
}
300+
301+
match &state.layout_name {
302+
Some(layout) => {
303+
if type_allowed(RiverEventType::OutputLayoutName) {
304+
events.push(RiverEvent::OutputLayoutName(GOutputLayoutName {
305+
output_id: state.output_id.clone(),
306+
output_name: state.name.clone(),
307+
layout: layout.clone(),
308+
}));
309+
}
310+
}
311+
None => {
312+
if type_allowed(RiverEventType::OutputLayoutNameClear) {
313+
events.push(RiverEvent::OutputLayoutName(GOutputLayoutName {
314+
output_id: state.output_id.clone(),
315+
output_name: state.name.clone(),
316+
layout: String::new(),
317+
}));
318+
}
319+
}
320+
}
321+
}
322+
323+
if type_allowed(RiverEventType::SeatFocusedOutput) {
324+
if let Some(named) = &self.seat_focused_output {
325+
let matches_output =
326+
output_filter.map_or(true, |target| named.name.as_deref() == Some(target));
327+
if matches_output {
328+
events.push(RiverEvent::SeatFocusedOutput(GSeatFocusedOutput {
329+
output_id: named.output_id.clone(),
330+
name: named.name.clone(),
331+
}));
332+
}
333+
}
334+
}
335+
336+
if type_allowed(RiverEventType::SeatFocusedView) {
337+
if let Some(title) = &self.seat_focused_view {
338+
events.push(RiverEvent::SeatFocusedView(GSeatFocusedView {
339+
title: title.clone(),
340+
}));
341+
}
342+
}
343+
344+
if type_allowed(RiverEventType::SeatMode) {
345+
if let Some(name) = &self.seat_mode {
346+
events.push(RiverEvent::SeatMode(GSeatMode { name: name.clone() }));
347+
}
348+
}
349+
350+
events
351+
}
352+
}
353+
354+
fn collect_requested_event_types(
355+
fragments: &HashMap<Name, Positioned<FragmentDefinition>>,
356+
selection_set: &SelectionSet,
357+
out: &mut HashSet<RiverEventType>,
358+
visited: &mut HashSet<Name>,
359+
) {
360+
for selection in &selection_set.items {
361+
match &selection.node {
362+
Selection::Field(field) => {
363+
collect_requested_event_types(
364+
fragments,
365+
&field.node.selection_set.node,
366+
out,
367+
visited,
368+
);
369+
}
370+
Selection::InlineFragment(fragment) => {
371+
if let Some(condition) = &fragment.node.type_condition {
372+
for ty in event_types_for_name(condition.node.on.node.as_str()) {
373+
out.insert(ty);
374+
}
375+
}
376+
collect_requested_event_types(
377+
fragments,
378+
&fragment.node.selection_set.node,
379+
out,
380+
visited,
381+
);
382+
}
383+
Selection::FragmentSpread(spread) => {
384+
let name = spread.node.fragment_name.node.clone();
385+
if !visited.insert(name.clone()) {
386+
continue;
387+
}
388+
if let Some(fragment) = fragments.get(&name) {
389+
for ty in
390+
event_types_for_name(fragment.node.type_condition.node.on.node.as_str())
391+
{
392+
out.insert(ty);
393+
}
394+
collect_requested_event_types(
395+
fragments,
396+
&fragment.node.selection_set.node,
397+
out,
398+
visited,
399+
);
400+
}
401+
}
402+
}
403+
}
404+
}
405+
406+
fn requested_event_types(ctx: &Context<'_>) -> Option<HashSet<RiverEventType>> {
407+
let selection_set = &ctx.item.node.selection_set.node;
408+
if selection_set.items.is_empty() {
409+
return None;
410+
}
411+
let mut out = HashSet::new();
412+
let mut visited = HashSet::new();
413+
collect_requested_event_types(
414+
&ctx.query_env.fragments,
415+
selection_set,
416+
&mut out,
417+
&mut visited,
418+
);
419+
if out.is_empty() { None } else { Some(out) }
420+
}
421+
422+
fn event_types_for_name(name: &str) -> Vec<RiverEventType> {
423+
match name {
424+
"OutputFocusedTags" => vec![RiverEventType::OutputFocusedTags],
425+
"OutputViewTags" => vec![RiverEventType::OutputViewTags],
426+
"OutputUrgentTags" => vec![RiverEventType::OutputUrgentTags],
427+
"OutputLayoutName" => vec![
428+
RiverEventType::OutputLayoutName,
429+
RiverEventType::OutputLayoutNameClear,
430+
],
431+
"SeatFocusedOutput" => vec![RiverEventType::SeatFocusedOutput],
432+
"SeatUnfocusedOutput" => vec![RiverEventType::SeatUnfocusedOutput],
433+
"SeatFocusedView" => vec![RiverEventType::SeatFocusedView],
434+
"SeatMode" => vec![RiverEventType::SeatMode],
435+
_ => Vec::new(),
436+
}
239437
}
240438

241439
pub type RiverStateHandle = Arc<RwLock<RiverSnapshot>>;
@@ -624,22 +822,32 @@ impl SubscriptionRoot {
624822
let sender = ctx.data_unchecked::<Sender<river::Event>>().clone();
625823
let rx = sender.subscribe();
626824
let include_lists = tag_list.unwrap_or(false);
627-
let tset = types.map(|v| v.into_iter().collect::<std::collections::HashSet<_>>());
628-
BroadcastStream::new(rx).filter_map(move |item| {
629-
let include_lists = include_lists;
825+
let tset = types
826+
.map(|v| v.into_iter().collect::<HashSet<_>>())
827+
.or_else(|| requested_event_types(ctx));
828+
let initial_events = {
829+
let handle = ctx.data_unchecked::<RiverStateHandle>();
830+
match handle.read() {
831+
Ok(snapshot) => snapshot.snapshot_events(include_lists, tset.as_ref(), None),
832+
Err(_) => Vec::new(),
833+
}
834+
};
835+
let tset_for_updates = tset.clone();
836+
let updates = BroadcastStream::new(rx).filter_map(move |item| {
630837
let e = match item {
631838
Ok(ev) => ev,
632839
Err(_) => return ready(None),
633840
};
634-
let pass = tset
841+
let pass = tset_for_updates
635842
.as_ref()
636843
.map_or(true, |ts| ts.contains(&RiverEventType::from(&e)));
637844
if pass {
638845
ready(Some(make_river_event(e, include_lists)))
639846
} else {
640847
ready(None)
641848
}
642-
})
849+
});
850+
stream::iter(initial_events.into_iter()).chain(updates)
643851
}
644852

645853
async fn events_for_output(
@@ -651,16 +859,29 @@ impl SubscriptionRoot {
651859
) -> impl Stream<Item = RiverEvent> {
652860
let sender = ctx.data_unchecked::<Sender<river::Event>>().clone();
653861
let rx = sender.subscribe();
654-
let target_output = output_name;
655862
let include_lists = tag_list.unwrap_or(false);
656-
let tset = types.map(|v| v.into_iter().collect::<std::collections::HashSet<_>>());
657-
BroadcastStream::new(rx).filter_map(move |item| {
658-
let include_lists = include_lists;
863+
let tset = types
864+
.map(|v| v.into_iter().collect::<HashSet<_>>())
865+
.or_else(|| requested_event_types(ctx));
866+
let target_output = output_name;
867+
let initial_events = {
868+
let handle = ctx.data_unchecked::<RiverStateHandle>();
869+
match handle.read() {
870+
Ok(snapshot) => snapshot.snapshot_events(
871+
include_lists,
872+
tset.as_ref(),
873+
Some(target_output.as_str()),
874+
),
875+
Err(_) => Vec::new(),
876+
}
877+
};
878+
let tset_for_updates = tset.clone();
879+
let updates = BroadcastStream::new(rx).filter_map(move |item| {
659880
let e = match item {
660881
Ok(ev) => ev,
661882
Err(_) => return ready(None),
662883
};
663-
let type_pass = tset
884+
let type_pass = tset_for_updates
664885
.as_ref()
665886
.map_or(true, |ts| ts.contains(&RiverEventType::from(&e)));
666887
let output_pass = event_matches_output_name(&e, &target_output);
@@ -669,7 +890,8 @@ impl SubscriptionRoot {
669890
} else {
670891
ready(None)
671892
}
672-
})
893+
});
894+
stream::iter(initial_events.into_iter()).chain(updates)
673895
}
674896
}
675897

0 commit comments

Comments
 (0)