diff --git a/packages/feeds-client/__integration-tests__/deferred-own-capabilities-hydration.test.ts b/packages/feeds-client/__integration-tests__/deferred-own-capabilities-hydration.test.ts deleted file mode 100644 index c65f7c1c..00000000 --- a/packages/feeds-client/__integration-tests__/deferred-own-capabilities-hydration.test.ts +++ /dev/null @@ -1,336 +0,0 @@ -import { - describe, - expect, - it, - vi, - afterEach, - beforeAll, - afterAll, -} from 'vitest'; -import type { UserRequest } from '../src/gen/models'; -import { - createTestClient, - createTestTokenGenerator, - getServerClient, - getTestUser, - waitForEvent, -} from './utils'; -import type { FeedsClient } from '../src/feeds-client'; -import type { Feed } from '../src/feed'; -import type { - ActivityResponse, - StreamClient, - StreamFeed, -} from '@stream-io/node-sdk'; - -describe('Deferred own_capabilities hydration', () => { - const feedGroup = 'timeline'; - const feedId = crypto.randomUUID(); - let clientRef: FeedsClient; - let serverClient: StreamClient; - let ownUser: UserRequest = getTestUser(); - let otherUsers: UserRequest[] = []; - let otherUsersWithExistingActivities: UserRequest[] = []; - let ownFeedRef: Feed; - const otherFeeds: StreamFeed[] = []; - const otherFeedsWithExistingActivities: StreamFeed[] = []; - const initialActivities: ActivityResponse[] = []; - - beforeAll(async () => { - ownUser = getTestUser(); - otherUsers = [getTestUser(), getTestUser(), getTestUser()]; - otherUsersWithExistingActivities = [ - getTestUser(), - getTestUser(), - getTestUser(), - ]; - clientRef = createTestClient(); - serverClient = getServerClient(); - await clientRef.connectUser(ownUser, createTestTokenGenerator(ownUser)); - await serverClient.upsertUsers([ - ...otherUsers, - ...otherUsersWithExistingActivities, - ]); - ownFeedRef = clientRef.feed(feedGroup, feedId); - await ownFeedRef.getOrCreate({ - watch: false, - member_pagination: { limit: 25 }, - limit: 25, - }); - const ownActivityResponse = await serverClient.feeds.addActivity({ - user_id: ownUser.id, - type: 'post', - feeds: [ownFeedRef.feed], - text: `Initial activity from ${ownFeedRef.feed}`, - }); - initialActivities.push(ownActivityResponse.activity); - for (let i = 0; i < otherUsers.length; i++) { - const otherUser = otherUsers[i]; - const otherFeed = serverClient.feeds.feed('user', otherUser.id); - await otherFeed.getOrCreate({ watch: false, user_id: otherUser.id }); - await ownFeedRef.follow(otherFeed.feed); - otherFeeds.push(otherFeed); - } - - for (let i = 0; i < otherUsersWithExistingActivities.length; i++) { - const otherUser = otherUsersWithExistingActivities[i]; - const otherFeed = serverClient.feeds.feed('user', otherUser.id); - await otherFeed.getOrCreate({ watch: false, user_id: otherUser.id }); - await ownFeedRef.follow(otherFeed.feed); - otherFeedsWithExistingActivities.push(otherFeed); - const activityResponse = await serverClient.feeds.addActivity({ - user_id: otherUser.id, - type: 'post', - feeds: [otherFeed.feed], - text: `Initial activity from ${otherFeed.feed}`, - }); - initialActivities.push(activityResponse.activity); - } - }); - - it('should properly populate capabilities on getOrCreate', async () => { - const client = createTestClient(); - await client.connectUser(ownUser, createTestTokenGenerator(ownUser)); - const ownFeed = client.feed(feedGroup, feedId); - - const initialCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - expect(Object.keys(initialCapabilities).length).toBe(0); - - await ownFeed.getOrCreate({ watch: false }); - - // should populate from activities after getOrCreate - const newCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - expect(Object.keys(newCapabilities).length).toBe(4); - expect(newCapabilities[ownFeed.feed]).toBeDefined(); - for (let i = 0; i < otherFeedsWithExistingActivities.length; i++) { - const otherFeed = otherFeedsWithExistingActivities[i]; - expect(newCapabilities[otherFeed.feed]).toBeDefined(); - } - }); - - it('should properly populate capabilities on queryFeeds', async () => { - const client = createTestClient(); - await client.connectUser(ownUser, createTestTokenGenerator(ownUser)); - const ownFeed = client.feed(feedGroup, feedId); - - const initialCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - expect(Object.keys(initialCapabilities).length).toBe(0); - - const feedsToQuery = [ownFeed.feed, otherFeeds[0].feed, otherFeeds[1].feed]; - - await client.queryFeeds({ filter: { feed: { $in: feedsToQuery } } }); - - const newCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - expect(Object.keys(newCapabilities).length).toBe(3); - for (const feed of feedsToQuery) { - expect(newCapabilities[feed]).toBeDefined(); - } - }); - - it(`should not populate capabilities on queryActivities - backend doesn't return it`, async () => { - const throttleTimeout = 1000; - const client = createTestClient({ - query_batch_own_capabilties_throttling_interval: throttleTimeout, - }); - await client.connectUser(ownUser, createTestTokenGenerator(ownUser)); - const getCapabilitiesSpy = vi.spyOn( - client as any, - 'throttledGetBatchOwnCapabilities', - ); - - const initialCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - expect(Object.keys(initialCapabilities).length).toBe(0); - - await client.queryActivities({ - filter: { id: { $in: initialActivities.map((a) => a.id) } }, - }); - - await new Promise((resolve) => setTimeout(resolve, throttleTimeout)); - - expect(getCapabilitiesSpy).not.toHaveBeenCalled(); - }); - - it('should not add extra capabilities in the cache if they already exist', async () => { - const client = createTestClient(); - await client.connectUser(ownUser, createTestTokenGenerator(ownUser)); - const getCapabilitiesSpy = vi.spyOn( - client as any, - 'throttledGetBatchOwnCapabilities', - ); - const ownFeed = client.feed(feedGroup, feedId); - - await ownFeed.getOrCreate({ - watch: true, - member_pagination: { limit: 25 }, - limit: 25, - }); - - const initialCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - - await ownFeed.addActivity({ - type: 'post', - text: `Another activity from ${ownFeed.feed}`, - }); - - await waitForEvent(ownFeed, 'feeds.activity.added', { timeoutMs: 1000 }); - - const newCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - expect(initialCapabilities).toBe(newCapabilities); - expect(getCapabilitiesSpy).not.toHaveBeenCalled(); - }); - - it('should hydrate with extra capabilities if they do not exist in the cache', async () => { - const client = createTestClient(); - await client.connectUser(ownUser, createTestTokenGenerator(ownUser)); - const getCapabilitiesSpy = vi.spyOn( - client as any, - 'throttledGetBatchOwnCapabilities', - ); - const ownFeed = client.feed(feedGroup, feedId); - - await ownFeed.getOrCreate({ - watch: true, - member_pagination: { limit: 25 }, - limit: 25, - }); - - const initialCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - - for (const key of Object.keys(initialCapabilities)) { - if ( - ![ - ownFeed.feed, - ...otherFeedsWithExistingActivities.map((f) => f.feed), - ].includes(key) - ) { - delete initialCapabilities[key]; - } - } - - client.state.partialNext({ own_capabilities_by_fid: initialCapabilities }); - - const otherFeed = otherFeeds[0]; - const otherUser = otherUsers[0]; - - await serverClient.feeds.addActivity({ - user_id: otherUser.id, - type: 'post', - feeds: [otherFeed.feed], - text: `Initial activity from ${otherFeed.feed}`, - }); - - await waitForEvent(ownFeed, 'feeds.activity.added', { timeoutMs: 1000 }); - - await vi.waitFor( - () => { - const finalCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - expect(Object.keys(finalCapabilities).length).toBe( - Object.keys(initialCapabilities).length + 1, - ); - expect(finalCapabilities[otherFeed.feed]).toBeDefined(); - }, - { timeout: 1000, interval: 50 }, - ); - - expect(getCapabilitiesSpy).toHaveBeenCalledExactlyOnceWith( - [otherFeed.feed], - expect.any(Function), - ); - }); - - it('should throttle new capabilities hydration', async () => { - const client = createTestClient(); - const getCapabilitiesSpy = vi.spyOn(client as any, 'ownBatch'); - await client.connectUser(ownUser, createTestTokenGenerator(ownUser)); - const ownFeed = client.feed(feedGroup, feedId); - - await ownFeed.getOrCreate({ - watch: true, - member_pagination: { limit: 25 }, - limit: 25, - }); - - const initialCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - - for (const key of Object.keys(initialCapabilities)) { - if ( - ![ - ownFeed.feed, - ...otherFeedsWithExistingActivities.map((f) => f.feed), - ].includes(key) - ) { - delete initialCapabilities[key]; - } - } - - client.state.partialNext({ own_capabilities_by_fid: initialCapabilities }); - - for (let i = 0; i < otherUsers.length; i++) { - const otherFeed = otherFeeds[i]; - const otherUser = otherUsers[i]; - await serverClient.feeds.addActivity({ - user_id: otherUser.id, - type: 'post', - feeds: [otherFeed.feed], - text: `Initial activity from ${otherFeed.feed}`, - }); - } - - await vi.waitFor( - () => { - const finalCapabilities = - client.state.getLatestValue().own_capabilities_by_fid; - expect(Object.keys(finalCapabilities).length).toBe( - Object.keys(initialCapabilities).length + 3, - ); - for (const otherFeed of otherFeeds) { - expect(finalCapabilities[otherFeed.feed]).toBeDefined(); - } - }, - // always leave enough of a timeout for the fetch to fire 3 times; - // it should of course fire only 2 and be done at most in 2000 + some - // delta ms, but just in case this behaviour gets broken - { timeout: 6050, interval: 50 }, - ); - - expect(getCapabilitiesSpy).toHaveBeenCalledTimes(2); - expect(getCapabilitiesSpy.mock.calls[0][0]).toStrictEqual({ - feeds: [otherFeeds[0].feed], - }); - expect(getCapabilitiesSpy.mock.calls[1][0]).toStrictEqual({ - feeds: [otherFeeds[1].feed, otherFeeds[2].feed], - }); - }); - - afterAll(async () => { - await ownFeedRef.delete({ hard_delete: true }); - for (let i = 0; i < otherFeeds.length; i++) { - const otherFeed = otherFeeds[i]; - await otherFeed.delete({ hard_delete: true }); - } - for (let i = 0; i < otherFeedsWithExistingActivities.length; i++) { - const otherFeed = otherFeedsWithExistingActivities[i]; - await otherFeed.delete({ hard_delete: true }); - } - await serverClient.deleteUsers({ - user_ids: [...otherUsers, ...otherUsersWithExistingActivities].map( - (u) => u.id, - ), - }); - await clientRef.disconnectUser(); - }); - - afterEach(() => { - vi.resetAllMocks(); - }); -}); diff --git a/packages/feeds-client/__integration-tests__/deferred-own-fields-hydration.test.ts b/packages/feeds-client/__integration-tests__/deferred-own-fields-hydration.test.ts new file mode 100644 index 00000000..3b667ff9 --- /dev/null +++ b/packages/feeds-client/__integration-tests__/deferred-own-fields-hydration.test.ts @@ -0,0 +1,135 @@ +import { + describe, + expect, + it, + vi, + afterEach, + beforeAll, + afterAll, +} from 'vitest'; +import type { UserRequest } from '../src/gen/models'; +import { + createTestClient, + createTestTokenGenerator, + getServerClient, + getTestUser, + waitForEvent, +} from './utils'; +import type { FeedsClient } from '../src/feeds-client'; +import type { Feed } from '../src/feed'; +import type { StreamClient } from '@stream-io/node-sdk'; + +describe('Deferred own_ fields hydration', () => { + let clientRef: FeedsClient; + let serverClient: StreamClient; + let ownUser: UserRequest = getTestUser(); + let otherUsers: UserRequest[] = []; + let ownTimeline: Feed; + let ownFeed: Feed; + + beforeAll(async () => { + ownUser = getTestUser(); + otherUsers = [getTestUser()]; + clientRef = createTestClient(); + serverClient = getServerClient(); + await clientRef.connectUser(ownUser, createTestTokenGenerator(ownUser)); + await serverClient.upsertUsers([...otherUsers]); + ownTimeline = clientRef.feed('timeline', ownUser.id); + ownFeed = clientRef.feed('user', ownUser.id); + await ownFeed.getOrCreate(); + await ownTimeline.getOrCreate({ + watch: false, + limit: 25, + }); + for (let i = 0; i < otherUsers.length; i++) { + const otherUser = otherUsers[i]; + const otherFeed = serverClient.feeds.feed('user', otherUser.id); + await otherFeed.getOrCreate({ user_id: otherUser.id }); + const otherTimeline = serverClient.feeds.feed('timeline', otherUser.id); + await otherTimeline.getOrCreate({ user_id: otherUser.id }); + await ownTimeline.follow(otherFeed.feed); + // Ensures data in own_followings + await serverClient.feeds.follow({ + source: otherTimeline.feed, + target: ownFeed.feed, + }); + // Ensures data in own_membership + await serverClient.feeds.updateFeedMembers({ + feed_id: otherUser.id, + feed_group_id: 'user', + members: [{ user_id: ownUser.id, role: 'feed_member' }], + operation: 'upsert', + }); + } + }); + + it('should hydrate with own_ fields if they do not exist in the cache', async () => { + const client = createTestClient(); + await client.connectUser(ownUser, createTestTokenGenerator(ownUser)); + const ownBatchThrottledSpy = vi.spyOn( + client as any, + 'throttledGetBatchOwnFields', + ); + + const timeline = client.feed('timeline', ownUser.id); + await timeline.getOrCreate({ + watch: true, + limit: 25, + }); + + const otherUser = otherUsers[0]; + + serverClient.feeds.addActivity({ + user_id: otherUser.id, + type: 'post', + feeds: [`user:${otherUser.id}`], + text: `Initial activity from ${otherUser.id}`, + }); + + await waitForEvent(timeline, 'feeds.activity.added', { + timeoutMs: 10000, + shouldReject: true, + }); + + await vi.waitFor( + () => { + const feed = client.feed('user', otherUser.id); + expect(feed.currentState.own_capabilities).toBeDefined(); + expect(feed.currentState.own_follows).toBeDefined(); + expect(feed.currentState.own_followings).toBeDefined(); + expect(feed.currentState.own_membership).toBeDefined(); + }, + { timeout: 1000, interval: 50 }, + ); + + expect(ownBatchThrottledSpy).toHaveBeenCalledExactlyOnceWith( + [`user:${otherUser.id}`], + expect.any(Function), + ); + }); + + afterAll(async () => { + await ownTimeline.delete({ hard_delete: true }); + await ownFeed.delete({ hard_delete: true }); + for (let i = 0; i < otherUsers.length; i++) { + await serverClient.feeds.deleteFeed({ + feed_group_id: 'user', + feed_id: otherUsers[i].id, + hard_delete: true, + }); + await serverClient.feeds.deleteFeed({ + feed_group_id: 'timeline', + feed_id: otherUsers[i].id, + hard_delete: true, + }); + } + await serverClient.deleteUsers({ + user_ids: [...otherUsers].map((u) => u.id), + }); + await clientRef.disconnectUser(); + }); + + afterEach(() => { + vi.resetAllMocks(); + }); +}); diff --git a/packages/feeds-client/__integration-tests__/docs-snippets/feed-capabilities.test.ts b/packages/feeds-client/__integration-tests__/docs-snippets/feed-capabilities.test.ts index 932b753c..56e17f8d 100644 --- a/packages/feeds-client/__integration-tests__/docs-snippets/feed-capabilities.test.ts +++ b/packages/feeds-client/__integration-tests__/docs-snippets/feed-capabilities.test.ts @@ -32,12 +32,16 @@ describe('Feeds capabilities page', () => { const activity = feed.state.getLatestValue().activities?.[0]!; + const activityFeed = client.feed( + activity.current_feed!.group_id, + activity.current_feed!.id, + ); + // Make sure to subscribe to changes, it's not guaranteed that own capabilities are ready by the time an activity is being displayed // Usually you do this in a lifecycle method that's called when an activity component is being created - const unsubscribe = client.state.subscribeWithSelector( + const unsubscribe = activityFeed.state.subscribeWithSelector( (state) => ({ - ownCapabilities: - state.own_capabilities_by_fid[activity.current_feed?.feed ?? ''], + ownCapabilities: state.own_capabilities, }), (state) => { console.log(state.ownCapabilities); diff --git a/packages/feeds-client/__integration-tests__/websocket-connection.test.ts b/packages/feeds-client/__integration-tests__/websocket-connection.test.ts index 95fc62cc..d04cbdb1 100644 --- a/packages/feeds-client/__integration-tests__/websocket-connection.test.ts +++ b/packages/feeds-client/__integration-tests__/websocket-connection.test.ts @@ -24,7 +24,6 @@ describe('WebSocket connection', () => { { connected_user: undefined, is_ws_connection_healthy: false, - own_capabilities_by_fid: {}, }, undefined, ); diff --git a/packages/feeds-client/src/bindings/react/hooks/feed-state-hooks/useOwnCapabilities.ts b/packages/feeds-client/src/bindings/react/hooks/feed-state-hooks/useOwnCapabilities.ts index 6dbc9704..ff6bf1fd 100644 --- a/packages/feeds-client/src/bindings/react/hooks/feed-state-hooks/useOwnCapabilities.ts +++ b/packages/feeds-client/src/bindings/react/hooks/feed-state-hooks/useOwnCapabilities.ts @@ -1,38 +1,29 @@ -import { useCallback } from 'react'; import { useStateStore } from '@stream-io/state-store/react-bindings'; import { useFeedContext } from '../../contexts/StreamFeedContext'; -import { useFeedsClient } from '../../contexts/StreamFeedsContext'; -import type { Feed } from '../../../../feed'; +import type { Feed, FeedState } from '../../../../feed'; import type { FeedOwnCapability } from '../../../../gen/models'; -import type { FeedsClientState } from '../../../../feeds-client'; +import { useFeedsClient } from '../../contexts/StreamFeedsContext'; const stableEmptyArray: readonly FeedOwnCapability[] = []; +const selector = (currentState: FeedState) => { + return { + feedOwnCapabilities: currentState.own_capabilities ?? stableEmptyArray, + }; +}; + export const useOwnCapabilities = (feedFromProps?: Feed | string) => { const client = useFeedsClient(); const feedFromContext = useFeedContext(); - const feed = feedFromProps ?? feedFromContext; - const fid = typeof feed === 'string' ? feed : feed?.feed; - - const selector = useCallback( - (currentState: FeedsClientState) => { - if (!fid) { - return { feedOwnCapabilities: stableEmptyArray }; - } - - return { - feedOwnCapabilities: - currentState.own_capabilities_by_fid[fid] ?? stableEmptyArray, - }; - }, - [fid], - ); + let feed = feedFromProps ?? feedFromContext; + if (typeof feed === 'string') { + const [groupId, id] = feed.split(':'); + feed = groupId && id ? client?.feed(groupId, id) : undefined; + } const { feedOwnCapabilities = stableEmptyArray } = - useStateStore(client?.state, selector) ?? {}; - - // console.log('GETTING CAPA: ', feed?.feed, feedOwnCapabilities); + useStateStore(feed?.state, selector) ?? {}; return feedOwnCapabilities; }; diff --git a/packages/feeds-client/src/common/types.ts b/packages/feeds-client/src/common/types.ts index b3673754..24349ffa 100644 --- a/packages/feeds-client/src/common/types.ts +++ b/packages/feeds-client/src/common/types.ts @@ -6,7 +6,7 @@ export type FeedsClientOptions = { base_url?: string; timeout?: number; configure_loggers_options?: ConfigureLoggersOptions; - query_batch_own_capabilties_throttling_interval?: number; + query_batch_own_fields_throttling_interval?: number; }; export type RateLimit = { diff --git a/packages/feeds-client/src/feed/event-handlers/activity/handle-activity-added.ts b/packages/feeds-client/src/feed/event-handlers/activity/handle-activity-added.ts index b736c74b..231ca291 100644 --- a/packages/feeds-client/src/feed/event-handlers/activity/handle-activity-added.ts +++ b/packages/feeds-client/src/feed/event-handlers/activity/handle-activity-added.ts @@ -62,12 +62,6 @@ export function handleActivityAdded( const activity = event.activity; this.client.hydratePollCache([activity]); - const currentFeed = activity.current_feed; - - if (currentFeed) { - this.client.hydrateCapabilitiesCache([currentFeed]); - } - this.state.partialNext({ activities: result.activities }); } } diff --git a/packages/feeds-client/src/feed/event-handlers/activity/handle-activity-updated.ts b/packages/feeds-client/src/feed/event-handlers/activity/handle-activity-updated.ts index 113a9afc..fe4c53a6 100644 --- a/packages/feeds-client/src/feed/event-handlers/activity/handle-activity-updated.ts +++ b/packages/feeds-client/src/feed/event-handlers/activity/handle-activity-updated.ts @@ -92,10 +92,6 @@ export function handleActivityUpdated( if (result1?.changed || result2.changed) { this.client.hydratePollCache([payload.activity]); - if (payload.activity.current_feed) { - this.client.hydrateCapabilitiesCache([payload.activity.current_feed]); - } - this.state.partialNext({ activities: result1?.changed ? result1.entities : currentActivities, pinned_activities: result2.entities, diff --git a/packages/feeds-client/src/feed/feed.test.ts b/packages/feeds-client/src/feed/feed.test.ts index 6edad79d..ec92ef4b 100644 --- a/packages/feeds-client/src/feed/feed.test.ts +++ b/packages/feeds-client/src/feed/feed.test.ts @@ -1,9 +1,10 @@ -import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { type Mock } from 'vitest'; import { FeedsClient } from '../feeds-client'; import { Feed } from './feed'; import type { ActivityResponse } from '../gen/models'; import { generateActivityResponse, generateFeedResponse } from '../test-utils'; +import { clearQueuedFeeds } from '../utils/throttling'; describe('Feed derived state updates', () => { let feed: Feed; @@ -105,7 +106,6 @@ describe(`getOrCreate`, () => { limit: 10, feed: generateFeedResponse({}), }), - hydrateCapabilitiesCache: vi.fn(), hydratePollCache: vi.fn(), } as unknown as { [key in keyof FeedsClient]: Mock }; const feedResponse = generateFeedResponse({ id: 'main', group_id: 'user' }); @@ -214,14 +214,27 @@ describe(`getOrCreate`, () => { describe(`newActivitiesAdded`, () => { let feed: Feed; - let client: Record; + let client: Record< + keyof FeedsClient | 'getOrCreateActiveFeed' | 'throttledGetBatchOwnFields', + Mock + >; beforeEach(() => { client = { getOrCreateActiveFeed: vi.fn(), - hydrateCapabilitiesCache: vi.fn(), hydratePollCache: vi.fn(), - } as unknown as Record; + throttledGetBatchOwnFields: vi.fn(), + feed: vi.fn().mockReturnValue({ + currentState: { + own_capabilities: undefined, + }, + }), + } as unknown as Record< + | keyof FeedsClient + | 'getOrCreateActiveFeed' + | 'throttledGetBatchOwnFields', + Mock + >; const feedResponse = generateFeedResponse({ id: 'user-123', group_id: 'user', @@ -284,17 +297,17 @@ describe(`newActivitiesAdded`, () => { group: feed1.group_id, id: feed1.id, data: feed1, - fromWebSocket: false, + fieldsToUpdate: ['own_capabilities', 'own_follows', 'own_membership'], }); expect(client['getOrCreateActiveFeed']).toHaveBeenCalledWith({ group: feed2.group_id, id: feed2.id, data: feed2, - fromWebSocket: false, + fieldsToUpdate: ['own_capabilities', 'own_follows', 'own_membership'], }); }); - it(`should set fromWebSocket flag to true if activities are added from a WebSocket event`, () => { + it(`should pass empty fieldsToUpdate array when fromWebSocket is true`, () => { const currentFeed = generateFeedResponse({ group_id: 'user', id: '123', @@ -309,7 +322,139 @@ describe(`newActivitiesAdded`, () => { group: currentFeed.group_id, id: currentFeed.id, data: currentFeed, - fromWebSocket: true, + fieldsToUpdate: [], + }); + }); + + it(`should fetch own_ fields if own_capabilities is undefined`, () => { + const feed1 = generateFeedResponse({ + group_id: 'user', + id: '123', + feed: 'user:123', + }); + const activity1 = generateActivityResponse({ current_feed: feed1 }); + + feed['newActivitiesAdded']([activity1]); + + // Don't call when not from WebSocket + expect(client['throttledGetBatchOwnFields']).toHaveBeenCalledTimes(0); + + const feed2 = generateFeedResponse({ + group_id: 'user', + id: '789', + feed: 'user:789', + }); + const activity2 = generateActivityResponse({ current_feed: feed2 }); + feed['newActivitiesAdded']([activity2], { fromWebSocket: true }); + + // Call when feed not seen + expect(client['throttledGetBatchOwnFields']).toHaveBeenCalledTimes(1); + const lastCall = client['throttledGetBatchOwnFields'].mock.lastCall; + expect(lastCall?.[0]).toEqual([feed2.feed]); + }); + + it('should include own_followings in fieldsToUpdate when enrich_own_followings is true and not from WebSocket', () => { + feed.state.partialNext({ + last_get_or_create_request_config: { + enrichment_options: { + enrich_own_followings: true, + }, + }, + }); + + const currentFeed = generateFeedResponse({ + group_id: 'user', + id: '123', + feed: 'user:123', + }); + feed['newActivitiesAdded']( + [generateActivityResponse({ current_feed: currentFeed })], + { fromWebSocket: false }, + ); + + expect(client['getOrCreateActiveFeed']).toHaveBeenCalledWith({ + group: currentFeed.group_id, + id: currentFeed.id, + data: currentFeed, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], + }); + }); + + it('should not include own_followings in fieldsToUpdate when enrich_own_followings is false and not from WebSocket', () => { + feed.state.partialNext({ + last_get_or_create_request_config: { + enrichment_options: { + enrich_own_followings: false, + }, + }, + }); + + const currentFeed = generateFeedResponse({ + group_id: 'user', + id: '123', + feed: 'user:123', + }); + feed['newActivitiesAdded']( + [generateActivityResponse({ current_feed: currentFeed })], + { fromWebSocket: false }, + ); + + expect(client['getOrCreateActiveFeed']).toHaveBeenCalledWith({ + group: currentFeed.group_id, + id: currentFeed.id, + data: currentFeed, + fieldsToUpdate: ['own_capabilities', 'own_follows', 'own_membership'], + }); + }); + + it('should not include own_followings in fieldsToUpdate when enrich_own_followings is undefined and not from WebSocket', () => { + feed.state.partialNext({ + last_get_or_create_request_config: { + enrichment_options: {}, + }, }); + + const currentFeed = generateFeedResponse({ + group_id: 'user', + id: '123', + feed: 'user:123', + }); + feed['newActivitiesAdded']( + [generateActivityResponse({ current_feed: currentFeed })], + { fromWebSocket: false }, + ); + + expect(client['getOrCreateActiveFeed']).toHaveBeenCalledWith({ + group: currentFeed.group_id, + id: currentFeed.id, + data: currentFeed, + fieldsToUpdate: ['own_capabilities', 'own_follows', 'own_membership'], + }); + }); + + it('should always include own_capabilities, own_follows, own_membership when not from WebSocket', () => { + const currentFeed = generateFeedResponse({ + group_id: 'user', + id: '123', + feed: 'user:123', + }); + feed['newActivitiesAdded']( + [generateActivityResponse({ current_feed: currentFeed })], + { fromWebSocket: false }, + ); + + const call = client['getOrCreateActiveFeed'].mock.calls[0][0]; + expect(call.fieldsToUpdate).toContain('own_capabilities'); + expect(call.fieldsToUpdate).toContain('own_follows'); + expect(call.fieldsToUpdate).toContain('own_membership'); + }); + + afterEach(() => { + clearQueuedFeeds(); }); }); diff --git a/packages/feeds-client/src/feed/feed.ts b/packages/feeds-client/src/feed/feed.ts index f7b5d2fd..13c7af8c 100644 --- a/packages/feeds-client/src/feed/feed.ts +++ b/packages/feeds-client/src/feed/feed.ts @@ -14,6 +14,7 @@ import type { FollowRequest, QueryCommentsRequest, ActivityAddedEvent, + EnrichmentOptions, } from '../gen/models'; import type { StreamResponse } from '../gen-imports'; import { StateStore } from '@stream-io/state-store'; @@ -63,16 +64,16 @@ import { checkHasAnotherPage, Constants, feedsLoggerSystem, - ownFeedFields, uniqueArrayMerge, } from '../utils'; import { handleActivityFeedback } from './event-handlers/activity/handle-activity-feedback'; import { deepEqual } from '../utils/deep-equal'; import { getOrCreateActiveFeed } from '../feeds-client/get-or-create-active-feed'; +import { queueBatchedOwnFields } from '../utils/throttling'; export type FeedState = Omit< Partial, - 'feed' | 'own_capabilities' | 'duration' + 'feed' | 'duration' > & { /** * True when loading state using `getOrCreate` @@ -333,11 +334,6 @@ export class Feed extends FeedApi { } } - this.client.hydrateCapabilitiesCache([ - response.feed, - ...currentActivityFeeds, - ]); - if (request?.next) { const { activities: currentActivities = [] } = this.currentState; @@ -919,10 +915,7 @@ export class Feed extends FeedApi { ...request, feeds: [this.feed], }); - const currentFeed = response.activity.current_feed; - if (currentFeed) { - this.client.hydrateCapabilitiesCache([currentFeed]); - } + return response; } @@ -934,34 +927,19 @@ export class Feed extends FeedApi { // no need to run noop function if (eventHandler !== Feed.noop) { - if ('activity' in event && this.hasActivity(event.activity.id)) { + // Backfill current_feed if activity is posted to multiple feeds + if ( + 'activity' in event && + event.activity.feeds.length > 1 && + this.hasActivity(event.activity.id) + ) { const currentActivity = this.currentState.activities?.find( (a) => a.id === event.activity.id, ); - // Backfill current_feed if activity is posted to multiple feeds - if ( - event.activity.feeds.length > 1 && - !event.activity.current_feed && - currentActivity?.current_feed - ) { + if (!event.activity.current_feed && currentActivity?.current_feed) { event.activity.current_feed = currentActivity.current_feed; } - - // Backfill own_ fields if activity is posted to a single feed - if ( - event.activity.feeds.length === 1 && - event.activity.current_feed && - currentActivity?.current_feed - ) { - ownFeedFields.forEach((field) => { - if (field in currentActivity.current_feed!) { - // @ts-expect-error TODO: fix this - event.activity.current_feed![field] = - currentActivity.current_feed![field]; - } - }); - } } // @ts-expect-error intersection of handler arguments results to never eventHandler?.(event); @@ -994,10 +972,7 @@ export class Feed extends FeedApi { ) { const enrichmentOptions = this.currentState.last_get_or_create_request_config?.enrichment_options; - if ( - !enrichmentOptions?.skip_activity_current_feed && - !enrichmentOptions?.skip_all - ) { + if (this.shouldAddToActiveFeeds(enrichmentOptions)) { const feedsToGetOrCreate = new Map(); activities.forEach((activity) => { if ( @@ -1010,14 +985,51 @@ export class Feed extends FeedApi { ); } }); - Array.from(feedsToGetOrCreate.values()).forEach((feed) => { + const newFeeds = Array.from(feedsToGetOrCreate.values()); + const fieldsToUpdate: Array< + 'own_capabilities' | 'own_follows' | 'own_followings' | 'own_membership' + > = []; + if (!options.fromWebSocket) { + fieldsToUpdate.push( + 'own_capabilities', + 'own_follows', + 'own_membership', + ); + if (enrichmentOptions?.enrich_own_followings) { + fieldsToUpdate.push('own_followings'); + } + } + newFeeds.forEach((feed) => { getOrCreateActiveFeed.bind(this.client)({ group: feed.group_id, id: feed.id, data: feed, - fromWebSocket: options.fromWebSocket, + fieldsToUpdate, }); }); + if (options.fromWebSocket) { + const uninitializedFeeds = newFeeds.filter((feedResponse) => { + const feed = this.client.feed(feedResponse.group_id, feedResponse.id); + // own_capabilities can only be undefined if we haven't fetched it yet + return feed.currentState.own_capabilities === undefined; + }); + if (uninitializedFeeds.length > 0) { + queueBatchedOwnFields.bind(this.client)({ + feeds: uninitializedFeeds.map((feed) => feed.feed), + }); + } + } } } + + private shouldAddToActiveFeeds(enrichmentOptions?: EnrichmentOptions) { + if (!enrichmentOptions) { + return true; + } + return ( + !enrichmentOptions?.skip_activity && + !enrichmentOptions?.skip_activity_current_feed && + !enrichmentOptions?.skip_all + ); + } } diff --git a/packages/feeds-client/src/feeds-client/feeds-client.test.ts b/packages/feeds-client/src/feeds-client/feeds-client.test.ts index ef4ac792..65a8c8a3 100644 --- a/packages/feeds-client/src/feeds-client/feeds-client.test.ts +++ b/packages/feeds-client/src/feeds-client/feeds-client.test.ts @@ -38,7 +38,12 @@ describe('Feeds client tests', () => { describe('client.feed', () => { it('should initialize feed state with the given data if feed does not exist', async () => { const data = generateFeedResponse({ feed: 'timeline:feed' }); - client['getOrCreateActiveFeed']({ group: 'timeline', id: 'feed', data }); + client['getOrCreateActiveFeed']({ + group: 'timeline', + id: 'feed', + data, + fieldsToUpdate: [], + }); expect( client['activeFeeds']['timeline:feed']?.currentState, @@ -50,7 +55,12 @@ describe('Feeds client tests', () => { feed: 'timeline:feed', follower_count: 5, }); - client['getOrCreateActiveFeed']({ group: 'timeline', id: 'feed', data }); + client['getOrCreateActiveFeed']({ + group: 'timeline', + id: 'feed', + data, + fieldsToUpdate: [], + }); expect( client['activeFeeds']['timeline:feed']?.currentState, @@ -63,6 +73,7 @@ describe('Feeds client tests', () => { group: 'timeline', id: 'feed', data: newData, + fieldsToUpdate: [], }); expect( @@ -78,7 +89,12 @@ describe('Feeds client tests', () => { feed: 'timeline:feed', follower_count: 5, }); - client['getOrCreateActiveFeed']({ group: 'timeline', id: 'feed', data }); + client['getOrCreateActiveFeed']({ + group: 'timeline', + id: 'feed', + data, + fieldsToUpdate: [], + }); expect( client['activeFeeds']['timeline:feed']?.currentState, @@ -91,6 +107,7 @@ describe('Feeds client tests', () => { group: 'timeline', id: 'feed', data: oldData, + fieldsToUpdate: [], }); expect( @@ -101,33 +118,51 @@ describe('Feeds client tests', () => { ).toBe(5); }); - it(`should not update own_ fields if data is from WebSocket`, async () => { + it(`should not update own_ fields if fieldsToUpdate is empty array`, async () => { const ownFollows = [ generateFollowResponse({ source_feed: generateFeedResponse({ feed: 'timeline:feed' }), target_feed: generateFeedResponse({ feed: 'user:123' }), }), ]; + const ownFollowings = [ + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'user:123' }), + target_feed: generateFeedResponse({ feed: 'user:456' }), + }), + ]; const ownMembership = generateFeedMemberResponse(); const ownCapabilities = [FeedOwnCapability.ADD_ACTIVITY]; const data = generateFeedResponse({ feed: 'user:123', follower_count: 5, own_follows: ownFollows, + own_followings: ownFollowings, own_membership: ownMembership, own_capabilities: ownCapabilities, }); - client['getOrCreateActiveFeed']({ group: 'user', id: '123', data }); + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], + }); const dataFromWebSocket = { ...data }; delete dataFromWebSocket.own_follows; + delete dataFromWebSocket.own_followings; delete dataFromWebSocket.own_membership; delete dataFromWebSocket.own_capabilities; client['getOrCreateActiveFeed']({ group: 'user', id: '123', data: dataFromWebSocket, - fromWebSocket: true, + fieldsToUpdate: [], }); expect(client['activeFeeds']['user:123']?.currentState).toMatchObject( @@ -136,11 +171,13 @@ describe('Feeds client tests', () => { expect( client['activeFeeds']['user:123']?.currentState.own_follows, ).toEqual(ownFollows); + expect( + client['activeFeeds']['user:123']?.currentState.own_followings, + ).toEqual(ownFollowings); expect( client['activeFeeds']['user:123']?.currentState.own_membership, ).toEqual(ownMembership); expect( - // @ts-expect-error - own_capabilities is currently excluded from type client['activeFeeds']['user:123']?.currentState.own_capabilities, ).toEqual(ownCapabilities); }); @@ -153,16 +190,33 @@ describe('Feeds client tests', () => { target_feed: generateFeedResponse({ feed: 'user:123' }), }), ]; + const ownFollowings = [ + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'user:123' }), + target_feed: generateFeedResponse({ feed: 'user:456' }), + }), + ]; const ownMembership = generateFeedMemberResponse(); const ownCapabilities = [FeedOwnCapability.ADD_ACTIVITY]; const data = generateFeedResponse({ feed: 'user:123', follower_count: 5, own_follows: ownFollows, + own_followings: ownFollowings, own_membership: ownMembership, own_capabilities: ownCapabilities, }); - client['getOrCreateActiveFeed']({ group: 'user', id: '123', data }); + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], + }); const spy = vi.fn(); const feed = client['activeFeeds']['user:123']; @@ -175,7 +229,12 @@ describe('Feeds client tests', () => { group: 'user', id: '123', data: newData, - fromWebSocket: false, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], }); expect(spy).toHaveBeenCalledTimes(0); @@ -193,6 +252,12 @@ describe('Feeds client tests', () => { group: 'user', id: '123', data: newData, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], }); expect(spy).toHaveBeenCalledTimes(1); @@ -206,6 +271,12 @@ describe('Feeds client tests', () => { group: 'user', id: '123', data: newData, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], }); expect(spy).toHaveBeenCalledTimes(0); @@ -217,11 +288,350 @@ describe('Feeds client tests', () => { group: 'user', id: '123', data: newData, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], }); expect(spy).toHaveBeenCalledTimes(1); expect(spy.mock.lastCall?.[0]).toMatchObject({ own_membership: newData.own_membership, }); + + spy.mockClear(); + newData.own_capabilities = [...ownCapabilities]; + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data: newData, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], + }); + + expect(spy).toHaveBeenCalledTimes(0); + + newData.own_capabilities = [ + FeedOwnCapability.ADD_ACTIVITY, + FeedOwnCapability.DELETE_OWN_ACTIVITY, + ]; + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data: newData, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], + }); + + expect(spy).toHaveBeenCalledTimes(1); + expect(spy.mock.lastCall?.[0]).toMatchObject({ + own_capabilities: newData.own_capabilities, + }); + + spy.mockClear(); + newData.own_followings = [...ownFollowings]; + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data: newData, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], + }); + + expect(spy).toHaveBeenCalledTimes(0); + + const newOwnFollowings = [ + ...ownFollowings, + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'user:123' }), + target_feed: generateFeedResponse({ feed: 'user:789' }), + }), + ]; + newData.own_followings = newOwnFollowings; + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data: newData, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], + }); + + expect(spy).toHaveBeenCalledTimes(1); + expect(spy.mock.lastCall?.[0]).toMatchObject({ + own_followings: newOwnFollowings, + }); + }); + + it(`should throttle calls to ownBatch endpoint`, async () => { + vi.useFakeTimers(); + vi.spyOn(client, 'ownBatch').mockResolvedValue({ data: {} } as any); + const throttleTime = 100; + client['setGetBatchOwnFieldsThrottlingInterval'](throttleTime); + + client['throttledGetBatchOwnFields']( + [`feed:1`, `feed:2`, `feed:3`], + () => {}, + ); + expect(client['ownBatch']).toHaveBeenCalledTimes(1); + + client['throttledGetBatchOwnFields']( + [`feed:4`, `feed:5`, `feed:6`], + () => {}, + ); + expect(client['ownBatch']).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(throttleTime / 2); + expect(client['ownBatch']).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(throttleTime / 2); + expect(client['ownBatch']).toHaveBeenCalledTimes(2); + + vi.useRealTimers(); + }); + + describe('fieldsToUpdate logic', () => { + it('should only update fields in fieldsToUpdate array', async () => { + const ownFollows = [ + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'timeline:feed' }), + target_feed: generateFeedResponse({ feed: 'user:123' }), + }), + ]; + const ownCapabilities = [FeedOwnCapability.ADD_ACTIVITY]; + const data = generateFeedResponse({ + feed: 'user:123', + own_follows: ownFollows, + own_capabilities: ownCapabilities, + }); + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data, + fieldsToUpdate: ['own_capabilities', 'own_follows'], + }); + + const spy = vi.fn(); + const feed = client['activeFeeds']['user:123']; + feed.state.subscribe(spy); + spy.mockClear(); + + const newData = { ...data }; + newData.own_capabilities = [ + FeedOwnCapability.ADD_ACTIVITY, + FeedOwnCapability.DELETE_OWN_ACTIVITY, + ]; + newData.own_follows = [ + ...ownFollows, + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'timeline:feed' }), + target_feed: generateFeedResponse({ feed: 'user:456' }), + }), + ]; + + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data: newData, + fieldsToUpdate: ['own_capabilities', 'own_follows'], + }); + + expect(spy).toHaveBeenCalledTimes(1); + const updatedState = spy.mock.lastCall?.[0]; + expect(updatedState).toHaveProperty('own_capabilities'); + expect(updatedState).toHaveProperty('own_follows'); + expect(updatedState).not.toHaveProperty('own_membership'); + expect(updatedState).not.toHaveProperty('own_followings'); + }); + + it('should not update own_followings when not in fieldsToUpdate array even if changed', async () => { + const ownFollowings = [ + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'user:123' }), + target_feed: generateFeedResponse({ feed: 'user:456' }), + }), + ]; + const data = generateFeedResponse({ + feed: 'user:123', + own_followings: ownFollowings, + }); + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data, + fieldsToUpdate: ['own_capabilities', 'own_follows', 'own_membership'], + }); + + const spy = vi.fn(); + const feed = client['activeFeeds']['user:123']; + feed.state.subscribe(spy); + spy.mockClear(); + + const newData = { ...data }; + newData.own_followings = [ + ...ownFollowings, + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'user:123' }), + target_feed: generateFeedResponse({ feed: 'user:789' }), + }), + ]; + + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data: newData, + fieldsToUpdate: ['own_capabilities', 'own_follows', 'own_membership'], + }); + + expect(spy).toHaveBeenCalledTimes(0); + expect(feed.currentState.own_followings).toEqual(ownFollowings); + }); + + it('should update own_followings when in fieldsToUpdate array and changed', async () => { + const ownFollowings = [ + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'user:123' }), + target_feed: generateFeedResponse({ feed: 'user:456' }), + }), + ]; + const data = generateFeedResponse({ + feed: 'user:123', + own_followings: ownFollowings, + }); + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data, + fieldsToUpdate: ['own_followings'], + }); + + const spy = vi.fn(); + const feed = client['activeFeeds']['user:123']; + feed.state.subscribe(spy); + spy.mockClear(); + + const newOwnFollowings = [ + ...ownFollowings, + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'user:123' }), + target_feed: generateFeedResponse({ feed: 'user:789' }), + }), + ]; + const newData = { ...data }; + newData.own_followings = newOwnFollowings; + + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data: newData, + fieldsToUpdate: ['own_followings'], + }); + + expect(spy).toHaveBeenCalledTimes(1); + expect(spy.mock.lastCall?.[0]).toMatchObject({ + own_followings: newOwnFollowings, + }); + }); + + it('should not update any fields when fieldsToUpdate is empty array', async () => { + const ownFollows = [ + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'timeline:feed' }), + target_feed: generateFeedResponse({ feed: 'user:123' }), + }), + ]; + const ownCapabilities = [FeedOwnCapability.ADD_ACTIVITY]; + const data = generateFeedResponse({ + feed: 'user:123', + own_follows: ownFollows, + own_capabilities: ownCapabilities, + }); + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data, + fieldsToUpdate: ['own_capabilities', 'own_follows'], + }); + + const spy = vi.fn(); + const feed = client['activeFeeds']['user:123']; + feed.state.subscribe(spy); + spy.mockClear(); + + const newData = { ...data }; + newData.own_capabilities = [ + FeedOwnCapability.ADD_ACTIVITY, + FeedOwnCapability.DELETE_OWN_ACTIVITY, + ]; + newData.own_follows = [ + ...ownFollows, + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'timeline:feed' }), + target_feed: generateFeedResponse({ feed: 'user:456' }), + }), + ]; + + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data: newData, + fieldsToUpdate: [], + }); + + expect(spy).toHaveBeenCalledTimes(0); + }); + + it('should only update fields if they actually changed (equality checks still apply)', async () => { + const ownFollows = [ + generateFollowResponse({ + source_feed: generateFeedResponse({ feed: 'timeline:feed' }), + target_feed: generateFeedResponse({ feed: 'user:123' }), + }), + ]; + const data = generateFeedResponse({ + feed: 'user:123', + own_follows: ownFollows, + }); + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data, + fieldsToUpdate: ['own_follows'], + }); + + const spy = vi.fn(); + const feed = client['activeFeeds']['user:123']; + feed.state.subscribe(spy); + spy.mockClear(); + + const newData = { ...data }; + newData.own_follows = [...ownFollows]; // Same content, different array reference + + client['getOrCreateActiveFeed']({ + group: 'user', + id: '123', + data: newData, + fieldsToUpdate: ['own_follows'], + }); + + expect(spy).toHaveBeenCalledTimes(0); + }); }); }); diff --git a/packages/feeds-client/src/feeds-client/feeds-client.ts b/packages/feeds-client/src/feeds-client/feeds-client.ts index ca160172..921db902 100644 --- a/packages/feeds-client/src/feeds-client/feeds-client.ts +++ b/packages/feeds-client/src/feeds-client/feeds-client.ts @@ -60,6 +60,7 @@ import { ModerationClient } from '../moderation-client'; import { StreamPoll } from '../common/Poll'; import { Feed, + type FeedState, handleActivityReactionAdded, handleActivityReactionDeleted, handleActivityReactionUpdated, @@ -86,15 +87,16 @@ import { feedsLoggerSystem } from '../utils'; import { handleCommentReactionUpdated } from '../feed/event-handlers/comment/handle-comment-reaction-updated'; import { throttle, - DEFAULT_BATCH_OWN_CAPABILITIES_THROTTLING_INTERVAL, - type GetBatchedOwnCapabilitiesThrottledCallback, - queueBatchedOwnCapabilities, - type ThrottledGetBatchedOwnCapabilities, clearQueuedFeeds, + type ThrottledGetBatchedOwnFields, + type GetBatchedOwnFieldsThrottledCallback, + DEFAULT_BATCH_OWN_FIELDS_THROTTLING_INTERVAL, } from '../utils/throttling'; import { ActivityWithStateUpdates } from '../activity-with-state-updates/activity-with-state-updates'; import { getFeed } from '../activity-with-state-updates/get-feed'; import { + isOwnCapabilitiesEqual, + isOwnFollowingsEqual, isOwnFollowsEqual, isOwnMembershipEqual, } from '../utils/check-own-fields-equality'; @@ -102,7 +104,6 @@ import { export type FeedsClientState = { connected_user: ConnectedUser | undefined; is_ws_connection_healthy: boolean; - own_capabilities_by_fid: Record; }; type FID = string; @@ -128,9 +129,9 @@ export class FeedsClient extends FeedsApi { private healthyConnectionChangedEventCount = 0; - protected throttledGetBatchOwnCapabilities!: ThrottledGetBatchedOwnCapabilities; - private cancelGetBatchOwnCapabilitiesTimer!: () => void; - private query_batch_own_capabilties_throttling_interval!: number; + protected throttledGetBatchOwnFields!: ThrottledGetBatchedOwnFields; + private cancelGetBatchOwnFieldsTimer!: () => void; + private query_batch_own_fields_throttling_interval!: number; constructor(apiKey: string, options?: FeedsClientOptions) { const tokenManager = new TokenManager(); @@ -145,16 +146,15 @@ export class FeedsClient extends FeedsApi { this.state = new StateStore({ connected_user: undefined, is_ws_connection_healthy: false, - own_capabilities_by_fid: {}, }); this.moderation = new ModerationClient(apiClient); this.tokenManager = tokenManager; this.connectionIdManager = connectionIdManager; this.polls_by_id = new Map(); - this.query_batch_own_capabilties_throttling_interval = - options?.query_batch_own_capabilties_throttling_interval ?? - DEFAULT_BATCH_OWN_CAPABILITIES_THROTTLING_INTERVAL; + this.query_batch_own_fields_throttling_interval = + options?.query_batch_own_fields_throttling_interval ?? + DEFAULT_BATCH_OWN_FIELDS_THROTTLING_INTERVAL; feedsLoggerSystem.configureLoggers(options?.configure_loggers_options); @@ -184,6 +184,7 @@ export class FeedsClient extends FeedsApi { group: event.feed.group_id, id: event.feed.id, data: event.feed, + fieldsToUpdate: [], }); break; @@ -284,30 +285,27 @@ export class FeedsClient extends FeedsApi { }); } - private setGetBatchOwnCapabilitiesThrottlingInterval = ( - throttlingMs: number, - ) => { - const { - throttledFn: throttledGetBatchOwnCapabilities, - cancelTimer: cancel, - } = throttle( - (feeds, callback) => { - this.ownBatch({ - feeds, - }).catch((error) => { - this.eventDispatcher.dispatch({ - type: 'errors.unhandled', - error_type: UnhandledErrorType.FetchingOwnCapabilitiesOnNewActivity, - error, + private setGetBatchOwnFieldsThrottlingInterval = (throttlingMs: number) => { + const { throttledFn: throttledGetBatchOwnFields, cancelTimer: cancel } = + throttle( + (feeds, callback) => { + this.ownBatch({ + feeds, + }).catch((error) => { + this.eventDispatcher.dispatch({ + type: 'errors.unhandled', + error_type: + UnhandledErrorType.FetchingOwnCapabilitiesOnNewActivity, + error, + }); }); - }); - callback(feeds); - }, - throttlingMs, - { trailing: true }, - ); - this.throttledGetBatchOwnCapabilities = throttledGetBatchOwnCapabilities; - this.cancelGetBatchOwnCapabilitiesTimer = cancel; + callback(feeds); + }, + throttlingMs, + { trailing: true }, + ); + this.throttledGetBatchOwnFields = throttledGetBatchOwnFields; + this.cancelGetBatchOwnFieldsTimer = cancel; }; private recoverOnReconnect = async () => { @@ -369,34 +367,6 @@ export class FeedsClient extends FeedsApi { } } - public hydrateCapabilitiesCache( - feedResponses: Array>, - ) { - let ownCapabilitiesCache = - this.state.getLatestValue().own_capabilities_by_fid; - - const capabilitiesToFetchQueue: string[] = []; - - for (const feedResponse of feedResponses) { - const { feed, own_capabilities } = feedResponse; - - if (!Object.prototype.hasOwnProperty.call(ownCapabilitiesCache, feed)) { - if (own_capabilities) { - ownCapabilitiesCache = { - ...ownCapabilitiesCache, - [feed]: own_capabilities, - }; - } else { - capabilitiesToFetchQueue.push(feed); - } - } - } - - queueBatchedOwnCapabilities.bind(this)({ feeds: capabilitiesToFetchQueue }); - - this.state.partialNext({ own_capabilities_by_fid: ownCapabilitiesCache }); - } - connectUser = async (user: UserRequest, tokenProvider?: TokenOrProvider) => { if ( this.state.getLatestValue().connected_user !== undefined || @@ -407,8 +377,8 @@ export class FeedsClient extends FeedsApi { this.tokenManager.setTokenOrProvider(tokenProvider); - this.setGetBatchOwnCapabilitiesThrottlingInterval( - this.query_batch_own_capabilties_throttling_interval, + this.setGetBatchOwnFieldsThrottlingInterval( + this.query_batch_own_fields_throttling_interval, ); try { @@ -649,10 +619,9 @@ export class FeedsClient extends FeedsApi { this.state.partialNext({ connected_user: undefined, is_ws_connection_healthy: false, - own_capabilities_by_fid: {}, }); - this.cancelGetBatchOwnCapabilitiesTimer(); + this.cancelGetBatchOwnFieldsTimer(); clearQueuedFeeds(); }; @@ -680,6 +649,7 @@ export class FeedsClient extends FeedsApi { group: groupId, id, options, + fieldsToUpdate: [], }); }; @@ -711,11 +681,15 @@ export class FeedsClient extends FeedsApi { id: feedResponse.id, data: feedResponse, watch: request?.watch, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], }), ); - this.hydrateCapabilitiesCache(feedResponses); - return { feeds, next: response.next, @@ -727,13 +701,12 @@ export class FeedsClient extends FeedsApi { async ownBatch(request: OwnBatchRequest) { const response = await super.ownBatch(request); - const feedResponses = Object.entries(response.data).map( - ([feed, ownFields]) => ({ - feed, - own_capabilities: ownFields.own_capabilities, - }), - ); - this.hydrateCapabilitiesCache(feedResponses); + Object.entries(response.data).forEach(([fid, ownFields]) => { + const feed = this.activeFeeds[fid]; + if (feed) { + feed.state.partialNext(ownFields); + } + }); return response; } @@ -826,7 +799,6 @@ export class FeedsClient extends FeedsApi { }, ) { const response = await super.getOrCreateFeed(request); - this.hydrateCapabilitiesCache([response.feed]); if (request.watch) { const feeds = this.findAllActiveFeedsByFid( @@ -848,6 +820,12 @@ export class FeedsClient extends FeedsApi { group: suggestion.group_id, id: suggestion.id, data: suggestion, + fieldsToUpdate: [ + 'own_capabilities', + 'own_follows', + 'own_membership', + 'own_followings', + ], }); }); @@ -861,7 +839,7 @@ export class FeedsClient extends FeedsApi { data, watch, options, - fromWebSocket = false, + fieldsToUpdate, }: { group: string; id: string; @@ -871,7 +849,9 @@ export class FeedsClient extends FeedsApi { addNewActivitiesTo?: 'start' | 'end'; activityAddedEventFilter?: (event: ActivityAddedEvent) => boolean; }; - fromWebSocket?: boolean; + fieldsToUpdate: Array< + 'own_capabilities' | 'own_follows' | 'own_followings' | 'own_membership' + >; }) => { const fid = `${group}:${id}`; let isCreated = false; @@ -909,18 +889,35 @@ export class FeedsClient extends FeedsApi { handleFeedUpdated.call(feed, { feed: data }); } else if ( (feed.currentState.updated_at?.getTime() ?? 0) === - data.updated_at.getTime() && - !fromWebSocket + data.updated_at.getTime() ) { - const fieldsToUpdate: Array = []; - if (!isOwnFollowsEqual(feed.currentState, data)) { - fieldsToUpdate.push('own_follows'); - } - if (!isOwnMembershipEqual(feed.currentState, data)) { - fieldsToUpdate.push('own_membership'); - } - if (fieldsToUpdate.length > 0) { - const fieldsToUpdateData = fieldsToUpdate.reduce( + const fieldsToUpdateData: Array = []; + const fieldChecks: Array< + [ + ( + | 'own_capabilities' + | 'own_follows' + | 'own_membership' + | 'own_followings' + ), + (currentState: FeedState, newState: FeedResponse) => boolean, + ] + > = [ + ['own_capabilities', isOwnCapabilitiesEqual], + ['own_follows', isOwnFollowsEqual], + ['own_membership', isOwnMembershipEqual], + ['own_followings', isOwnFollowingsEqual], + ]; + fieldChecks.forEach(([field, isEqual]) => { + if ( + fieldsToUpdate.includes(field) && + !isEqual(feed.currentState, data) + ) { + fieldsToUpdateData.push(field); + } + }); + if (fieldsToUpdateData.length > 0) { + const fieldsToUpdatePayload = fieldsToUpdateData.reduce( (acc: Partial, field) => { // @ts-expect-error TODO: fix this acc[field] = data[field]; @@ -928,7 +925,7 @@ export class FeedsClient extends FeedsApi { }, {}, ); - feed.state.partialNext(fieldsToUpdateData); + feed.state.partialNext(fieldsToUpdatePayload); } } } diff --git a/packages/feeds-client/src/utils/check-own-fields-equality.ts b/packages/feeds-client/src/utils/check-own-fields-equality.ts index ef30245a..6a2bf336 100644 --- a/packages/feeds-client/src/utils/check-own-fields-equality.ts +++ b/packages/feeds-client/src/utils/check-own-fields-equality.ts @@ -1,25 +1,25 @@ import type { FeedState } from '../feed'; -import type { FeedResponse } from '../gen/models'; +import type { FeedResponse, FollowResponse } from '../gen/models'; -export const isOwnFollowsEqual = ( - currentState: FeedState, - newState: FeedResponse, -) => { +const areFollowArraysEqual = ( + currentFollows: FollowResponse[] | undefined, + newFollows: FollowResponse[] | undefined, +): boolean => { const existingFollows = new Set( - currentState.own_follows?.map( + currentFollows?.map( (f) => `${f.source_feed.feed}:${f.target_feed.feed}:${f.updated_at.getTime()}`, ), ); - const newFollows = new Set( - newState.own_follows?.map( + const newFollowsSet = new Set( + newFollows?.map( (f) => `${f.source_feed.feed}:${f.target_feed.feed}:${f.updated_at.getTime()}`, ), ); - if (existingFollows.size === newFollows.size) { + if (existingFollows.size === newFollowsSet.size) { const areEqual = Array.from(existingFollows).every((f) => - newFollows.has(f), + newFollowsSet.has(f), ); if (areEqual) { return true; @@ -29,6 +29,13 @@ export const isOwnFollowsEqual = ( return false; }; +export const isOwnFollowsEqual = ( + currentState: FeedState, + newState: FeedResponse, +) => { + return areFollowArraysEqual(currentState.own_follows, newState.own_follows); +}; + export const isOwnMembershipEqual = ( currentState: FeedState, newState: FeedResponse, @@ -38,3 +45,23 @@ export const isOwnMembershipEqual = ( (newState.own_membership?.updated_at.getTime() ?? 0) ); }; + +export const isOwnCapabilitiesEqual = ( + currentState: FeedState, + newState: FeedResponse, +) => { + return ( + [...(currentState.own_capabilities ?? [])].sort().join(',') === + [...(newState.own_capabilities ?? [])].sort().join(',') + ); +}; + +export const isOwnFollowingsEqual = ( + currentState: FeedState, + newState: FeedResponse, +) => { + return areFollowArraysEqual( + currentState.own_followings, + newState.own_followings, + ); +}; diff --git a/packages/feeds-client/src/utils/throttling/index.ts b/packages/feeds-client/src/utils/throttling/index.ts index cfebc1f2..dedeb76c 100644 --- a/packages/feeds-client/src/utils/throttling/index.ts +++ b/packages/feeds-client/src/utils/throttling/index.ts @@ -1,2 +1,2 @@ export * from './throttle'; -export * from './throttled-get-batched-own-capabilities' +export * from './throttled-get-batched-own-fields'; diff --git a/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-capabilities.test.ts b/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-capabilities.test.ts deleted file mode 100644 index bfbd4aec..00000000 --- a/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-capabilities.test.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from 'vitest'; -import { - queueBatchedOwnCapabilities, - clearQueuedFeeds, -} from './throttled-get-batched-own-capabilities'; - -describe('queueBatchedOwnCapabilities', () => { - beforeEach(() => { - clearQueuedFeeds(); - }); - - it('calls throttledGetBatchOwnCapabilities with queued feeds and clears them via callback', () => { - const throttledGetBatchOwnCapabilities = vi.fn(); - const client: any = { throttledGetBatchOwnCapabilities }; - - // First enqueue two feeds → should call with both - queueBatchedOwnCapabilities.call(client, { feeds: ['feed:1', 'feed:2'] }); - expect(throttledGetBatchOwnCapabilities).toHaveBeenCalledTimes(1); - - const firstArgs = throttledGetBatchOwnCapabilities.mock.calls[0]; - const firstQueued = firstArgs[0] as string[]; - const firstCallback = firstArgs[1] as (feedsToClear: string[]) => void; - expect(new Set(firstQueued)).toEqual(new Set(['feed:1', 'feed:2'])); - - // Simulate downstream completion that clears only `feed:1` - firstCallback(['feed:1']); - - // Calling again without adding new feeds should pass only the remaining queued feed - queueBatchedOwnCapabilities.call(client, { feeds: [] }); - expect(throttledGetBatchOwnCapabilities).toHaveBeenCalledTimes(2); - - const secondArgs = throttledGetBatchOwnCapabilities.mock.calls[1]; - const secondQueued = secondArgs[0] as string[]; - const secondCallback = secondArgs[1] as (feedsToClear: string[]) => void; - expect(secondQueued).toEqual(['feed:2']); - - // Now clear the last remaining feed - secondCallback(['feed:2']); - - // With nothing queued anymore, a subsequent call should not trigger a throttled call - queueBatchedOwnCapabilities.call(client, { feeds: [] }); - expect(throttledGetBatchOwnCapabilities).toHaveBeenCalledTimes(2); - }); - - it('should provide no more feeds than the API limit (100)', () => { - const throttledGetBatchOwnCapabilities = vi.fn(); - const client: any = { throttledGetBatchOwnCapabilities }; - - queueBatchedOwnCapabilities.call(client, { - feeds: Array.from({ length: 101 }, (_, i) => `feed:${i + 1}`), - }); - expect(throttledGetBatchOwnCapabilities).toHaveBeenCalledTimes(1); - const args = throttledGetBatchOwnCapabilities.mock.calls[0]; - - expect(args[0]).toHaveLength(100); - - const clearCallback = args[1] as (feedsToClear: string[]) => void; - clearCallback(args[0]); - - queueBatchedOwnCapabilities.call(client, { - feeds: [`feed:102`], - }); - - expect(throttledGetBatchOwnCapabilities).toHaveBeenCalledTimes(2); - const args2 = throttledGetBatchOwnCapabilities.mock.calls[1]; - expect(args2[0]).toHaveLength(2); - }); -}); diff --git a/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-fields.test.ts b/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-fields.test.ts new file mode 100644 index 00000000..592aa1cb --- /dev/null +++ b/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-fields.test.ts @@ -0,0 +1,76 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { + queueBatchedOwnFields, + clearQueuedFeeds, +} from './throttled-get-batched-own-fields'; + +describe('queueBatchedOwnFields', () => { + beforeEach(() => { + clearQueuedFeeds(); + }); + + it('calls throttledGetBatchOwnFields with queued feeds and clears them via callback', () => { + const throttledGetBatchOwnFields = vi.fn(); + const client: any = { throttledGetBatchOwnFields }; + + // First enqueue two feeds → should call with both + queueBatchedOwnFields.call(client, { feeds: ['feed:1', 'feed:2'] }); + expect(throttledGetBatchOwnFields).toHaveBeenCalledTimes(1); + + const firstArgs = throttledGetBatchOwnFields.mock.calls[0]; + const firstQueued = firstArgs[0] as string[]; + const firstCallback = firstArgs[1] as (feedsToClear: string[]) => void; + expect(new Set(firstQueued)).toEqual(new Set(['feed:1', 'feed:2'])); + + // Simulate downstream completion that clears only `feed:1` + firstCallback(['feed:1']); + + // Calling again without adding new feeds should pass only the remaining queued feed + queueBatchedOwnFields.call(client, { feeds: [] }); + expect(throttledGetBatchOwnFields).toHaveBeenCalledTimes(2); + + const secondArgs = throttledGetBatchOwnFields.mock.calls[1]; + const secondQueued = secondArgs[0] as string[]; + const secondCallback = secondArgs[1] as (feedsToClear: string[]) => void; + expect(secondQueued).toEqual(['feed:2']); + + // Now clear the last remaining feed + secondCallback(['feed:2']); + + // With nothing queued anymore, a subsequent call should not trigger a throttled call + queueBatchedOwnFields.call(client, { feeds: [] }); + expect(throttledGetBatchOwnFields).toHaveBeenCalledTimes(2); + }); + + it('should provide no more feeds than the API limit (100)', () => { + const throttledGetBatchOwnFields = vi.fn(); + const client: any = { throttledGetBatchOwnFields }; + + queueBatchedOwnFields.call(client, { + feeds: Array.from({ length: 101 }, (_, i) => `feed:${i + 1}`), + }); + expect(throttledGetBatchOwnFields).toHaveBeenCalledTimes(1); + const args = throttledGetBatchOwnFields.mock.calls[0]; + + expect(args[0]).toHaveLength(100); + + const clearCallback = args[1] as (feedsToClear: string[]) => void; + clearCallback(args[0]); + + queueBatchedOwnFields.call(client, { + feeds: [`feed:102`], + }); + + expect(throttledGetBatchOwnFields).toHaveBeenCalledTimes(2); + const args2 = throttledGetBatchOwnFields.mock.calls[1]; + expect(args2[0]).toHaveLength(2); + }); + + it('should not call throttledGetBatchOwnFields if there are no feeds to queue', () => { + const throttledGetBatchOwnFields = vi.fn(); + const client: any = { throttledGetBatchOwnFields }; + + queueBatchedOwnFields.call(client, { feeds: [] }); + expect(throttledGetBatchOwnFields).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-capabilities.ts b/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-fields.ts similarity index 52% rename from packages/feeds-client/src/utils/throttling/throttled-get-batched-own-capabilities.ts rename to packages/feeds-client/src/utils/throttling/throttled-get-batched-own-fields.ts index 0fde2184..aceb2fcf 100644 --- a/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-capabilities.ts +++ b/packages/feeds-client/src/utils/throttling/throttled-get-batched-own-fields.ts @@ -1,35 +1,35 @@ import type { FeedsClient } from '../../feeds-client'; import type { ThrottledFunction } from './throttle'; -const BATCH_OWN_CAPABILITIES_API_LIMIT = 100; +const BATCH_OWN_FIELDS_API_LIMIT = 100; -export type GetBatchedOwnCapabilities = { +export type GetBatchedOwnFields = { feeds: string[]; }; -export type GetBatchedOwnCapabilitiesThrottledCallback = [ +export type GetBatchedOwnFieldsThrottledCallback = [ feeds: string[], callback: (feedsToClear: string[]) => void | Promise, ]; -export type ThrottledGetBatchedOwnCapabilities = - ThrottledFunction; +export type ThrottledGetBatchedOwnFields = + ThrottledFunction; -export const DEFAULT_BATCH_OWN_CAPABILITIES_THROTTLING_INTERVAL = 2000; +export const DEFAULT_BATCH_OWN_FIELDS_THROTTLING_INTERVAL = 2000; const queuedFeeds: Set = new Set(); -export function queueBatchedOwnCapabilities( +export function queueBatchedOwnFields( this: FeedsClient, - { feeds }: GetBatchedOwnCapabilities, + { feeds }: GetBatchedOwnFields, ) { for (const feed of feeds) { queuedFeeds.add(feed); } if (queuedFeeds.size > 0) { - this.throttledGetBatchOwnCapabilities( - [...queuedFeeds].slice(0, BATCH_OWN_CAPABILITIES_API_LIMIT), + this.throttledGetBatchOwnFields( + [...queuedFeeds].slice(0, BATCH_OWN_FIELDS_API_LIMIT), (feedsToClear: string[]) => { for (const feed of feedsToClear) { queuedFeeds.delete(feed);