66 "gpt-load/internal/models"
77 "gpt-load/internal/store"
88 "sync"
9+ "sync/atomic"
910 "time"
1011
1112 "github.com/sirupsen/logrus"
@@ -89,7 +90,7 @@ func (s *CronChecker) runLoop() {
8990 }
9091}
9192
92- // submitValidationJobs finds groups whose keys need validation and validates them.
93+ // submitValidationJobs finds groups whose keys need validation and validates them concurrently .
9394func (s * CronChecker ) submitValidationJobs () {
9495 var groups []models.Group
9596 if err := s .DB .Find (& groups ).Error ; err != nil {
@@ -98,51 +99,93 @@ func (s *CronChecker) submitValidationJobs() {
9899 }
99100
100101 validationStartTime := time .Now ()
102+ var wg sync.WaitGroup
101103
102104 for i := range groups {
103105 group := & groups [i ]
104- effectiveSettings : = s .SettingsManager .GetEffectiveConfig (group .Config )
105- interval := time .Duration (effectiveSettings .KeyValidationIntervalMinutes ) * time .Minute
106+ group . EffectiveConfig = s .SettingsManager .GetEffectiveConfig (group .Config )
107+ interval := time .Duration (group . EffectiveConfig .KeyValidationIntervalMinutes ) * time .Minute
106108
107109 if group .LastValidatedAt == nil || validationStartTime .Sub (* group .LastValidatedAt ) > interval {
108- groupProcessStart := time .Now ()
109- var invalidKeys []models.APIKey
110- err := s .DB .Where ("group_id = ? AND status = ?" , group .ID , models .KeyStatusInvalid ).Find (& invalidKeys ).Error
111- if err != nil {
112- logrus .Errorf ("CronChecker: Failed to get invalid keys for group %s: %v" , group .Name , err )
113- continue
114- }
110+ wg .Add (1 )
111+ g := group
112+ go func () {
113+ defer wg .Done ()
114+ s .validateGroupKeys (g )
115+ }()
116+ }
117+ }
118+
119+ wg .Wait ()
120+ }
121+
122+ // validateGroupKeys validates all invalid keys for a single group concurrently.
123+ func (s * CronChecker ) validateGroupKeys (group * models.Group ) {
124+ groupProcessStart := time .Now ()
115125
116- validatedCount := len (invalidKeys )
117- becameValidCount := 0
118- if validatedCount > 0 {
119- for j := range invalidKeys {
120- select {
121- case <- s .stopChan :
126+ var invalidKeys []models.APIKey
127+ err := s .DB .Where ("group_id = ? AND status = ?" , group .ID , models .KeyStatusInvalid ).Find (& invalidKeys ).Error
128+ if err != nil {
129+ logrus .Errorf ("CronChecker: Failed to get invalid keys for group %s: %v" , group .Name , err )
130+ return
131+ }
132+
133+ if len (invalidKeys ) == 0 {
134+ if err := s .DB .Model (group ).Update ("last_validated_at" , time .Now ()).Error ; err != nil {
135+ logrus .Errorf ("CronChecker: Failed to update last_validated_at for group %s: %v" , group .Name , err )
136+ }
137+ logrus .Infof ("CronChecker: Group '%s' has no invalid keys to check." , group .Name )
138+ return
139+ }
140+
141+ var becameValidCount int32
142+ var keyWg sync.WaitGroup
143+ jobs := make (chan * models.APIKey , len (invalidKeys ))
144+
145+ concurrency := group .EffectiveConfig .KeyValidationConcurrency
146+ for range concurrency {
147+ keyWg .Add (1 )
148+ go func () {
149+ defer keyWg .Done ()
150+ for {
151+ select {
152+ case key , ok := <- jobs :
153+ if ! ok {
122154 return
123- default :
124155 }
125- key := & invalidKeys [j ]
126156 isValid , _ := s .Validator .ValidateSingleKey (key , group )
127-
128157 if isValid {
129- becameValidCount ++
158+ atomic . AddInt32 ( & becameValidCount , 1 )
130159 }
160+ case <- s .stopChan :
161+ return
131162 }
132163 }
164+ }()
165+ }
133166
134- if err := s .DB .Model (group ).Update ("last_validated_at" , time .Now ()).Error ; err != nil {
135- logrus .Errorf ("CronChecker: Failed to update last_validated_at for group %s: %v" , group .Name , err )
136- }
137-
138- duration := time .Since (groupProcessStart )
139- logrus .Infof (
140- "CronChecker: Group '%s' validation finished. Total checked: %d, became valid: %d. Duration: %s." ,
141- group .Name ,
142- validatedCount ,
143- becameValidCount ,
144- duration .String (),
145- )
167+ DistributeLoop:
168+ for i := range invalidKeys {
169+ select {
170+ case jobs <- & invalidKeys [i ]:
171+ case <- s .stopChan :
172+ break DistributeLoop
146173 }
147174 }
175+ close (jobs )
176+
177+ keyWg .Wait ()
178+
179+ if err := s .DB .Model (group ).Update ("last_validated_at" , time .Now ()).Error ; err != nil {
180+ logrus .Errorf ("CronChecker: Failed to update last_validated_at for group %s: %v" , group .Name , err )
181+ }
182+
183+ duration := time .Since (groupProcessStart )
184+ logrus .Infof (
185+ "CronChecker: Group '%s' validation finished. Total checked: %d, became valid: %d. Duration: %s." ,
186+ group .Name ,
187+ len (invalidKeys ),
188+ becameValidCount ,
189+ duration .String (),
190+ )
148191}
0 commit comments