Skip to content

Commit 22dae9e

Browse files
committed
Add a new worker that checks if an account is opened
1 parent 3f0870c commit 22dae9e

File tree

6 files changed

+139
-32
lines changed

6 files changed

+139
-32
lines changed

pkg/debrid/account/account.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package account
22

33
import (
4+
"fmt"
5+
"net/http"
46
"sync/atomic"
57

68
"github.com/puzpuzpuz/xsync/v4"
@@ -17,6 +19,9 @@ type Account struct {
1719
TrafficUsed atomic.Int64 `json:"traffic_used"` // Traffic used in bytes
1820
Username string `json:"username"` // Username for the account
1921
httpClient *request.Client
22+
23+
// Account reactivation tracking
24+
DisableCount atomic.Int32 `json:"disable_count"`
2025
}
2126

2227
func (a *Account) Equals(other *Account) bool {
@@ -69,3 +74,46 @@ func (a *Account) StoreDownloadLinks(dls map[string]*types.DownloadLink) {
6974
a.StoreDownloadLink(*dl)
7075
}
7176
}
77+
78+
// MarkDisabled marks the account as disabled and increments the disable count
79+
func (a *Account) MarkDisabled() {
80+
a.Disabled.Store(true)
81+
a.DisableCount.Add(1)
82+
}
83+
84+
func (a *Account) Reset() {
85+
a.DisableCount.Store(0)
86+
a.Disabled.Store(false)
87+
}
88+
89+
func (a *Account) CheckBandwidth() error {
90+
// Get a one of the download links to check if the account is still valid
91+
downloadLink := ""
92+
a.links.Range(func(key string, dl types.DownloadLink) bool {
93+
if dl.DownloadLink != "" {
94+
downloadLink = dl.DownloadLink
95+
return false
96+
}
97+
return true
98+
})
99+
if downloadLink == "" {
100+
return fmt.Errorf("no download link found")
101+
}
102+
103+
// Let's check the download link status
104+
req, err := http.NewRequest(http.MethodGet, downloadLink, nil)
105+
if err != nil {
106+
return err
107+
}
108+
// Use a simple client
109+
client := http.DefaultClient
110+
resp, err := client.Do(req)
111+
if err != nil {
112+
return err
113+
}
114+
defer resp.Body.Close()
115+
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
116+
return fmt.Errorf("account check failed with status code %d", resp.StatusCode)
117+
}
118+
return nil
119+
}

pkg/debrid/account/manager.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,22 @@ import (
1414
"go.uber.org/ratelimit"
1515
)
1616

17+
const (
18+
MaxDisableCount = 3
19+
)
20+
1721
type Manager struct {
1822
debrid string
1923
current atomic.Pointer[Account]
2024
accounts *xsync.Map[string, *Account]
25+
logger zerolog.Logger
2126
}
2227

