@@ -643,42 +643,165 @@ func (bs *BackupScheduler) getCouchDBDatabaseInfo(dbName string) (map[string]int
643643// exportCouchDBDocs 导出数据库所有文档
644644func (bs * BackupScheduler ) exportCouchDBDocs (dbName string ) ([]interface {}, error ) {
645645 url := fmt .Sprintf ("%s/%s/_all_docs?include_docs=true" , bs .couchdbClient .GetURL (), dbName )
646-
646+
647647 req , err := http .NewRequest (http .MethodGet , url , nil )
648648 if err != nil {
649649 return nil , err
650650 }
651-
651+
652652 bs .setBasicAuth (req )
653-
653+
654654 resp , err := http .DefaultClient .Do (req )
655655 if err != nil {
656656 return nil , err
657657 }
658658 defer resp .Body .Close ()
659-
659+
660660 if resp .StatusCode != http .StatusOK {
661661 body , _ := io .ReadAll (resp .Body )
662662 return nil , fmt .Errorf ("unexpected status code: %d, body: %s" , resp .StatusCode , string (body ))
663663 }
664-
664+
665665 var result struct {
666666 Rows []struct {
667667 Doc interface {} `json:"doc"`
668668 } `json:"rows"`
669669 }
670-
670+
671671 if err := json .NewDecoder (resp .Body ).Decode (& result ); err != nil {
672672 return nil , err
673673 }
674-
674+
675675 docs := make ([]interface {}, 0 , len (result .Rows ))
676676 for _ , row := range result .Rows {
677677 if row .Doc != nil {
678678 docs = append (docs , row .Doc )
679679 }
680680 }
681+
682+ return docs , nil
683+ }
681684
685+ // streamExportCouchDBDocs 流式导出数据库文档(分批处理,避免 OOM)
686+ func (bs * BackupScheduler ) streamExportCouchDBDocs (dbName string , dbInfo map [string ]interface {}, outputPath string ) (int64 , error ) {
687+ // 创建输出文件
688+ file , err := os .Create (outputPath )
689+ if err != nil {
690+ return 0 , fmt .Errorf ("failed to create output file: %w" , err )
691+ }
692+ defer file .Close ()
693+
694+ // 写入文件头部(元数据)
695+ docCount := 0
696+ if dc , ok := dbInfo ["doc_count" ].(float64 ); ok {
697+ docCount = int (dc )
698+ }
699+
700+ header := fmt .Sprintf (`{"db_name":"%s","update_seq":"%v","doc_count":%d,"timestamp":"%s","docs":[` ,
701+ dbName ,
702+ dbInfo ["update_seq" ],
703+ docCount ,
704+ time .Now ().Format (time .RFC3339 ),
705+ )
706+
707+ if _ , err := file .WriteString (header ); err != nil {
708+ return 0 , err
709+ }
710+
711+ // 分批导出文档,避免 OOM
712+ // 每批处理 1000 个文档
713+ batchSize := 1000
714+ totalExported := 0
715+
716+ for skip := 0 ; skip < docCount ; skip += batchSize {
717+ // 获取一批文档
718+ limit := batchSize
719+ if skip + limit > docCount {
720+ limit = docCount - skip
721+ }
722+
723+ docs , err := bs .exportCouchDBDocsBatch (dbName , skip , limit )
724+ if err != nil {
725+ return 0 , fmt .Errorf ("failed to export batch at skip=%d: %w" , skip , err )
726+ }
727+
728+ // 写入文档
729+ for i , doc := range docs {
730+ if totalExported > 0 || i > 0 {
731+ file .WriteString ("," )
732+ }
733+
734+ // 将文档序列化为 JSON 并写入
735+ docJSON , err := json .Marshal (doc )
736+ if err != nil {
737+ bs .log .Warnf ("Failed to marshal doc in %s: %v" , dbName , err )
738+ continue
739+ }
740+
741+ file .Write (docJSON )
742+ totalExported ++
743+ }
744+
745+ // 输出进度(每 10 批)
746+ if (skip / batchSize )% 10 == 0 && skip > 0 {
747+ progress := float64 (totalExported ) / float64 (docCount ) * 100
748+ bs .log .Printf (" Progress: %d/%d docs (%.1f%%)" , totalExported , docCount , progress )
749+ }
750+ }
751+
752+ // 写入文件尾部
753+ file .WriteString ("]}" )
754+
755+ // 获取文件大小
756+ stat , _ := os .Stat (outputPath )
757+
758+ if totalExported != docCount {
759+ bs .log .Warnf ("Expected %d docs, exported %d docs for %s" , docCount , totalExported , dbName )
760+ }
761+
762+ return stat .Size (), nil
763+ }
764+
765+ // exportCouchDBDocsBatch 分批导出文档
766+ func (bs * BackupScheduler ) exportCouchDBDocsBatch (dbName string , skip , limit int ) ([]interface {}, error ) {
767+ url := fmt .Sprintf ("%s/%s/_all_docs?include_docs=true&skip=%d&limit=%d" ,
768+ bs .couchdbClient .GetURL (), dbName , skip , limit )
769+
770+ req , err := http .NewRequest (http .MethodGet , url , nil )
771+ if err != nil {
772+ return nil , err
773+ }
774+
775+ bs .setBasicAuth (req )
776+
777+ resp , err := http .DefaultClient .Do (req )
778+ if err != nil {
779+ return nil , err
780+ }
781+ defer resp .Body .Close ()
782+
783+ if resp .StatusCode != http .StatusOK {
784+ body , _ := io .ReadAll (resp .Body )
785+ return nil , fmt .Errorf ("unexpected status code: %d, body: %s" , resp .StatusCode , string (body ))
786+ }
787+
788+ var result struct {
789+ Rows []struct {
790+ Doc interface {} `json:"doc"`
791+ } `json:"rows"`
792+ }
793+
794+ if err := json .NewDecoder (resp .Body ).Decode (& result ); err != nil {
795+ return nil , err
796+ }
797+
798+ docs := make ([]interface {}, 0 , len (result .Rows ))
799+ for _ , row := range result .Rows {
800+ if row .Doc != nil {
801+ docs = append (docs , row .Doc )
802+ }
803+ }
804+
682805 return docs , nil
683806}
684807
0 commit comments