Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 101 additions & 75 deletions app/Providers/PrometheusServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
use Spatie\Prometheus\Facades\Prometheus;

const PROM_JOB_SCRAPER_SEPARATOR = '-PROM-JOB-SCRAPER-SEPARATOR-';

/** Cache TTL for slow Prometheus metrics (10 minutes) */
const PROM_CACHE_TTL = 600;

/** Cache TTL for frequently-changing metrics like en-route counts (2 minutes) */
const PROM_CACHE_TTL_SHORT = 120;

class PrometheusServiceProvider extends ServiceProvider
{
/**
Expand Down Expand Up @@ -52,19 +59,18 @@ public function register(): void

public static function getJobsByDisplayName(string $tableName): array
{
$counts = DB::table($tableName)
->get(['queue', 'payload'])
->map(fn ($row) => [
'queue' => $row->queue,
'displayName' => json_decode($row->payload)->displayName])
->countBy(fn ($job) => $job['displayName'] . PROM_JOB_SCRAPER_SEPARATOR . $job['queue'])
// Use SQL JSON extraction instead of fetching all payload columns to PHP
// and decoding them one by one.
$jsonExtract = DB::getDriverName() === 'sqlite'
? "json_extract(payload, '$.displayName')"
: "JSON_UNQUOTE(JSON_EXTRACT(payload, '$.displayName'))";

return DB::table($tableName)
->selectRaw("count(*) AS total, queue, {$jsonExtract} AS display_name")
->groupBy('queue', 'display_name')
->get()
->map(fn ($row) => [$row->total, [$row->display_name, $row->queue]])
->toArray();

return array_map(
fn ($jobProperties, $count) => [$count, explode(PROM_JOB_SCRAPER_SEPARATOR, $jobProperties)],
array_keys($counts),
array_values($counts)
);
}

private function getHafasByType(array $getFailures): array
Expand Down Expand Up @@ -116,13 +122,13 @@ public function metaDataStats(): void
Prometheus::addGauge('Stations count')
->helpText('How many stations exist in the database?')
->value(function () {
return Station::count();
return Cache::remember('prom_station_count', PROM_CACHE_TTL, fn () => Station::count());
});

Prometheus::addGauge('Station identifiers count')
->helpText('How many station identifiers exist in the database?')
->value(function () {
return StationIdentifier::count();
return Cache::remember('prom_station_identifier_count', PROM_CACHE_TTL, fn () => StationIdentifier::count());
});

Prometheus::addGauge('Users count')
Expand All @@ -149,57 +155,65 @@ public function metaDataStats(): void
->helpText('How many hafas trips are posted grouped by operator and mode of transport?')
->labels(['operator', 'category'])
->value(function () {
return Trip::groupBy('operator_id', 'category')
->selectRaw('count(*) AS total, operator_id, category')
->with('operator')
->get()
->map(fn ($item) => [$item->total, [$item->operator?->name, $item->category]])
->toArray();
return Cache::remember('prom_trips_by_operator_category', PROM_CACHE_TTL, function () {
return Trip::leftJoin('hafas_operators', 'hafas_trips.operator_id', '=', 'hafas_operators.id')
->groupBy('hafas_trips.operator_id', 'hafas_trips.category')
->selectRaw('count(*) AS total, MAX(hafas_operators.name) AS operator_name, hafas_trips.category')
->get()
->map(fn ($item) => [$item->total, [$item->operator_name, $item->category]])
->toArray();
});
});

Prometheus::addGauge('Trip Source count')
->helpText('How many hafas trips are posted grouped by source?')
->label('source')
->value(function () {
return Trip::groupBy('source')
->selectRaw('count(*) AS total, source')
->get()
->map(fn ($item) => [$item->total, [$item->source?->value]])
->toArray();
return Cache::remember('prom_trips_by_source', PROM_CACHE_TTL, function () {
return Trip::groupBy('source')
->selectRaw('count(*) AS total, source')
->get()
->map(fn ($item) => [$item->total, [$item->source?->value]])
->toArray();
});
});

Prometheus::addGauge('Polylines count')
->helpText('How many polylines are saved grouped by source?')
->labels(['source'])
->value(function () {
return PolyLine::groupBy('source')
->selectRaw('count(*) AS total, source')
->get()
->map(fn ($item) => [$item->total, [$item->source]])
->toArray();
return Cache::remember('prom_polylines_by_source', PROM_CACHE_TTL, function () {
return PolyLine::groupBy('source')
->selectRaw('count(*) AS total, source')
->get()
->map(fn ($item) => [$item->total, [$item->source]])
->toArray();
});
});

Prometheus::addGauge('profile_image_count')
->helpText('How many profile images are stored?')
->value(function () {
$iter = new \FilesystemIterator(public_path('uploads/avatars'));
return Cache::remember('prom_profile_image_count', PROM_CACHE_TTL, function () {
$iter = new \FilesystemIterator(public_path('uploads/avatars'));

return iterator_count($iter);
return iterator_count($iter);
});
});

Prometheus::addGauge('active_statuses_count')
->helpText('How many trips are en route?')
->value(function () {
return Trip::where('departure', '<', now())
->where('arrival', '>', now())
->count();
return Cache::remember('prom_active_statuses_count', PROM_CACHE_TTL_SHORT, function () {
return Trip::where('departure', '<', now())
->where('arrival', '>', now())
->count();
});
});

}

public function queueMetrics(): void
{

Prometheus::addGauge('queue_size')
->helpText('How many items are currently in the job queue?')
->labels(['queue'])
Expand All @@ -214,19 +228,23 @@ public function queueMetrics(): void
->helpText('How many jobs have failed?')
->labels(['job_name', 'queue'])
->value(function () {
return $this->getJobsByDisplayName('failed_jobs');
return Cache::remember('prom_failed_jobs_count', PROM_CACHE_TTL_SHORT, function () {
return $this->getJobsByDisplayName('failed_jobs');
});
});

Prometheus::addGauge('completed_jobs_count')
->helpText('How many jobs are done? Old items from queue monitor table are deleted after 7 days.')
->labels(['job_name', 'status', 'queue'])
->value(function () {
return DB::table('queue_monitor')
->groupBy('name', 'status', 'queue')
->selectRaw('count(*) AS total, name, status, queue')
->get()
->map(fn ($item) => [$item->total, [$item->name, MonitorStatus::toNamedArray()[$item->status], $item->queue]])
->toArray();
return Cache::remember('prom_completed_jobs_count', PROM_CACHE_TTL_SHORT, function () {
return DB::table('queue_monitor')
->groupBy('name', 'status', 'queue')
->selectRaw('count(*) AS total, name, status, queue')
->get()
->map(fn ($item) => [$item->total, [$item->name, MonitorStatus::toNamedArray()[$item->status], $item->queue]])
->toArray();
});
});
}

Expand Down Expand Up @@ -286,47 +304,55 @@ public function oAuthMetrics(): void
->helpText('How many total (revoked and accredited) access tokens do the clients have?')
->labels(['app_name'])
->value(function () {
return DB::table('oauth_access_tokens')
->join('oauth_clients', 'oauth_access_tokens.client_id', '=', 'oauth_clients.id')
->groupBy('oauth_clients.name')
->selectRaw('count(*) AS total, oauth_clients.name AS name')
->orderBy('total', 'desc')
->limit(20)
->get()
->map(fn ($item) => [$item->total, [$item->name]])
->toArray();
return Cache::remember('prom_oauth_total_tokens', PROM_CACHE_TTL, function () {
return DB::table('oauth_access_tokens')
->join('oauth_clients', 'oauth_access_tokens.client_id', '=', 'oauth_clients.id')
->groupBy('oauth_clients.name')
->selectRaw('count(*) AS total, oauth_clients.name AS name')
->orderBy('total', 'desc')
->limit(20)
->get()
->map(fn ($item) => [$item->total, [$item->name]])
->toArray();
});
});

Prometheus::addGauge('oauth_users')
->helpText('How many access tokens do the clients have?')
->labels(['app_name'])
->value(function () {
return DB::table('oauth_access_tokens')
->join('oauth_clients', 'oauth_access_tokens.client_id', '=', 'oauth_clients.id')
->groupBy('oauth_clients.name')
->selectRaw('count(distinct oauth_access_tokens.user_id) AS total, oauth_clients.name AS name')
->where('oauth_access_tokens.revoked', '=', 0)
->whereNull('oauth_access_tokens.expires_at')
->orderBy('total', 'desc')
->limit(20)
->get()
->map(fn ($item) => [$item->total, [$item->name]])
->toArray();
return Cache::remember('prom_oauth_users', PROM_CACHE_TTL, function () {
return DB::table('oauth_access_tokens')
->join('oauth_clients', 'oauth_access_tokens.client_id', '=', 'oauth_clients.id')
->groupBy('oauth_clients.name')
->selectRaw('count(distinct oauth_access_tokens.user_id) AS total, oauth_clients.name AS name')
->where('oauth_access_tokens.revoked', '=', 0)
->whereNull('oauth_access_tokens.expires_at')
->orderBy('total', 'desc')
->limit(20)
->get()
->map(fn ($item) => [$item->total, [$item->name]])
->toArray();
});
});

Prometheus::addGauge('oauth_revoked_tokens')
->helpText('How many revoked access tokens do the clients have?')
->labels(['app_name'])
->value(function () {
return DB::table('oauth_access_tokens')
->join('oauth_clients', 'oauth_access_tokens.client_id', '=', 'oauth_clients.id')
->groupBy('oauth_clients.name')
->selectRaw('count(distinct oauth_access_tokens.user_id) AS total, oauth_clients.name AS name')
->where('oauth_access_tokens.revoked', '!=', 0)
->whereNotNull('oauth_access_tokens.expires_at', 'or')
->orderBy('total', 'desc')
->limit(20)
->get()
->map(fn ($item) => [$item->total, [$item->name]])
->toArray();
return Cache::remember('prom_oauth_revoked_tokens', PROM_CACHE_TTL, function () {
return DB::table('oauth_access_tokens')
->join('oauth_clients', 'oauth_access_tokens.client_id', '=', 'oauth_clients.id')
->groupBy('oauth_clients.name')
->selectRaw('count(distinct oauth_access_tokens.user_id) AS total, oauth_clients.name AS name')
->where('oauth_access_tokens.revoked', '!=', 0)
->whereNotNull('oauth_access_tokens.expires_at', 'or')
->orderBy('total', 'desc')
->limit(20)
->get()
->map(fn ($item) => [$item->total, [$item->name]])
->toArray();
});
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class() extends Migration
{
public function up(): void
{
Schema::table('hafas_trips', function (Blueprint $table): void {
// Fixes Trip::groupBy('source'). Previously: full table scan + tmp table + filesort on 5.5M rows
$table->index('source', 'hafas_trips_source_index');

// Fixes active_statuses_count. Previously: full table scan on 5.5M rows
// arrival first: arrival > NOW() selects only ~36 rows (very selective)
$table->index(['arrival', 'departure'], 'hafas_trips_arrival_departure_index');
});
}

public function down(): void
{
Schema::table('hafas_trips', function (Blueprint $table): void {
$table->dropIndex('hafas_trips_source_index');
$table->dropIndex('hafas_trips_arrival_departure_index');
});
}
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class() extends Migration
{
public function up(): void
{
Schema::table('oauth_access_tokens', function (Blueprint $table): void {
// Fixes three Prometheus OAuth queries. Previously: full table scan + tmp table + filesort on 587K rows
$table->index('client_id', 'oauth_access_tokens_client_id_index');
});
}

public function down(): void
{
Schema::table('oauth_access_tokens', function (Blueprint $table): void {
$table->dropIndex('oauth_access_tokens_client_id_index');
});
}
};
51 changes: 30 additions & 21 deletions tests/Unit/Providers/PrometheusServiceProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,50 @@

use App\Providers\PrometheusServiceProvider;
use Illuminate\Foundation\Testing\RefreshDatabase;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\DB;

use function PHPUnit\Framework\assertEquals;
use function PHPUnit\Framework\assertEqualsCanonicalizing;

use Tests\ApiTestCase;

class PrometheusServiceProviderTest extends ApiTestCase
{
use RefreshDatabase;

const TABLENAME = 'jobs';
private const TABLENAME = 'jobs';

public function test_get_jobs_by_display_name()
private function insertJob(string $queue, string $displayName): void
{
// GIVEN
DB::shouldReceive('table')
->with(self::TABLENAME)
->once()
->andReturnSelf();

DB::shouldReceive('get')
->with(['queue', 'payload'])
->andReturn(
Collection::make(
array_merge([
...array_fill(0, 4, (object) ['queue' => 'default', 'payload' => json_encode(['displayName' => 'JobA'])]),
...array_fill(0, 7, (object) ['queue' => 'webhook', 'payload' => json_encode(['displayName' => 'JobB'])]),
...array_fill(0, 2, (object) ['queue' => 'default', 'payload' => json_encode(['displayName' => 'JobC'])]),
...array_fill(0, 5, (object) ['queue' => 'webhook', 'payload' => json_encode(['displayName' => 'JobC'])]),
])));
DB::table(self::TABLENAME)->insert([
'queue' => $queue,
'payload' => json_encode(['displayName' => $displayName]),
'attempts' => 0,
'available_at' => now()->timestamp,
'created_at' => now()->timestamp,
]);
}

public function test_get_jobs_by_display_name(): void
{
// GIVEN: insert real rows so SQL JSON extraction can be tested end-to-end
foreach (range(1, 4) as $_) {
$this->insertJob('default', 'JobA');
}
foreach (range(1, 7) as $_) {
$this->insertJob('webhook', 'JobB');
}
foreach (range(1, 2) as $_) {
$this->insertJob('default', 'JobC');
}
foreach (range(1, 5) as $_) {
$this->insertJob('webhook', 'JobC');
}

// WHEN
$actual = PrometheusServiceProvider::getJobsByDisplayName(self::TABLENAME);

assertEquals([
// THEN: order is not guaranteed by GROUP BY, so use canonical comparison
assertEqualsCanonicalizing([
[4, ['JobA', 'default']],
[7, ['JobB', 'webhook']],
[2, ['JobC', 'default']],
Expand Down
Loading