Skip to content

Commit 47950dd

Browse files
zhou-hongyuHongyu Zhou
andauthored
[CONFIG-326][stage][prod] Save download metrics and have reflector emitting it (#121)
* pool vectors udpport from download script * address feedbacks * address feedback * address feedback * remove netcat timeout * unblock * see what's blocking * fix * fix * fix * fix * TIL the condition on the while loop is only tested at the beginning of each time through that loop * fix * check if there is gap * don't emit if already present * experiment * write downloading time into a file * fix * address feedback * address feedbacks * emit extra metric after reflector finished downloading the snapshot via s3 client * surface log * fix test * fix * address feedbacks * address feedbacks * explicit flushing stats * fix * address feedbacks * add test for emitMetricFromFile * fix panic * fix panic * fix * refactor * run in parallel --------- Co-authored-by: Hongyu Zhou <[email protected]>
1 parent 975ec0b commit 47950dd

File tree

5 files changed

+139
-10
lines changed

5 files changed

+139
-10
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
/bin
55
.coverprofile
66
.vscode
7+
.idea

pkg/reflector/download.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func (d *S3Downloader) DownloadTo(w io.Writer) (n int64, err error) {
7171
if compressedSize != nil {
7272
events.Log("LDB inflated %d -> %d bytes", *compressedSize, n)
7373
}
74+
7475
return
7576
}
7677

pkg/reflector/reflector.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package reflector
22

33
import (
4+
"bytes"
45
"context"
56
"database/sql"
67
"encoding/base64"
8+
"encoding/json"
79
"fmt"
810
"io"
911
"net/url"
@@ -25,6 +27,7 @@ import (
2527
"github.com/segmentio/errors-go"
2628
"github.com/segmentio/events/v2"
2729
_ "github.com/segmentio/events/v2/log" // lets events actually log
30+
2831
"github.com/segmentio/stats/v4"
2932
)
3033

@@ -73,6 +76,12 @@ type ReflectorConfig struct {
7376
BusyTimeoutMS int // optional
7477
}
7578

79+
type DownloadMetric struct {
80+
StartTime int `json:"startTime,omitempty"`
81+
Downloaded string `json:"downloaded"`
82+
Compressed string `json:"compressed"`
83+
}
84+
7685
type starter interface {
7786
Start(ctx context.Context)
7887
}
@@ -175,6 +184,13 @@ func ReflectorFromConfig(config ReflectorConfig) (*Reflector, error) {
175184

176185
events.Log("Max known ledger sequence: %{seq}d", maxKnownSeq)
177186

187+
path := "/var/spool/ctlstore/metrics.json"
188+
err = emitMetricFromFile(path)
189+
if err != nil {
190+
events.Log("Failed to emit metric from file", err)
191+
}
192+
events.Log("Successfully emitted metric from file")
193+
178194
// TODO: check Upstream fields
179195

180196
stop := make(chan struct{})
@@ -280,6 +296,45 @@ func ReflectorFromConfig(config ReflectorConfig) (*Reflector, error) {
280296
}, nil
281297
}
282298

299+
func emitMetricFromFile(path string) error {
300+
if _, err := os.Stat(path); err != nil {
301+
switch {
302+
case os.IsNotExist(err):
303+
return nil
304+
default:
305+
return err
306+
}
307+
}
308+
309+
metricsFile, err := os.Open(path)
310+
if err != nil {
311+
return err
312+
}
313+
314+
b, err := io.ReadAll(metricsFile)
315+
if err != nil {
316+
return err
317+
}
318+
319+
var dm DownloadMetric
320+
321+
d := json.NewDecoder(bytes.NewReader(b))
322+
d.DisallowUnknownFields()
323+
err = d.Decode(&dm)
324+
if err != nil {
325+
return err
326+
}
327+
328+
stats.Observe("init_snapshot_download_time", dm.StartTime, stats.T("downloaded", dm.Downloaded), stats.T("compressed", dm.Compressed))
329+
330+
defer func() {
331+
metricsFile.Close()
332+
os.Remove(path)
333+
}()
334+
335+
return nil
336+
}
337+
283338
func (r *Reflector) Start(ctx context.Context) error {
284339
events.Log("Starting Reflector.")
285340
go r.ledgerMonitor.Start(ctx)

pkg/reflector/reflector_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"encoding/base64"
7+
"github.com/stretchr/testify/assert"
78
"io/ioutil"
89
"os"
910
"path/filepath"
@@ -16,6 +17,7 @@ import (
1617
"github.com/segmentio/ctlstore/pkg/ldb"
1718
"github.com/segmentio/ctlstore/pkg/ldbwriter"
1819
"github.com/segmentio/ctlstore/pkg/ledger"
20+
"github.com/segmentio/errors-go"
1921
"github.com/segmentio/events/v2"
2022
"github.com/stretchr/testify/require"
2123
)
@@ -220,3 +222,76 @@ func TestReflector(t *testing.T) {
220222
err = reflector.Close()
221223
require.NoError(t, err)
222224
}
225+
226+
func TestEmitMetricFromFile(t *testing.T) {
227+
for _, test := range []struct {
228+
name string
229+
fileName string
230+
extra string
231+
content string
232+
perm int
233+
err error
234+
}{
235+
{
236+
"file does not exist doesn't return error",
237+
"1.jso",
238+
"n",
239+
"{\"startTime\": \"6\", \"downloaded\": \"true\", \"compressed\": \"false\"}",
240+
0664,
241+
nil,
242+
},
243+
{
244+
"file exist but unable to open",
245+
"2.json",
246+
"",
247+
"{\"startTime\": 6, \"downloaded\": \"true\", \"compressed\": \"false\"}",
248+
064,
249+
errors.New("permission denied"),
250+
},
251+
{
252+
"invalid character",
253+
"3.json",
254+
"",
255+
"{\"startTime\": \"6, \"downloaded\": \"true\", \"compressed\": \"false\"}",
256+
0664,
257+
errors.New("invalid character"),
258+
},
259+
{
260+
"invalid key",
261+
"4.json",
262+
"",
263+
"{\"start\": \"6\", \"downloaded\": \"true\", \"compressed\": \"false\"}",
264+
0664,
265+
errors.New("unknown field"),
266+
},
267+
{
268+
"valid content",
269+
"5.json",
270+
"",
271+
"{\"startTime\": 6, \"downloaded\": \"true\", \"compressed\": \"false\"}",
272+
0664,
273+
nil,
274+
},
275+
} {
276+
t.Run(test.name, func(t *testing.T) {
277+
t.Parallel()
278+
tempdir := t.TempDir()
279+
f, err := os.CreateTemp(tempdir, test.fileName)
280+
assert.NoError(t, err)
281+
282+
_, err = f.Write([]byte(test.content))
283+
assert.NoError(t, err)
284+
285+
err = os.Chmod(f.Name(), os.FileMode(test.perm))
286+
assert.NoError(t, err)
287+
288+
err = emitMetricFromFile(f.Name() + test.extra)
289+
290+
if test.err == nil {
291+
require.NoError(t, err)
292+
} else {
293+
require.Contains(t, err.Error(), test.err.Error())
294+
}
295+
})
296+
}
297+
}

scripts/download.sh

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ set -eo pipefail
44

55
CTLSTORE_BOOTSTRAP_URL=$1
66
CONCURRENCY=${2:-20}
7-
STATS_IP=$3
8-
STATS_PORT=${4:-8125}
7+
DOWNLOADED="false"
8+
COMPRESSED="false"
9+
METRICS="/var/spool/ctlstore/metrics.json"
910

10-
TAGS="downloaded:false"
1111
START=$(date +%s)
1212
END=$(date +%s)
1313
if [ ! -f /var/spool/ctlstore/ldb.db ]; then
@@ -18,22 +18,19 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then
1818
cd /var/spool/ctlstore
1919
s5cmd -r 0 --log debug cp --concurrency $CONCURRENCY $CTLSTORE_BOOTSTRAP_URL .
2020

21-
TAGS="downloaded:true"
21+
DOWNLOADED="true"
2222
if [[ ${CTLSTORE_BOOTSTRAP_URL: -2} == gz ]]; then
2323
echo "Decompressing"
2424
pigz -d snapshot.db.gz
25-
TAGS="$TAGS,compressed:true"
25+
COMPRESSED="true"
2626
fi
2727

28-
TAGS="$TAGS,concurrency:$CONCURRENCY"
29-
3028
mv snapshot.db ldb.db
3129
END=$(date +%s)
3230
echo "ldb.db ready in $(($END - $START)) seconds"
3331
else
3432
echo "Snapshot already present"
3533
fi
3634

37-
if [ ! -z "$STATS_IP" ]; then
38-
echo -n "ctlstore.reflector.init_snapshot_download_time:$(($END - $START))|h|#$TAGS" | nc -u -w1 $NODE_IP $STATS_PORT
39-
fi
35+
echo "{\"startTime\": $(($END - $START)), \"downloaded\": \"$DOWNLOADED\", \"compressed\": \"$COMPRESSED\"}" > $METRICS
36+
cat $METRICS

0 commit comments

Comments
 (0)