2328
func NewManager(debridConf config.Debrid, downloadRL ratelimit.Limiter, logger zerolog.Logger) *Manager {
2429
m := &Manager{
2530
debrid: debridConf.Name,
2631
accounts: xsync.NewMap[string, *Account](),
32+
logger: logger,
2733
}
2834

2935
var firstAccount *Account
@@ -95,25 +101,29 @@ func (m *Manager) Current() *Account {
95101
// Slow path - find new current account
96102
activeAccounts := m.Active()
97103
if len(activeAccounts) == 0 {
98-
m.current.Store(nil)
99-
return nil
104+
// No active accounts left, try to use disabled ones
105+
m.logger.Warn().Str("debrid", m.debrid).Msg("No active accounts available, all accounts are disabled")
106+
allAccounts := m.All()
107+
if len(allAccounts) == 0 {
108+
m.logger.Error().Str("debrid", m.debrid).Msg("No accounts configured")
109+
m.current.Store(nil)
110+
return nil
111+
}
112+
m.current.Store(allAccounts[0])
113+
return allAccounts[0]
100114
}
101115

102116
newCurrent := activeAccounts[0]
103117
m.current.Store(newCurrent)
104118
return newCurrent
105119
}
106120

107-
func (m *Manager) setCurrent(account *Account) {
108-
m.current.Store(account)
109-
}
110-
111121
func (m *Manager) Disable(account *Account) {
112122
if account == nil {
113123
return
114124
}
115125

116-
account.Disabled.Store(true)
126+
account.MarkDisabled()
117127

118128
// If we're disabling the current account, it will be replaced
119129
// on the next Current() call - no need to proactively update
@@ -131,7 +141,7 @@ func (m *Manager) Disable(account *Account) {
131141

132142
func (m *Manager) Reset() {
133143
m.accounts.Range(func(key string, acc *Account) bool {
134-
acc.Disabled.Store(false)
144+
acc.Reset()
135145
return true
136146
})
137147

@@ -144,12 +154,6 @@ func (m *Manager) Reset() {
144154
}
145155
}
146156

147-
func (m *Manager) Update(account *Account) {
148-
if account != nil {
149-
m.accounts.Store(account.Token, account)
150-
}
151-
}
152-
153157
func (m *Manager) GetAccount(token string) (*Account, error) {
154158
if token == "" {
155159
return nil, fmt.Errorf("token cannot be empty")
@@ -209,3 +213,27 @@ func (m *Manager) Stats() []map[string]any {
209213
}
210214
return stats
211215
}
216+
217+
func (m *Manager) CheckAndResetBandwidth() {
218+
found := false
219+
m.accounts.Range(func(key string, acc *Account) bool {
220+
if acc.Disabled.Load() && acc.DisableCount.Load() < MaxDisableCount {
221+
if err := acc.CheckBandwidth(); err == nil {
222+
acc.Disabled.Store(false)
223+
found = true
224+
m.logger.Info().Str("debrid", m.debrid).Str("token", utils.Mask(acc.Token)).Msg("Re-activated disabled account")
225+
} else {
226+
m.logger.Debug().Err(err).Str("debrid", m.debrid).Str("token", utils.Mask(acc.Token)).Msg("Account still disabled")
227+
}
228+
}
229+
return true
230+
})
231+
if found {
232+
// If we re-activated any account, reset current to first active
233+
activeAccounts := m.Active()
234+
if len(activeAccounts) > 0 {
235+
m.current.Store(activeAccounts[0])
236+
}
237+
238+
}
239+
}

pkg/debrid/debrid.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,54 @@ func (d *Storage) StartWorker(ctx context.Context) error {
109109
ctx = context.Background()
110110
}
111111

112-
// Start all debrid syncAccounts
113-
// Runs every 1m
114-
if err := d.syncAccounts(); err != nil {
115-
return err
112+
// Start syncAccounts worker
113+
go d.syncAccountsWorker(ctx)
114+
115+
// Start bandwidth reset worker
116+
go d.checkBandwidthWorker(ctx)
117+
118+
return nil
119+
}
120+
121+
func (d *Storage) checkBandwidthWorker(ctx context.Context) {
122+
if ctx == nil {
123+
ctx = context.Background()
124+
}
125+
ticker := time.NewTicker(30 * time.Minute)
126+
go func() {
127+
for {
128+
select {
129+
case <-ctx.Done():
130+
return
131+
case <-ticker.C:
132+
d.checkAccountBandwidth()
133+
}
134+
}
135+
}()
136+
}
137+
138+
func (d *Storage) checkAccountBandwidth() {
139+
d.mu.Lock()
140+
defer d.mu.Unlock()
141+
142+
for _, debrid := range d.debrids {
143+
if debrid == nil || debrid.client == nil {
144+
continue
145+
}
146+
accountManager := debrid.client.AccountManager()
147+
if accountManager == nil {
148+
continue
149+
}
150+
accountManager.CheckAndResetBandwidth()
116151
}
152+
}
153+
154+
func (d *Storage) syncAccountsWorker(ctx context.Context) {
155+
if ctx == nil {
156+
ctx = context.Background()
157+
}
158+
159+
_ = d.syncAccounts()
117160
ticker := time.NewTicker(5 * time.Minute)
118161
go func() {
119162
for {
@@ -125,7 +168,7 @@ func (d *Storage) StartWorker(ctx context.Context) error {
125168
}
126169
}
127170
}()
128-
return nil
171+
129172
}
130173

131174
func (d *Storage) syncAccounts() error {

pkg/debrid/providers/alldebrid/alldebrid.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (ad *AllDebrid) SubmitMagnet(torrent *types.Torrent) (*types.Torrent, error
104104
}
105105
magnets := data.Data.Magnets
106106
if len(magnets) == 0 {
107-
return nil, fmt.Errorf("error adding torrent")
107+
return nil, fmt.Errorf("error adding torrent. No magnets returned")
108108
}
109109
magnet := magnets[0]
110110
torrentId := strconv.Itoa(magnet.ID)

pkg/webdav/file.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ func (f *File) getDownloadByteRange() (*[2]int64, error) {
101101
return byteRange, nil
102102
}
103103

104-
// setVideoStreamingHeaders sets the necessary headers for video streaming
105104
// It returns error and a boolean indicating if the request is a range request
106105
func (f *File) servePreloadedContent(w http.ResponseWriter, r *http.Request) error {
107106
content := f.content
@@ -156,8 +155,6 @@ func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, networkRe
156155
return &streamError{Err: err, StatusCode: http.StatusInternalServerError}
157156
}
158157

159-
setVideoStreamingHeaders(upstreamReq)
160-
161158
isRangeRequest := f.handleRangeRequest(upstreamReq, r, w)
162159
if isRangeRequest == -1 {
163160
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}

pkg/webdav/misc.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,3 @@ func parseRange(s string, size int64) ([]httpRange, error) {
212212
}
213213
return ranges, nil
214214
}
215-
216-
func setVideoStreamingHeaders(req *http.Request) {
217-
// Request optimizations for faster response
218-
req.Header.Set("Accept", "*/*")
219-
req.Header.Set("Accept-Encoding", "identity")
220-
req.Header.Set("Connection", "keep-alive")
221-
req.Header.Set("User-Agent", "VideoStream/1.0")
222-
req.Header.Set("Priority", "u=1")
223-
}

0 commit comments

Comments
 (0)