Skip to content

Commit 9b85a8f

Browse files
committed
Large object instrumentation
1 parent c5d146b commit 9b85a8f

File tree

2 files changed

+246
-4
lines changed

2 files changed

+246
-4
lines changed

src/Instrumentation/PostgreSql/src/PgSqlTracker.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use WeakMap;
1111
use WeakReference;
1212
use PgSql\Connection;
13+
use PgSql\Lob;
1314

1415
/**
1516
* @phan-file-suppress PhanNonClassMethodCall
@@ -26,12 +27,18 @@ final class PgSqlTracker
2627
*/
2728
private WeakMap $connectionAsyncLink;
2829

30+
/**
31+
* @var WeakMap<Lob, WeakReference<Connection>>
32+
*/
33+
private WeakMap $connectionLargeObjects;
34+
2935
public function __construct()
3036
{
3137
// /** @psalm-suppress PropertyTypeCoercion */
3238
$this->connectionAttributes = new WeakMap();
3339
$this->connectionStatements = new WeakMap();
3440
$this->connectionAsyncLink = new WeakMap(); // maps connection to SplQueue with links
41+
$this->connectionLargeObjects = new WeakMap(); // maps Lob to Connection
3542
}
3643

3744
public function addAsyncLinkForConnection(Connection $connection, SpanContextInterface $spanContext) {
@@ -80,6 +87,19 @@ public function getConnectionAttributes(Connection $connection) : array
8087
{
8188
return $this->connectionAttributes[$connection] ?? [];
8289
}
90+
91+
public function trackConnectionFromLob(Connection $connection, Lob $lob)
92+
{
93+
$this->connectionLargeObjects[$lob] = WeakReference::create($connection);
94+
}
95+
96+
public function getConnectionFromLob(Lob $lob) : ?Connection {
97+
if ($this->connectionLargeObjects->offsetExists($lob)) {
98+
return $this->connectionLargeObjects[$lob]->get();
99+
}
100+
return null;
101+
}
102+
83103
public static function splitQueries(string $sql)
84104
{
85105
// Normalize line endings to \n

src/Instrumentation/PostgreSql/src/PostgreSqlInstrumentation.php

Lines changed: 226 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use OpenTelemetry\API\Trace\StatusCode;
1414

1515
use OpenTelemetry\Context\Context;
16+
use PgSql\Lob;
1617
use function OpenTelemetry\Instrumentation\hook;
1718
use OpenTelemetry\SemConv\TraceAttributes;
1819
use OpenTelemetry\SemConv\Version;
@@ -28,8 +29,9 @@ class PostgreSqlInstrumentation
2829
public static function register(): void
2930
{
3031

32+
//TODO DB_OPERATION_BATCH_SIZE
3133
//TODO Large objet - track by PgLob instance
32-
//pg_query_params, pg_select
34+
//db.response.status_code
3335

3436
// https://opentelemetry.io/docs/specs/semconv/database/postgresql/
3537
$instrumentation = new CachedInstrumentation(
@@ -139,6 +141,18 @@ public static function register(): void
139141
}
140142
);
141143

144+
145+
hook(
146+
null,
147+
'pg_select',
148+
pre: static function (...$args) use ($instrumentation, $tracker) {
149+
self::basicPreHook('pg_select', $instrumentation, $tracker, ...$args);
150+
},
151+
post: static function (...$args) use ($instrumentation, $tracker) {
152+
self::selectPostHook($instrumentation, $tracker, ...$args);
153+
}
154+
);
155+
142156
hook(
143157
null,
144158
'pg_send_prepare',
@@ -192,6 +206,82 @@ public static function register(): void
192206
}
193207
);
194208

209+
hook(
210+
null,
211+
'pg_lo_open',
212+
pre: static function (...$args) use ($instrumentation, $tracker) {
213+
self::basicPreHook('pg_lo_open', $instrumentation, $tracker, ...$args);
214+
},
215+
post: static function (...$args) use ($instrumentation, $tracker) {
216+
self::loOpenPostHook($instrumentation, $tracker, ...$args);
217+
}
218+
);
219+
220+
hook(
221+
null,
222+
'pg_lo_write',
223+
pre: static function (...$args) use ($instrumentation, $tracker) {
224+
self::basicPreHook('pg_lo_write', $instrumentation, $tracker, ...$args);
225+
},
226+
post: static function (...$args) use ($instrumentation, $tracker) {
227+
self::loWritePostHook($instrumentation, $tracker, ...$args);
228+
}
229+
);
230+
231+
hook(
232+
null,
233+
'pg_lo_read',
234+
pre: static function (...$args) use ($instrumentation, $tracker) {
235+
self::basicPreHook('pg_lo_read', $instrumentation, $tracker, ...$args);
236+
},
237+
post: static function (...$args) use ($instrumentation, $tracker) {
238+
self::loReadPostHook($instrumentation, $tracker, ...$args);
239+
}
240+
);
241+
242+
hook(
243+
null,
244+
'pg_lo_read_all',
245+
pre: static function (...$args) use ($instrumentation, $tracker) {
246+
self::basicPreHook('pg_lo_read_all', $instrumentation, $tracker, ...$args);
247+
},
248+
post: static function (...$args) use ($instrumentation, $tracker) {
249+
self::loReadAllPostHook($instrumentation, $tracker, ...$args);
250+
}
251+
);
252+
253+
hook(
254+
null,
255+
'pg_lo_unlink',
256+
pre: static function (...$args) use ($instrumentation, $tracker) {
257+
self::basicPreHook('pg_lo_unlink', $instrumentation, $tracker, ...$args);
258+
},
259+
post: static function (...$args) use ($instrumentation, $tracker) {
260+
self::loUnlinkPostHook($instrumentation, $tracker, ...$args);
261+
}
262+
);
263+
264+
hook(
265+
null,
266+
'pg_lo_import',
267+
pre: static function (...$args) use ($instrumentation, $tracker) {
268+
self::basicPreHook('pg_lo_import', $instrumentation, $tracker, ...$args);
269+
},
270+
post: static function (...$args) use ($instrumentation, $tracker) {
271+
self::loImportExportPostHook($instrumentation, $tracker, 'IMPORT', ...$args);
272+
}
273+
);
274+
275+
hook(
276+
null,
277+
'pg_lo_export',
278+
pre: static function (...$args) use ($instrumentation, $tracker) {
279+
self::basicPreHook('pg_lo_export', $instrumentation, $tracker, ...$args);
280+
},
281+
post: static function (...$args) use ($instrumentation, $tracker) {
282+
self::loImportExportPostHook($instrumentation, $tracker, 'EXPORT', ...$args);
283+
}
284+
);
195285
}
196286

