Skip to content

Commit 820a3b2

Browse files
committed
fix: clean episodes cache
1 parent 5473349 commit 820a3b2

File tree

1 file changed

+38
-11
lines changed

1 file changed

+38
-11
lines changed

main.go

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"os"
8+
"strings"
89
"time"
910

1011
"github.com/bradfitz/gomemcache/memcache"
@@ -18,9 +19,12 @@ func main() {
1819
zerolog.MessageFieldName = "msg"
1920

2021
k := kafka.NewReader(kafka.ReaderConfig{
21-
Brokers: []string{os.Getenv("KAFKA_BROKERS")},
22-
GroupID: "clear-old-cache",
23-
GroupTopics: []string{"debezium.chii.bangumi.chii_subject_interests"},
22+
Brokers: []string{os.Getenv("KAFKA_BROKERS")},
23+
GroupID: "clear-old-cache",
24+
GroupTopics: []string{
25+
"debezium.chii.bangumi.chii_subject_interests",
26+
"debezium.chii.bangumi.chii_episodes",
27+
},
2428
})
2529
mc := memcache.New(os.Getenv("MEMCACHED"))
2630

@@ -39,19 +43,38 @@ func main() {
3943
continue
4044
}
4145

42-
var m KafkaMessageValue[ChiiInterest]
43-
err = json.Unmarshal(msg.Value, &m)
44-
if err != nil {
45-
log.Err(err).Bytes("value", msg.Value).Msg("failed to decode kafka message as json")
46+
if strings.HasSuffix(msg.Topic, ".chii_episodes") {
47+
var m KafkaMessageValue[ChiiEpisode]
48+
err = json.Unmarshal(msg.Value, &m)
49+
if err != nil {
50+
log.Err(err).Bytes("value", msg.Value).Msg("failed to decode kafka message as json")
51+
continue
52+
}
53+
54+
if (m.Op != OpUpdate) && (m.Op != OpCreate) {
55+
continue
56+
}
57+
58+
_ = mc.Delete(fmt.Sprintf("subject_eps_%d", m.After.SubjectID))
4659
continue
4760
}
4861

49-
if m.Op != OpUpdate {
62+
if strings.HasSuffix(msg.Topic, ".chii_subject_interests") {
63+
var m KafkaMessageValue[ChiiInterest]
64+
err = json.Unmarshal(msg.Value, &m)
65+
if err != nil {
66+
log.Err(err).Bytes("value", msg.Value).Msg("failed to decode kafka message as json")
67+
continue
68+
}
69+
70+
if m.Op != OpUpdate {
71+
continue
72+
}
73+
74+
_ = mc.Delete(fmt.Sprintf("prg_ep_status_%d", m.After.InterestUID))
75+
_ = mc.Delete(fmt.Sprintf("prg_watching_v3_%d", m.After.InterestUID))
5076
continue
5177
}
52-
53-
_ = mc.Delete(fmt.Sprintf("prg_ep_status_%d", m.After.InterestUID))
54-
_ = mc.Delete(fmt.Sprintf("prg_watching_v3_%d", m.After.InterestUID))
5578
}
5679
}
5780

@@ -76,6 +99,10 @@ type KafkaMessageValue[T any] struct {
7699
Source Source `json:"source"`
77100
}
78101

102+
type ChiiEpisode struct {
103+
SubjectID uint64 `json:"ep_subject_id"`
104+
}
105+
79106
type ChiiInterest struct {
80107
InterestID int `json:"interest_id"`
81108
InterestUID int `json:"interest_uid"`

0 commit comments

Comments
 (0)