Skip to content

Commit 5b1bf51

Browse files
offbythreejonas-grgt
authored andcommitted
feat: display consumer lag
- add HighWaterMark and Lag fields to TopicPartitionOffset - add High Watermark and Lag columns to consumer group's offset lister page add refresh for consumer group offset page fix tests - fix existing tests - update tests to include High Watermark and Lag fields - handle case when kafka client's getOffset results in error. - use go routines to fetch newest offset (high watermark) for a topic's partition(s) use a mutex lock to avoid nested function, variable capturing issue, and the unnecessary for loop make height & weight consistent for offsets page - switch between topics table and offsets table using tab - add a single row total table view that displays total number of partitions, sum of offsets, high watermark, and lag across partitions fix tests - remove the total table with total lag window title - adjust table width
1 parent e42ce1b commit 5b1bf51

File tree

4 files changed

+214
-56
lines changed

4 files changed

+214
-56
lines changed

kadmin/offset_lister.go

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
11
package kadmin
22

3-
import tea "github.com/charmbracelet/bubbletea"
3+
import (
4+
"math"
5+
"sync"
6+
7+
"github.com/IBM/sarama"
8+
tea "github.com/charmbracelet/bubbletea"
9+
)
10+
11+
const (
12+
ErrorValue int64 = math.MinInt64
13+
)
414

515
type OffsetLister interface {
616
ListOffsets(group string) tea.Msg
717
}
818

919
type TopicPartitionOffset struct {
10-
Topic string
11-
Partition int32
12-
Offset int64
20+
Topic string
21+
Partition int32
22+
Offset int64
23+
HighWaterMark int64
24+
Lag int64
1325
}
1426

