@@ -88,31 +88,37 @@ private Class<? extends OpenSearchClient> getOpenSearchClientClass(Version versi
88
88
private Boolean getCompressionEnabled () {
89
89
log .atInfo ().setMessage ("Checking compression on cluster" ).log ();
90
90
return client .getAsync ("_cluster/settings?include_defaults=true" , null )
91
- .flatMap (this ::checkCompressionFromResponse )
92
- .doOnError (e -> log .error (e .getMessage ()))
93
- .retryWhen (OpenSearchClient .CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY )
94
- .flatMap (hasCompressionEnabled -> {
95
- log .atInfo ().setMessage ("Checking Compression, was enabled? {}" ).addArgument (hasCompressionEnabled ).log ();
96
- return Mono .just (hasCompressionEnabled );
97
- })
98
- .block ();
91
+ .flatMap (this ::checkCompressionFromResponse )
92
+ .doOnError (e -> log .atWarn ()
93
+ .setMessage ("Check cluster compression failed" )
94
+ .setCause (e )
95
+ .log ())
96
+ .retryWhen (OpenSearchClient .CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY )
97
+ .onErrorReturn (false )
98
+ .doOnNext (hasCompressionEnabled -> log .atInfo ()
99
+ .setMessage ("After querying target, compression={}" )
100
+ .addArgument (hasCompressionEnabled ).log ())
101
+ .block ();
99
102
}
100
103
101
104
public Version getClusterVersion () {
102
105
var versionFromRootApi = client .getAsync ("" , null )
103
- .flatMap (resp -> {
104
- if (resp .statusCode == 200 ) {
105
- return versionFromResponse (resp );
106
- }
107
- // If the root API doesn't exist, the cluster is OpenSearch Serverless
108
- if (resp .statusCode == 404 ) {
109
- return Mono .just (AMAZON_SERVERLESS_VERSION );
110
- }
111
- return Mono .error (new OpenSearchClient .UnexpectedStatusCode (resp ));
112
- })
113
- .doOnError (e -> log .error (e .getMessage ()))
114
- .retryWhen (OpenSearchClient .CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY )
115
- .block ();
106
+ .flatMap (resp -> {
107
+ if (resp .statusCode == 200 ) {
108
+ return versionFromResponse (resp );
109
+ }
110
+ // If the root API doesn't exist, the cluster is OpenSearch Serverless
111
+ if (resp .statusCode == 404 ) {
112
+ return Mono .just (AMAZON_SERVERLESS_VERSION );
113
+ }
114
+ return Mono .error (new OpenSearchClient .UnexpectedStatusCode (resp ));
115
+ })
116
+ .doOnError (e -> log .atWarn ()
117
+ .setMessage ("Check cluster version failed" )
118
+ .setCause (e )
119
+ .log ())
120
+ .retryWhen (OpenSearchClient .CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY )
121
+ .block ();
116
122
117
123
// Compatibility mode is only enabled on OpenSearch clusters responding with the version of 7.10.2
118
124
if (!VersionMatchers .isES_7_10 .test (versionFromRootApi )) {
@@ -123,8 +129,9 @@ public Version getClusterVersion() {
123
129
.doOnError (e -> log .error (e .getMessage ()))
124
130
.retryWhen (OpenSearchClient .CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY )
125
131
.flatMap (hasCompatibilityModeEnabled -> {
126
- log .atInfo ().setMessage ("Checking CompatibilityMode, was enabled? {}" ).addArgument (hasCompatibilityModeEnabled ).log ();
132
+ log .atInfo ().setMessage ("After querying target, compatibilityMode= {}" ).addArgument (hasCompatibilityModeEnabled ).log ();
127
133
if (Boolean .FALSE .equals (hasCompatibilityModeEnabled )) {
134
+ assert versionFromRootApi != null : "Expected version from root api to be set" ;
128
135
return Mono .just (versionFromRootApi );
129
136
}
130
137
return client .getAsync ("_nodes/_all/nodes,version?format=json" , null )
@@ -137,6 +144,7 @@ public Version getClusterVersion() {
137
144
.setCause (e )
138
145
.setMessage ("Unable to determine CompatibilityMode or version from plugin, falling back to version {}" )
139
146
.addArgument (versionFromRootApi ).log ();
147
+ assert versionFromRootApi != null : "Expected version from root api to be set" ;
140
148
return Mono .just (versionFromRootApi );
141
149
})
142
150
.block ();
0 commit comments