Skip to content

Commit 8976f81

Browse files
define batch size
1 parent 317975f commit 8976f81

File tree

1 file changed

+80
-28
lines changed

1 file changed

+80
-28
lines changed

health.go

Lines changed: 80 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,6 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
759759
platformHealth.OpensearchOps = <-opensearchHealthChannel
760760
}
761761

762-
763762
datastoreHealthChannel := make(chan DatastoreHealth)
764763
go func() {
765764
datastoreHealth, err := RunOpsDatastore(apiKey, orgId)
@@ -795,22 +794,22 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
795794
errorChannel <- err
796795
}()
797796

798-
// if project.Environment == "cloud" {
799-
// // App upload via zip is not supported in self-hosted machine yet
800-
// pythonAppHealthChannel := make(chan AppHealth)
801-
// go func() {
802-
// pythonAppHealth, err := RunOpsAppUpload(apiKey, orgId)
803-
// if err != nil {
804-
// log.Printf("[ERROR] Failed running python app health check: %s", err)
805-
// }
806-
//
807-
// pythonAppHealthChannel <- pythonAppHealth
808-
// errorChannel <- err
809-
// }()
810-
//
811-
// // Use channel for getting RunOpsWorkflow function results
812-
// platformHealth.PythonApps = <-pythonAppHealthChannel
813-
// }
797+
// if project.Environment == "cloud" {
798+
// // App upload via zip is not supported in self-hosted machine yet
799+
// pythonAppHealthChannel := make(chan AppHealth)
800+
// go func() {
801+
// pythonAppHealth, err := RunOpsAppUpload(apiKey, orgId)
802+
// if err != nil {
803+
// log.Printf("[ERROR] Failed running python app health check: %s", err)
804+
// }
805+
//
806+
// pythonAppHealthChannel <- pythonAppHealth
807+
// errorChannel <- err
808+
// }()
809+
//
810+
// // Use channel for getting RunOpsWorkflow function results
811+
// platformHealth.PythonApps = <-pythonAppHealthChannel
812+
// }
814813

815814
platformHealth.Datastore = <-datastoreHealthChannel
816815
platformHealth.FileOps = <-fileHealthChannel
@@ -3615,7 +3614,34 @@ func FixOpensearchIndexPrefix(ctx context.Context) (OpensearchPrefixFixResult, e
36153614
"number_of_replicas": 1,
36163615
"refresh_interval": "30s",
36173616
},
3618-
Mappings: map[string]interface{}{
3617+
}
3618+
3619+
sourceMappings := map[string]interface{}{}
3620+
mappingReq, err := http.NewRequest("GET", fmt.Sprintf("%s/%s/_mapping", opensearchUrl, indexName), nil)
3621+
if err == nil {
3622+
mappingResp, err := foundClient.Client.Transport.Perform(mappingReq)
3623+
if err == nil {
3624+
mappingBody, err := io.ReadAll(mappingResp.Body)
3625+
if err == nil {
3626+
mappingResp.Body.Close()
3627+
mappingInfo := map[string]struct {
3628+
Mappings map[string]interface{} `json:"mappings"`
3629+
}{}
3630+
if err := json.Unmarshal(mappingBody, &mappingInfo); err == nil {
3631+
if info, ok := mappingInfo[indexName]; ok && len(info.Mappings) > 0 {
3632+
sourceMappings = info.Mappings
3633+
}
3634+
}
3635+
} else {
3636+
mappingResp.Body.Close()
3637+
}
3638+
}
3639+
}
3640+
3641+
if len(sourceMappings) > 0 {
3642+
indexConfig.Mappings = sourceMappings
3643+
} else {
3644+
indexConfig.Mappings = map[string]interface{}{
36193645
"dynamic_templates": []map[string]interface{}{
36203646
{
36213647
"strings_as_keywords": map[string]interface{}{
@@ -3626,7 +3652,7 @@ func FixOpensearchIndexPrefix(ctx context.Context) (OpensearchPrefixFixResult, e
36263652
},
36273653
},
36283654
},
3629-
},
3655+
}
36303656
}
36313657
}
36323658

@@ -3660,14 +3686,6 @@ func FixOpensearchIndexPrefix(ctx context.Context) (OpensearchPrefixFixResult, e
36603686
return result, fmt.Errorf("failed checking index %s: %s", newIndex, string(existsBody))
36613687
}
36623688

3663-
reindexBody, err := json.Marshal(OpensearchReindexRequest{
3664-
Source: OpensearchReindexSourceDest{Index: indexName},
3665-
Dest: OpensearchReindexSourceDest{Index: newIndex},
3666-
})
3667-
if err != nil {
3668-
return result, err
3669-
}
3670-
36713689
sourceCountReq, err := http.NewRequest("GET", fmt.Sprintf("%s/%s/_count", opensearchUrl, indexName), nil)
36723690
if err != nil {
36733691
return result, err
@@ -3696,7 +3714,41 @@ func FixOpensearchIndexPrefix(ctx context.Context) (OpensearchPrefixFixResult, e
36963714
return result, err
36973715
}
36983716

3699-
reindexReq, err := http.NewRequest("POST", fmt.Sprintf("%s/_reindex?wait_for_completion=true", opensearchUrl), bytes.NewBuffer(reindexBody))
3717+
reindexPayload := map[string]interface{}{
3718+
"source": map[string]interface{}{
3719+
"index": indexName,
3720+
},
3721+
"dest": map[string]interface{}{
3722+
"index": newIndex,
3723+
},
3724+
}
3725+
3726+
if batchSizeStr := os.Getenv("OPENSEARCH_REINDEX_BATCH_SIZE"); batchSizeStr != "" {
3727+
if batchSize, err := strconv.Atoi(batchSizeStr); err == nil && batchSize > 0 {
3728+
reindexPayload["source"].(map[string]interface{})["size"] = batchSize
3729+
}
3730+
}
3731+
3732+
if slicesStr := os.Getenv("OPENSEARCH_REINDEX_SLICES"); slicesStr != "" {
3733+
if slices, err := strconv.Atoi(slicesStr); err == nil && slices > 0 {
3734+
reindexPayload["slices"] = slices
3735+
}
3736+
}
3737+
3738+
reindexBody, err := json.Marshal(reindexPayload)
3739+
if err != nil {
3740+
return result, err
3741+
}
3742+
3743+
reindexUrl := fmt.Sprintf("%s/_reindex?wait_for_completion=true", opensearchUrl)
3744+
if scroll := strings.TrimSpace(os.Getenv("OPENSEARCH_REINDEX_SCROLL")); scroll != "" {
3745+
reindexUrl = fmt.Sprintf("%s&scroll=%s", reindexUrl, scroll)
3746+
}
3747+
if rps := strings.TrimSpace(os.Getenv("OPENSEARCH_REINDEX_RPS")); rps != "" {
3748+
reindexUrl = fmt.Sprintf("%s&requests_per_second=%s", reindexUrl, rps)
3749+
}
3750+
3751+
reindexReq, err := http.NewRequest("POST", reindexUrl, bytes.NewBuffer(reindexBody))
37003752
if err != nil {
37013753
return result, err
37023754
}

0 commit comments

Comments
 (0)