Skip to content

Commit 7308169

Browse files
committed
wip: manual intrument mongo calls
1 parent 580566a commit 7308169

File tree

5 files changed

+187
-31
lines changed

5 files changed

+187
-31
lines changed

meteor/server/api/profiler/apm.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import { Meteor } from 'meteor/meteor'
22
// const shimmer = require('shimmer')
33
import Agent, { AgentConfigOptions } from 'elastic-apm-node'
4-
import { WebApp } from 'meteor/webapp'
54

65
// const { Session, Subscription, MongoCursor } = require('./meteorx')
76

87
// Only the ones of these we use have been copied across.
98
// The others can be found at https://github.com/Meteor-Community-Packages/meteor-elastic-apm/tree/master/instrumenting
109
// const instrumentMethods = require('./instrumenting/methods')
11-
// const instrumentHttpIn = require('./instrumenting/http-in')
1210
// const instrumentHttpOut = require('./instrumenting/http-out')
1311
// const instrumentSession = require('./instrumenting/session')
1412
// const instrumentSubscription = require('./instrumenting/subscription')
@@ -31,7 +29,7 @@ export function startAgent(config: AgentConfigOptions): void {
3129
if (config.active !== false) {
3230
try {
3331
// Must be called before any other route is registered on WebApp.
34-
instrumentHttpIn(Agent, WebApp)
32+
// http-in has been moved to be part of where the koa router is mounted
3533
// instrumentHttpOut(Agent)
3634

3735
Agent.start(config)
@@ -40,7 +38,7 @@ export function startAgent(config: AgentConfigOptions): void {
4038
// instrumentSession(Agent, Session),
4139
// instrumentSubscription(Agent, Subscription),
4240
// hackDB() // TODO: what is this doing? https://github.com/Meteor-Community-Packages/meteor-elastic-apm/blob/master/hacks.js
43-
instrumentDB(Agent, Meteor, MongoCursor)
41+
// instrumentDB replaced by manual wrapping in WrappedAsyncMongoCollection
4442
// startMetrics(Agent),
4543

4644
Agent.logger.info('meteor-elastic-apm completed instrumenting')

meteor/server/api/rest/koa.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { public_dir } from '../../lib'
99
import staticServe from 'koa-static'
1010
import { logger } from '../../logging'
1111
import { PackageInfo } from '../../coreSystem'
12+
import { profiler } from '../profiler'
1213

1314
declare module 'http' {
1415
interface IncomingMessage {
@@ -46,6 +47,32 @@ Meteor.startup(() => {
4647

4748
// Expose the API at the url
4849
WebApp.rawConnectHandlers.use((req, res) => {
50+
const transaction = profiler.startTransaction(`${req.method}:${req.url}`, 'http.incoming')
51+
if (transaction) {
52+
transaction.setLabel('url', `${req.url}`)
53+
transaction.setLabel('method', `${req.method}`)
54+
55+
res.on('finish', () => {
56+
let route = req.originalUrl
57+
if (req.originalUrl.endsWith(req.url.slice(1)) && req.url.length > 1) {
58+
route = req.originalUrl.slice(0, -1 * (req.url.length - 1))
59+
}
60+
61+
if (route.endsWith('/')) {
62+
route = route.slice(0, -1)
63+
}
64+
65+
if (route && transaction) {
66+
transaction.name = `${req.method}:${route}`
67+
transaction.setLabel('route', `${route}`)
68+
}
69+
70+
if (transaction) {
71+
transaction.end()
72+
}
73+
})
74+
}
75+
4976
const callback = Meteor.bindEnvironment(koaApp.callback())
5077
callback(req, res).catch(() => res.end())
5178
})

meteor/server/collections/implementations/asyncCollection.ts

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { PromisifyCallbacks } from '@sofie-automation/shared-lib/dist/lib/types'
1010
import type { AnyBulkWriteOperation } from 'mongodb'
1111
import { MinimalMongoCursor, WrappedMongoCollectionBase } from './base'
1212
import { AsyncOnlyMongoCollection } from '../collection'
13+
import { profiler } from '../../api/profiler'
1314

1415
export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedString<any> }>
1516
extends WrappedMongoCollectionBase<DBInterface>
@@ -23,38 +24,111 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
2324
selector: MongoQuery<DBInterface> | DBInterface['_id'],
2425
options?: FindOptions<DBInterface>
2526
): Promise<Array<DBInterface>> {
26-
return this.find(selector as any, options).fetchAsync()
27+
const span = profiler.startSpan(`MongoCollection.${this.name}.findFetch`)
28+
if (span) {
29+
span.addLabels({
30+
collection: this.name,
31+
query: JSON.stringify(selector),
32+
})
33+
}
34+
try {
35+
const res = await this._collection.find((selector ?? {}) as any, options as any).fetchAsync()
36+
if (span) span.end()
37+
return res
38+
} catch (e) {
39+
if (span) span.end()
40+
this.wrapMongoError(e)
41+
}
2742
}
2843

2944
async findOneAsync(
3045
selector: MongoQuery<DBInterface> | DBInterface['_id'],
3146
options?: FindOptions<DBInterface>
3247
): Promise<DBInterface | undefined> {
33-
const arr = await this.findFetchAsync(selector, { ...options, limit: 1 })
34-
return arr[0]
48+
const span = profiler.startSpan(`MongoCollection.${this.name}.findOne`)
49+
if (span) {
50+
span.addLabels({
51+
collection: this.name,
52+
query: JSON.stringify(selector),
53+
})
54+
}
55+
try {
56+
const arr = await this._collection
57+
.find((selector ?? {}) as any, { ...(options as any), limit: 1 })
58+
.fetchAsync()
59+
if (span) span.end()
60+
return arr[0]
61+
} catch (e) {
62+
if (span) span.end()
63+
this.wrapMongoError(e)
64+
}
3565
}
3666

3767
async findWithCursor(
3868
selector?: MongoQuery<DBInterface> | DBInterface['_id'],
3969
options?: FindOptions<DBInterface>
4070
): Promise<MinimalMongoCursor<DBInterface>> {
41-
return this.find(selector as any, options)
71+
const span = profiler.startSpan(`MongoCollection.${this.name}.findCursor`)
72+
if (span) {
73+
span.addLabels({
74+
collection: this.name,
75+
query: JSON.stringify(selector),
76+
})
77+
}
78+
try {
79+
const res = this._collection.find((selector ?? {}) as any, options as any)
80+
if (span) span.end()
81+
return res
82+
} catch (e) {
83+
if (span) span.end()
84+
this.wrapMongoError(e)
85+
}
4286
}
4387

4488
async observeChanges(
4589
selector: MongoQuery<DBInterface> | DBInterface['_id'],
4690
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
4791
options?: FindOptions<DBInterface>
4892
): Promise<Meteor.LiveQueryHandle> {
49-
return this.find(selector as any, options).observeChangesAsync(callbacks)
93+
const span = profiler.startSpan(`MongoCollection.${this.name}.observeChanges`)
94+
if (span) {
95+
span.addLabels({
96+
collection: this.name,
97+
query: JSON.stringify(selector),
98+
})
99+
}
100+
try {
101+
const res = await this._collection
102+
.find((selector ?? {}) as any, options as any)
103+
.observeChangesAsync(callbacks)
104+
if (span) span.end()
105+
return res
106+
} catch (e) {
107+
if (span) span.end()
108+
this.wrapMongoError(e)
109+
}
50110
}
51111

52112
async observe(
53113
selector: MongoQuery<DBInterface> | DBInterface['_id'],
54114
callbacks: PromisifyCallbacks<ObserveCallbacks<DBInterface>>,
55115
options?: FindOptions<DBInterface>
56116
): Promise<Meteor.LiveQueryHandle> {
57-
return this.find(selector as any, options).observeAsync(callbacks)
117+
const span = profiler.startSpan(`MongoCollection.${this.name}.observe`)
118+
if (span) {
119+
span.addLabels({
120+
collection: this.name,
121+
query: JSON.stringify(selector),
122+
})
123+
}
124+
try {
125+
const res = await this._collection.find((selector ?? {}) as any, options as any).observeAsync(callbacks)
126+
if (span) span.end()
127+
return res
128+
} catch (e) {
129+
if (span) span.end()
130+
this.wrapMongoError(e)
131+
}
58132
}
59133

60134
async insertManyAsync(docs: DBInterface[]): Promise<Array<DBInterface['_id']>> {
@@ -81,6 +155,14 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
81155
}
82156

83157
async bulkWriteAsync(ops: Array<AnyBulkWriteOperation<DBInterface>>): Promise<void> {
158+
const span = profiler.startSpan(`MongoCollection.${this.name}.bulkWrite`)
159+
if (span) {
160+
span.addLabels({
161+
collection: this.name,
162+
opCount: ops.length,
163+
})
164+
}
165+
84166
if (ops.length > 0) {
85167
const rawCollection = this.rawCollection()
86168
const bulkWriteResult = await rawCollection.bulkWrite(ops, {
@@ -92,12 +174,24 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
92174
throw new Meteor.Error(500, `Errors in rawCollection.bulkWrite: ${writeErrors.join(',')}`)
93175
}
94176
}
177+
178+
if (span) span.end()
95179
}
96180

97181
async countDocuments(selector?: MongoQuery<DBInterface>, options?: FindOptions<DBInterface>): Promise<number> {
182+
const span = profiler.startSpan(`MongoCollection.${this.name}.countDocuments`)
183+
if (span) {
184+
span.addLabels({
185+
collection: this.name,
186+
query: JSON.stringify(selector),
187+
})
188+
}
98189
try {
99-
return this._collection.find((selector ?? {}) as any, options as any).countAsync()
190+
const res = await this._collection.find((selector ?? {}) as any, options as any).countAsync()
191+
if (span) span.end()
192+
return res
100193
} catch (e) {
194+
if (span) span.end()
101195
this.wrapMongoError(e)
102196
}
103197
}

meteor/server/collections/implementations/base.ts

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
import { MongoModifier, MongoQuery } from '@sofie-automation/corelib/dist/mongo'
2-
import { ProtectedString, protectString } from '@sofie-automation/corelib/dist/protectedString'
2+
import { ProtectedString, protectString, unprotectString } from '@sofie-automation/corelib/dist/protectedString'
33
import { Meteor } from 'meteor/meteor'
44
import { Mongo } from 'meteor/mongo'
55
import {
66
UpdateOptions,
77
UpsertOptions,
8-
FindOptions,
98
IndexSpecifier,
109
MongoCursor,
1110
} from '@sofie-automation/meteor-lib/dist/collections/lib'
1211
import type { Collection as RawCollection, Db as RawDb } from 'mongodb'
1312
import { stringifyError } from '@sofie-automation/shared-lib/dist/lib/stringifyError'
1413
import { NpmModuleMongodb } from 'meteor/npm-mongo'
14+
import { profiler } from '../../api/profiler'
1515

1616
export type MinimalMongoCursor<T extends { _id: ProtectedString<any> }> = Pick<
1717
MongoCursor<T>,
@@ -54,34 +54,42 @@ export class WrappedMongoCollectionBase<DBInterface extends { _id: ProtectedStri
5454
rawCollection(): RawCollection<DBInterface> {
5555
return this._collection.rawCollection() as any
5656
}
57-
rawDatabase(): RawDb {
57+
protected rawDatabase(): RawDb {
5858
return this._collection.rawDatabase() as any
5959
}
6060

61-
protected find(
62-
selector?: MongoQuery<DBInterface> | DBInterface['_id'],
63-
options?: FindOptions<DBInterface>
64-
): MinimalMongoCursor<DBInterface> {
65-
try {
66-
return this._collection.find((selector ?? {}) as any, options as any) as MongoCursor<DBInterface>
67-
} catch (e) {
68-
this.wrapMongoError(e)
69-
}
70-
}
71-
7261
public async insertAsync(doc: DBInterface): Promise<DBInterface['_id']> {
62+
const span = profiler.startSpan(`MongoCollection.${this.name}.insert`)
63+
if (span) {
64+
span.addLabels({
65+
collection: this.name,
66+
id: unprotectString(doc._id),
67+
})
68+
}
7369
try {
7470
const resultId = await this._collection.insertAsync(doc as unknown as Mongo.OptionalId<DBInterface>)
71+
if (span) span.end()
7572
return protectString(resultId)
7673
} catch (e) {
74+
if (span) span.end()
7775
this.wrapMongoError(e)
7876
}
7977
}
8078

8179
public async removeAsync(selector: MongoQuery<DBInterface> | DBInterface['_id']): Promise<number> {
80+
const span = profiler.startSpan(`MongoCollection.${this.name}.remove`)
81+
if (span) {
82+
span.addLabels({
83+
collection: this.name,
84+
query: JSON.stringify(selector),
85+
})
86+
}
8287
try {
83-
return this._collection.removeAsync(selector as any)
88+
const res = await this._collection.removeAsync(selector as any)
89+
if (span) span.end()
90+
return res
8491
} catch (e) {
92+
if (span) span.end()
8593
this.wrapMongoError(e)
8694
}
8795
}
@@ -90,9 +98,19 @@ export class WrappedMongoCollectionBase<DBInterface extends { _id: ProtectedStri
9098
modifier: MongoModifier<DBInterface>,
9199
options?: UpdateOptions
92100
): Promise<number> {
101+
const span = profiler.startSpan(`MongoCollection.${this.name}.update`)
102+
if (span) {
103+
span.addLabels({
104+
collection: this.name,
105+
query: JSON.stringify(selector),
106+
})
107+
}
93108
try {
94-
return this._collection.updateAsync(selector as any, modifier as any, options)
109+
const res = await this._collection.updateAsync(selector as any, modifier as any, options)
110+
if (span) span.end()
111+
return res
95112
} catch (e) {
113+
if (span) span.end()
96114
this.wrapMongoError(e)
97115
}
98116
}
@@ -104,21 +122,40 @@ export class WrappedMongoCollectionBase<DBInterface extends { _id: ProtectedStri
104122
numberAffected?: number
105123
insertedId?: DBInterface['_id']
106124
}> {
125+
const span = profiler.startSpan(`MongoCollection.${this.name}.upsert`)
126+
if (span) {
127+
span.addLabels({
128+
collection: this.name,
129+
query: JSON.stringify(selector),
130+
})
131+
}
107132
try {
108133
const result = await this._collection.upsertAsync(selector as any, modifier as any, options)
134+
if (span) span.end()
109135
return {
110136
numberAffected: result.numberAffected,
111137
insertedId: protectString(result.insertedId),
112138
}
113139
} catch (e) {
140+
if (span) span.end()
114141
this.wrapMongoError(e)
115142
}
116143
}
117144

118145
createIndex(keys: IndexSpecifier<DBInterface> | string, options?: NpmModuleMongodb.CreateIndexesOptions): void {
146+
const span = profiler.startSpan(`MongoCollection.${this.name}.createIndex`)
147+
if (span) {
148+
span.addLabels({
149+
collection: this.name,
150+
keys: JSON.stringify(keys),
151+
})
152+
}
119153
try {
120-
return this._collection.createIndex(keys as any, options)
154+
const res = this._collection.createIndex(keys as any, options)
155+
if (span) span.end()
156+
return res
121157
} catch (e) {
158+
if (span) span.end()
122159
this.wrapMongoError(e)
123160
}
124161
}

0 commit comments

Comments
 (0)