Skip to content

Commit 93144cc

Browse files
authored
Merge pull request #617 from rusq/i598
Fast channel fetch and edge channel filters
2 parents 5dc4254 + 9631b9e commit 93144cc

35 files changed

+1376
-170
lines changed

channels.go

Lines changed: 92 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,49 @@ package slackdump
1919

2020
import (
2121
"context"
22+
"errors"
23+
"iter"
24+
"log/slog"
2225
"runtime/trace"
23-
"time"
2426

2527
"github.com/rusq/slack"
2628

2729
"github.com/rusq/slackdump/v4/internal/network"
30+
"github.com/rusq/slackdump/v4/stream"
2831
"github.com/rusq/slackdump/v4/types"
2932
)
3033

34+
// GetChannelsParameters holds the parameters for [GetChannelsEx] and
35+
// [StreamChannelsEx] functions.
36+
type GetChannelsParameters struct {
37+
// ChannelTypes allows to specify the channel types to fetch. If the slice
38+
// is empty, all channel types will be fetched.
39+
ChannelTypes []string
40+
// OnlyMyChannels restricts the channels only to the channels that the user
41+
// is a member of.
42+
OnlyMyChannels bool
43+
}
44+
3145
// GetChannels list all conversations for a user. `chanTypes` specifies the
32-
// type of messages to fetch. See github.com/rusq/slack docs for possible
46+
// type of channels to fetch. See github.com/rusq/slack docs for possible
3347
// values. If large number of channels is to be returned, consider using
34-
// StreamChannels.
48+
// [StreamChannelsEx]. It is a wrapper for [GetChannelsEx].
49+
//
50+
// Deprecated; Use [GetChannelsEx]. This function Will be removed in v5.
3551
func (s *Session) GetChannels(ctx context.Context, chanTypes ...string) (types.Channels, error) {
52+
p := GetChannelsParameters{
53+
ChannelTypes: chanTypes,
54+
OnlyMyChannels: false,
55+
}
56+
return s.GetChannelsEx(ctx, p)
57+
}
58+
59+
// GetChannelsEx list all conversations for a user. GetChannelParameters should
60+
// contain the fetch criteria. If large number of channels is to be returned,
61+
// consider using [StreamChannelsEx].
62+
func (s *Session) GetChannelsEx(ctx context.Context, p GetChannelsParameters) (types.Channels, error) {
3663
var allChannels types.Channels
37-
if err := s.getChannels(ctx, chanTypes, func(cc types.Channels) error {
64+
if err := s.getChannels(ctx, p, func(ctx context.Context, cc types.Channels) error {
3865
allChannels = append(allChannels, cc...)
3966
return nil
4067
}); err != nil {
@@ -44,74 +71,92 @@ func (s *Session) GetChannels(ctx context.Context, chanTypes ...string) (types.C
4471
}
4572

4673
// StreamChannels requests the channels from the API and calls the callback
47-
// function cb for each.
74+
// function cb for each. It is a wrapper for [StreamChannelsEx].
75+
//
76+
// Deprecated: Use [StreamChannelsEx]. This function Will be removed in v5.
4877
func (s *Session) StreamChannels(ctx context.Context, chanTypes []string, cb func(ch slack.Channel) error) error {
49-
return s.getChannels(ctx, chanTypes, func(chans types.Channels) error {
78+
p := GetChannelsParameters{
79+
ChannelTypes: chanTypes,
80+
OnlyMyChannels: false,
81+
}
82+
for chans, err := range s.StreamChannelsEx(ctx, p) {
83+
if err != nil {
84+
return err
85+
}
5086
for _, ch := range chans {
5187
if err := cb(ch); err != nil {
5288
return err
5389
}
5490
}
55-
return nil
56-
})
91+
}
92+
return nil
93+
}
94+
95+
// StreamChannelsEx requests the channels from the API and returns an iterator
96+
// of channel chunks.
97+
func (s *Session) StreamChannelsEx(ctx context.Context, p GetChannelsParameters) iter.Seq2[[]slack.Channel, error] {
98+
return func(yield func(ch []slack.Channel, err error) bool) {
99+
err := s.getChannels(ctx, p, func(ctx context.Context, chans types.Channels) error {
100+
if !yield(chans, nil) {
101+
return ErrStop
102+
}
103+
return nil
104+
})
105+
if err != nil {
106+
if errors.Is(err, ErrStop) {
107+
return
108+
}
109+
_ = yield(nil, err)
110+
}
111+
}
57112
}
58113

114+
type chanProcFunc func(ctx context.Context, ch types.Channels) error
115+
116+
func (f chanProcFunc) Channels(ctx context.Context, ch []slack.Channel) error {
117+
return f(ctx, ch)
118+
}
119+
120+
// ErrStop instructs early stop to streaming function, when returned from a
121+
// callback function.
122+
var ErrStop = errors.New("stop")
123+
59124
// getChannels list all channels for a user. `chanTypes` specifies
60125
// the type of messages to fetch. See github.com/rusq/slack docs for possible
61-
// values
62-
func (s *Session) getChannels(ctx context.Context, chanTypes []string, cb func(types.Channels) error) error {
126+
// values. If the cb function returns [ErrStop], the iteration will stop.
127+
func (s *Session) getChannels(ctx context.Context, gcp GetChannelsParameters, cb chanProcFunc) error {
63128
ctx, task := trace.NewTask(ctx, "getChannels")
64129
defer task.End()
65130

66-
limiter := s.limiter(network.Tier2)
67-
68-
if len(chanTypes) == 0 {
69-
chanTypes = AllChanTypes
131+
if len(gcp.ChannelTypes) == 0 {
132+
gcp.ChannelTypes = AllChanTypes
70133
}
71134

72-
params := &slack.GetConversationsParameters{Types: chanTypes, Limit: s.cfg.limits.Request.Channels}
73-
fetchStart := time.Now()
74-
var total int
75-
for i := 1; ; i++ {
76-
var (
77-
chans []slack.Channel
78-
nextcur string
79-
)
80-
reqStart := time.Now()
81-
if err := network.WithRetry(ctx, limiter, s.cfg.limits.Tier3.Retries, func(ctx context.Context) error {
82-
var err error
83-
trace.WithRegion(ctx, "GetConversationsContext", func() {
84-
chans, nextcur, err = s.client.GetConversationsContext(ctx, params)
85-
})
86-
return err
87-
}); err != nil {
88-
return err
135+
st := s.Stream()
136+
params := &slack.GetConversationsParameters{Types: gcp.ChannelTypes, Limit: s.cfg.limits.Request.Channels}
137+
if err := st.ListChannelsEx(ctx, cb, params, gcp.OnlyMyChannels); err != nil {
138+
if errors.Is(err, ErrStop) {
139+
// early stop indicated
140+
return nil
89141
}
90142

91-
if err := cb(chans); err != nil {
143+
if !shouldFallbackToListChannels(err) {
92144
return err
93145
}
94-
total += len(chans)
95-
96-
s.log.InfoContext(ctx, "channels", "request", i, "fetched", len(chans), "total", total,
97-
"speed", float64(len(chans))/time.Since(reqStart).Seconds(),
98-
"avg", float64(total)/time.Since(fetchStart).Seconds(),
99-
)
100-
101-
if nextcur == "" {
102-
s.log.InfoContext(ctx, "channels fetch complete", "total", total)
103-
break
104-
}
105-
106-
params.Cursor = nextcur
107-
108-
if err := limiter.Wait(ctx); err != nil {
146+
slog.DebugContext(ctx, "falling back to simple List Channels", "err", err)
147+
if err := st.ListChannels(ctx, cb, params); err != nil {
148+
if errors.Is(err, ErrStop) {
149+
// early stop indicated
150+
return nil
151+
}
109152
return err
110153
}
111154
}
112155
return nil
113156
}
114157

158+
func shouldFallbackToListChannels(err error) bool { return errors.Is(err, stream.ErrOpNotSupported) }
159+
115160
// GetChannelMembers returns a list of all members in a channel.
116161
func (sd *Session) GetChannelMembers(ctx context.Context, channelID string) ([]string, error) {
117162
var ids []string

channels_test.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ import (
2929

3030
"github.com/rusq/slackdump/v4/internal/client"
3131
"github.com/rusq/slackdump/v4/internal/client/mock_client"
32+
"github.com/rusq/slackdump/v4/internal/edge"
3233
"github.com/rusq/slackdump/v4/internal/network"
3334
"github.com/rusq/slackdump/v4/internal/structures"
35+
"github.com/rusq/slackdump/v4/stream"
3436
"github.com/rusq/slackdump/v4/types"
3537
)
3638

@@ -108,7 +110,7 @@ func TestSession_getChannels(t *testing.T) {
108110
}
109111

110112
var got types.Channels
111-
err := sd.getChannels(tt.args.ctx, tt.args.chanTypes, func(c types.Channels) error {
113+
err := sd.getChannels(tt.args.ctx, GetChannelsParameters{ChannelTypes: tt.args.chanTypes}, func(_ context.Context, c types.Channels) error {
112114
got = append(got, c...)
113115
return nil
114116
})
@@ -157,6 +159,53 @@ func TestSession_GetChannels(t *testing.T) {
157159
}
158160
}
159161

162+
func Test_shouldFallbackToListChannels(t *testing.T) {
163+
tests := []struct {
164+
name string
165+
err error
166+
want bool
167+
}{
168+
{
169+
name: "op not supported",
170+
err: stream.ErrOpNotSupported,
171+
want: true,
172+
},
173+
{
174+
name: "api no_channels_supplied",
175+
err: &edge.APIError{Err: "no_channels_supplied"},
176+
want: false, // Not a fallback condition anymore
177+
},
178+
{
179+
name: "api internal_error",
180+
err: &edge.APIError{Err: "internal_error"},
181+
want: false, // Not a fallback condition anymore
182+
},
183+
{
184+
name: "wrapped callback no_channels_supplied",
185+
err: errors.New("API error: callback error: no_channels_supplied"),
186+
want: false, // Not a fallback condition anymore
187+
},
188+
{
189+
name: "wrapped callback internal_error",
190+
err: errors.New("API error: callback error: internal_error"),
191+
want: false, // Not a fallback condition anymore
192+
},
193+
{
194+
name: "other error",
195+
err: errors.New("network timeout"),
196+
want: false,
197+
},
198+
}
199+
200+
for _, tt := range tests {
201+
t.Run(tt.name, func(t *testing.T) {
202+
if got := shouldFallbackToListChannels(tt.err); got != tt.want {
203+
t.Fatalf("shouldFallbackToListChannels() = %v, want %v", got, tt.want)
204+
}
205+
})
206+
}
207+
}
208+
160209
func TestSession_GetChannelMembers(t *testing.T) {
161210
type fields struct {
162211
wspInfo *slack.AuthTestResponse

cmd/slackdump/internal/cfg/cfg.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ const (
140140
OmitChunkFileMode
141141
OmitYesManFlag
142142
OmitChannelTypesFlag
143+
OmitMemberOnlyFlag
143144

144145
OmitAll = OmitConfigFlag |
145146
OmitWithFilesFlag |
@@ -155,6 +156,7 @@ const (
155156
OmitWithAvatarsFlag |
156157
OmitChunkFileMode |
157158
OmitYesManFlag |
159+
OmitMemberOnlyFlag |
158160
OmitChannelTypesFlag
159161
)
160162

@@ -215,8 +217,10 @@ func SetBaseFlags(fs *flag.FlagSet, mask FlagMask) {
215217
fs.Var(&Oldest, "time-from", "timestamp of the oldest message to fetch (UTC timezone, `YYYY-MM-DDTHH:MM:SS`)")
216218
fs.Var(&Latest, "time-to", "timestamp of the newest message to fetch (UTC timezone, `YYYY-MM-DDTHH:MM:SS`)")
217219
}
218-
if mask&OmitCustomUserFlags == 0 {
220+
if mask&OmitMemberOnlyFlag == 0 {
219221
fs.BoolVar(&MemberOnly, "member-only", false, "export only channels, which the current user belongs to (if no channels are specified)")
222+
}
223+
if mask&OmitCustomUserFlags == 0 {
220224
fs.BoolVar(&OnlyChannelUsers, "channel-users", false, "export only users involved in the channel, and skip fetching of all users")
221225
fs.BoolVar(&IncludeCustomLabels, "custom-labels", false, "request user's custom fields labels, may result in requests being throttled hard")
222226
}

cmd/slackdump/internal/dump/dump.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ var CmdDump = &base.Command{
5656
RequireAuth: true,
5757
PrintFlags: true,
5858
FlagMask: (cfg.OmitCustomUserFlags |
59+
cfg.OmitMemberOnlyFlag |
5960
cfg.OmitRecordFilesFlag |
6061
cfg.OmitWithAvatarsFlag |
6162
cfg.OmitChannelTypesFlag), // we don't need channel types, as dump requires explicit channel ids

cmd/slackdump/internal/list/channels.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@ import (
2828
"github.com/rusq/slackdump/v4/cmd/slackdump/internal/cfg"
2929
"github.com/rusq/slackdump/v4/cmd/slackdump/internal/golang/base"
3030
"github.com/rusq/slackdump/v4/internal/cache"
31+
"github.com/rusq/slackdump/v4/internal/structures"
3132
"github.com/rusq/slackdump/v4/types"
3233
)
3334

3435
var CmdListChannels = &base.Command{
3536
Run: runListChannels,
3637
UsageLine: "slackdump list channels [flags] [filename]",
3738
PrintFlags: true,
38-
FlagMask: flagMask &^ cfg.OmitChannelTypesFlag,
39+
FlagMask: flagMask &^ cfg.OmitChannelTypesFlag &^ cfg.OmitMemberOnlyFlag,
3940
Short: "list workspace channels",
4041
Long: fmt.Sprintf(`
4142
# List Channels Command
@@ -148,9 +149,25 @@ func (l *channels) Retrieve(ctx context.Context, sess *slackdump.Session, m *cac
148149
return nil
149150
}
150151
}
151-
cc, err := sess.GetChannels(ctx, cfg.ChannelTypes...)
152-
if err != nil {
153-
return fmt.Errorf("error getting channels: %w", err)
152+
p := slackdump.GetChannelsParameters{
153+
ChannelTypes: cfg.ChannelTypes,
154+
OnlyMyChannels: cfg.MemberOnly,
155+
}
156+
157+
var cc []slack.Channel
158+
for chans, err := range sess.StreamChannelsEx(ctx, p) {
159+
if err != nil {
160+
return fmt.Errorf("error getting channels: %w", err)
161+
}
162+
if cfg.MemberOnly {
163+
for _, ch := range chans {
164+
if structures.IsMember(&ch) {
165+
cc = append(cc, ch)
166+
}
167+
}
168+
} else {
169+
cc = append(cc, chans...)
170+
}
154171
}
155172
l.channels = cc
156173
l.users = <-usersc

internal/chunk/control/control_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/rusq/slackdump/v4/internal/structures"
3030
"github.com/rusq/slackdump/v4/mocks/mock_processor"
3131
"github.com/rusq/slackdump/v4/processor"
32+
"github.com/rusq/slackdump/v4/stream"
3233
)
3334

3435
func TestController_Close(t *testing.T) {
@@ -144,6 +145,7 @@ func TestController_Run(t *testing.T) {
144145
expectFn: func(s *mock_control.MockStreamer, f *mock_processor.MockFiler, a *mock_processor.MockAvatars, tf *mock_control.MockExportTransformer, erc *mock_control.MockEncodeReferenceCloser) {
145146
testUsers := []slack.User{testUser1, testUser2}
146147
// called by the runner
148+
s.EXPECT().ListChannelsEx(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(stream.ErrOpNotSupported)
147149
s.EXPECT().ListChannels(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
148150
s.EXPECT().Users(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, proc processor.Users, opt ...slack.GetUsersOption) error {
149151
proc.Users(ctx, testUsers)
@@ -172,6 +174,7 @@ func TestController_Run(t *testing.T) {
172174
},
173175
expectFn: func(s *mock_control.MockStreamer, f *mock_processor.MockFiler, a *mock_processor.MockAvatars, tf *mock_control.MockExportTransformer, erc *mock_control.MockEncodeReferenceCloser) {
174176
// called by the runner
177+
s.EXPECT().ListChannelsEx(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(stream.ErrOpNotSupported)
175178
s.EXPECT().ListChannels(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
176179
s.EXPECT().Users(gomock.Any(), gomock.Any()).Return(assert.AnError)
177180
s.EXPECT().Conversations(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
@@ -238,6 +241,7 @@ func TestController_RunNoTransform(t *testing.T) {
238241
expectFn: func(s *mock_control.MockStreamer, f *mock_processor.MockFiler, a *mock_processor.MockAvatars, tf *mock_control.MockExportTransformer, erc *mock_control.MockEncodeReferenceCloser) {
239242
testUsers := []slack.User{testUser1, testUser2}
240243
// called by the runner
244+
s.EXPECT().ListChannelsEx(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(stream.ErrOpNotSupported)
241245
s.EXPECT().ListChannels(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
242246
s.EXPECT().Users(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, proc processor.Users, opt ...slack.GetUsersOption) error {
243247
proc.Users(ctx, testUsers)
@@ -269,6 +273,7 @@ func TestController_RunNoTransform(t *testing.T) {
269273
},
270274
expectFn: func(s *mock_control.MockStreamer, f *mock_processor.MockFiler, a *mock_processor.MockAvatars, tf *mock_control.MockExportTransformer, erc *mock_control.MockEncodeReferenceCloser) {
271275
// called by the runner
276+
s.EXPECT().ListChannelsEx(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(stream.ErrOpNotSupported)
272277
s.EXPECT().ListChannels(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
273278
s.EXPECT().Users(gomock.Any(), gomock.Any()).Return(assert.AnError)
274279
s.EXPECT().Conversations(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)

0 commit comments

Comments
 (0)