Skip to content

Commit 45fc8a7

Browse files
committed
ssh: support --zstd compress metadata
1 parent c52088a commit 45fc8a7

File tree

6 files changed

+66
-18
lines changed

6 files changed

+66
-18
lines changed

docs/protocol.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ struct metadata {
170170
+ `Accept: application/x-zeta-metadata` 传输流不压缩。
171171
+ `Accept: application/x-zeta-compress-metadata`,传输流使用 ZSTD 压缩。
172172
173+
SSH 协议可以添加参数 `--zstd` 开启元数据压缩。
174+
173175
#### 2.2.2 基本元数据下载
174176
在 HugeSCM 系统中,只需要获得最新的 `revision`及其 tree 就行了,这里 `revision`可以是 `commit`也可以是 `tag`,如果是 `tag`对象需进一步解析到 `commit`为止。
175177

pkg/serve/config.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,6 @@ func (d *Database) MakeConfig() (*mysql.Config, error) {
6565
// http://iokde.com/post/go-mysql-max_allowed_packet.html
6666
// https://wangming1993.github.io/2019/02/25/go-mysql-disable-max-allowed-packet/
6767
cfg.MaxAllowedPacket = maxAllowedPacket
68-
cfg.Params = map[string]string{
69-
"charset": "utf8",
70-
}
7168

7269
return cfg, nil
7370
}

pkg/serve/protocol/pack.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,17 +138,33 @@ type Packer struct {
138138
}
139139

140140
// NewPipePacker: SSH protocol
141-
func NewPipePacker(o odb.DB, w io.Writer, treeMaxDepth int) (*Packer, error) {
141+
func NewPipePacker(o odb.DB, w io.Writer, treeMaxDepth int, useZSTD bool) (*Packer, error) {
142142
if treeMaxDepth == -1 {
143143
treeMaxDepth = math.MaxInt
144144
}
145-
buffedWriter := streamio.GetBufferWriter(w)
146-
closeFn := func() error {
147-
err := buffedWriter.Flush()
148-
streamio.PutBufferWriter(buffedWriter)
149-
return err
145+
var bodyWriter io.Writer
146+
var closeFn func() error
147+
switch {
148+
case useZSTD:
149+
buffedWriter := streamio.GetBufferWriter(w)
150+
zstdWriter := streamio.GetZstdWriter(buffedWriter)
151+
closeFn = func() error {
152+
streamio.PutZstdWriter(zstdWriter)
153+
err := buffedWriter.Flush()
154+
streamio.PutBufferWriter(buffedWriter)
155+
return err
156+
}
157+
bodyWriter = zstdWriter
158+
default:
159+
buffedWriter := streamio.GetBufferWriter(w)
160+
closeFn = func() error {
161+
err := buffedWriter.Flush()
162+
streamio.PutBufferWriter(buffedWriter)
163+
return err
164+
}
165+
bodyWriter = buffedWriter
150166
}
151-
cw := crc.NewCrc64Writer(buffedWriter)
167+
cw := crc.NewCrc64Writer(bodyWriter)
152168
p := &Packer{DB: o, w: cw, Finisher: cw, treeMaxDepth: treeMaxDepth, closeFn: closeFn, seen: make(map[plumbing.Hash]bool)}
153169
if err := writeMetadataHeader(cw); err != nil {
154170
_ = p.Close()

pkg/serve/sshserver/command_metadata.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ import (
1919

2020
// zeta-serve metadata "group/mono-zeta" --batch --depth=1
2121

22+
const (
23+
UseZSTD rune = 1000
24+
)
25+
2226
type Metadata struct {
2327
Path string
2428
Revision string
@@ -28,6 +32,7 @@ type Metadata struct {
2832
Depth int
2933
Batch bool
3034
Sparse bool
35+
UseZSTD bool
3136
}
3237

3338
func (c *Metadata) ParseArgs(args []string) error {
@@ -40,7 +45,8 @@ func (c *Metadata) ParseArgs(args []string) error {
4045
Add("deepen-from", REQUIRED, 'F').
4146
Add("deepen", REQUIRED, 'D').
4247
Add("sparse", NOARG, 'S').
43-
Add("batch", NOARG, 'B')
48+
Add("batch", NOARG, 'B').
49+
Add("zstd", NOARG, UseZSTD)
4450
if err := p.Parse(args, func(index rune, nextArg, raw string) error {
4551
switch index {
4652
case 'R':
@@ -72,6 +78,8 @@ func (c *Metadata) ParseArgs(args []string) error {
7278
c.Sparse = true
7379
case 'B':
7480
c.Batch = true
81+
case UseZSTD:
82+
c.UseZSTD = true
7583
}
7684
return nil
7785
}); err != nil {
@@ -89,7 +97,7 @@ func (c *Metadata) Exec(ctx *RunCtx) int {
8997
return exitCode
9098
}
9199
if c.Batch {
92-
return ctx.S.BatchMetadata(ctx.Session, c.Depth)
100+
return ctx.S.BatchMetadata(ctx.Session, c.Depth, c.UseZSTD)
93101
}
94102
if c.Sparse {
95103
return ctx.S.GetSparseMetadata(ctx.Session, c)
@@ -110,7 +118,7 @@ func (s *Server) FetchMetadata(e *Session, c *Metadata) int {
110118
if ro.Target == nil {
111119
return e.ExitFormat(400, "revision %s target not commit", c.Revision)
112120
}
113-
p, err := protocol.NewPipePacker(rr.ODB(), e, c.Depth)
121+
p, err := protocol.NewPipePacker(rr.ODB(), e, c.Depth, c.UseZSTD)
114122
if err != nil {
115123
logrus.Errorf("new packer error %v", err)
116124
return e.ExitError(err)
@@ -153,7 +161,7 @@ func (s *Server) GetSparseMetadata(e *Session, c *Metadata) int {
153161
return e.ExitFormat(400, "revision %s target not commit", c.Revision)
154162
}
155163
cc := ro.Target
156-
p, err := protocol.NewPipePacker(rr.ODB(), e, c.Depth)
164+
p, err := protocol.NewPipePacker(rr.ODB(), e, c.Depth, c.UseZSTD)
157165
if err != nil {
158166
logrus.Errorf("new packer error %v", err)
159167
return e.ExitError(err)
@@ -176,7 +184,7 @@ func (s *Server) GetSparseMetadata(e *Session, c *Metadata) int {
176184
return 0
177185
}
178186

179-
func (s *Server) BatchMetadata(e *Session, depth int) int {
187+
func (s *Server) BatchMetadata(e *Session, depth int, useZSTD bool) int {
180188
oids, err := protocol.ReadInputOIDs(e)
181189
if err != nil {
182190
e.WriteError("batch-oids: %v", err)
@@ -196,7 +204,7 @@ func (s *Server) BatchMetadata(e *Session, depth int) int {
196204
}
197205
objects = append(objects, a)
198206
}
199-
p, err := protocol.NewPipePacker(rr.ODB(), e, depth)
207+
p, err := protocol.NewPipePacker(rr.ODB(), e, depth, useZSTD)
200208
if err != nil {
201209
logrus.Errorf("new packer error %v", err)
202210
return e.ExitError(err)

pkg/transport/http/metadata.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (r decompressReader) Close() error {
4545

4646
func newDecompressReader(rc io.ReadCloser, h http.Header) (io.ReadCloser, error) {
4747
switch conentType := h.Get("Content-Type"); conentType {
48-
case "application/x-zeta-metadata":
48+
case ZETA_MIME_METADATA:
4949
return &decompressReader{
5050
Reader: rc,
5151
closer: []io.Closer{rc},

pkg/transport/ssh/metadata.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/antgroup/hugescm/modules/plumbing"
1515
"github.com/antgroup/hugescm/pkg/transport"
16+
"github.com/klauspost/compress/zstd"
1617
)
1718

1819
// FetchReference: zeta-serve ls-remote "group/mono-zeta" --reference "${REFNAME}"
@@ -59,6 +60,24 @@ func sparsesGenReader(sparses []string) io.Reader {
5960
return strings.NewReader(b.String())
6061
}
6162

63+
type decompressReader struct {
64+
decoder *zstd.Decoder
65+
cmd *Command
66+
}
67+
68+
func (r decompressReader) Read(p []byte) (n int, err error) {
69+
return r.decoder.Read(p)
70+
}
71+
72+
func (r decompressReader) Close() error {
73+
r.decoder.Close()
74+
return r.cmd.Close()
75+
}
76+
77+
func (r *decompressReader) LastError() error {
78+
return r.cmd.lastError
79+
}
80+
6281
// FetchMetadata: support base metadata and sparses metadata.
6382
//
6483
// zeta-serve metadata "group/mono-zeta" --revision "${REVISION}" --depth=1 --deepen-from=${from}
@@ -80,6 +99,7 @@ func (c *client) FetchMetadata(ctx context.Context, target plumbing.Hash, opts *
8099
if len(opts.Sparses) != 0 {
81100
psArgs = append(psArgs, "--sparse")
82101
}
102+
psArgs = append(psArgs, "--zstd")
83103
commandArgs := strings.Join(psArgs, " ")
84104
cmd, err := c.NewBaseCommand(ctx)
85105
if err != nil {
@@ -94,5 +114,10 @@ func (c *client) FetchMetadata(ctx context.Context, target plumbing.Hash, opts *
94114
_ = cmd.Close()
95115
return nil, err
96116
}
97-
return cmd, nil
117+
zr, err := zstd.NewReader(cmd)
118+
if err != nil {
119+
_ = cmd.Close()
120+
return nil, err
121+
}
122+
return &decompressReader{decoder: zr, cmd: cmd}, nil
98123
}

0 commit comments

Comments
 (0)