Skip to content

Commit d32beb8

Browse files
authored
feat: Pass tracing info with websocket (#9627)
Signed-off-by: Andrey Sobolev <[email protected]>
1 parent fd5354a commit d32beb8

File tree

24 files changed

+694
-440
lines changed

24 files changed

+694
-440
lines changed

.vscode/launch.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,8 @@
612612
"PLATFORM_OPERATION_LOGGING": "true",
613613
"FRONT_URL": "http://localhost:8080",
614614
"PORT": "3500",
615-
"STATS_URL": "http://huly.local:4900"
615+
"STATS_URL": "http://huly.local:4900",
616+
"OTEL_EXPORTER_OTLP_ENDPOINT": "http://huly.local:4318/v1/traces"
616617
},
617618
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
618619
"sourceMaps": true,
@@ -665,7 +666,7 @@
665666
"SECRET": "secret",
666667
"ACCOUNTS_URL": "http://localhost:3000",
667668
"QUEUE_CONFIG": "localhost:19092",
668-
"QUEUE_REGION": "cockroach",
669+
"QUEUE_REGION": "cockroach"
669670
},
670671
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
671672
"runtimeVersion": "20",

packages/measurements-otlp/src/telemetry.ts

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
import {
1919
context,
2020
metrics as otelMetrics,
21+
propagation,
2122
Span,
2223
SpanStatusCode,
2324
trace,
@@ -131,31 +132,37 @@ export class OpenTelemetryMetricsContext implements MeasureContext {
131132
fullParams?: FullParamsType
132133
logger?: MeasureLogger
133134
span?: WithOptions['span'] // By default true
135+
meta?: Record<string, string | number | boolean>
134136
}
135137
): MeasureContext {
136-
let span: Span | undefined
137-
let childContext: Context = context.active()
138-
if (opt?.span === true) {
139-
childContext =
140-
this.span !== undefined
141-
? trace.setSpan(this.context ?? context.active(), this.span)
142-
: this.context ?? context.active()
143-
span = this.tracer.startSpan(name, undefined, childContext)
138+
let _span: Span | undefined
139+
let childContext: Context | undefined
140+
if (opt?.span === true || opt?.span === 'inherit') {
141+
childContext = opt?.span === 'inherit' ? context.active() : this.context ?? context.active()
142+
143+
if (opt.meta !== undefined && Object.keys(opt.meta).length > 0) {
144+
// We need to set meta params
145+
childContext = propagation.extract(childContext ?? context.active(), opt.meta)
146+
}
147+
_span = this.tracer.startSpan(name, undefined, childContext)
144148

145149
const spanParams = [...Object.entries(params)]
146150
for (const [k, v] of spanParams) {
147-
span?.setAttribute(k, v as any)
151+
_span?.setAttribute(k, v as any)
148152
}
149153
}
150154
if (opt?.span === 'disable') {
151-
childContext = suppressTracing(childContext)
155+
childContext = suppressTracing(childContext ?? context.active())
156+
}
157+
if (childContext !== undefined && _span !== undefined) {
158+
childContext = trace.setSpan(childContext, _span)
152159
}
153160

154161
const result = new OpenTelemetryMetricsContext(
155162
name,
156163
this.tracer,
157164
childContext,
158-
span,
165+
_span,
159166
params,
160167
opt?.fullParams ?? {},
161168
childMetrics(this.metrics, [name]),
@@ -170,6 +177,14 @@ export class OpenTelemetryMetricsContext implements MeasureContext {
170177
return result
171178
}
172179

180+
extractMeta (): Record<string, string | number | boolean> {
181+
const headers: Record<string, string> = {}
182+
if (this.context !== undefined) {
183+
propagation.inject(this.context, headers)
184+
}
185+
return headers
186+
}
187+
173188
with<T>(
174189
name: string,
175190
params: ParamsType,
@@ -180,7 +195,8 @@ export class OpenTelemetryMetricsContext implements MeasureContext {
180195
const c = this.newChild(name, opt?.inheritParams === true ? { ...this.params, ...params } : params, {
181196
fullParams,
182197
logger: this.logger,
183-
span: opt?.span ?? true
198+
span: opt?.span ?? true,
199+
meta: opt?.meta
184200
})
185201
let needFinally = true
186202
try {

packages/measurements/src/context.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ export class MeasureMetricsContext implements MeasureContext {
157157
}
158158
}
159159

160+
extractMeta (): Record<string, string | number | boolean> {
161+
return {}
162+
}
163+
160164
withSync<T>(
161165
name: string,
162166
params: ParamsType,
@@ -226,6 +230,10 @@ export class NoMetricsContext implements MeasureContext {
226230
return r instanceof Promise ? r : Promise.resolve(r)
227231
}
228232

233+
extractMeta (): Record<string, string | number | boolean> {
234+
return {}
235+
}
236+
229237
withSync<T>(
230238
name: string,
231239
params: ParamsType,

packages/measurements/src/types.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,12 @@ export interface MeasureLogger {
6666
}
6767

6868
export interface WithOptions {
69-
span?: true | false | 'disable' | 'skip' // 'none' means no span will be created, 'disable' means context will be tracing disabled
69+
span?: true | false | 'disable' | 'skip' | 'inherit' // 'none' means no span will be created, 'disable' means context will be tracing disabled
7070
log?: boolean
7171
inheritParams?: boolean
72+
73+
// Passed context metadata
74+
meta?: Record<string, string | number | boolean>
7275
}
7376

7477
/**
@@ -87,6 +90,7 @@ export interface MeasureContext<Q = any> {
8790
fullParams?: FullParamsType
8891
logger?: MeasureLogger
8992
span?: WithOptions['span'] // By default true
93+
meta?: Record<string, string | number | boolean>
9094
}
9195
) => MeasureContext
9296

@@ -108,6 +112,8 @@ export interface MeasureContext<Q = any> {
108112
opt?: WithOptions
109113
) => T
110114

115+
extractMeta: () => Record<string, string | number | boolean>
116+
111117
logger: MeasureLogger
112118

113119
parent?: MeasureContext

plugins/client-resources/src/connection.ts

Lines changed: 57 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,7 @@ import core, {
5555
TxResult,
5656
type WorkspaceUuid
5757
} from '@hcengineering/core'
58-
import platform, {
59-
broadcastEvent,
60-
getMetadata,
61-
PlatformError,
62-
Severity,
63-
Status,
64-
UNAUTHORIZED
65-
} from '@hcengineering/platform'
58+
import platform, { getMetadata, PlatformError, Severity, Status, UNAUTHORIZED } from '@hcengineering/platform'
6659
import { HelloRequest, HelloResponse, type RateLimitInfo, ReqId, type Response, RPCHandler } from '@hcengineering/rpc'
6760
import { uncompress } from 'snappyjs'
6861

@@ -109,9 +102,6 @@ class Connection implements ClientConnection {
109102
private dialTimer: any | undefined
110103

111104
private sockets = 0
112-
113-
private incomingTimer: any
114-
115105
private openAction: any
116106

117107
private readonly sessionId: string | undefined
@@ -492,9 +482,6 @@ class Connection implements ClientConnection {
492482
promise.resolve(resp.result)
493483
}
494484
}
495-
void broadcastEvent(client.event.NetworkRequests, this.requests.size).catch((err) => {
496-
this.ctx.error('failed to broadcast', { err })
497-
})
498485
} else {
499486
const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx]
500487

@@ -508,17 +495,6 @@ class Connection implements ClientConnection {
508495
this.handlers.forEach((handler) => {
509496
handler(...txArr)
510497
})
511-
512-
clearTimeout(this.incomingTimer)
513-
void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1).catch((err) => {
514-
this.ctx.error('failed to broadcast', { err })
515-
})
516-
517-
this.incomingTimer = setTimeout(() => {
518-
void broadcastEvent(client.event.NetworkRequests, this.requests.size).catch((err) => {
519-
this.ctx.error('failed to broadcast', { err })
520-
})
521-
}, 500)
522498
}
523499
}
524500

@@ -657,10 +633,6 @@ class Connection implements ClientConnection {
657633
wsocket.close()
658634
return
659635
}
660-
// console.log('client websocket closed', socketId, ev?.reason)
661-
void broadcastEvent(client.event.NetworkRequests, -1).catch((err) => {
662-
this.ctx.error('failed broadcast', { err })
663-
})
664636
this.scheduleOpen(this.ctx, true)
665637
}
666638
wsocket.onopen = () => {
@@ -690,9 +662,6 @@ class Connection implements ClientConnection {
690662
if (opened) {
691663
console.error('client websocket error:', socketId, this.url, this.workspace, this.user)
692664
}
693-
void broadcastEvent(client.event.NetworkRequests, -1).catch((err) => {
694-
this.ctx.error('failed to broadcast', { err })
695-
})
696665
}
697666
}
698667

@@ -707,83 +676,83 @@ class Connection implements ClientConnection {
707676
allowReconnect?: boolean
708677
overrideId?: number
709678
}): Promise<any> {
710-
return this.ctx.newChild('send-request', {}).with(data.method, {}, async (ctx) => {
711-
if (this.closed) {
712-
throw new PlatformError(new Status(Severity.ERROR, platform.status.ConnectionClosed, {}))
713-
}
679+
return this.ctx.with(
680+
'connection-' + data.method,
681+
{},
682+
async (ctx) => {
683+
if (this.closed) {
684+
throw new PlatformError(new Status(Severity.ERROR, platform.status.ConnectionClosed, {}))
685+
}
714686

715-
if (this.slowDownTimer > 0) {
716-
// We need to wait a bit to avoid ban.
717-
await new Promise((resolve) => setTimeout(resolve, this.slowDownTimer))
718-
}
687+
if (this.slowDownTimer > 0) {
688+
// We need to wait a bit to avoid ban.
689+
await new Promise((resolve) => setTimeout(resolve, this.slowDownTimer))
690+
}
719691

720-
if (data.once === true) {
721-
// Check if has same request already then skip
722-
const dparams = JSON.stringify(data.params)
723-
for (const [, v] of this.requests) {
724-
if (v.method === data.method && JSON.stringify(v.params) === dparams) {
725-
// We have same unanswered, do not add one more.
726-
return
692+
if (data.once === true) {
693+
// Check if has same request already then skip
694+
const dparams = JSON.stringify(data.params)
695+
for (const [, v] of this.requests) {
696+
if (v.method === data.method && JSON.stringify(v.params) === dparams) {
697+
// We have same unanswered, do not add one more.
698+
return
699+
}
727700
}
728701
}
729-
}
730702

731-
const id = data.overrideId ?? this.lastId++
732-
const promise = new RequestPromise(data.method, data.params, data.handleResult)
733-
promise.handleTime = data.measure
703+
const id = data.overrideId ?? this.lastId++
704+
const promise = new RequestPromise(data.method, data.params, data.handleResult)
705+
promise.handleTime = data.measure
734706

735-
const w = this.waitOpenConnection(ctx)
736-
if (w instanceof Promise) {
737-
await w
738-
}
739-
if (data.method !== pingConst) {
740-
this.requests.set(id, promise)
741-
}
742-
promise.sendData = (): void => {
743-
if (this.websocket?.readyState === ClientSocketReadyState.OPEN) {
744-
promise.startTime = Date.now()
707+
const w = this.waitOpenConnection(ctx)
708+
if (w instanceof Promise) {
709+
await w
710+
}
711+
if (data.method !== pingConst) {
712+
this.requests.set(id, promise)
713+
}
714+
promise.sendData = (): void => {
715+
if (this.websocket?.readyState === ClientSocketReadyState.OPEN) {
716+
promise.startTime = Date.now()
745717

746-
if (data.method !== pingConst) {
747-
const dta = ctx.withSync('serialize', {}, () =>
748-
this.rpcHandler.serialize(
718+
if (data.method !== pingConst) {
719+
const dta = this.rpcHandler.serialize(
749720
{
750721
method: data.method,
751722
params: data.params,
723+
meta: ctx.extractMeta(),
752724
id,
753725
time: Date.now()
754726
},
755727
this.binaryMode
756728
)
757-
)
758729

759-
ctx.withSync('send-data', {}, () => this.websocket?.send(dta))
760-
} else {
761-
this.websocket?.send(pingConst)
730+
this.websocket?.send(dta)
731+
} else {
732+
this.websocket?.send(pingConst)
733+
}
762734
}
763735
}
764-
}
765-
if (data.allowReconnect ?? true) {
766-
promise.reconnect = () => {
767-
setTimeout(async () => {
768-
// In case we don't have response yet.
769-
if (this.requests.has(id) && ((await data.retry?.()) ?? true)) {
770-
promise.sendData()
771-
}
772-
}, 50)
736+
if (data.allowReconnect ?? true) {
737+
promise.reconnect = () => {
738+
setTimeout(async () => {
739+
// In case we don't have response yet.
740+
if (this.requests.has(id) && ((await data.retry?.()) ?? true)) {
741+
promise.sendData()
742+
}
743+
}, 50)
744+
}
773745
}
774-
}
775-
ctx.withSync('send-data', {}, () => {
776746
promise.sendData()
777-
})
778-
void ctx
779-
.with('broadcast-event', {}, () => broadcastEvent(client.event.NetworkRequests, this.requests.size))
780-
.catch((err) => {
781-
this.ctx.error('failed to broadcast', { err })
782-
})
783-
if (data.method !== pingConst) {
784-
return await promise.promise
747+
if (data.method !== pingConst) {
748+
return await promise.promise
749+
}
750+
},
751+
undefined,
752+
{
753+
span: 'inherit'
785754
}
786-
})
755+
)
787756
}
788757

789758
loadModel (last: Timestamp, hash?: string): Promise<Tx[] | LoadModelResponse> {

plugins/client/src/index.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,5 @@ export default plugin(clientId, {
9696
},
9797
function: {
9898
GetClient: '' as Resource<ClientFactory>
99-
},
100-
event: {
101-
NetworkRequests: '' as Metadata<string>
10299
}
103100
})

server/rpc/src/rpc.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ export interface Request<P extends any[]> {
3030
method: string
3131
params: P
3232

33+
meta?: Record<string, string | number | boolean>
34+
3335
time?: number // Server time to perform operation
3436
}
3537

server/server/src/sessionManager.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1309,7 +1309,10 @@ export class TSessionManager implements SessionManager {
13091309
this.createOpContext(callTx, requestCtx, pipeline, request.id, service, ws, rateLimit),
13101310
...params
13111311
]),
1312-
{ ...request, user: service.getUser, socialId: service.getRawAccount().primarySocialId }
1312+
{ ...request, user: service.getUser, socialId: service.getRawAccount().primarySocialId },
1313+
{
1314+
meta: request.meta
1315+
}
13131316
)
13141317
})
13151318
} catch (err: any) {

0 commit comments

Comments
 (0)