|
5 | 5 | "context" |
6 | 6 | "encoding/json" |
7 | 7 | "fmt" |
| 8 | + "io" |
8 | 9 | "os" |
9 | 10 | "path" |
10 | 11 | "strconv" |
@@ -202,33 +203,46 @@ func (k *Keeper) Restore(dumpFile, prefix string) error { |
202 | 203 | if k.root != "" && !strings.HasPrefix(prefix, k.root) { |
203 | 204 | prefix = path.Join(k.root, prefix) |
204 | 205 | } |
205 | | - scanner := bufio.NewScanner(f) |
206 | | - for scanner.Scan() { |
| 206 | + reader := bufio.NewReader(f) |
| 207 | + for { |
| 208 | + line, readErr := reader.ReadString('\n') |
| 209 | + if readErr != nil && readErr != io.EOF { |
| 210 | + return errors.Wrapf(readErr, "can't read %s", dumpFile) |
| 211 | + } |
| 212 | + line = strings.TrimSuffix(line, "\n") |
| 213 | + if line == "" { |
| 214 | + if readErr == io.EOF { |
| 215 | + break |
| 216 | + } |
| 217 | + continue |
| 218 | + } |
207 | 219 | node := DumpNode{} |
208 | | - binaryData := scanner.Bytes() |
209 | | - if err = json.Unmarshal(binaryData, &node); err != nil { |
| 220 | + binaryData := []byte(line) |
| 221 | + if binaryUnmarshalErr := json.Unmarshal(binaryData, &node); binaryUnmarshalErr != nil { |
210 | 222 | //convert from old format |
211 | 223 | nodeString := DumpNodeString{} |
212 | 224 | if stringUnmarshalErr := json.Unmarshal(binaryData, &nodeString); stringUnmarshalErr != nil { |
213 | | - return errors.WithStack(fmt.Errorf("k.Restore can't read data binaryErr=%v, stringErr=%v", err, stringUnmarshalErr)) |
| 225 | + return errors.WithStack(fmt.Errorf("k.Restore can't read data binaryErr=%v, stringErr=%v", binaryUnmarshalErr, stringUnmarshalErr)) |
214 | 226 | } |
215 | 227 | } |
216 | 228 | node.Path = path.Join(prefix, node.Path) |
217 | 229 | version := int32(0) |
218 | | - _, stat, err := k.conn.Get(node.Path) |
219 | | - if err != nil { |
220 | | - _, err = k.conn.Create(node.Path, node.Value, 0, zk.WorldACL(zk.PermAll)) |
221 | | - if err != nil { |
222 | | - return errors.Wrapf(err, "can't create znode %s, error", node.Path) |
| 230 | + _, stat, keeperErr := k.conn.Get(node.Path) |
| 231 | + if keeperErr != nil { |
| 232 | + _, keeperErr = k.conn.Create(node.Path, node.Value, 0, zk.WorldACL(zk.PermAll)) |
| 233 | + if keeperErr != nil { |
| 234 | + return errors.Wrapf(keeperErr, "can't create znode %s, error", node.Path) |
223 | 235 | } |
224 | 236 | } else { |
225 | 237 | version = stat.Version |
226 | | - _, err = k.conn.Set(node.Path, node.Value, version) |
| 238 | + _, keeperErr = k.conn.Set(node.Path, node.Value, version) |
| 239 | + if keeperErr != nil { |
| 240 | + return errors.Wrapf(keeperErr, "can't set znode %s, error", node.Path) |
| 241 | + } |
| 242 | + } |
| 243 | + if readErr == io.EOF { |
| 244 | + break |
227 | 245 | } |
228 | | - } |
229 | | - |
230 | | - if err = scanner.Err(); err != nil { |
231 | | - return errors.WithStack(fmt.Errorf("can't scan %s, error: %s", dumpFile, err)) |
232 | 246 | } |
233 | 247 | return nil |
234 | 248 | } |
|
0 commit comments