Skip to content

Data tracks - incoming manager#1819

Open
1egoman wants to merge 46 commits intomainfrom
data-track-incoming-manager
Open

Data tracks - incoming manager#1819
1egoman wants to merge 46 commits intomainfrom
data-track-incoming-manager

Conversation

@1egoman
Copy link
Contributor

@1egoman 1egoman commented Feb 20, 2026

This pull request builds on the outgoing manager here and introduces a new "incoming manager" which is used for ingesting events from remote participants which are publishing data tracks.

@changeset-bot
Copy link

changeset-bot bot commented Feb 20, 2026

⚠️ No Changeset found

Latest commit: c5a8d77

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@github-actions
Copy link
Contributor

github-actions bot commented Feb 20, 2026

size-limit report 📦

Path Size
dist/livekit-client.esm.mjs 86.79 KB (-0.01% 🔽)
dist/livekit-client.umd.js 97.21 KB (-0.05% 🔽)

Comment on lines +286 to +291
// FIXME: this might be wrong? Shouldn't this only occur if it is the last subscription to
// terminate?
const previousDescriptorSubscription = descriptor.subscription;
descriptor.subscription = { type: 'none' };
this.subscriptionHandles.delete(previousDescriptorSubscription.subcriptionHandle);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the behavior of the rust implementation but by eye I think something might be off here, I would think that this code should only run on the final subscription termination, not on any subscription termination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ladvoc FYI, could you take a look at this and see if it looks right to you?

@1egoman 1egoman marked this pull request as ready for review February 20, 2026 20:27
@1egoman 1egoman requested review from ladvoc and lukasIO February 20, 2026 20:27
@1egoman 1egoman mentioned this pull request Feb 20, 2026
2 tasks
Leave these internal and export them later once these interfaces get
used more widely across all tracks. More context here:
#1819 (comment)
}
};

abortSignal?.addEventListener('abort', onAbort);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for safety it would make sense to check for abortSignal.aborted here before adding the event listener

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I also fixed this in the OutgoingDataTrackManager at the same time and added tests exercising both.

Also when digging into this I found another fairly nuanced / gnarly subscription management edge case where cancelling an abortsignal on one subscription would propagate to others. I think I've addressed it though and have a test for this case as well.

Just generally, if you can I'd love a second set of eyes going through the test cases and if you can think of any other ones for subscriptions specifically (that's probably the most nuanced part of this change) feel free to suggest additional ones. I still have a bit of work to add more of these.

const combinedSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal;

// Wait for the subscription to complete, or time out if it takes too long
const reader = await waitForCompletionFuture(descriptor, combinedSignal);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any means to also react to participant's disconnection here? both the local participant disconnecting and the remote participant disconnecting should probably error out early

Copy link
Contributor Author

@1egoman 1egoman Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I added an event in 6cbf73e which the room can send in when a remote participant disconnects and throw a new DataTrackSubscribeError.disconnected() error, along with an associated test case for both this when a subscription is pending and active.

also cc @ladvoc because the rust implementation probably also has this problem.

* SFU. */
private createReadableStream(sid: DataTrackSid) {
let streamController: ReadableStreamDefaultController<DataTrackFrame> | null = null;
return new ReadableStream<DataTrackFrame>({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should set an appropriate queuing strategy on the readable stream and
also potentially start to drop frames if the backpressure gets too high 🤔

Copy link
Contributor Author

@1egoman 1egoman Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea good point, I added something in 6bdd819. But FYI that other readable streams like the ones that power data streams don't have this, so it sounds like maybe there needs to be a pass done through everything to add it.

also cc @ladvoc, because this is similar to a discussion we had in regards to the rust implementation - the default channel buffer length is I think 4 and this should probably be configurable externally somehow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true. We should arguably add something for data streams as well. What makes it less significant - in theory - for data streams is that they are supposed to have a defined end.

Thanks for adding that queuing strategy. Should we maybe use a default that's a bit higher? 4 seems very low, but not sure either about the exact use cases.
ideally we'd want something that is a sane default that doesn't require adjustments from 90% of users

Copy link
Contributor Author

@1egoman 1egoman Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with 4 because that is what the rust implementation had used for its internal mpsc channel which acts in a similar fashion, but it looks like that might have been updated to 16 and I missed that (link), so I made that update!

If you think it should be higher, I'm open to it, but It's a hard thing to pick a non super high default for. The optimal value is heavily dependent on the sample rate of data being received, the size of each sample, and the speed at which samples are being consumed. So either we pick a super high default and users just assume it is unlimited until they hit an edge case and realize it isn't, or we pick a lower threshold and users may run into the barrier faster but it means they have to be more strategic about what it is configured to for their specific use case. Also cc @ladvoc in case you have any thoughts.

I think my preference would be to either keep it unlimited by default, pick some fairly conservative threshold, or (ideally but most complex) to somehow expose some function which can be fed in derived metrics and which could compute an optimal high water mark value.

No matter what we choose, both implementations should be updated in lock step so they have similar behavior.

AbortSignal.any / AbortSignal.timeout have been available widely since
2024, but that isn't far enough back for some user's use cases.
…e edge cases

1. If a subscription is cancelled, only propagate that upwards to the
   sfu subscription if it is the only remaining subscription
2. If the abort signal is already aborted, then terminate immediately
…bleStreams

This lets subscribers pick their optimal tradeoff between dropping
events versus caching them all in memory.
@1egoman
Copy link
Contributor Author

1egoman commented Mar 5, 2026

Ok I think this is completely done now in my eyes, @ladvoc @lukasIO other than the queuing strategy comment / coming to a decision on the default value, is there anything else either of you would like to see done to this before merging?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants