Skip to content

Commit c5d146b

Browse files
committed
Instrumentation of asynchronous commands
1 parent 552df1f commit c5d146b

File tree

2 files changed

+159
-17
lines changed

2 files changed

+159
-17
lines changed

src/Instrumentation/PostgreSql/src/PgSqlTracker.php

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use OpenTelemetry\API\Trace\SpanContextInterface;
88
use OpenTelemetry\SemConv\TraceAttributes;
9+
use SplQueue;
910
use WeakMap;
1011
use WeakReference;
1112
use PgSql\Connection;
@@ -17,14 +18,45 @@ final class PgSqlTracker
1718
{
1819

1920
private WeakMap $connectionAttributes;
21+
2022
private WeakMap $connectionStatements;
23+
24+
/**
25+
* @var WeakMap<Connection, SplQueue<WeakReference<?SpanContextInterface>>
26+
*/
27+
private WeakMap $connectionAsyncLink;
28+
2129
public function __construct()
2230
{
2331
// /** @psalm-suppress PropertyTypeCoercion */
2432
$this->connectionAttributes = new WeakMap();
2533
$this->connectionStatements = new WeakMap();
34+
$this->connectionAsyncLink = new WeakMap(); // maps connection to SplQueue with links
35+
}
36+
37+
public function addAsyncLinkForConnection(Connection $connection, SpanContextInterface $spanContext) {
38+
39+
if (!$this->connectionAsyncLink->offsetExists($connection)) {
40+
$this->connectionAsyncLink[$connection] = new SplQueue();
41+
}
42+
$this->connectionAsyncLink[$connection]->push(WeakReference::create($spanContext));
43+
}
44+
45+
public function getAsyncLinkForConnection(Connection $connection) : ?SpanContextInterface
46+
{
47+
if (!$this->connectionAsyncLink->offsetExists($connection)) {
48+
return null;
49+
}
50+
51+
if ($this->connectionAsyncLink[$connection]->isEmpty()) {
52+
return null;
53+
}
54+
55+
56+
return $this->connectionAsyncLink[$connection]->pop()->get();
2657
}
2758

59+
2860
public function addConnectionStatement(Connection $connection, string $statementName, string $query)
2961
{
3062
if (!$this->connectionStatements->offsetExists($connection)) {
@@ -48,7 +80,7 @@ public function getConnectionAttributes(Connection $connection) : array
4880
{
4981
return $this->connectionAttributes[$connection] ?? [];
5082
}
51-
private function splitQueries(string $sql)
83+
public static function splitQueries(string $sql)
5284
{
5385
// Normalize line endings to \n
5486
$sql = preg_replace("/\r\n|\n\r|\r/", "\n", $sql);

src/Instrumentation/PostgreSql/src/PostgreSqlInstrumentation.php

Lines changed: 126 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@ public static function register(): void
3030

3131
//TODO Large objet - track by PgLob instance
3232
//pg_query_params, pg_select
33-
//pg_send_execute
34-
//pg_send_prepare
35-
//pg_send_query
36-
//pg_send_query_params
3733

3834
// https://opentelemetry.io/docs/specs/semconv/database/postgresql/
3935
$instrumentation = new CachedInstrumentation(
@@ -117,7 +113,7 @@ public static function register(): void
117113
self::basicPreHook('pg_prepare', $instrumentation, $tracker, ...$args);
118114
},
119115
post: static function (...$args) use ($instrumentation, $tracker) {
120-
self::preparePostHook($instrumentation, $tracker, ...$args);
116+
self::preparePostHook($instrumentation, $tracker, false, ...$args);
121117
}
122118
);
123119

@@ -128,11 +124,10 @@ public static function register(): void
128124
self::basicPreHook('pg_execute', $instrumentation, $tracker, ...$args);
129125
},
130126
post: static function (...$args) use ($instrumentation, $tracker) {
131-
self::executePostHook($instrumentation, $tracker, ...$args);
127+
self::executePostHook($instrumentation, false, $tracker, ...$args);
132128
}
133129
);
134130

135-
136131
hook(
137132
null,
138133
'pg_query',
@@ -144,6 +139,59 @@ public static function register(): void
144139
}
145140
);
146141

