44 "context"
55 "fmt"
66 "github.com/swanchain/go-computing-provider/internal/contract"
7+ "os"
8+ "path/filepath"
79 "strconv"
810 "strings"
911 "sync"
@@ -45,6 +47,9 @@ func (task *CronTask) RunTask() {
4547 task .checkJobReward ()
4648 task .cleanImageResource ()
4749 task .CheckCpBalance ()
50+ task .UpdateContainerLog ()
51+ task .DeleteSpaceLog ()
52+
4853}
4954
5055func CheckClusterNetworkPolicy () {
@@ -566,7 +571,7 @@ func addNodeLabel() {
566571
567572func (task * CronTask ) CheckCpBalance () {
568573 c := cron .New (cron .WithSeconds ())
569- c .AddFunc ("* 0/30 * * * ?" , func () {
574+ c .AddFunc ("0 0/30 * * * ?" , func () {
570575 defer func () {
571576 if err := recover (); err != nil {
572577 logs .GetLogger ().Errorf ("check cp balance catch panic error: %+v" , err )
@@ -577,6 +582,60 @@ func (task *CronTask) CheckCpBalance() {
577582 c .Start ()
578583}
579584
585+ func (task * CronTask ) UpdateContainerLog () {
586+ c := cron .New (cron .WithSeconds ())
587+ c .AddFunc ("* 0/10 * * * ?" , func () {
588+ defer func () {
589+ if err := recover (); err != nil {
590+ logs .GetLogger ().Errorf ("update container log catch panic error: %+v" , err )
591+ }
592+ }()
593+
594+ jobList , err := NewJobService ().GetJobList (models .UN_DELETEED_FLAG , - 1 )
595+ if err != nil {
596+ logs .GetLogger ().Errorf ("failed to get job data, error: %+v" , err )
597+ return
598+ }
599+
600+ for _ , job := range jobList {
601+ NewK8sService ().UpdateContainerLogToFile (job .JobUuid )
602+ }
603+ })
604+ c .Start ()
605+ }
606+
607+ func (task * CronTask ) DeleteSpaceLog () {
608+ c := cron .New (cron .WithSeconds ())
609+ c .AddFunc ("0 0/30 * * * ?" , func () {
610+ defer func () {
611+ if err := recover (); err != nil {
612+ logs .GetLogger ().Errorf ("update container log catch panic error: %+v" , err )
613+ }
614+ }()
615+
616+ jobList , err := NewJobService ().GetJobList (models .DELETED_FLAG , - 1 )
617+ if err != nil {
618+ logs .GetLogger ().Errorf ("failed to get job data, error: %+v" , err )
619+ return
620+ }
621+ cpRepoPath , _ := os .LookupEnv ("CP_PATH" )
622+
623+ for _ , job := range jobList {
624+ if job .CreateTime + int64 (24 * 7 * 3600 ) < time .Now ().Unix () {
625+ continue
626+ }
627+ if job .ExpireTime + int64 (conf .GetConfig ().API .ClearLogDuration )* 3600 < time .Now ().Unix () {
628+ err := os .RemoveAll (filepath .Join (cpRepoPath , constants .LOG_PATH_PREFIX , job .JobUuid ))
629+ if err != nil {
630+ logs .GetLogger ().Errorf ("failed to delete logs, job_uuid: %s, error: %v" , job .JobUuid , err )
631+ continue
632+ }
633+ }
634+ }
635+ })
636+ c .Start ()
637+ }
638+
580639func reportJobStatus (jobUuid string , deployStatus int ) bool {
581640 var job = new (models.JobEntity )
582641 job .JobUuid = jobUuid
@@ -659,7 +718,8 @@ func checkFcpJobInfoInChain(job *models.JobEntity) {
659718type TaskGroup struct {
660719 Items []* models.TaskEntity
661720 Ids []int64
662- Type int // 1: contract 2: sequncer
721+ Uuids []string
722+ Type int // 1: contract 2: sequncer 3: mining
663723}
664724
665725func handleTasksToGroup (list []* models.TaskEntity ) []TaskGroup {
@@ -695,3 +755,23 @@ func handleTasksToGroup(list []*models.TaskEntity) []TaskGroup {
695755 }
696756 return groups
697757}
758+
759+ func handleTasksToGroupForMining (list []* models.TaskEntity ) []TaskGroup {
760+ var groups []TaskGroup
761+ var group TaskGroup
762+
763+ const batchSize = 10
764+ for i := 0 ; i < len (list ); i ++ {
765+ if len (group .Items ) > batchSize {
766+ groups = append (groups , group )
767+ group = TaskGroup {}
768+ }
769+ group .Items = append (group .Items , list [i ])
770+ group .Uuids = append (group .Uuids , list [i ].Uuid )
771+ group .Type = 3
772+ }
773+ if len (group .Items ) > 0 {
774+ groups = append (groups , group )
775+ }
776+ return groups
777+ }
0 commit comments