Skip to content

Commit b7c075b

Browse files
committed
perf: optimize job update refresh performance
1 parent 973cbb9 commit b7c075b

File tree

17 files changed

+811
-398
lines changed

17 files changed

+811
-398
lines changed

apis/converts.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,12 @@ func convertJobs(jobs ...*executor.Job) []*entity.Job {
6969
converted := make([]*entity.Job, 0, len(jobs))
7070
for _, job := range jobs {
7171
converted = append(converted, &entity.Job{
72-
Id: job.ID,
73-
Status: job.Status,
74-
Priority: job.Priority,
75-
CreateTime: job.CreateTime.Unix(),
76-
UpdateTime: job.UpdateTime.Unix(),
77-
State: job.State,
72+
Id: job.ID,
73+
Status: job.Status,
74+
Priority: job.Priority,
75+
CreateTimeNs: job.CreateTime.UnixNano(),
76+
UpdateTimeNs: job.UpdateTime.UnixNano(),
77+
State: job.State,
7878
})
7979
}
8080
return converted

apis/job_get_log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ func (api *API) JobGetLog(ctx context.Context, req *entity.JobGetLogRequest) (*e
2828
return nil, fmt.Errorf("read log fail, %w", err)
2929
}
3030

31-
return &entity.JobGetLogReply{Logs: buf}, nil
31+
return &entity.JobGetLogReply{Logs: buf, Offset: req.GetOffset() + int64(len(buf))}, nil
3232
}

apis/job_list.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ func (api *API) JobList(ctx context.Context, req *entity.JobListRequest) (*entit
2121
return nil, err
2222
}
2323
return &entity.JobListReply{Jobs: convertJobs(jobs...)}, nil
24+
case *entity.JobListRequest_RecentlyUpdate:
25+
jobs, err := api.exe.ListRecentlyUpdateJob(ctx, param.RecentlyUpdate)
26+
if err != nil {
27+
return nil, err
28+
}
29+
return &entity.JobListReply{Jobs: convertJobs(jobs...)}, nil
2430
default:
2531
return nil, fmt.Errorf("unexpected param, %T", req.Param)
2632
}

entity/job.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import (
55
"database/sql/driver"
66
)
77

