-
|
I'm building a stream processor that ingests and generates a multiplexed stream. (For those familiar with media stream, think: MPEG Transport Stream, linear satellite broadcast, but it equally applies to group chat sessions.) I need to demultiplex the stream and process each individual channel separately before muxing them back again, so something like The problem in my case is that the pipeline structure is dynamic. Embedded streams can come and go more or less asynchronously and the stream (ingest & egress) should be running before it's configuration (or even the number of sub-streams) is known. So far, I've just done this all manually, like so: class Dispatcher {
private Multi<String> incoming;
private Map<Long, Multi<String>> channels = new HashMap<>();
UnicastProcessor<String> outgoing = UnicastProcessor.create();
Dispatcher(Multi<String> incoming) {
this.incoming = incoming.broadcast()
.withCancellationAfterLastSubscriberDeparture()
.toAtLeast(1);
incoming.subscribe().with(n -> {
if (!ids.keySet().stream().anyMatch(id -> (n % id == 0))) {
System.err.printf("Channel %d not registered in %s\n", n, ids.keySet());
}
});
}
void registerStream(long id, Function<Multi<String>, Multi<String>> handler) {
ids.put(id, incoming.filter(m -> getChannelId(m) == id));
ids.get(id).subscribe().with(m -> {
outgoing.onNext(handler.apply(getPayload(m));
});
}
Multi<String> merged() {
return outgoing;
}
private static long getChannelId(String message) {...}
private static String getPayload(String message) {...}
}
static void main() {
Multi<String> stream = ...
var d = new Dispatcher(stream);
d.merged().subscribe().with(System.out::println);
//...
d.register(1, m -> m.toUpperCase());
// ...
d.register(2, m -> m.toLowerCase());
}...but I was wondering if there is a more idiomatic way to approach this. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
|
I don't have any good recommendation now. The original splitting feature was motivated by needs in Reactive Messaging (/cc @ozangunalp) and it was a sound assumption that you'd split into a finite amount of streams based on a key set. You might want to check https://javadoc.io/doc/io.smallrye.reactive/mutiny/latest/io.smallrye.mutiny/io/smallrye/mutiny/groups/MultiGroup.html |
Beta Was this translation helpful? Give feedback.
-
|
Wicked! Thanks so much, that is very helpful! record Message(long streamId, String payload) {
static Message alterPayload(Message message) {
return new Message(message.streamId, message.payload.toUpperCase());
}
}
public void main() {
Map<Long, Multi<Message>> subStreams = new ConcurrentHashMap<>();
{
Multi<Message> stream =
Multi.createFrom().ticks().every(Duration.ofMillis(10)).map(n -> new Message(n % 5, "foo" + n));
Multi<GroupedMulti<Long, Message>> groupedStreams = stream.group().by(Message::streamId);
groupedStreams
.subscribe()
.with(
group -> {
long id = group.key();
boolean streamOfInterest = id < 2;
if (streamOfInterest) {
System.out.printf("Saving stream with id=%d for future access\n", id);
subStreams.put(id, group);
} else {
System.out.printf("Ignoring stream with id=%d\n", id);
}
},
failure -> {
throw new RuntimeException(failure);
});
}
// ...And then later
Thread.sleep(100);
UnicastProcessor<String> merged = UnicastProcessor.create();
{
long audio = 1;
System.out.printf("Subscribing to audio stream id=%d\n", audio);
subStreams
.get(audio)
.map(Message::alterPayload)
.subscribe()
.with(
m -> {
merged.onNext(String.format("[Group with id=%d] Received audio: %s", audio, m));
},
f -> {
throw new RuntimeException(f);
});
}
{
long video = 0;
System.out.printf("Subscribing to video stream id=%d\n", video);
subStreams
.get(video)
.map(Message::alterPayload)
.subscribe()
.with(
m -> {
merged.onNext(String.format("[Group with id=%d] Received video: %s", video, m));
},
f -> {
throw new RuntimeException(f);
});
}
merged.subscribe().with(System.out::println);
Thread.sleep(1000);
}Which results in something like: |
Beta Was this translation helpful? Give feedback.
Actually something you could do to merge back a dynamic amount of streams is to:
UnicastProcessor(see https://smallrye.io/smallrye-mutiny/latest/guides/integrate-a-non-reactive-source/)That's not perfect and possibly dirty, but... it might be it