142+
hook(
143+
null,
144+
'pg_send_prepare',
145+
pre: static function (...$args) use ($instrumentation, $tracker) {
146+
self::basicPreHook('pg_send_prepare', $instrumentation, $tracker, ...$args);
147+
},
148+
post: static function (...$args) use ($instrumentation, $tracker) {
149+
self::preparePostHook($instrumentation, $tracker, true, ...$args);
150+
}
151+
);
152+
153+
hook(
154+
null,
155+
'pg_send_execute',
156+
pre: static function (...$args) use ($instrumentation, $tracker) {
157+
self::basicPreHook('pg_send_execute', $instrumentation, $tracker, ...$args);
158+
},
159+
post: static function (...$args) use ($instrumentation, $tracker) {
160+
self::executePostHook($instrumentation, $tracker, true, ...$args);
161+
}
162+
);
163+
hook(
164+
null,
165+
'pg_send_query',
166+
pre: static function (...$args) use ($instrumentation, $tracker) {
167+
self::basicPreHook('pg_send_query', $instrumentation, $tracker, ...$args);
168+
},
169+
post: static function (...$args) use ($instrumentation, $tracker) {
170+
self::sendQueryPostHook($instrumentation, $tracker, ...$args);
171+
}
172+
);
173+
hook(
174+
null,
175+
'pg_send_query_params',
176+
pre: static function (...$args) use ($instrumentation, $tracker) {
177+
self::basicPreHook('pg_send_query_params', $instrumentation, $tracker, ...$args);
178+
},
179+
post: static function (...$args) use ($instrumentation, $tracker) {
180+
self::sendQueryParamsPostHook($instrumentation, $tracker, ...$args);
181+
}
182+
);
183+
184+
hook(
185+
null,
186+
'pg_get_result',
187+
pre: static function (...$args) use ($instrumentation, $tracker) {
188+
self::basicPreHook('pg_get_result', $instrumentation, $tracker, ...$args);
189+
},
190+
post: static function (...$args) use ($instrumentation, $tracker) {
191+
self::getResultPostHook($instrumentation, $tracker, ...$args);
192+
}
193+
);
194+
147195
}
148196

149197
/** @param non-empty-string $spanName */
@@ -158,7 +206,7 @@ private static function connectPostHook(CachedInstrumentation $instrumentation,
158206
if ($retVal instanceof Connection) {
159207
$tracker->storeConnectionAttributes($retVal, $params[0]);
160208
}
161-
self::endSpan([], $exception, $retVal === false ? "Connection error" : null);
209+
self::endSpan([], $exception, $retVal == false ? "Connection error" : null);
162210
}
163211

164212
/** @param non-empty-string $spanName */
@@ -169,7 +217,7 @@ private static function basicPreHook(string $spanName, CachedInstrumentation $in
169217

170218
private static function basicPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, ?array $attributes, bool $dropIfNoError, $obj, array $params, mixed $retVal, ?\Throwable $exception)
171219
{
172-
$errorStatus = $retVal === false ? pg_last_error($params[0]) : null;
220+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
173221
if ($dropIfNoError && $errorStatus === null && $exception === null) {
174222
self::dropSpan();
175223
return;
@@ -186,31 +234,68 @@ private static function tableOperationsPostHook(CachedInstrumentation $instrumen
186234
$attributes[TraceAttributes::DB_NAMESPACE] = mb_convert_encoding($params[1], 'UTF-8');
187235
}
188236

189-
$errorStatus = $retVal === false ? pg_last_error($params[0]) : null;
237+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
190238
if ($dropIfNoError && $errorStatus === null && $exception === null) {
191239
self::dropSpan();
192240
return;
193241
}
194242
self::endSpan($attributes, $exception, $errorStatus);
195243
}
196244

197-
private static function preparePostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
245+
private static function preparePostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, bool $async, $obj, array $params, mixed $retVal, ?\Throwable $exception)
198246
{
199247
$attributes = $tracker->getConnectionAttributes($params[0]);
200248

201249
$attributes[TraceAttributes::DB_QUERY_TEXT] = mb_convert_encoding($params[2], 'UTF-8');
202250
$attributes[TraceAttributes::DB_OPERATION_NAME] = self::extractQueryCommand($params[2]);
203251

204-
$errorStatus = $retVal === false ? pg_last_error($params[0]) : null;
252+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
205253

206-
if ($retVal !== false) {
254+
if ($retVal != false) {
207255
$tracker->addConnectionStatement($params[0], $params[1], $params[2]);
256+
257+
if ($async) {
258+
$tracker->addAsyncLinkForConnection($params[0], Span::getCurrent()->getContext());
259+
}
260+
261+
}
262+
263+
self::endSpan($attributes, $exception, $errorStatus);
264+
}
265+
266+
267+
private static function sendQueryParamsPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
268+
{
269+
$attributes = $tracker->getConnectionAttributes($params[0]);
270+
271+
$attributes[TraceAttributes::DB_QUERY_TEXT] = mb_convert_encoding($params[1], 'UTF-8');
272+
$attributes[TraceAttributes::DB_OPERATION_NAME] = self::extractQueryCommand($params[1]);
273+
274+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
275+
276+
if ($retVal != false) {
277+
$tracker->addAsyncLinkForConnection($params[0], Span::getCurrent()->getContext());
278+
}
279+
280+
self::endSpan($attributes, $exception, $errorStatus);
281+
}
282+
283+
private static function getResultPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
284+
{
285+
$attributes = $tracker->getConnectionAttributes($params[0]);
286+
287+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
288+
289+
if ($retVal !== false) {
290+
if ($linkedContext = $tracker->getAsyncLinkForConnection($params[0])) {
291+
Span::getCurrent()->addLink($linkedContext);
292+
}
208293
}
209294

210295
self::endSpan($attributes, $exception, $errorStatus);
211296
}
212297

213-
private static function executePostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
298+
private static function executePostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, bool $async, $obj, array $params, mixed $retVal, ?\Throwable $exception)
214299
{
215300
$attributes = $tracker->getConnectionAttributes($params[0]);
216301

@@ -220,7 +305,30 @@ private static function executePostHook(CachedInstrumentation $instrumentation,
220305
$attributes[TraceAttributes::DB_OPERATION_NAME] = self::extractQueryCommand($query);
221306
}
222307

223-
$errorStatus = $retVal === false ? pg_last_error($params[0]) : null;
308+
if ($retVal != false) {
309+
if ($async) {
310+
$tracker->addAsyncLinkForConnection($params[0], Span::getCurrent()->getContext());
311+
}
312+
}
313+
314+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
315+
self::endSpan($attributes, $exception, $errorStatus);
316+
}
317+
318+
private static function sendQueryPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
319+
{
320+
$attributes = $tracker->getConnectionAttributes($params[0]);
321+
322+
$queries = PgSqlTracker::splitQueries($params[1]);
323+
$queriesCount = count($queries);
324+
for ($i = 0; $i < $queriesCount; $i++) {
325+
$tracker->addAsyncLinkForConnection($params[0], Span::getCurrent()->getContext());
326+
}
327+
328+
$attributes[TraceAttributes::DB_QUERY_TEXT] = mb_convert_encoding($params[1], 'UTF-8');
329+
$attributes[TraceAttributes::DB_OPERATION_NAME] = self::extractQueryCommand($params[1]);
330+
331+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
224332
self::endSpan($attributes, $exception, $errorStatus);
225333
}
226334

@@ -231,7 +339,7 @@ private static function queryPostHook(CachedInstrumentation $instrumentation, Pg
231339
$attributes[TraceAttributes::DB_QUERY_TEXT] = mb_convert_encoding($params[1], 'UTF-8');
232340
$attributes[TraceAttributes::DB_OPERATION_NAME] = self::extractQueryCommand($params[1]);
233341

234-
$errorStatus = $retVal === false ? pg_last_error($params[0]) : null;
342+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
235343
self::endSpan($attributes, $exception, $errorStatus);
236344
}
237345

@@ -247,6 +355,8 @@ private static function startSpan(string $spanName, CachedInstrumentation $instr
247355
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
248356
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
249357
->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno)
358+
->setAttribute(TraceAttributes::DB_SYSTEM_NAME, 'postgresql')
359+
->setAttribute(TraceAttributes::DB_SYSTEM, 'postgresql')
250360
->setAttributes($attributes);
251361

252362
$span = $builder->startSpan();

0 commit comments

Comments
 (0)