Skip to content

Commit 898af61

Browse files
authored
promtool: support creating tsdb blocks from a pipe (prometheus#16011)
This is very useful when piping the input file to stdin and then using /dev/stdin as the input file. e.g. xzcat dump.xz | promtool tsdb create-blocks-from openmetrics /dev/stdin /tmp/data Signed-off-by: Nicolas Peugnet <nicolas.peugnet@lip6.fr>
1 parent 509b978 commit 898af61

File tree

2 files changed

+86
-3
lines changed

2 files changed

+86
-3
lines changed

cmd/promtool/tsdb.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -826,17 +826,31 @@ func checkErr(err error) int {
826826
}
827827

828828
func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) int {
829-
inputFile, err := fileutil.OpenMmapFile(path)
829+
var buf []byte
830+
info, err := os.Stat(path)
830831
if err != nil {
831832
return checkErr(err)
832833
}
833-
defer inputFile.Close()
834+
if info.Mode()&(os.ModeNamedPipe|os.ModeCharDevice) != 0 {
835+
// Read the pipe chunks by chunks as it cannot be mmap-ed
836+
buf, err = os.ReadFile(path)
837+
if err != nil {
838+
return checkErr(err)
839+
}
840+
} else {
841+
inputFile, err := fileutil.OpenMmapFile(path)
842+
if err != nil {
843+
return checkErr(err)
844+
}
845+
defer inputFile.Close()
846+
buf = inputFile.Bytes()
847+
}
834848

835849
if err := os.MkdirAll(outputDir, 0o777); err != nil {
836850
return checkErr(fmt.Errorf("create output dir: %w", err))
837851
}
838852

839-
return checkErr(backfill(5000, inputFile.Bytes(), outputDir, humanReadable, quiet, maxBlockDuration, customLabels))
853+
return checkErr(backfill(5000, buf, outputDir, humanReadable, quiet, maxBlockDuration, customLabels))
840854
}
841855

842856
func displayHistogram(dataType string, datas []int, total int) {

cmd/promtool/tsdb_posix_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2017 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
//go:build !windows
15+
16+
package main
17+
18+
import (
19+
"bytes"
20+
"io"
21+
"math"
22+
"os"
23+
"path"
24+
"syscall"
25+
"testing"
26+
"time"
27+
28+
"github.com/stretchr/testify/require"
29+
30+
"github.com/prometheus/prometheus/tsdb"
31+
)
32+
33+
func TestTSDBDumpOpenMetricsRoundTripPipe(t *testing.T) {
34+
initialMetrics, err := os.ReadFile("testdata/dump-openmetrics-roundtrip-test.prom")
35+
require.NoError(t, err)
36+
initialMetrics = normalizeNewLine(initialMetrics)
37+
38+
pipeDir := t.TempDir()
39+
dbDir := t.TempDir()
40+
41+
// create pipe
42+
pipe := path.Join(pipeDir, "pipe")
43+
err = syscall.Mkfifo(pipe, 0o666)
44+
require.NoError(t, err)
45+
46+
go func() {
47+
// open pipe to write
48+
in, err := os.OpenFile(pipe, os.O_WRONLY, os.ModeNamedPipe)
49+
require.NoError(t, err)
50+
defer func() { require.NoError(t, in.Close()) }()
51+
_, err = io.Copy(in, bytes.NewReader(initialMetrics))
52+
require.NoError(t, err)
53+
}()
54+
55+
// Import samples from OM format
56+
code := backfillOpenMetrics(pipe, dbDir, false, false, 2*time.Hour, map[string]string{})
57+
require.Equal(t, 0, code)
58+
db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil)
59+
require.NoError(t, err)
60+
t.Cleanup(func() {
61+
require.NoError(t, db.Close())
62+
})
63+
64+
// Dump the blocks into OM format
65+
dumpedMetrics := getDumpedSamples(t, dbDir, "", math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics)
66+
67+
// Should get back the initial metrics.
68+
require.Equal(t, string(initialMetrics), dumpedMetrics)
69+
}

0 commit comments

Comments
 (0)