Skip to content

Commit 1da67fc

Browse files
authored
Merge pull request #579 from rusq/redl-tool-update
Update redownload tool to work with any type of source directory
2 parents 071df51 + 7c2d1c8 commit 1da67fc

File tree

2 files changed

+100
-36
lines changed

2 files changed

+100
-36
lines changed

cmd/slackdump/internal/diag/redownload.go

Lines changed: 98 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"iter"
78
"log/slog"
89
"os"
910
"path/filepath"
@@ -14,7 +15,6 @@ import (
1415
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/bootstrap"
1516
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg"
1617
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/golang/base"
17-
"github.com/rusq/slackdump/v3/internal/chunk"
1818
"github.com/rusq/slackdump/v3/internal/convert/transform/fileproc"
1919
"github.com/rusq/slackdump/v3/internal/structures"
2020
"github.com/rusq/slackdump/v3/processor"
@@ -25,16 +25,20 @@ var cmdRedownload = &base.Command{
2525
UsageLine: "tools redownload [flags] <archive_dir>",
2626
Short: "attempts to redownload missing files from the archive",
2727
Long: `# File redownload tool
28-
Redownload tool scans the directory with Slackdump Archive, validating the files.
29-
If a file is missing or has zero length, it will be redownloaded from the Slack API.
30-
The tool will not overwrite existing files, so it is safe to run multiple times.
28+
Redownload tool scans the slackdump export, archive or dump directory,
29+
validating the files.
3130
32-
Please note:
31+
If a file is missing or has zero length, it will be redownloaded from the Slack
32+
API. The tool will not overwrite existing files, so it is safe to run it
33+
multiple times.
34+
35+
** Please note: **
3336
3437
1. It requires you to have a valid authentication in the selected workspace.
3538
2. Ensure that you have selected the correct workspace using "slackdump workspace select".
36-
3. It only works with Slackdump Archive directories, Slack exports and dumps
37-
are not supported.`,
39+
3. It only support directories. ZIP files can not be updated. Unpack ZIP file
40+
to a directory before using this tool.
41+
`,
3842
FlagMask: cfg.OmitAll &^ cfg.OmitAuthFlags,
3943
Run: runRedownload,
4044
PrintFlags: true,
@@ -74,29 +78,22 @@ func validate(dir string) error {
7478
base.SetExitStatus(base.SUserError)
7579
return fmt.Errorf("error determining source type: %w", err)
7680
}
77-
if flags&source.FChunk == 0 {
81+
if flags&source.FZip != 0 {
7882
base.SetExitStatus(base.SUserError)
79-
return errors.New("expected a Slackdump Archive directory")
83+
return errors.New("unable to work with ZIP files, unpack it first")
8084
}
8185

82-
if fi, err := os.Stat(filepath.Join(dir, string(chunk.FWorkspace)+".json.gz")); err != nil {
83-
base.SetExitStatus(base.SUserError)
84-
return fmt.Errorf("error accessing the workspace file: %w", err)
85-
} else if fi.IsDir() {
86-
base.SetExitStatus(base.SUserError)
87-
return errors.New("this does not look like an archive directory")
88-
}
8986
return nil
9087
}
9188

9289
func redownload(ctx context.Context, dir string) (int, error) {
93-
cd, err := chunk.OpenDir(dir)
90+
src, err := source.Load(ctx, dir)
9491
if err != nil {
9592
return 0, fmt.Errorf("error opening directory: %w", err)
9693
}
97-
defer cd.Close()
94+
defer src.Close()
9895

99-
channels, err := cd.Channels(ctx)
96+
channels, err := src.Channels(ctx)
10097
if err != nil {
10198
return 0, fmt.Errorf("error reading channels: %w", err)
10299
}
@@ -113,16 +110,21 @@ func redownload(ctx context.Context, dir string) (int, error) {
113110
ctx,
114111
true,
115112
client,
116-
fsadapter.NewDirectory(cd.Name()),
113+
fsadapter.NewDirectory(src.Name()),
117114
cfg.Log,
118115
)
119116
defer dl.Stop()
120-
// we are using the same file subprocessor as the mattermost export.
121-
fproc := fileproc.New(dl)
117+
118+
// determine the file processor for the source.
119+
fproc, err := fileProcessorForSource(src, dl)
120+
if err != nil {
121+
return 0, err
122+
}
123+
defer fproc.Close()
122124

123125
total := 0
124126
for _, ch := range channels {
125-
if n, err := redlChannel(ctx, fproc, cd, &ch); err != nil {
127+
if n, err := redownloadChannel(ctx, fproc, src, &ch); err != nil {
126128
return total, err
127129
} else {
128130
total += n
@@ -132,35 +134,97 @@ func redownload(ctx context.Context, dir string) (int, error) {
132134
return total, nil
133135
}
134136

135-
func redlChannel(ctx context.Context, fp processor.Filer, cd *chunk.Directory, ch *slack.Channel) (int, error) {
137+
// fileProcessorForSource returns the appropriate file processor for the given
138+
// source.
139+
func fileProcessorForSource(src source.Sourcer, dl fileproc.Downloader) (processor.Filer, error) {
140+
var fproc processor.Filer
141+
srcFlags := src.Type()
142+
switch {
143+
case srcFlags&source.FDatabase != 0 || srcFlags&source.FChunk != 0:
144+
fproc = fileproc.New(dl)
145+
case srcFlags&source.FExport != 0:
146+
typ := src.Files().Type()
147+
if typ == source.STnone {
148+
typ = source.STmattermost // default to mattermost
149+
}
150+
fproc = fileproc.NewExport(typ, dl)
151+
case srcFlags&source.FDump != 0:
152+
fproc = fileproc.NewDump(dl)
153+
default:
154+
return nil, fmt.Errorf("unable to determine file storage format for the source with flags %s", srcFlags)
155+
}
156+
return fproc, nil
157+
}
158+
159+
func redownloadChannel(ctx context.Context, fp processor.Filer, src source.Sourcer, ch *slack.Channel) (int, error) {
136160
slog.Info("processing channel", "channel", ch.ID)
137-
f, err := cd.Open(chunk.FileID(ch.ID))
161+
it, err := src.AllMessages(ctx, ch.ID)
138162
if err != nil {
163+
if errors.Is(err, source.ErrNotFound) {
164+
// no data in the channel
165+
return 0, nil
166+
}
139167
return 0, fmt.Errorf("error reading messages: %w", err)
140168
}
141-
defer f.Close()
142-
msgs, err := f.AllMessages(ctx, ch.ID)
169+
// collect messages from the iterator
170+
msgs, err := collect(it)
143171
if err != nil {
144-
return 0, fmt.Errorf("error reading messages: %w", err)
172+
return 0, fmt.Errorf("error fetching messages: %w", err)
145173
}
174+
146175
if len(msgs) == 0 {
147176
return 0, nil
148177
}
149178
slog.Info("scanning messages", "num_messages", len(msgs))
150-
return scanMsgs(ctx, fp, cd, f, ch, msgs)
179+
return scanMsgs(ctx, fp, src, ch, msgs, false)
180+
}
181+
182+
// collect collects all Ks from iterator it, returning any encountered error.
183+
func collect[K any](it iter.Seq2[K, error]) ([]K, error) {
184+
kk := make([]K, 0)
185+
for k, err := range it {
186+
if err != nil {
187+
return kk, fmt.Errorf("error fetching messages: %w", err)
188+
}
189+
kk = append(kk, k)
190+
}
191+
return kk, nil
192+
}
193+
194+
func pathFuncForSource(src source.Sourcer) func(ch *slack.Channel, f *slack.File) string {
195+
if src.Files().Type() != source.STnone {
196+
// easy
197+
return src.Files().FilePath
198+
}
199+
typ := src.Type()
200+
switch {
201+
case typ&source.FDump != 0:
202+
return source.DumpFilepath
203+
default:
204+
// in all other cases we default to mattermost file path.
205+
return source.MattermostFilepath
206+
}
207+
// unreachable
151208
}
152209

153-
func scanMsgs(ctx context.Context, fp processor.Filer, cd *chunk.Directory, f *chunk.File, ch *slack.Channel, msgs []slack.Message) (int, error) {
210+
func scanMsgs(ctx context.Context, fp processor.Filer, src source.Sourcer, ch *slack.Channel, msgs []slack.Message, isThread bool) (int, error) {
154211
lg := slog.With("channel", ch.ID)
212+
// workaround for completely missing storage
213+
pathFn := pathFuncForSource(src)
155214
total := 0
156215
for _, m := range msgs {
157-
if structures.IsThreadStart(&m) {
158-
tm, err := f.AllThreadMessages(ch.ID, m.ThreadTimestamp)
216+
if structures.IsThreadStart(&m) && !isThread {
217+
it, err := src.AllThreadMessages(ctx, ch.ID, m.ThreadTimestamp)
159218
if err != nil {
160219
return 0, fmt.Errorf("error reading thread messages: %w", err)
161220
}
221+
tm, err := collect(it)
222+
if err != nil {
223+
return 0, fmt.Errorf("error collecting thread messages: %w", err)
224+
}
225+
162226
lg.Info("scanning thread messages", "num_messages", len(tm), "thread", m.ThreadTimestamp)
163-
if n, err := scanMsgs(ctx, fp, cd, f, ch, tm); err != nil {
227+
if n, err := scanMsgs(ctx, fp, src, ch, tm, true); err != nil {
164228
return total, err
165229
} else {
166230
total += n
@@ -170,7 +234,7 @@ func scanMsgs(ctx context.Context, fp processor.Filer, cd *chunk.Directory, f *c
170234
// collect all missing files from the message.
171235
var missing []slack.File
172236
for _, ff := range m.Files {
173-
name := filepath.Join(cd.Name(), source.MattermostFilepath(ch, &ff))
237+
name := filepath.Join(src.Name(), pathFn(ch, &ff))
174238
lg := lg.With("file", name)
175239
lg.Debug("checking file")
176240
if fi, err := os.Stat(name); err != nil {

source/source.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ type Resumer interface {
6969
Latest(ctx context.Context) (map[structures.SlackLink]time.Time, error)
7070
}
7171

72-
// Resumer is the interface that should be implemented by sources that can be
73-
// resumed.
72+
// SourceResumeCloser is the interface that should be implemented by sources
73+
// that can be resumed.
7474
type SourceResumeCloser interface {
7575
Sourcer
7676
Resumer

0 commit comments

Comments
 (0)