|
6 | 6 | package logic
|
7 | 7 |
|
8 | 8 | import (
|
| 9 | + "bufio" |
9 | 10 | "fmt"
|
| 11 | + "io" |
10 | 12 | "math"
|
11 | 13 | "os"
|
12 | 14 | "os/signal"
|
| 15 | + "strconv" |
| 16 | + "strings" |
13 | 17 | "sync/atomic"
|
14 | 18 | "syscall"
|
15 | 19 | "time"
|
@@ -41,6 +45,7 @@ type Migrator struct {
|
41 | 45 | inspector *Inspector
|
42 | 46 | applier *Applier
|
43 | 47 | eventsStreamer *EventsStreamer
|
| 48 | + server *Server |
44 | 49 | migrationContext *base.MigrationContext
|
45 | 50 |
|
46 | 51 | tablesInPlace chan bool
|
@@ -95,6 +100,9 @@ func (this *Migrator) acceptSignals() {
|
95 | 100 |
|
96 | 101 | func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
97 | 102 | // User-based throttle
|
| 103 | + if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 { |
| 104 | + return true, "commanded by user" |
| 105 | + } |
98 | 106 | if this.migrationContext.ThrottleFlagFile != "" {
|
99 | 107 | if base.FileExists(this.migrationContext.ThrottleFlagFile) {
|
100 | 108 | // Throttle file defined and exists!
|
@@ -321,6 +329,9 @@ func (this *Migrator) Migrate() (err error) {
|
321 | 329 | }
|
322 | 330 | }
|
323 | 331 |
|
| 332 | + if err := this.initiateServer(); err != nil { |
| 333 | + return err |
| 334 | + } |
324 | 335 | if err := this.addDMLEventsListener(); err != nil {
|
325 | 336 | return err
|
326 | 337 | }
|
@@ -518,6 +529,64 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
|
518 | 529 | return nil
|
519 | 530 | }
|
520 | 531 |
|
| 532 | +func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) { |
| 533 | + tokens := strings.Split(command, "=") |
| 534 | + command = strings.TrimSpace(tokens[0]) |
| 535 | + arg := "" |
| 536 | + if len(tokens) > 1 { |
| 537 | + arg = strings.TrimSpace(tokens[1]) |
| 538 | + } |
| 539 | + switch command { |
| 540 | + case "help": |
| 541 | + { |
| 542 | + fmt.Fprintln(writer, `available commands: |
| 543 | + status # Print a status message |
| 544 | + chunk-size=<newsize> # Set a new chunk-size |
| 545 | + throttle # Force throttling |
| 546 | + no-throttle # End forced throttling (other throttling may still apply) |
| 547 | + help # This message |
| 548 | +`) |
| 549 | + } |
| 550 | + case "info", "status": |
| 551 | + this.printMigrationStatusHint(writer) |
| 552 | + this.printStatus(writer) |
| 553 | + case "chunk-size": |
| 554 | + { |
| 555 | + if chunkSize, err := strconv.Atoi(arg); err != nil { |
| 556 | + return log.Errore(err) |
| 557 | + } else { |
| 558 | + this.migrationContext.SetChunkSize(int64(chunkSize)) |
| 559 | + this.printMigrationStatusHint(writer) |
| 560 | + } |
| 561 | + } |
| 562 | + case "throttle", "pause", "suspend": |
| 563 | + { |
| 564 | + atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1) |
| 565 | + } |
| 566 | + case "no-throttle", "unthrottle", "resume", "continue": |
| 567 | + { |
| 568 | + atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0) |
| 569 | + } |
| 570 | + default: |
| 571 | + return fmt.Errorf("Unknown command: %s", command) |
| 572 | + } |
| 573 | + writer.Flush() |
| 574 | + return nil |
| 575 | +} |
| 576 | + |
| 577 | +func (this *Migrator) initiateServer() (err error) { |
| 578 | + this.server = NewServer(this.onServerCommand) |
| 579 | + if err := this.server.BindSocketFile(); err != nil { |
| 580 | + return err |
| 581 | + } |
| 582 | + if err := this.server.BindTCPPort(); err != nil { |
| 583 | + return err |
| 584 | + } |
| 585 | + |
| 586 | + go this.server.Serve() |
| 587 | + return nil |
| 588 | +} |
| 589 | + |
521 | 590 | func (this *Migrator) initiateInspector() (err error) {
|
522 | 591 | this.inspector = NewInspector()
|
523 | 592 | if err := this.inspector.InitDBConnections(); err != nil {
|
@@ -563,34 +632,42 @@ func (this *Migrator) initiateStatus() error {
|
563 | 632 | return nil
|
564 | 633 | }
|
565 | 634 |
|
566 |
| -func (this *Migrator) printMigrationStatusHint() { |
567 |
| - fmt.Println(fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s", |
| 635 | +func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { |
| 636 | + writers = append(writers, os.Stdout) |
| 637 | + w := io.MultiWriter(writers...) |
| 638 | + fmt.Fprintln(w, fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s", |
568 | 639 | sql.EscapeName(this.migrationContext.DatabaseName),
|
569 | 640 | sql.EscapeName(this.migrationContext.OriginalTableName),
|
570 | 641 | sql.EscapeName(this.migrationContext.DatabaseName),
|
571 | 642 | sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
572 | 643 | ))
|
573 |
| - fmt.Println(fmt.Sprintf("# Migration started at %+v", |
| 644 | + fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v", |
574 | 645 | this.migrationContext.StartTime.Format(time.RubyDate),
|
575 | 646 | ))
|
576 |
| - fmt.Println(fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v", |
| 647 | + fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v", |
577 | 648 | atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
578 | 649 | atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
|
579 | 650 | this.migrationContext.MaxLoad,
|
580 | 651 | ))
|
581 | 652 | if this.migrationContext.ThrottleFlagFile != "" {
|
582 |
| - fmt.Println(fmt.Sprintf("# Throttle flag file: %+v", |
| 653 | + fmt.Fprintln(w, fmt.Sprintf("# Throttle flag file: %+v", |
583 | 654 | this.migrationContext.ThrottleFlagFile,
|
584 | 655 | ))
|
585 | 656 | }
|
586 | 657 | if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
587 |
| - fmt.Println(fmt.Sprintf("# Throttle additional flag file: %+v", |
| 658 | + fmt.Fprintln(w, fmt.Sprintf("# Throttle additional flag file: %+v", |
588 | 659 | this.migrationContext.ThrottleAdditionalFlagFile,
|
589 | 660 | ))
|
590 | 661 | }
|
| 662 | + fmt.Fprintln(w, fmt.Sprintf("# Serving on unix socket: %+v", |
| 663 | + this.migrationContext.ServeSocketFile, |
| 664 | + )) |
| 665 | + if this.migrationContext.ServeTCPPort != 0 { |
| 666 | + fmt.Fprintln(w, fmt.Sprintf("# Serving on TCP port: %+v", this.migrationContext.ServeTCPPort)) |
| 667 | + } |
591 | 668 | }
|
592 | 669 |
|
593 |
| -func (this *Migrator) printStatus() { |
| 670 | +func (this *Migrator) printStatus(writers ...io.Writer) { |
594 | 671 | elapsedTime := this.migrationContext.ElapsedTime()
|
595 | 672 | elapsedSeconds := int64(elapsedTime.Seconds())
|
596 | 673 | totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
|
@@ -656,7 +733,9 @@ func (this *Migrator) printStatus() {
|
656 | 733 | fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
|
657 | 734 | status,
|
658 | 735 | )
|
659 |
| - fmt.Println(status) |
| 736 | + writers = append(writers, os.Stdout) |
| 737 | + w := io.MultiWriter(writers...) |
| 738 | + fmt.Fprintln(w, status) |
660 | 739 | }
|
661 | 740 |
|
662 | 741 | func (this *Migrator) initiateHeartbeatListener() {
|
|
0 commit comments