8+
const (
9+
JobStatusVisible = 128
10+
)
11+
812
var (
913
_ = sql.Scanner(&JobParam{})
1014
_ = driver.Valuer(&JobParam{})

entity/job.pb.go

Lines changed: 186 additions & 105 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

entity/job.proto

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@ enum JobStatus {
1212
PROCESSING = 3;
1313
COMPLETED = 4;
1414

15-
FAILED = 255;
15+
FAILED = 127;
16+
17+
DELETED = 255;
1618
}
1719

1820
message Job {
1921
int64 id = 1;
2022
JobStatus status = 2;
2123
int64 priority = 3;
22-
int64 create_time = 4;
23-
int64 update_time = 5;
24+
int64 create_time_ns = 4;
25+
int64 update_time_ns = 5;
2426

2527
JobState state = 17;
2628
}
@@ -58,6 +60,12 @@ message JobFilter {
5860
optional int64 offset = 34;
5961
}
6062

63+
message JobRecentlyUpdateFilter {
64+
optional int64 update_since_ns = 1;
65+
66+
optional int64 limit = 33;
67+
}
68+
6169
message JobDisplay {
6270
oneof display {
6371
job_archive.JobArchiveDisplay archive = 1;

entity/service.pb.go

Lines changed: 245 additions & 214 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

entity/service.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ message JobListRequest {
113113
oneof param {
114114
JobMGetRequest mget = 1;
115115
job.JobFilter list = 2;
116+
job.JobRecentlyUpdateFilter recently_update = 3;
116117
}
117118
}
118119

@@ -163,6 +164,7 @@ message JobGetLogRequest {
163164

164165
message JobGetLogReply {
165166
bytes logs = 1;
167+
int64 offset = 2;
166168
}
167169

168170
message SourceListRequest {

entity/utils.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,34 @@
11
package entity
22

33
import (
4+
"bytes"
45
"database/sql/driver"
56
"fmt"
7+
"io"
68

9+
"github.com/klauspost/compress/zstd"
710
"github.com/modern-go/reflect2"
11+
"github.com/samuelncui/yatm/tools"
812
"google.golang.org/protobuf/proto"
913
)
1014

15+
const (
16+
compressThreshold = 1024
17+
)
18+
19+
var (
20+
magicHeaderV2 = []byte{0xff, 'y', 'm', '\x02'}
21+
22+
zstdEncoderPool = tools.NewPool(func() *zstd.Encoder {
23+
encoder, _ := zstd.NewWriter(nil) // there will be no error without options
24+
return encoder
25+
})
26+
zstdDecoderPool = tools.NewPool(func() *zstd.Decoder {
27+
decoder, _ := zstd.NewReader(nil) // there will be no error without options
28+
return decoder
29+
})
30+
)
31+
1132
// Scan implement database/sql.Scanner
1233
func Scan(dst proto.Message, src interface{}) error {
1334
typ := reflect2.TypeOf(dst).(reflect2.PtrType).Elem()
@@ -24,22 +45,62 @@ func Scan(dst proto.Message, src interface{}) error {
2445
default:
2546
return fmt.Errorf("process define extra scanner, unexpected type for i18n, %T", v)
2647
}
27-
2848
if len(buf) == 0 {
2949
return nil
3050
}
3151

52+
if bytes.HasPrefix(buf, magicHeaderV2) {
53+
decoder := zstdDecoderPool.Get()
54+
55+
err := decoder.Reset(bytes.NewBuffer(buf[len(magicHeaderV2):]))
56+
if err != nil {
57+
return fmt.Errorf("zstd reset decoder fail, %w", err)
58+
}
59+
60+
buf, err = io.ReadAll(decoder)
61+
if err != nil {
62+
return fmt.Errorf("zstd read decoder fail, %w", err)
63+
}
64+
65+
decoder.Reset(nil)
66+
zstdDecoderPool.Put(decoder)
67+
}
68+
3269
if err := proto.Unmarshal(buf, dst); err != nil {
33-
return fmt.Errorf("process define extra scanner, json unmarshal fail, %w", err)
70+
return fmt.Errorf("process define extra scanner, protobuf unmarshal fail, %w", err)
3471
}
72+
3573
return nil
3674
}
3775

3876
// Value implement database/sql/driver.Valuer
3977
func Value(src proto.Message) (driver.Value, error) {
4078
buf, err := proto.Marshal(src)
4179
if err != nil {
42-
return nil, fmt.Errorf("process define extra valuer, json marshal fail, %w", err)
80+
return nil, fmt.Errorf("process define extra valuer, protobuf marshal fail, %w", err)
81+
}
82+
83+
if len(buf) <= compressThreshold {
84+
return buf, nil
4385
}
86+
87+
buffer := bytes.NewBuffer(make([]byte, 0, len(buf)))
88+
buffer.Write(magicHeaderV2)
89+
90+
encoder := zstdEncoderPool.Get()
91+
encoder.Reset(buffer)
92+
_, err = encoder.Write(buf)
93+
if err != nil {
94+
return nil, fmt.Errorf("zstd write to encoder fail, %w", err)
95+
}
96+
err = encoder.Close()
97+
if err != nil {
98+
return nil, fmt.Errorf("zstd close encoder fail, %w", err)
99+
}
100+
101+
buf = buffer.Bytes()
102+
encoder.Reset(nil)
103+
zstdEncoderPool.Put(encoder)
104+
44105
return buf, nil
45106
}

executor/job.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type Job struct {
2222
State *entity.JobState
2323

2424
CreateTime time.Time
25-
UpdateTime time.Time
25+
UpdateTime time.Time `gorm:"index:idx_update_time"`
2626
}
2727

2828
func (j *Job) BeforeUpdate(tx *gorm.DB) error {
@@ -56,9 +56,18 @@ func (e *Executor) CreateJob(ctx context.Context, job *Job, param *entity.JobPar
5656
}
5757

5858
func (e *Executor) DeleteJobs(ctx context.Context, ids ...int64) error {
59-
if r := e.db.WithContext(ctx).Delete(ModelJob, ids); r.Error != nil {
60-
return fmt.Errorf("delete job fail, err= %w", r.Error)
59+
jobs, err := e.MGetJob(ctx, ids...)
60+
if err != nil {
61+
return fmt.Errorf("mget jobs fail")
62+
}
63+
64+
for _, job := range jobs {
65+
job.Status = entity.JobStatus_DELETED
66+
if r := e.db.WithContext(ctx).Save(job); r.Error != nil {
67+
return fmt.Errorf("delete job write db fail, id= %d err= %w", job.ID, r.Error)
68+
}
6169
}
70+
6271
return nil
6372
}
6473

@@ -120,6 +129,8 @@ func (e *Executor) ListJob(ctx context.Context, filter *entity.JobFilter) ([]*Jo
120129
db := e.db.WithContext(ctx)
121130
if filter.Status != nil {
122131
db = db.Where("status = ?", *filter.Status)
132+
} else {
133+
db = db.Where("status < ?", entity.JobStatusVisible)
123134
}
124135

125136
if filter.Limit != nil {
@@ -140,3 +151,25 @@ func (e *Executor) ListJob(ctx context.Context, filter *entity.JobFilter) ([]*Jo
140151

141152
return jobs, nil
142153
}
154+
155+
func (e *Executor) ListRecentlyUpdateJob(ctx context.Context, filter *entity.JobRecentlyUpdateFilter) ([]*Job, error) {
156+
db := e.db.WithContext(ctx)
157+
if filter.UpdateSinceNs != nil {
158+
db = db.Where("update_time > ?", time.Unix(0, *filter.UpdateSinceNs))
159+
}
160+
161+
if filter.Limit != nil {
162+
db = db.Limit(int(*filter.Limit))
163+
} else {
164+
db = db.Limit(20)
165+
}
166+
167+
db = db.Order("update_time ASC")
168+
169+
jobs := make([]*Job, 0, 20)
170+
if r := db.Find(&jobs); r.Error != nil {
171+
return nil, fmt.Errorf("list jobs fail, err= %w", r.Error)
172+
}
173+
174+
return jobs, nil
175+
}

0 commit comments

Comments
 (0)