Skip to content
This repository was archived by the owner on Jan 20, 2026. It is now read-only.

Workers Not Leasing All Shards #14

@calebstewart

Description

@calebstewart

Describe the bug

The current worker implementation will only grab a lease for a single shard per shard sync interval. This seems like a bug. Based on the commit message (e2a945d), the intent was to "prevent on host tak[ing] more shard[s] than it's configuration allowed". However, the result is that only a single shard is leased per interval. This causes start up times to balloon as more shards are introduced.

Reproduction steps

A basic worker on a stream with multiple shards exhibits this behavior:

package main

import (
  "os"
  "fmt"
  "os/signal"

  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"
)

type RecordProcessor struct {}
type RecordProcessorFactory struct {}

func (f *RecordProcessorFactory) CreateProcessor() interfaces.IRecordProcessor {
  return &RecordProcessor{}
}

func (p *RecordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) {}
func (p *RecordProcessor) Initialize(input *interfaces.InitializationInput) {}
func (p *RecordProcessor) Shutdown(input *interfaces.ShutdownInput) {}

func main() {

  // Separately, I have no idea why, but the library seems incapable of figuring out the
  // Kinesis service endpoint on it's own. Not specifying it manually results in errors
  // where it seemingly is trying to use an empty string as a service endpoint, but that's
  // probably a problem for a separate issue.
  cfg := config.NewKinesisClientLibConfig("test", "caleb-testing", "us-east-2", "worker")
  cfg.KinesisEndpoint = "https://kinesis.us-east-2.amazonaws.com"
  kcl := worker.NewWorker(&RecordProcessorFactory{}, cfg)

  if err := kcl.Start(); err != nil {
    fmt.Printf("[!] failed to start kcl worker: %v\n", err)
    return
  }
  defer kcl.Shutdown()

  signals := make(chan os.Signal, 1)
  signal.Notify(signals, os.Interrupt, os.Kill)
  for range signals {
    break
  }

  return
}

Expected behavior

A worker should lease as many shards as it can up to MaxLeasesPerWorker on every shard sync.

Additional context

I believe the solution is to refactor the lease loop (ref) to look something like this:

// max number of lease has not been reached yet
for _, shard := range w.shardStatus {
  // Don't take out work leases than allowed
  if counter >= w.kclConfig.MaxLeasesForWorker {
    break
  }

  // already owner of the shard
  if shard.GetLeaseOwner() == w.workerID {
    continue
  }

  err := w.checkpointer.FetchCheckpoint(shard)
  if err != nil {
    // checkpoint may not exist yet is not an error condition.
    if err != chk.ErrSequenceIDNotFound {
      log.Warnf("Couldn't fetch checkpoint: %+v", err)
      // move on to next shard
      continue
    }
  }

  // The shard is closed and we have processed all records
  if shard.GetCheckpoint() == chk.ShardEnd {
    continue
  }

  var stealShard bool
  if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" {
    upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond)
    if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) {
      if shard.ClaimRequest == w.workerID {
        stealShard = true
        log.Debugf("Stealing shard: %s", shard.ID)
      } else {
        log.Debugf("Shard being stolen: %s", shard.ID)
        continue
      }
    }
  }

  err = w.checkpointer.GetLease(shard, w.workerID)
  if err != nil {
    // cannot get lease on the shard
    if !errors.As(err, &chk.ErrLeaseNotAcquired{}) {
      log.Errorf("Cannot get lease: %+v", err)
    }
    continue
  }

  if stealShard {
    log.Debugf("Successfully stole shard: %+v", shard.ID)
    w.shardStealInProgress = false
  }

  // log metrics on got lease
  w.mService.LeaseGained(shard.ID)
  w.waitGroup.Add(1)
  go func(shard *par.ShardStatus) {
    defer w.waitGroup.Done()
    if err := w.newShardConsumer(shard).getRecords(); err != nil {
      log.Errorf("Error in getRecords: %+v", err)
    }
  }(shard)

  // Increase the number of leases we have
  counter++
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions