@@ -3,7 +3,6 @@ package zstd
3
3
4
4
import (
5
5
"io"
6
- "runtime"
7
6
"sync"
8
7
9
8
"github.com/klauspost/compress/zstd"
@@ -29,19 +28,17 @@ func (c *Codec) Name() string { return "zstd" }
29
28
// NewReader implements the compress.Codec interface.
30
29
func (c * Codec ) NewReader (r io.Reader ) io.ReadCloser {
31
30
p := new (reader )
32
- if dec , _ := decoderPool .Get ().(* decoder ); dec == nil {
33
- z , err := zstd .NewReader (r )
31
+ if p .dec , _ = decoderPool .Get ().(* zstd.Decoder ); p .dec != nil {
32
+ p .dec .Reset (r )
33
+ } else {
34
+ z , err := zstd .NewReader (r ,
35
+ zstd .WithDecoderConcurrency (1 ),
36
+ )
34
37
if err != nil {
35
38
p .err = err
36
39
} else {
37
- p .dec = & decoder {z }
38
- // We need a finalizer because the reader spawns goroutines
39
- // that will only be stopped if the Close method is called.
40
- runtime .SetFinalizer (p .dec , (* decoder ).finalize )
40
+ p .dec = z
41
41
}
42
- } else {
43
- p .dec = dec
44
- p .err = dec .Reset (r )
45
42
}
46
43
return p
47
44
}
@@ -57,18 +54,10 @@ func (c *Codec) zstdLevel() zstd.EncoderLevel {
57
54
return zstd .EncoderLevelFromZstd (c .level ())
58
55
}
59
56
60
- var decoderPool sync.Pool // *decoder
61
-
62
- type decoder struct {
63
- * zstd.Decoder
64
- }
65
-
66
- func (d * decoder ) finalize () {
67
- d .Close ()
68
- }
57
+ var decoderPool sync.Pool // *zstd.Decoder
69
58
70
59
type reader struct {
71
- dec * decoder
60
+ dec * zstd. Decoder
72
61
err error
73
62
}
74
63
@@ -88,6 +77,9 @@ func (r *reader) Read(p []byte) (int, error) {
88
77
if r .err != nil {
89
78
return 0 , r .err
90
79
}
80
+ if r .dec == nil {
81
+ return 0 , io .EOF
82
+ }
91
83
return r .dec .Read (p )
92
84
}
93
85
@@ -96,21 +88,25 @@ func (r *reader) WriteTo(w io.Writer) (int64, error) {
96
88
if r .err != nil {
97
89
return 0 , r .err
98
90
}
91
+ if r .dec == nil {
92
+ return 0 , io .ErrClosedPipe
93
+ }
99
94
return r .dec .WriteTo (w )
100
95
}
101
96
102
97
// NewWriter implements the compress.Codec interface.
103
98
func (c * Codec ) NewWriter (w io.Writer ) io.WriteCloser {
104
99
p := new (writer )
105
- if enc , _ := c .encoderPool .Get ().(* encoder ); enc == nil {
106
- z , err := zstd .NewWriter (w , zstd .WithEncoderLevel (c .zstdLevel ()))
100
+ if enc , _ := c .encoderPool .Get ().(* zstd.Encoder ); enc == nil {
101
+ z , err := zstd .NewWriter (w ,
102
+ zstd .WithEncoderLevel (c .zstdLevel ()),
103
+ zstd .WithEncoderConcurrency (1 ),
104
+ zstd .WithZeroFrames (true ),
105
+ )
107
106
if err != nil {
108
107
p .err = err
109
108
} else {
110
- p .enc = & encoder {z }
111
- // We need a finalizer because the writer spawns goroutines
112
- // that will only be stopped if the Close method is called.
113
- runtime .SetFinalizer (p .enc , (* encoder ).finalize )
109
+ p .enc = z
114
110
}
115
111
} else {
116
112
p .enc = enc
@@ -120,17 +116,9 @@ func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
120
116
return p
121
117
}
122
118
123
- type encoder struct {
124
- * zstd.Encoder
125
- }
126
-
127
- func (e * encoder ) finalize () {
128
- e .Close ()
129
- }
130
-
131
119
type writer struct {
132
120
c * Codec
133
- enc * encoder
121
+ enc * zstd. Encoder
134
122
err error
135
123
}
136
124
@@ -149,7 +137,7 @@ func (w *writer) Close() error {
149
137
w .enc = nil
150
138
return err
151
139
}
152
- return nil
140
+ return w . err
153
141
}
154
142
155
143
// WriteTo implements the io.WriterTo interface.
0 commit comments