Skip to content

Commit 61b21d4

Browse files
vitalyisaev2pstpn
andauthored
Prometheus: basic support (#337)
* Prometheus: basic support (#267) * Actualize branch Co-authored-by: Stepan <[email protected]>
1 parent ab32360 commit 61b21d4

35 files changed

+2174
-1500
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ scripts/bench/ydb
1111
__pycache__
1212
*.test
1313
.vscode
14+
.idea
15+
.DS_Store
1416

1517
.ipynb_checkpoints
1618
*.csv

app/client/connector/read_table.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func doReadTable(
9595
api_common.EGenericDataSourceKind_MYSQL, api_common.EGenericDataSourceKind_GREENPLUM,
9696
api_common.EGenericDataSourceKind_ORACLE, api_common.EGenericDataSourceKind_LOGGING,
9797
api_common.EGenericDataSourceKind_MONGO_DB, api_common.EGenericDataSourceKind_REDIS,
98-
api_common.EGenericDataSourceKind_OPENSEARCH:
98+
api_common.EGenericDataSourceKind_OPENSEARCH, api_common.EGenericDataSourceKind_PROMETHEUS:
9999
typeMappingSettings := &api_service_protos.TTypeMappingSettings{
100100
DateTimeFormat: dateTimeFormat,
101101
}

app/config/server.pb.go

Lines changed: 424 additions & 330 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/config/server.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,18 @@ message TOpenSearchConfig {
262262
TExponentialBackoffConfig exponential_backoff = 10;
263263
}
264264

265+
// TPrometheusConfig contains settings specific for Prometheus data source
266+
message TPrometheusConfig {
267+
// Timeout for Prometheus connection opening.
268+
// Valid values should satisfy `time.ParseDuration` (e. g. '5s', '100ms', '3h').
269+
string open_connection_timeout = 1;
270+
271+
// By default, we use `5e+7 bytes` ~ 50MB for the maximum size of frame, that we can read at once from data source
272+
uint64 chunked_read_limit = 2;
273+
274+
TExponentialBackoffConfig exponential_backoff = 10;
275+
}
276+
265277
// TPostgreSQLConfig contains settings specific for PostgreSQL data source
266278
message TPostgreSQLConfig {
267279
// Timeout for PostgreSQL connection opening.
@@ -410,6 +422,7 @@ message TDatasourcesConfig {
410422
TMongoDbConfig mongodb = 9;
411423
TRedisConfig redis = 10;
412424
TOpenSearchConfig opensearch = 11;
425+
TPrometheusConfig prometheus = 12;
413426
}
414427

415428
// TObservationConfig contains configuration for query observation system.

app/server/config/config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,19 @@ func fillServerConfigDefaults(c *config.TServerConfig) {
189189
c.Datasources.Opensearch.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
190190
}
191191

192+
// Prometheus
193+
194+
if c.Datasources.Prometheus == nil {
195+
c.Datasources.Prometheus = &config.TPrometheusConfig{
196+
OpenConnectionTimeout: "5s",
197+
ChunkedReadLimit: 5e7,
198+
}
199+
}
200+
201+
if c.Datasources.Prometheus.ExponentialBackoff == nil {
202+
c.Datasources.Prometheus.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
203+
}
204+
192205
// PostgreSQL
193206

194207
if c.Datasources.Postgresql == nil {

app/server/data_source_collection.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/ydb-platform/fq-connector-go/app/server/datasource/nosql/mongodb"
1717
"github.com/ydb-platform/fq-connector-go/app/server/datasource/nosql/opensearch"
1818
"github.com/ydb-platform/fq-connector-go/app/server/datasource/nosql/redis"
19+
"github.com/ydb-platform/fq-connector-go/app/server/datasource/prometheus"
1920
"github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms"
2021
"github.com/ydb-platform/fq-connector-go/app/server/observation"
2122
"github.com/ydb-platform/fq-connector-go/app/server/paging"
@@ -91,6 +92,18 @@ func (dsc *DataSourceCollection) DescribeTable(
9192
dsc.queryLoggerFactory.Make(logger),
9293
)
9394

95+
return ds.DescribeTable(ctx, logger, request)
96+
case api_common.EGenericDataSourceKind_PROMETHEUS:
97+
prometheusCfg := dsc.cfg.Datasources.Prometheus
98+
ds := prometheus.NewDataSource(
99+
&retry.RetrierSet{
100+
MakeConnection: retry.NewRetrierFromConfig(prometheusCfg.ExponentialBackoff, retry.ErrorCheckerMakeConnectionCommon),
101+
Query: retry.NewRetrierFromConfig(prometheusCfg.ExponentialBackoff, retry.ErrorCheckerNoop),
102+
},
103+
prometheusCfg,
104+
dsc.converterCollection,
105+
)
106+
94107
return ds.DescribeTable(ctx, logger, request)
95108
default:
96109
return nil, fmt.Errorf("unsupported data source type '%v': %w", kind, common.ErrDataSourceNotSupported)
@@ -172,6 +185,23 @@ func (dsc *DataSourceCollection) ListSplits(
172185
if err := streamer.Run(); err != nil {
173186
return fmt.Errorf("run streamer: %w", err)
174187
}
188+
case api_common.EGenericDataSourceKind_PROMETHEUS:
189+
prometheusCfg := dsc.cfg.Datasources.Prometheus
190+
ds := prometheus.NewDataSource(
191+
&retry.RetrierSet{
192+
MakeConnection: retry.NewRetrierFromConfig(prometheusCfg.ExponentialBackoff, retry.ErrorCheckerMakeConnectionCommon),
193+
Query: retry.NewRetrierFromConfig(prometheusCfg.ExponentialBackoff, retry.ErrorCheckerNoop),
194+
},
195+
prometheusCfg,
196+
dsc.converterCollection,
197+
)
198+
199+
streamer := streaming.NewListSplitsStreamer(logger, stream, ds, request, slct)
200+
201+
if err := streamer.Run(); err != nil {
202+
return fmt.Errorf("run streamer: %w", err)
203+
}
204+
175205
default:
176206
return fmt.Errorf("unsupported data source type '%v': %w", kind, common.ErrDataSourceNotSupported)
177207
}
@@ -242,6 +272,19 @@ func (dsc *DataSourceCollection) ReadSplit(
242272
dsc.queryLoggerFactory.Make(logger),
243273
)
244274

275+
return doReadSplit(
276+
logger, stream, request, split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.observationStorage, dsc.cfg)
277+
case api_common.EGenericDataSourceKind_PROMETHEUS:
278+
prometheusCfg := dsc.cfg.Datasources.Prometheus
279+
ds := prometheus.NewDataSource(
280+
&retry.RetrierSet{
281+
MakeConnection: retry.NewRetrierFromConfig(prometheusCfg.ExponentialBackoff, retry.ErrorCheckerMakeConnectionCommon),
282+
Query: retry.NewRetrierFromConfig(prometheusCfg.ExponentialBackoff, retry.ErrorCheckerNoop),
283+
},
284+
prometheusCfg,
285+
dsc.converterCollection,
286+
)
287+
245288
return doReadSplit(
246289
logger, stream, request, split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.observationStorage, dsc.cfg)
247290

app/server/datasource/prometheus/Makefile

Lines changed: 0 additions & 9 deletions
This file was deleted.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
bench:
2+
go run bench/client.go
3+
4+
.PHONY: bench

app/server/datasource/prometheus/bench/client.go renamed to app/server/datasource/prometheus/analysis/bench/client.go

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@ var bodySize = int64(0)
2424

2525
func main() {
2626
runtime.GOMAXPROCS(1)
27+
2728
ctx := context.Background()
2829

2930
readClient, err := remote.NewReadClient("remote-read-test", &remote.ClientConfig{
3031
URL: &config.URL{URL: mustParseURL("http://localhost:9090/api/v1/read")},
3132
Timeout: model.Duration(1000 * time.Second),
3233
ChunkedReadLimit: cfg.DefaultChunkedReadLimit,
3334
})
35+
if err != nil {
36+
log.Fatal(err)
37+
}
38+
3439
readClient.(*remote.Client).Client = &http.Client{
3540
Transport: &Transport{Transport: http.DefaultTransport},
3641
}
@@ -40,9 +45,11 @@ func main() {
4045
log.Fatal(err)
4146
}
4247

48+
now := time.Now()
49+
4350
pbQuery, err := remote.ToQuery(
44-
int64(model.TimeFromUnixNano(time.Now().Add(-48*time.Hour).UnixNano())),
45-
int64(model.TimeFromUnixNano(time.Now().UnixNano())),
51+
int64(model.TimeFromUnixNano(now.Add(-20*time.Minute).UnixNano())),
52+
int64(model.TimeFromUnixNano(now.UnixNano())),
4653
matchers,
4754
nil,
4855
)
@@ -55,37 +62,41 @@ func main() {
5562
measureCount := 1.0
5663
measureCountInt := int64(measureCount)
5764
start := time.Now()
65+
5866
for range measureCountInt {
5967
timeseries, err := readClient.Read(ctx, pbQuery, false)
6068
if err != nil {
6169
log.Fatal(err)
6270
}
6371

6472
var it chunkenc.Iterator
73+
6574
for timeseries.Next() {
6675
s := timeseries.At()
67-
it := s.Iterator(it)
76+
l := s.Labels().String()
77+
iter := s.Iterator(it)
6878

69-
//l := s.Labels().String()
70-
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
79+
for vt := iter.Next(); vt != chunkenc.ValNone; vt = iter.Next() {
7180
atomic.AddInt64(&metricsCount, 1)
81+
7282
switch vt {
7383
case chunkenc.ValFloat:
74-
ts, v := it.At()
84+
ts, v := iter.At()
7585
vv += float64(ts) + v
76-
//fmt.Printf("%s %g %d\n", l, v, ts)
86+
fmt.Printf("%s %g %d\n", l, v, ts)
7787
case chunkenc.ValHistogram:
78-
ts, h := it.AtHistogram(nil)
88+
ts, h := iter.AtHistogram(nil)
7989
vv += float64(ts) + h.Sum
80-
//fmt.Printf("%s %s %d\n", l, h.String(), ts)
90+
fmt.Printf("%s %s %d\n", l, h.String(), ts)
8191
case chunkenc.ValFloatHistogram:
82-
ts, h := it.AtFloatHistogram(nil)
92+
ts, h := iter.AtFloatHistogram(nil)
8393
vv += float64(ts) + h.Sum
84-
//fmt.Printf("%s %s %d\n", l, h.String(), ts)
94+
fmt.Printf("%s %s %d\n", l, h.String(), ts)
8595
default:
8696
panic("unreachable")
8797
}
8898
}
99+
89100
if err := timeseries.Err(); err != nil {
90101
log.Fatal(err)
91102
}
@@ -99,14 +110,19 @@ func main() {
99110
fmt.Printf("Metrics count: %d\n", metricsCount/measureCountInt)
100111
fmt.Printf("Size: %.3f KB\n", avgSize)
101112
fmt.Printf("Avg time: %.5f s\n", avgTime)
102-
fmt.Printf("Throughput: %.3f KB/s; %.3f MB/s; %.3f GB/s\n", avgSize/avgTime, (avgSize/1024.0)/avgTime, (avgSize/(1024.0*1024.0))/avgTime)
113+
fmt.Printf("Throughput: %.3f KB/s; %.3f MB/s; %.3f GB/s\n",
114+
avgSize/avgTime,
115+
(avgSize/1024.0)/avgTime,
116+
(avgSize/(1024.0*1024.0))/avgTime,
117+
)
103118
}
104119

105120
func mustParseURL(raw string) *url.URL {
106121
u, err := url.Parse(raw)
107122
if err != nil {
108123
log.Fatal(err)
109124
}
125+
110126
return u
111127
}
112128

@@ -120,18 +136,19 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
120136
return nil, err
121137
}
122138

123-
body, err := t.cloneBody(res.Body)
139+
body, err := cloneBody(res.Body)
124140
if err != nil {
125141
return nil, err
126142
}
143+
127144
atomic.AddInt64(&bodySize, int64(len(body)))
128145

129146
res.Body = io.NopCloser(bytes.NewBuffer(body))
130147

131148
return res, nil
132149
}
133150

134-
func (t *Transport) cloneBody(body io.ReadCloser) ([]byte, error) {
151+
func cloneBody(body io.ReadCloser) ([]byte, error) {
135152
if body == nil {
136153
return nil, nil
137154
}

app/server/datasource/prometheus/comparison.md renamed to app/server/datasource/prometheus/analysis/comparison.md

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -135,26 +135,9 @@ trino:default> select * from up limit 10;
135135
136136
Результатом `SELECT *` запроса всегда будет таблица следующего вида:
137137
138-
| Тип метрики | Название метрики (`__name__`) | ... | Лэйблы (`label`) | ... | Время (`timestamp`) | Значение (`value`) |
139-
|-------------|-------------------------------|----------|------------------|----------|---------------------|-----------------------------------------------------------------------------------------------------|
140-
| `String` | `String` | `String` | `String` | `String` | `Timestamp` | `Double` \| `List<Double>` \| `Dict<Double, Uint64 \| Double \| List<Uint64> \| List<Double>>` |
141-
142-
**Колонка "Значение (`value`)" может содержать разные типы данных, зависимость описана в разделе ниже**
143-
144-
### Колонка `value` результирующей таблицы YDB
145-
146-
В таблице ниже описана зависимость типы данных в результирующей таблице YDB от типа метрики и типа данных Prometheus (от простейшего запроса с простейшими метриками)
147-
148-
| Тип метрики Prometheus | Тип данных Prometheus | Пример запроса Prometheus | YDB | Комментарий |
149-
|------------------------|-----------------------|----------------------------------------|-------------------------------|---------------------------------------------------------------------------------------------------------------------|
150-
| `counter` | `Instant vector` | `echo_requests_total` | `Double` | `echo_requests_total` - `counter` метрика |
151-
| `counter` | `Range vector` | `echo_requests_total[10s]` | `List<Double>` | `List<10 значений метрики из предыдущих 10 секунд>` |
152-
| `gauge` | `Instant vector` | `go_memstats_sys_bytes` | `Double` | `go_memstats_sys_bytes` - `gauge` метрика |
153-
| `gauge` | `Range vector` | `go_memstats_sys_bytes[10s]` | `List<Double>` | `List<10 значений метрики из предыдущих 10 секунд>` |
154-
| `histogram` | `Instant vector` | `echo_response_size_bytes_bucket` | `Dict<Double, Uint64>` | `echo_response_size_bytes_bucket` - `histogram` метрика; <br/><br/> `Dict<значение le (<=), кол-во значений <= le>` |
155-
| `histogram` | `Range vector` | `echo_response_size_bytes_bucket[10s]` | `Dict<Double, List<Uint64>>` | `Dict<значение le (<=), List<10 значений кол-ва предыдущих 10 секунд>>` |
156-
| `summary` | `Instant vector` | `go_gc_duration_seconds` | `Dict<Float, Double>` | `go_gc_duration_seconds` - `summary` метрика; <br/><br/> `Dict<уровень quantile (от 0 до 1), значение quantile>` |
157-
| `summary` | `Range vector` | `go_gc_duration_seconds[10s]` | `Dict<Float, List<Double>>` | `Dict<уровень quantile (от 0 до 1), List<значения 10 квантилей предыдущих 10 секунд>>` |
138+
| Название метрики (`__name__`) | ... | Лэйблы (`label`) | ... | Время (`timestamp`) | Значение (`value`) |
139+
|-------------------------------|----------|------------------|----------|---------------------|--------------------|
140+
| `String` | `String` | `String` | `String` | `Timestamp` | `Double` |
158141
159142
### API Prometheus
160143

0 commit comments

Comments
 (0)