|
1 | 1 | package inputflags |
2 | 2 |
|
3 | 3 | import ( |
4 | | - "context" |
5 | 4 | "errors" |
6 | 5 | "flag" |
7 | | - "fmt" |
8 | | - "os" |
9 | 6 |
|
10 | | - "github.com/brimdata/super" |
11 | 7 | "github.com/brimdata/super/cli/auto" |
12 | | - "github.com/brimdata/super/pkg/storage" |
13 | | - "github.com/brimdata/super/sio" |
14 | 8 | "github.com/brimdata/super/sio/anyio" |
15 | 9 | "github.com/brimdata/super/sio/bsupio" |
16 | 10 | ) |
17 | 11 |
|
18 | 12 | type Flags struct { |
19 | | - anyio.ReaderOpts |
20 | | - Dynamic bool |
21 | | - ReadMax auto.Bytes |
22 | | - ReadSize auto.Bytes |
23 | | - Threads int |
24 | | -} |
25 | | - |
26 | | -func (f *Flags) Options() anyio.ReaderOpts { |
27 | | - return f.ReaderOpts |
| 13 | + Dynamic bool |
| 14 | + ReaderOpts anyio.ReaderOpts |
| 15 | + bsupReadMax auto.Bytes |
| 16 | + bsupReadSize auto.Bytes |
28 | 17 | } |
29 | 18 |
|
30 | 19 | func (f *Flags) SetFlags(fs *flag.FlagSet, validate bool) { |
31 | | - fs.StringVar(&f.Format, "i", "auto", "format of input data [auto,arrows,bsup,csup,csv,json,line,parquet,sup,tsv,zeek,jsup]") |
32 | | - f.CSV.Delim = ',' |
| 20 | + f.bsupReadMax = auto.NewBytes(bsupio.MaxSize) |
| 21 | + fs.Var(&f.bsupReadMax, "bsup.readmax", "maximum Super Binary read buffer size in MiB, MB, etc.") |
| 22 | + f.bsupReadSize = auto.NewBytes(bsupio.ReadSize) |
| 23 | + fs.Var(&f.bsupReadSize, "bsup.readsize", "target Super Binary read buffer size in MiB, MB, etc.") |
| 24 | + opts := &f.ReaderOpts |
| 25 | + fs.IntVar(&opts.BSUP.Threads, "bsup.threads", 0, "number of Super Binary read threads (0=GOMAXPROCS)") |
| 26 | + fs.BoolVar(&opts.BSUP.Validate, "bsup.validate", validate, "validate format when reading Super Binary") |
| 27 | + opts.CSV.Delim = ',' |
33 | 28 | fs.Func("csv.delim", `CSV field delimiter (default ",")`, func(s string) error { |
34 | 29 | if len(s) != 1 { |
35 | 30 | return errors.New("CSV field delimiter must be exactly one character") |
36 | 31 | } |
37 | | - f.CSV.Delim = rune(s[0]) |
| 32 | + opts.CSV.Delim = rune(s[0]) |
38 | 33 | return nil |
39 | 34 |
|
40 | 35 | }) |
41 | | - fs.IntVar(&f.BSUP.Threads, "bsup.threads", 0, "number of Super Binary read threads (0=GOMAXPROCS)") |
42 | | - fs.BoolVar(&f.BSUP.Validate, "bsup.validate", validate, "validate format when reading Super Binary") |
43 | | - f.ReadMax = auto.NewBytes(bsupio.MaxSize) |
44 | | - fs.Var(&f.ReadMax, "bsup.readmax", "maximum Super Binary read buffer size in MiB, MB, etc.") |
45 | | - f.ReadSize = auto.NewBytes(bsupio.ReadSize) |
46 | | - fs.Var(&f.ReadSize, "bsup.readsize", "target Super Binary read buffer size in MiB, MB, etc.") |
47 | 36 | fs.BoolVar(&f.Dynamic, "dynamic", false, "disable static type checking of inputs") |
| 37 | + fs.StringVar(&opts.Format, "i", "auto", "format of input data [auto,arrows,bsup,csup,csv,json,jsup,line,parquet,sup,tsv,zeek]") |
48 | 38 | } |
49 | 39 |
|
50 | 40 | // Init is called after flags have been parsed. |
51 | 41 | func (f *Flags) Init() error { |
52 | | - f.BSUP.Max = int(f.ReadMax.Bytes) |
53 | | - if f.BSUP.Max < 0 { |
| 42 | + bsup := &f.ReaderOpts.BSUP |
| 43 | + bsup.Max = int(f.bsupReadMax.Bytes) |
| 44 | + if bsup.Max < 0 { |
54 | 45 | return errors.New("max read buffer size must be greater than zero") |
55 | 46 | } |
56 | | - f.BSUP.Size = int(f.ReadSize.Bytes) |
57 | | - if f.BSUP.Size < 0 { |
| 47 | + bsup.Size = int(f.bsupReadSize.Bytes) |
| 48 | + if bsup.Size < 0 { |
58 | 49 | return errors.New("target read buffer size must be greater than zero") |
59 | 50 | } |
60 | 51 | return nil |
61 | 52 | } |
62 | | - |
63 | | -func (f *Flags) Open(ctx context.Context, sctx *super.Context, engine storage.Engine, paths []string, stopOnErr bool) ([]sio.Reader, error) { |
64 | | - var readers []sio.Reader |
65 | | - for _, path := range paths { |
66 | | - if path == "-" { |
67 | | - path = "stdio:stdin" |
68 | | - } |
69 | | - file, err := anyio.Open(ctx, sctx, engine, path, f.ReaderOpts) |
70 | | - if err != nil { |
71 | | - err = fmt.Errorf("%s: %w", path, err) |
72 | | - if stopOnErr { |
73 | | - return nil, err |
74 | | - } |
75 | | - fmt.Fprintln(os.Stderr, err) |
76 | | - continue |
77 | | - } |
78 | | - readers = append(readers, file) |
79 | | - } |
80 | | - return readers, nil |
81 | | -} |
0 commit comments