Skip to content

Commit e8e5c68

Browse files
authored
Merge pull request #15 from rtmelsov/fix/file-stream
Fix/file stream
2 parents 719821f + 2d0c3fb commit e8e5c68

File tree

12 files changed

+540
-119
lines changed

12 files changed

+540
-119
lines changed

internal/akclient/download_file.go

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
filev1 "github.com/rtmelsov/adv-keeper/gen/go/proto/file/v1"
1818
"github.com/rtmelsov/adv-keeper/internal/helpers"
1919
"github.com/rtmelsov/adv-keeper/internal/middleware"
20+
"github.com/rtmelsov/adv-keeper/internal/models"
2021
"google.golang.org/grpc/codes"
2122
"google.golang.org/grpc/status"
2223
)
@@ -43,15 +44,17 @@ func safeBase(name string) string {
4344
}
4445
return string(runes)
4546
}
46-
47-
func DownloadFile(fileID string) (*filev1.GetFilesResponse, error) {
47+
func DownloadFile(fileID string, prog chan<- models.Prog) {
48+
defer close(prog)
4849
ctx, err := middleware.AddAuthData()
4950
if err != nil {
50-
return nil, err
51+
prog <- models.Prog{Err: err}
52+
return
5153
}
5254
outDir := helpers.DownloadFilesDir
5355
conn, err := grpc.NewClient(helpers.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
5456
if err != nil {
57+
prog <- models.Prog{Err: err}
5558
log.Fatalf("dial %s: %v", helpers.Addr, err)
5659
}
5760
defer conn.Close()
@@ -61,17 +64,20 @@ func DownloadFile(fileID string) (*filev1.GetFilesResponse, error) {
6164

6265
stream, err := c.DownloadFile(ctx, &filev1.DownloadFileRequest{Fileid: fileID})
6366
if err != nil {
64-
return nil, err
67+
prog <- models.Prog{Err: err}
68+
return
6569
}
6670

6771
// 1) ждём FileInfo
6872
first, err := stream.Recv()
6973
if err != nil {
70-
return nil, status.Errorf(codes.Internal, "read: %v", err)
74+
prog <- models.Prog{Err: status.Errorf(codes.Internal, "read: %v", err)}
75+
return
7176
}
7277
info := first.GetInfo()
7378
if info == nil {
74-
return nil, errors.New("first message must be FileInfo")
79+
prog <- models.Prog{Err: errors.New("first message must be FileInfo")}
80+
return
7581
}
7682

7783
filename := safeBase(info.GetFilename())
@@ -81,14 +87,16 @@ func DownloadFile(fileID string) (*filev1.GetFilesResponse, error) {
8187

8288
// 2) готовим пути/директории
8389
if err := os.MkdirAll(outDir, 0o755); err != nil {
84-
return nil, fmt.Errorf("mkdir: %w", err)
90+
prog <- models.Prog{Err: fmt.Errorf("mkdir: %w", err)}
91+
return
8592
}
8693
tmpPath := filepath.Join(outDir, filename+".part")
8794
finalPath := filepath.Join(outDir, filename)
8895

8996
out, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
9097
if err != nil {
91-
return nil, fmt.Errorf("create: %w", err)
98+
prog <- models.Prog{Err: fmt.Errorf("create: %w", err)}
99+
return
92100
}
93101
defer func() {
94102
out.Close()
@@ -97,12 +105,20 @@ func DownloadFile(fileID string) (*filev1.GetFilesResponse, error) {
97105
}
98106
}()
99107

108+
stat, err := out.Stat()
109+
if err != nil {
110+
prog <- models.Prog{Err: err}
111+
return
112+
}
113+
total := stat.Size()
114+
100115
h := sha256.New()
101116
var written int64
102117

103118
for {
104119
if ctx.Err() != nil {
105-
return nil, ctx.Err()
120+
prog <- models.Prog{Err: ctx.Err()}
121+
return
106122
}
107123
msg, rerr := stream.Recv()
108124
if rerr == io.EOF {
@@ -111,39 +127,49 @@ func DownloadFile(fileID string) (*filev1.GetFilesResponse, error) {
111127
if rerr != nil {
112128
// красиво покажем gRPC статус
113129
if st, ok := status.FromError(rerr); ok {
114-
return nil, fmt.Errorf("recv: %s", st.Message())
130+
prog <- models.Prog{Err: fmt.Errorf("recv: %s", st.Message())}
131+
return
115132
}
116-
return nil, fmt.Errorf("recv: %w", rerr)
133+
prog <- models.Prog{Err: fmt.Errorf("recv: %w", rerr)}
134+
return
117135
}
118136

119137
ch := msg.GetChunk()
120138
if ch == nil {
121-
return nil, errors.New("unexpected message: want FileChunk")
139+
prog <- models.Prog{Err: errors.New("unexpected message: want FileChunk")}
140+
return
122141
}
123142

124143
n, werr := out.Write(ch.Content)
125144
if werr != nil {
126-
return nil, fmt.Errorf("write: %w", werr)
145+
prog <- models.Prog{Err: fmt.Errorf("write: %w", werr)}
146+
return
127147
}
128148
written += int64(n)
149+
select {
150+
case prog <- models.Prog{Done: written, Total: total}:
151+
default: // не блокируем UI, если буфер заполнен
152+
}
129153
_, _ = h.Write(ch.Content)
130154
}
131155

132156
// 4) fsync → close → rename
133157
if err := out.Sync(); err != nil {
134-
return nil, fmt.Errorf("sync: %w", err)
158+
prog <- models.Prog{Err: fmt.Errorf("sync: %w", err)}
159+
return
135160
}
136161
if err := out.Close(); err != nil {
137-
return nil, fmt.Errorf("close: %w", err)
162+
prog <- models.Prog{Err: fmt.Errorf("close: %w", err)}
163+
return
138164
}
139165
if err := os.Rename(tmpPath, finalPath); err != nil {
140-
return nil, fmt.Errorf("rename: %w", err)
166+
prog <- models.Prog{Err: fmt.Errorf("rename: %w", err)}
167+
return
141168
}
142169

143170
// 5) сверим размер (если сервер прислал size)
144171
if info.Size > 0 && written != info.Size {
145-
return nil, fmt.Errorf("size mismatch: got %d, want %d", written, info.Size)
172+
prog <- models.Prog{Err: fmt.Errorf("size mismatch: got %d, want %d", written, info.Size)}
173+
return
146174
}
147-
148-
return nil, nil
149175
}

internal/akclient/upload_file.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,35 @@ import (
1414

1515
"github.com/rtmelsov/adv-keeper/internal/helpers"
1616
"github.com/rtmelsov/adv-keeper/internal/middleware"
17+
"github.com/rtmelsov/adv-keeper/internal/models"
1718

1819
filev1 "github.com/rtmelsov/adv-keeper/gen/go/proto/file/v1"
1920
)
2021

2122
const chunkSize = 1 << 20 // 1 MiB — безопасно ниже 4 MiB лимита на сообщение
2223

23-
func UploadFile(path string) (*filev1.UploadResponse, error) {
24+
func UploadFile(path string, prog chan<- models.Prog) {
25+
defer close(prog)
2426
ctx, err := middleware.AddAuthData()
2527
if err != nil {
26-
return nil, err
28+
prog <- models.Prog{Err: err}
29+
return
2730
}
2831

2932
f, err := os.Open(path)
3033
if err != nil {
31-
return nil, err
34+
prog <- models.Prog{Err: err}
35+
return
3236
}
3337
defer f.Close()
3438

3539
stat, err := f.Stat()
3640
if err != nil {
37-
return nil, err
41+
prog <- models.Prog{Err: err}
42+
return
3843
}
3944

45+
total := stat.Size()
4046
conn, err := grpc.NewClient(
4147
helpers.Addr,
4248
grpc.WithTransportCredentials(insecure.NewCredentials()),
@@ -52,15 +58,17 @@ func UploadFile(path string) (*filev1.UploadResponse, error) {
5258
)
5359

5460
if err != nil {
55-
return nil, err
61+
prog <- models.Prog{Err: err}
62+
return
5663
}
5764
defer conn.Close()
5865

5966
client := filev1.NewFileServiceClient(conn)
6067

6168
stream, err := client.Upload(ctx)
6269
if err != nil {
63-
return nil, err
70+
prog <- models.Prog{Err: err}
71+
return
6472
}
6573

6674
// 1) отправляем мета-инфу (первое сообщение)
@@ -73,7 +81,8 @@ func UploadFile(path string) (*filev1.UploadResponse, error) {
7381
},
7482
})
7583
if err != nil {
76-
return nil, err
84+
prog <- models.Prog{Err: err}
85+
return
7786
}
7887

7988
// 2) шлём файл кусками
@@ -89,25 +98,32 @@ func UploadFile(path string) (*filev1.UploadResponse, error) {
8998
if err := stream.Send(&filev1.UploadRequest{
9099
Payload: &filev1.UploadRequest_Chunk{Chunk: chunk},
91100
}); err != nil {
92-
return nil, fmt.Errorf("send chunk: %w", err)
101+
prog <- models.Prog{Err: err}
102+
return
93103
}
94104
offset += int64(n)
105+
select {
106+
case prog <- models.Prog{Done: offset, Total: total}:
107+
default: // не блокируем UI, если буфер заполнен
108+
}
95109
}
96110
if readErr == io.EOF {
97111
break // <— обязательно выходим!
98112
}
99113
if readErr != nil {
100-
return nil, fmt.Errorf("read file: %w", readErr)
114+
prog <- models.Prog{Err: err}
115+
return
101116
}
102117
}
103118

104119
// 3) закрываем отправку и получаем ответ
105-
resp, err := stream.CloseAndRecv()
120+
_, err = stream.CloseAndRecv()
106121
if err != nil {
107122
if st, ok := status.FromError(err); ok {
108-
return nil, fmt.Errorf("upload failed: %s: %s", st.Code(), st.Message())
123+
prog <- models.Prog{Err: fmt.Errorf("upload failed: %s: %s", st.Code(), st.Message())}
124+
return
109125
}
110-
return nil, fmt.Errorf("upload failed: %w", err)
126+
prog <- models.Prog{Err: fmt.Errorf("upload failed: %w", err)}
127+
return
111128
}
112-
return resp, nil
113129
}

internal/middleware/client_interceptor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@ import (
88
)
99

1010
func AddAuthData() (context.Context, error) {
11+
token := ""
1112
session, err := helpers.LoadSession()
1213
if err != nil {
1314
return nil, err
1415
}
15-
md := metadata.New(map[string]string{"authorization": "Bearer " + session.AccessToken})
16+
if session != nil {
17+
token = session.AccessToken
18+
}
19+
md := metadata.New(map[string]string{"authorization": "Bearer " + token})
1620

1721
ctx := metadata.NewOutgoingContext(context.Background(), md)
1822
return ctx, nil

internal/models/loader.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,9 @@ type LoaderType struct {
44
FileSize int64
55
ChankSize int64
66
}
7+
8+
type Prog struct {
9+
Done int64
10+
Total int64
11+
Err error
12+
}

internal/tui/app.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
filev1 "github.com/rtmelsov/adv-keeper/gen/go/proto/file/v1"
1010
"github.com/rtmelsov/adv-keeper/internal/akclient"
1111
"github.com/rtmelsov/adv-keeper/internal/models"
12+
"time"
1213
)
1314

1415
type ProfileModel struct {
@@ -17,6 +18,19 @@ type ProfileModel struct {
1718
}
1819

1920
type TuiModel struct {
21+
uploadCh <-chan models.Prog
22+
Uploaded int64
23+
UploadTotal int64
24+
UploadStart time.Time
25+
Uploading bool
26+
27+
// download
28+
downloadCh <-chan models.Prog
29+
Downloaded int64
30+
DownloadTotal int64
31+
DownloadStart time.Time
32+
Downloading bool
33+
2034
LoaderCount models.LoaderType
2135
W int
2236
H int

internal/tui/file_screen.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"github.com/charmbracelet/lipgloss"
77

88
// commonv1 "github.com/rtmelsov/adv-keeper/gen/go/proto/common/v1"
9-
// "github.com/rtmelsov/adv-keeper/internal/helpers"
109
"github.com/rtmelsov/adv-keeper/internal/akclient"
10+
"github.com/rtmelsov/adv-keeper/internal/models"
1111
"github.com/rtmelsov/adv-keeper/internal/ui"
1212
)
1313

@@ -50,16 +50,25 @@ func (m TuiModel) FileDetailsAction(msg string) (tea.Model, tea.Cmd) {
5050
return m, tea.ClearScreen
5151
case "enter":
5252
if m.RightCursor == 0 {
53-
_, err := akclient.DownloadFile(m.SelectedFileInfo.Fileid)
54-
if err != nil {
55-
m.Error = err.Error()
56-
}
53+
m.Loading = true
54+
return m, tea.Batch(
55+
m.Spin.Tick,
56+
func() tea.Msg {
57+
ch := make(chan models.Prog)
58+
go akclient.DownloadFile(m.SelectedFileInfo.Fileid, ch)
59+
return progressChanReadyMsg{ch: ch, Kind: OpDownload}
60+
},
61+
)
5762
}
5863
if m.RightCursor == 1 {
59-
err := akclient.DeleteFile(m.SelectedFileInfo.Fileid)
60-
if err != nil {
61-
m.Error = err.Error()
62-
}
64+
m.Loading = true
65+
return m, tea.Batch(
66+
m.Spin.Tick,
67+
func() tea.Msg {
68+
err := akclient.DeleteFile(m.SelectedFileInfo.Fileid)
69+
return deleteFileFinishedMsg{err: err}
70+
},
71+
)
6372
}
6473
return m, tea.ClearScreen
6574

0 commit comments

Comments
 (0)