Skip to content

Commit 8a4be97

Browse files
authored
Merge pull request #1973 from streamr-dev/stream-latency
Add stream latency calculation
2 parents 16c8e72 + 0f8a31b commit 8a4be97

File tree

9 files changed

+194
-14
lines changed

9 files changed

+194
-14
lines changed

src/components/Stats.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ export function StreamStats({ streamId }: StreamStatsProps) {
104104
<Stat
105105
id="latency"
106106
label="Latency ms"
107-
value={latency == null ? undefined : latency.toFixed(2)}
107+
value={latency == null ? undefined : latency.toFixed(0)}
108108
/>
109109
</ButtonGrid>
110110
</StreamStatsRoot>

src/generated/gql/indexer.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export type Neighbor = {
3535
__typename?: 'Neighbor';
3636
nodeId1: Scalars['String']['output'];
3737
nodeId2: Scalars['String']['output'];
38+
rtt?: Maybe<Scalars['Int']['output']>;
3839
streamPartId: Scalars['String']['output'];
3940
};
4041

@@ -157,6 +158,17 @@ export type GetGlobalStreamsStatsQueryVariables = Exact<{ [key: string]: never;
157158

158159
export type GetGlobalStreamsStatsQuery = { __typename?: 'Query', summary: { __typename?: 'Summary', bytesPerSecond: number, messagesPerSecond: number, streamCount: number } };
159160

161+
export type GetNeighborsQueryVariables = Exact<{
162+
cursor?: InputMaybe<Scalars['String']['input']>;
163+
pageSize?: InputMaybe<Scalars['Int']['input']>;
164+
streamPart?: InputMaybe<Scalars['String']['input']>;
165+
node?: InputMaybe<Scalars['String']['input']>;
166+
streamId?: InputMaybe<Scalars['String']['input']>;
167+
}>;
168+
169+
170+
export type GetNeighborsQuery = { __typename?: 'Query', neighbors: { __typename?: 'Neighbors', cursor?: string | null, items: Array<{ __typename?: 'Neighbor', streamPartId: string, nodeId1: string, nodeId2: string, rtt?: number | null }> } };
171+
160172

161173
export const GetStreamsDocument = gql`
162174
query getStreams($streamIds: [String!], $first: Int, $orderBy: StreamOrderBy, $orderDirection: OrderDirection, $search: String, $owner: String, $cursor: String) {
@@ -192,4 +204,24 @@ export const GetGlobalStreamsStatsDocument = gql`
192204
}
193205
}
194206
`;
195-
export type GetGlobalStreamsStatsQueryResult = Apollo.QueryResult<GetGlobalStreamsStatsQuery, GetGlobalStreamsStatsQueryVariables>;
207+
export type GetGlobalStreamsStatsQueryResult = Apollo.QueryResult<GetGlobalStreamsStatsQuery, GetGlobalStreamsStatsQueryVariables>;
208+
export const GetNeighborsDocument = gql`
209+
query getNeighbors($cursor: String, $pageSize: Int, $streamPart: String, $node: String, $streamId: String) {
210+
neighbors(
211+
cursor: $cursor
212+
pageSize: $pageSize
213+
streamPart: $streamPart
214+
node: $node
215+
stream: $streamId
216+
) {
217+
items {
218+
streamPartId
219+
nodeId1
220+
nodeId2
221+
rtt
222+
}
223+
cursor
224+
}
225+
}
226+
`;
227+
export type GetNeighborsQueryResult = Apollo.QueryResult<GetNeighborsQuery, GetNeighborsQueryVariables>;

src/generated/gql/network.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4307,7 +4307,7 @@ export type StreamPermission = {
43074307
/** subscribeExpires timestamp tells until what time this address may subscribe to the stream */
43084308
subscribeExpiration?: Maybe<Scalars['BigInt']['output']>;
43094309
/** [DEPRECATED] Ethereum address, owner of this permission; only if permission granting didn't use *forUserId functions */
4310-
userAddress?: Maybe<Scalars['Bytes']['output']>;
4310+
userAddress: Scalars['Bytes']['output'];
43114311
/** Ethereum address or other ID, owner of this permission */
43124312
userId: Scalars['Bytes']['output'];
43134313
};

src/getters/getNeighbors.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { getIndexerClient } from '~/getters/getGraphClient'
2+
import {
3+
GetNeighborsDocument,
4+
GetNeighborsQuery,
5+
GetNeighborsQueryVariables,
6+
} from '../generated/gql/indexer'
7+
8+
interface GetNeighborsParams {
9+
node?: string
10+
streamId?: string
11+
streamPartitionId?: string
12+
chainId: number
13+
}
14+
15+
interface Neighbour {
16+
nodeId0: string
17+
nodeId1: string
18+
streamPartitionId: string
19+
rtt?: number
20+
}
21+
22+
export async function getNeighbors(params: GetNeighborsParams): Promise<Neighbour[]> {
23+
const pageSize = 1000
24+
25+
const { node, streamId, streamPartitionId, chainId } = params
26+
27+
const items: Neighbour[] = []
28+
29+
const uniquenessGate: Record<string, true> = {}
30+
31+
let cursor = '0'
32+
33+
for (;;) {
34+
const client = getIndexerClient(chainId)
35+
36+
if (!client) {
37+
console.error('Could not get indexer client for chainId', chainId)
38+
break
39+
}
40+
41+
const {
42+
data: { neighbors },
43+
} = await client.query<GetNeighborsQuery, GetNeighborsQueryVariables>({
44+
fetchPolicy: 'network-only',
45+
query: GetNeighborsDocument,
46+
variables: {
47+
cursor,
48+
node,
49+
pageSize,
50+
streamId,
51+
streamPart: streamPartitionId,
52+
},
53+
})
54+
55+
for (const {
56+
nodeId1: a,
57+
nodeId2: b,
58+
streamPartId: finalStreamPartitionId,
59+
rtt,
60+
} of neighbors.items) {
61+
const pair = [a, b].sort() as [string, string]
62+
63+
const key = pair.join('-')
64+
65+
if (uniquenessGate[key]) {
66+
continue
67+
}
68+
69+
uniquenessGate[key] = true
70+
71+
const [nodeId0, nodeId1] = pair
72+
73+
items.push({
74+
nodeId0,
75+
nodeId1,
76+
streamPartitionId: finalStreamPartitionId,
77+
rtt: rtt ?? undefined,
78+
})
79+
}
80+
81+
if (!neighbors.cursor || neighbors.cursor === cursor) {
82+
break
83+
}
84+
85+
cursor = neighbors.cursor
86+
}
87+
88+
return items
89+
}

src/getters/getStreamStats.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
GetStreamsQuery,
55
GetStreamsQueryVariables,
66
} from '../generated/gql/indexer'
7+
import { getNeighbors } from './getNeighbors'
78

89
export const defaultStreamStats = {
910
latency: undefined,
@@ -36,9 +37,31 @@ export const getStreamStats = async (streamId: string) => {
3637

3738
const { messagesPerSecond, peerCount } = stream
3839

40+
const latency = await calculateLatencyForStream(streamId, 137)
41+
3942
return {
40-
latency: undefined as undefined | number,
43+
latency,
4144
messagesPerSecond,
4245
peerCount,
4346
}
4447
}
48+
49+
export async function calculateLatencyForStream(streamId: string, chainId: number) {
50+
const neighbors = await getNeighbors({
51+
streamId,
52+
chainId,
53+
})
54+
55+
const validRTTs = neighbors
56+
.map((n) => n.rtt)
57+
.filter((rtt): rtt is number => typeof rtt === 'number' && rtt > 0)
58+
59+
// Calculate average one-way latency from neighbors with valid RTT.
60+
// Latency is the average RTT of neighbors in the stream, divided by 2.
61+
const latency =
62+
validRTTs.length > 0
63+
? validRTTs.reduce((sum, rtt) => sum + rtt, 0) / validRTTs.length / 2
64+
: undefined
65+
66+
return latency
67+
}

src/hooks/streams.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,10 @@ async function getStreamsFromGraph(
269269

270270
const streams = result.map((s) => {
271271
const { publisherCount, subscriberCount } = getStatsFromPermissions(
272-
s.permissions || [],
272+
s.permissions?.map((p) => ({
273+
...p,
274+
userAddress: p.userId, // We need to add this deprecated field for now
275+
})) || [],
273276
)
274277

275278
return {

src/hooks/useStreamStats.tsx

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ export function useMultipleStreamStatsQuery(streamIds: string[]) {
2525
)) as StreamStats[]
2626
return stats.reduce(
2727
(acc: StreamStats, curr: StreamStats) => ({
28-
// For latency, we can take the average of non-undefined values
29-
latency:
30-
acc.latency === undefined && curr.latency === undefined
31-
? undefined
32-
: ((acc.latency || 0) + (curr.latency || 0)) /
33-
(acc.latency !== undefined && curr.latency !== undefined
34-
? 2
35-
: 1),
28+
// Take the maximum latency among all streams
29+
latency: Math.max(
30+
acc.latency ?? -Infinity,
31+
curr.latency ?? -Infinity
32+
) === -Infinity ? undefined : Math.max(
33+
acc.latency ?? -Infinity,
34+
curr.latency ?? -Infinity
35+
),
3636
messagesPerSecond: acc.messagesPerSecond + curr.messagesPerSecond,
3737
peerCount: acc.peerCount + curr.peerCount,
3838
}),

src/queries/indexer.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,28 @@ gql`
3939
streamCount
4040
}
4141
}
42+
43+
query getNeighbors(
44+
$cursor: String
45+
$pageSize: Int
46+
$streamPart: String
47+
$node: String
48+
$streamId: String
49+
) {
50+
neighbors(
51+
cursor: $cursor
52+
pageSize: $pageSize
53+
streamPart: $streamPart
54+
node: $node
55+
stream: $streamId
56+
) {
57+
items {
58+
streamPartId
59+
nodeId1
60+
nodeId2
61+
rtt
62+
}
63+
cursor
64+
}
65+
}
4266
`

src/services/streams.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,16 @@ export const getPagedStreams = async (
170170
})
171171

172172
if (streams && streams.length > 0) {
173-
return prepareStreamResult(streams, first)
173+
return prepareStreamResult(
174+
streams.map((s) => ({
175+
...s,
176+
permissions: s.permissions?.map((p) => ({
177+
...p,
178+
userAddress: p.userId, // We need to add this deprecated field for now
179+
})),
180+
})),
181+
first,
182+
)
174183
}
175184

176185
return {

0 commit comments

Comments
 (0)