|
2 | 2 | package main |
3 | 3 |
|
4 | 4 | import ( |
5 | | - "errors" |
6 | 5 | "flag" |
7 | 6 | "fmt" |
8 | | - "io" |
9 | 7 | "os" |
10 | | - "path/filepath" |
11 | 8 | "runtime/pprof" |
12 | | - "strings" |
13 | 9 | "time" |
14 | 10 |
|
15 | | - "github.com/maxmind/mmdbconvert/internal/config" |
16 | | - "github.com/maxmind/mmdbconvert/internal/merger" |
17 | | - "github.com/maxmind/mmdbconvert/internal/mmdb" |
18 | | - "github.com/maxmind/mmdbconvert/internal/writer" |
| 11 | + "github.com/maxmind/mmdbconvert" |
19 | 12 | ) |
20 | 13 |
|
21 | 14 | const version = "0.1.0" |
@@ -126,361 +119,29 @@ func run(configPath string, quiet, disableCache bool) error { |
126 | 119 | if !quiet { |
127 | 120 | fmt.Printf("mmdbconvert v%s\n", version) |
128 | 121 | fmt.Printf("Loading configuration from %s...\n", configPath) |
129 | | - } |
130 | | - |
131 | | - // Load configuration |
132 | | - cfg, err := config.LoadConfig(configPath) |
133 | | - if err != nil { |
134 | | - return fmt.Errorf("loading config: %w", err) |
135 | | - } |
136 | | - |
137 | | - // Override DisableCache from command-line flag if provided |
138 | | - // Command-line flag takes precedence over config file |
139 | | - if disableCache { |
140 | | - cfg.DisableCache = true |
141 | | - } |
142 | | - |
143 | | - if !quiet { |
144 | | - fmt.Printf("Output format: %s\n", cfg.Output.Format) |
145 | | - if cfg.Output.File != "" { |
146 | | - fmt.Printf("Output file: %s\n", cfg.Output.File) |
147 | | - } else { |
148 | | - fmt.Printf("Output files: IPv4=%s, IPv6=%s\n", cfg.Output.IPv4File, cfg.Output.IPv6File) |
149 | | - } |
150 | | - fmt.Printf("Databases: %d\n", len(cfg.Databases)) |
151 | | - fmt.Printf("Data columns: %d\n", len(cfg.Columns)) |
152 | | - fmt.Printf("Network columns: %d\n", len(cfg.Network.Columns)) |
153 | | - fmt.Println() |
154 | | - } |
155 | | - |
156 | | - // Open MMDB databases |
157 | | - if !quiet { |
158 | | - fmt.Println("Opening MMDB databases...") |
159 | | - } |
160 | | - |
161 | | - databases := map[string]string{} |
162 | | - for _, db := range cfg.Databases { |
163 | | - databases[db.Name] = db.Path |
164 | | - if !quiet { |
165 | | - fmt.Printf(" - %s: %s\n", db.Name, db.Path) |
166 | | - } |
167 | | - } |
168 | | - |
169 | | - readers, err := mmdb.OpenDatabases(databases) |
170 | | - if err != nil { |
171 | | - return fmt.Errorf("opening databases: %w", err) |
172 | | - } |
173 | | - defer readers.Close() |
174 | | - |
175 | | - if err := validateParquetNetworkColumns(cfg, readers); err != nil { |
176 | | - return fmt.Errorf("validating network columns: %w", err) |
177 | | - } |
178 | | - |
179 | | - rowWriter, closers, outputPaths, err := prepareRowWriter(cfg, readers, quiet) |
180 | | - if err != nil { |
181 | | - return err |
182 | | - } |
183 | | - defer func() { |
184 | | - for _, closer := range closers { |
185 | | - closer.Close() |
186 | | - } |
187 | | - }() |
188 | | - |
189 | | - if !quiet { |
190 | 122 | fmt.Println("Merging databases and writing output...") |
191 | | - if cfg.DisableCache { |
| 123 | + if disableCache { |
192 | 124 | fmt.Println(" (unmarshaler caching disabled)") |
193 | 125 | } |
194 | 126 | } |
195 | 127 |
|
196 | | - // Create merger and run |
197 | | - m, err := merger.NewMerger(readers, cfg, rowWriter) |
| 128 | + err := mmdbconvert.Run(mmdbconvert.Options{ |
| 129 | + ConfigPath: configPath, |
| 130 | + DisableCache: disableCache, |
| 131 | + }) |
198 | 132 | if err != nil { |
199 | | - return fmt.Errorf("creating merger: %w", err) |
200 | | - } |
201 | | - if err := m.Merge(); err != nil { |
202 | | - return fmt.Errorf("merging databases: %w", err) |
203 | | - } |
204 | | - |
205 | | - // Flush writer |
206 | | - if flusher, ok := rowWriter.(interface{ Flush() error }); ok { |
207 | | - if err := flusher.Flush(); err != nil { |
208 | | - return fmt.Errorf("flushing output: %w", err) |
209 | | - } |
| 133 | + return err |
210 | 134 | } |
211 | 135 |
|
212 | 136 | if !quiet { |
213 | 137 | elapsed := time.Since(startTime) |
214 | 138 | fmt.Println() |
215 | 139 | fmt.Printf("✓ Successfully completed in %v\n", elapsed.Round(time.Millisecond)) |
216 | | - if len(outputPaths) == 1 { |
217 | | - fmt.Printf("Output written to: %s\n", outputPaths[0]) |
218 | | - } else { |
219 | | - fmt.Println("Output written to:") |
220 | | - for _, path := range outputPaths { |
221 | | - fmt.Printf(" - %s\n", path) |
222 | | - } |
223 | | - } |
224 | | - } |
225 | | - |
226 | | - return nil |
227 | | -} |
228 | | - |
229 | | -func prepareRowWriter( |
230 | | - cfg *config.Config, |
231 | | - readers *mmdb.Readers, |
232 | | - quiet bool, |
233 | | -) (merger.RowWriter, []io.Closer, []string, error) { |
234 | | - var ( |
235 | | - closers []io.Closer |
236 | | - outputPaths []string |
237 | | - ) |
238 | | - |
239 | | - closeAll := func() { |
240 | | - for _, closer := range closers { |
241 | | - closer.Close() |
242 | | - } |
243 | | - } |
244 | | - |
245 | | - switch cfg.Output.Format { |
246 | | - case "csv": |
247 | | - if cfg.Output.IPv4File != "" && cfg.Output.IPv6File != "" { |
248 | | - if !quiet { |
249 | | - fmt.Println() |
250 | | - fmt.Println("Creating output files...") |
251 | | - } |
252 | | - ipv4Path, ipv6Path := splitConfiguredPaths( |
253 | | - cfg.Output.File, |
254 | | - cfg.Output.IPv4File, |
255 | | - cfg.Output.IPv6File, |
256 | | - ) |
257 | | - ipv4File, err := createOutputFile(ipv4Path) |
258 | | - if err != nil { |
259 | | - closeAll() |
260 | | - return nil, nil, nil, fmt.Errorf("creating IPv4 output file: %w", err) |
261 | | - } |
262 | | - closers = append(closers, ipv4File) |
263 | | - outputPaths = append(outputPaths, ipv4Path) |
264 | | - |
265 | | - ipv6File, err := createOutputFile(ipv6Path) |
266 | | - if err != nil { |
267 | | - closeAll() |
268 | | - return nil, nil, nil, fmt.Errorf("creating IPv6 output file: %w", err) |
269 | | - } |
270 | | - closers = append(closers, ipv6File) |
271 | | - outputPaths = append(outputPaths, ipv6Path) |
272 | | - |
273 | | - return writer.NewSplitRowWriter( |
274 | | - writer.NewCSVWriter(ipv4File, cfg), |
275 | | - writer.NewCSVWriter(ipv6File, cfg), |
276 | | - ), closers, outputPaths, nil |
277 | | - } |
278 | | - |
279 | | - if !quiet { |
280 | | - fmt.Println() |
281 | | - fmt.Println("Creating output file...") |
282 | | - } |
283 | | - outputFile, err := createOutputFile(cfg.Output.File) |
284 | | - if err != nil { |
285 | | - closeAll() |
286 | | - return nil, nil, nil, fmt.Errorf("creating output file: %w", err) |
287 | | - } |
288 | | - closers = append(closers, outputFile) |
289 | | - outputPaths = append(outputPaths, cfg.Output.File) |
290 | | - return writer.NewCSVWriter(outputFile, cfg), closers, outputPaths, nil |
291 | | - |
292 | | - case "parquet": |
293 | | - if cfg.Output.IPv4File != "" && cfg.Output.IPv6File != "" { |
294 | | - if !quiet { |
295 | | - fmt.Println() |
296 | | - fmt.Println("Creating output files...") |
297 | | - } |
298 | | - ipv4Path, ipv6Path := splitConfiguredPaths( |
299 | | - cfg.Output.File, |
300 | | - cfg.Output.IPv4File, |
301 | | - cfg.Output.IPv6File, |
302 | | - ) |
303 | | - |
304 | | - ipv4File, err := createOutputFile(ipv4Path) |
305 | | - if err != nil { |
306 | | - closeAll() |
307 | | - return nil, nil, nil, fmt.Errorf("creating IPv4 output file: %w", err) |
308 | | - } |
309 | | - closers = append(closers, ipv4File) |
310 | | - outputPaths = append(outputPaths, ipv4Path) |
311 | | - |
312 | | - ipv6File, err := createOutputFile(ipv6Path) |
313 | | - if err != nil { |
314 | | - closeAll() |
315 | | - return nil, nil, nil, fmt.Errorf("creating IPv6 output file: %w", err) |
316 | | - } |
317 | | - closers = append(closers, ipv6File) |
318 | | - outputPaths = append(outputPaths, ipv6Path) |
319 | | - |
320 | | - ipv4Writer, err := writer.NewParquetWriterWithIPVersion( |
321 | | - ipv4File, |
322 | | - cfg, |
323 | | - writer.IPVersion4, |
324 | | - ) |
325 | | - if err != nil { |
326 | | - closeAll() |
327 | | - return nil, nil, nil, fmt.Errorf("creating IPv4 Parquet writer: %w", err) |
328 | | - } |
329 | | - ipv6Writer, err := writer.NewParquetWriterWithIPVersion( |
330 | | - ipv6File, |
331 | | - cfg, |
332 | | - writer.IPVersion6, |
333 | | - ) |
334 | | - if err != nil { |
335 | | - closeAll() |
336 | | - return nil, nil, nil, fmt.Errorf("creating IPv6 Parquet writer: %w", err) |
337 | | - } |
338 | | - return writer.NewSplitRowWriter(ipv4Writer, ipv6Writer), closers, outputPaths, nil |
339 | | - } |
340 | | - |
341 | | - if !quiet { |
342 | | - fmt.Println() |
343 | | - fmt.Println("Creating output file...") |
344 | | - } |
345 | | - outputFile, err := createOutputFile(cfg.Output.File) |
346 | | - if err != nil { |
347 | | - closeAll() |
348 | | - return nil, nil, nil, fmt.Errorf("creating output file: %w", err) |
349 | | - } |
350 | | - closers = append(closers, outputFile) |
351 | | - outputPaths = append(outputPaths, cfg.Output.File) |
352 | | - |
353 | | - parquetWriter, err := writer.NewParquetWriter(outputFile, cfg) |
354 | | - if err != nil { |
355 | | - closeAll() |
356 | | - return nil, nil, nil, fmt.Errorf("creating Parquet writer: %w", err) |
357 | | - } |
358 | | - return parquetWriter, closers, outputPaths, nil |
359 | | - |
360 | | - case "mmdb": |
361 | | - if !quiet { |
362 | | - fmt.Println() |
363 | | - fmt.Println("Creating output file...") |
364 | | - } |
365 | | - |
366 | | - // Detect IP version from databases |
367 | | - ipVersion, err := detectIPVersionFromDatabases(cfg, readers) |
368 | | - if err != nil { |
369 | | - closeAll() |
370 | | - return nil, nil, nil, fmt.Errorf("detecting IP version: %w", err) |
371 | | - } |
372 | | - |
373 | | - mmdbWriter, err := writer.NewMMDBWriter(cfg.Output.File, cfg, ipVersion) |
374 | | - if err != nil { |
375 | | - closeAll() |
376 | | - return nil, nil, nil, fmt.Errorf("creating MMDB writer: %w", err) |
377 | | - } |
378 | | - |
379 | | - outputPaths = append(outputPaths, cfg.Output.File) |
380 | | - return mmdbWriter, closers, outputPaths, nil |
381 | | - } |
382 | | - |
383 | | - closeAll() |
384 | | - return nil, nil, nil, fmt.Errorf("unsupported output format: %s", cfg.Output.Format) |
385 | | -} |
386 | | - |
387 | | -func createOutputFile(path string) (*os.File, error) { |
388 | | - // #nosec G304 -- paths come from trusted configuration |
389 | | - file, err := os.Create(path) |
390 | | - if err != nil { |
391 | | - return nil, fmt.Errorf("creating %s: %w", path, err) |
392 | | - } |
393 | | - return file, nil |
394 | | -} |
395 | | - |
396 | | -func detectIPVersionFromDatabases(cfg *config.Config, readers *mmdb.Readers) (int, error) { |
397 | | - // Get the first database from config to detect IP version |
398 | | - // In practice, all databases in the merge should have the same IP version |
399 | | - // due to validation in merger |
400 | | - if len(cfg.Databases) == 0 { |
401 | | - return 0, errors.New("no databases configured") |
402 | | - } |
403 | | - |
404 | | - firstDB := cfg.Databases[0].Name |
405 | | - reader, ok := readers.Get(firstDB) |
406 | | - if !ok { |
407 | | - return 0, fmt.Errorf("database '%s' not found", firstDB) |
408 | | - } |
409 | | - |
410 | | - metadata := reader.Metadata() |
411 | | - //nolint:gosec // IPVersion is always 4 or 6, no overflow risk |
412 | | - ipVersion := int(metadata.IPVersion) |
413 | | - |
414 | | - if ipVersion != 4 && ipVersion != 6 { |
415 | | - return 0, fmt.Errorf("invalid IP version %d in database '%s'", ipVersion, firstDB) |
416 | | - } |
417 | | - |
418 | | - return ipVersion, nil |
419 | | -} |
420 | | - |
421 | | -func splitConfiguredPaths(base, ipv4Override, ipv6Override string) (ipv4, ipv6 string) { |
422 | | - ext := filepath.Ext(base) |
423 | | - name := strings.TrimSuffix(base, ext) |
424 | | - if name == "" { |
425 | | - name = base |
426 | | - } |
427 | | - if ext == "" { |
428 | | - ext = ".parquet" |
429 | | - } |
430 | | - |
431 | | - defaultIPv4 := fmt.Sprintf("%s_ipv4%s", name, ext) |
432 | | - defaultIPv6 := fmt.Sprintf("%s_ipv6%s", name, ext) |
433 | | - |
434 | | - ipv4 = ipv4Override |
435 | | - if ipv4 == "" { |
436 | | - ipv4 = defaultIPv4 |
437 | | - } |
438 | | - ipv6 = ipv6Override |
439 | | - if ipv6 == "" { |
440 | | - ipv6 = defaultIPv6 |
441 | | - } |
442 | | - |
443 | | - return ipv4, ipv6 |
444 | | -} |
445 | | - |
446 | | -func validateParquetNetworkColumns(cfg *config.Config, readers *mmdb.Readers) error { |
447 | | - if cfg.Output.Format != "parquet" { |
448 | | - return nil |
449 | | - } |
450 | | - |
451 | | - if !hasIntegerNetworkColumns(cfg.Network.Columns) { |
452 | | - return nil |
453 | | - } |
454 | | - |
455 | | - // Already split output, so integer columns are safe (each writer enforces a single IP family). |
456 | | - if cfg.Output.IPv4File != "" && cfg.Output.IPv6File != "" { |
457 | | - return nil |
458 | | - } |
459 | | - |
460 | | - ipVersion, err := detectIPVersionFromDatabases(cfg, readers) |
461 | | - if err != nil { |
462 | | - return err |
463 | | - } |
464 | | - |
465 | | - if ipVersion == 6 { |
466 | | - return errors.New( |
467 | | - "network column types 'start_int' and 'end_int' require split IPv4/IPv6 outputs when processing IPv6 databases; set output.ipv4_file and output.ipv6_file or switch to start_ip/end_ip", |
468 | | - ) |
469 | 140 | } |
470 | 141 |
|
471 | 142 | return nil |
472 | 143 | } |
473 | 144 |
|
474 | | -func hasIntegerNetworkColumns(cols []config.NetworkColumn) bool { |
475 | | - for _, col := range cols { |
476 | | - switch col.Type { |
477 | | - case writer.NetworkColumnStartInt, writer.NetworkColumnEndInt: |
478 | | - return true |
479 | | - } |
480 | | - } |
481 | | - return false |
482 | | -} |
483 | | - |
484 | 145 | func usage() { |
485 | 146 | fmt.Fprint( |
486 | 147 | os.Stderr, |
|
0 commit comments