1527
type OffsetListingStartedMsg struct {
@@ -51,18 +63,46 @@ func (ka *SaramaKafkaAdmin) doListOffsets(group string, offsetsChan chan []Topic
5163
listResult, err := ka.admin.ListConsumerGroupOffsets(group, nil)
5264
if err != nil {
5365
errChan <- err
66+
return
67+
}
68+
69+
totalPartitions := 0
70+
for _, m := range listResult.Blocks {
71+
totalPartitions += len(m)
5472
}
5573

56-
var topicPartitionOffsets []TopicPartitionOffset
74+
topicPartitionOffsets := make([]TopicPartitionOffset, 0, totalPartitions)
75+
76+
var mu sync.Mutex
77+
var wg sync.WaitGroup
78+
5779
for t, m := range listResult.Blocks {
5880
for p, block := range m {
59-
topicPartitionOffsets = append(topicPartitionOffsets, TopicPartitionOffset{
60-
Topic: t,
61-
Partition: p,
62-
Offset: block.Offset,
63-
})
81+
wg.Go(
82+
func() {
83+
hwm, err := ka.client.GetOffset(t, p, sarama.OffsetNewest)
84+
var lag int64
85+
if err != nil {
86+
hwm = ErrorValue
87+
lag = ErrorValue
88+
} else {
89+
lag = hwm - block.Offset
90+
}
91+
mu.Lock()
92+
topicPartitionOffsets = append(topicPartitionOffsets, TopicPartitionOffset{
93+
Topic: t,
94+
Partition: p,
95+
Offset: block.Offset,
96+
HighWaterMark: hwm,
97+
Lag: lag,
98+
})
99+
mu.Unlock()
100+
},
101+
)
64102
}
65103
}
66104

105+
wg.Wait()
106+
67107
offsetsChan <- topicPartitionOffsets
68108
}

kadmin/offset_lister_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package kadmin
22

33
import (
44
"context"
5-
"github.com/IBM/sarama"
6-
"github.com/stretchr/testify/assert"
75
"testing"
86
"time"
7+
8+
"github.com/IBM/sarama"
9+
"github.com/stretchr/testify/assert"
910
)
1011

1112
func TestConsumerGroupOffsets(t *testing.T) {
@@ -52,9 +53,11 @@ func TestConsumerGroupOffsets(t *testing.T) {
5253
assert.NotNil(t, offsets)
5354
assert.Len(t, offsets, 1)
5455
assert.Equal(t, offsets[0], TopicPartitionOffset{
55-
Topic: topic,
56-
Partition: 0,
57-
Offset: 9,
56+
Topic: topic,
57+
Partition: 0,
58+
Offset: 9,
59+
HighWaterMark: 10,
60+
Lag: 1,
5861
})
5962
case err := <-offsetListingStartedMsg.Err:
6063
t.Fatal("Error while listing offsets", err)

ui/pages/cgroups_topics_page/cgroups_parts_offsets_page.go

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package cgroups_topics_page
22

33
import (
44
"fmt"
5-
"github.com/charmbracelet/log"
65
"ktea/kadmin"
76
"ktea/kontext"
87
"ktea/styles"
@@ -18,6 +17,8 @@ import (
1817
"strconv"
1918
"strings"
2019

20+
"github.com/charmbracelet/log"
21+
2122
"github.com/charmbracelet/bubbles/table"
2223
tea "github.com/charmbracelet/bubbletea"
2324
lg "github.com/charmbracelet/lipgloss"
@@ -29,6 +30,7 @@ type tableFocus int
2930
type state int
3031

3132
const (
33+
na string = "N/A"
3234
topicFocus tableFocus = 0
3335
offsetFocus tableFocus = 1
3436

@@ -38,13 +40,16 @@ const (
3840
)
3941

4042
type Model struct {
43+
lister kadmin.OffsetLister
4144
tableFocus tableFocus
4245
topicsTable table.Model
4346
offsetsTable table.Model
47+
totalTable table.Model
4448
offsetsBorder *border.Model
4549
topicsBorder *border.Model
4650
topicsRows []table.Row
4751
offsetRows []table.Row
52+
totalLag int64
4853
groupName string
4954
topicByPartOffset map[string][]partOffset
5055
cmdBar *CGroupCmdbar[string]
@@ -63,17 +68,24 @@ func (m *Model) View(ktx *kontext.ProgramKtx, renderer *ui.Renderer) string {
6368
cmdBarView := m.cmdBar.View(ktx, renderer)
6469

6570
halfWidth := int(float64(ktx.WindowWidth / 2))
66-
m.topicsTable.SetHeight(ktx.AvailableHeight - 4)
67-
m.topicsTable.SetWidth(halfWidth - 2)
71+
m.topicsTable.SetHeight(ktx.AvailableTableHeight())
72+
m.topicsTable.SetWidth(int(float64(halfWidth)))
6873
m.topicsTable.SetColumns([]table.Column{
69-
{"Topic Name", int(float64(halfWidth - 4))},
74+
{Title: "Topic Name", Width: int(float64(halfWidth - 2))},
7075
})
7176
m.topicsTable.SetRows(m.topicsRows)
7277

73-
m.offsetsTable.SetHeight(ktx.AvailableHeight - 4)
78+
partitionColumnWidth := int(float64(halfWidth-4) * 0.22)
79+
offsetColumnWidth := int(float64(halfWidth-4) * 0.24)
80+
hwmColumnWidth := int(float64(halfWidth-4) * 0.24)
81+
lagColumnWidth := int(float64(halfWidth-4) * 0.22)
82+
83+
m.offsetsTable.SetHeight(ktx.AvailableTableHeight())
7484
m.offsetsTable.SetColumns([]table.Column{
75-
{"Partition", int(float64(halfWidth-6) * 0.5)},
76-
{"Offset", int(float64(halfWidth-5) * 0.5)},
85+
{Title: "Partition", Width: partitionColumnWidth},
86+
{Title: "Offset", Width: offsetColumnWidth},
87+
{Title: "High Watermark", Width: hwmColumnWidth},
88+
{Title: "Lag", Width: lagColumnWidth},
7789
})
7890
m.offsetsTable.SetRows(m.offsetRows)
7991

@@ -106,6 +118,24 @@ func (m *Model) View(ktx *kontext.ProgramKtx, renderer *ui.Renderer) string {
106118
type partOffset struct {
107119
partition string
108120
offset int64
121+
hwm int64
122+
lag int64
123+
}
124+
125+
func (partOffset *partOffset) getHwmValue() string {
126+
if partOffset.hwm == kadmin.ErrorValue {
127+
return na
128+
} else {
129+
return humanize.Comma(partOffset.hwm)
130+
}
131+
}
132+
133+
func (partOffset *partOffset) getLagValue() string {
134+
if partOffset.lag == kadmin.ErrorValue {
135+
return na
136+
} else {
137+
return humanize.Comma(partOffset.lag)
138+
}
109139
}
110140

111141
func (m *Model) Update(msg tea.Msg) tea.Cmd {
@@ -122,6 +152,20 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd {
122152
if !m.cmdBar.IsFocussed() {
123153
return ui.PublishMsg(nav.LoadCGroupsPageMsg{})
124154
}
155+
case "f5":
156+
m.state = stateOffsetsLoading
157+
return func() tea.Msg {
158+
return m.lister.ListOffsets(m.groupName)
159+
}
160+
case "tab":
161+
// only accept when the table is focussed
162+
if !m.cmdBar.IsFocussed() {
163+
if m.tableFocus == topicFocus {
164+
m.tableFocus = offsetFocus
165+
} else {
166+
m.tableFocus = topicFocus
167+
}
168+
}
125169
}
126170
case kadmin.OffsetListingStartedMsg:
127171
cmds = append(cmds, msg.AwaitCompletion)
@@ -144,8 +188,10 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd {
144188
if !m.cmdBar.IsFocussed() {
145189
if m.tableFocus == topicFocus {
146190
m.topicsTable, cmd = m.topicsTable.Update(msg)
191+
m.offsetsTable.GotoTop()
147192
} else {
148193
m.offsetsTable, cmd = m.offsetsTable.Update(msg)
194+
m.totalTable.Update(msg)
149195
}
150196
if cmd != nil {
151197
cmds = append(cmds, cmd)
@@ -167,13 +213,18 @@ func (m *Model) recreateOffsetRows() {
167213

168214
selectedTopic := m.selectedRow()
169215
if selectedTopic != "" {
216+
totalLag := int64(0)
170217
m.offsetRows = []table.Row{}
171218
for _, partOffset := range m.topicByPartOffset[selectedTopic] {
219+
totalLag += int64(partOffset.lag)
172220
m.offsetRows = append(m.offsetRows, table.Row{
173221
partOffset.partition,
174222
humanize.Comma(partOffset.offset),
223+
partOffset.getHwmValue(),
224+
partOffset.getLagValue(),
175225
})
176226
}
227+
m.totalLag = totalLag
177228
sort.SliceStable(m.offsetRows, func(i, j int) bool {
178229
a, _ := strconv.Atoi(m.offsetRows[i][0])
179230
b, _ := strconv.Atoi(m.offsetRows[j][0])
@@ -201,6 +252,8 @@ func (m *Model) recreateTopicRows() {
201252
partOffset := partOffset{
202253
partition: strconv.FormatInt(int64(offset.Partition), 10),
203254
offset: offset.Offset,
255+
hwm: offset.HighWaterMark,
256+
lag: offset.Lag,
204257
}
205258
m.topicByPartOffset[offset.Topic] = append(m.topicByPartOffset[offset.Topic], partOffset)
206259
}
@@ -223,9 +276,9 @@ func (m *Model) selectedRow() string {
223276

224277
func (m *Model) Shortcuts() []statusbar.Shortcut {
225278
return []statusbar.Shortcut{
226-
{"Go Back", "esc"},
227-
{"Search", "/"},
228-
{"Refresh", "F5"},
279+
{Name: "Go Back", Keybinding: "esc"},
280+
{Name: "Search", Keybinding: "/"},
281+
{Name: "Refresh", Keybinding: "F5"},
229282
}
230283
}
231284

@@ -268,6 +321,7 @@ func New(lister kadmin.OffsetLister, group string) (*Model, tea.Cmd) {
268321
)
269322

270323
model := Model{
324+
lister: lister,
271325
cmdBar: NewCGroupCmdbar[string](
272326
cmdbar.NewSearchCmdBar("Search groups by name"),
273327
notifierCmdBar,
@@ -283,7 +337,10 @@ func New(lister kadmin.OffsetLister, group string) (*Model, tea.Cmd) {
283337
border.WithTitleFn(func() string {
284338
return border.KeyValueTitle("Total Topics", fmt.Sprintf(" %d", len(model.topicsRows)), true)
285339
}))
286-
model.offsetsBorder = border.New(border.WithInnerPaddingTop())
340+
model.offsetsBorder = border.New(border.WithInnerPaddingTop(),
341+
border.WithTitleFn(func() string {
342+
return border.KeyValueTitle("Total Lag", fmt.Sprintf(" %d", model.totalLag), false)
343+
}))
287344
return &model, func() tea.Msg {
288345
return lister.ListOffsets(group)
289346
}

0 commit comments

Comments
 (0)