197287
/** @param non-empty-string $spanName */
@@ -284,15 +374,19 @@ private static function getResultPostHook(CachedInstrumentation $instrumentation
284374
{
285375
$attributes = $tracker->getConnectionAttributes($params[0]);
286376

287-
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
288-
289377
if ($retVal !== false) {
290378
if ($linkedContext = $tracker->getAsyncLinkForConnection($params[0])) {
291379
Span::getCurrent()->addLink($linkedContext);
292380
}
381+
self::endSpan($attributes, $exception, null);
382+
} else {
383+
// pg_get_result() returns false when there are no more pending results.
384+
// This is normal and expected behavior — it is designed for polling.
385+
// A false return value simply means there are no results currently available.
386+
// There’s no point in creating a span that won’t be linked to any operation.
387+
self::dropSpan();
293388
}
294389

295-
self::endSpan($attributes, $exception, $errorStatus);
296390
}
297391

298392
private static function executePostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, bool $async, $obj, array $params, mixed $retVal, ?\Throwable $exception)
@@ -343,6 +437,134 @@ private static function queryPostHook(CachedInstrumentation $instrumentation, Pg
343437
self::endSpan($attributes, $exception, $errorStatus);
344438
}
345439

440+
private static function selectPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
441+
{
442+
$attributes = $tracker->getConnectionAttributes($params[0]);
443+
444+
if ($retVal != false) {
445+
$table = $params[1];
446+
$conditions = $params[2];
447+
$query = null;
448+
449+
if (empty($conditions)) {
450+
if (PHP_VERSION_ID >= 80400) {
451+
$query = "SELECT * FROM {$table}";
452+
} else {
453+
$query = null;
454+
}
455+
} else {
456+
$where = implode(' AND ', array_map(
457+
fn($k, $v) => is_null($v) ? "$k IS NULL" : "$k = '$v'",
458+
array_keys($conditions),
459+
$conditions
460+
));
461+
$query = "SELECT * FROM {$table} WHERE {$where}";
462+
}
463+
464+
if ($query) {
465+
$attributes[TraceAttributes::DB_QUERY_TEXT] = mb_convert_encoding($query, 'UTF-8');
466+
}
467+
$attributes[TraceAttributes::DB_OPERATION_NAME] = 'SELECT';
468+
}
469+
470+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
471+
self::endSpan($attributes, $exception, $errorStatus);
472+
}
473+
474+
private static function loOpenPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
475+
{
476+
$attributes = $tracker->getConnectionAttributes($params[0]);
477+
$attributes[TraceAttributes::DB_OPERATION_NAME] = 'OPEN';
478+
479+
if ($retVal instanceof Lob) {
480+
$tracker->trackConnectionFromLob($params[0], $retVal);
481+
}
482+
483+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
484+
self::endSpan($attributes, $exception, $errorStatus);
485+
}
486+
487+
private static function loWritePostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
488+
{
489+
$attributes = [];
490+
$lob = $params[0];
491+
if ($lob instanceof Lob) {
492+
if ($connection = $tracker->getConnectionFromLob($lob)) {
493+
$attributes = $tracker->getConnectionAttributes($connection);
494+
}
495+
if ($retVal !== false) {
496+
$attributes['db.postgres.bytes_written'] = $retVal;
497+
}
498+
}
499+
500+
$attributes[TraceAttributes::DB_OPERATION_NAME] = 'WRITE';
501+
502+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
503+
self::endSpan($attributes, $exception, $errorStatus);
504+
}
505+
506+
private static function loReadPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
507+
{
508+
$attributes = [];
509+
$lob = $params[0];
510+
if ($lob instanceof Lob) {
511+
if ($connection = $tracker->getConnectionFromLob($lob)) {
512+
$attributes = $tracker->getConnectionAttributes($connection);
513+
}
514+
if ($retVal !== false) {
515+
$attributes['db.postgres.bytes_read'] = $params[1];
516+
}
517+
}
518+
$attributes[TraceAttributes::DB_OPERATION_NAME] = 'READ';
519+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
520+
self::endSpan($attributes, $exception, $errorStatus);
521+
}
522+
523+
private static function loReadAllPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
524+
{
525+
$attributes = [];
526+
527+
$lob = $params[0];
528+
if ($lob instanceof Lob) {
529+
if ($connection = $tracker->getConnectionFromLob($lob)) {
530+
$attributes = $tracker->getConnectionAttributes($connection);
531+
}
532+
if ($retVal !== false) {
533+
$attributes['db.postgres.bytes_read'] = $retVal;
534+
}
535+
}
536+
$attributes[TraceAttributes::DB_OPERATION_NAME] = 'READ';
537+
538+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
539+
self::endSpan($attributes, $exception, $errorStatus);
540+
}
541+
542+
private static function loUnlinkPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, $obj, array $params, mixed $retVal, ?\Throwable $exception)
543+
{
544+
$attributes = $tracker->getConnectionAttributes($params[0]);
545+
$attributes[TraceAttributes::DB_OPERATION_NAME] = 'DELETE';
546+
547+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
548+
self::endSpan($attributes, $exception, $errorStatus);
549+
}
550+
551+
552+
private static function loImportExportPostHook(CachedInstrumentation $instrumentation, PgSqlTracker $tracker, string $operation, $obj, array $params, mixed $retVal, ?\Throwable $exception)
553+
{
554+
$attributes = [];
555+
556+
$lob = $params[0];
557+
if ($lob instanceof Lob) {
558+
if ($connection = $tracker->getConnectionFromLob($lob)) {
559+
$attributes = $tracker->getConnectionAttributes($connection);
560+
}
561+
}
562+
$attributes[TraceAttributes::DB_OPERATION_NAME] = $operation;
563+
564+
$errorStatus = $retVal == false ? pg_last_error($params[0]) : null;
565+
self::endSpan($attributes, $exception, $errorStatus);
566+
}
567+
346568
/** @param non-empty-string $spanName */
347569
private static function startSpan(string $spanName, CachedInstrumentation $instrumentation, ?string $class, ?string $function, ?string $filename, ?int $lineno, iterable $attributes) : SpanInterface
348570
{

0 commit comments

Comments
 (0)