1111import org .apache .logging .log4j .Logger ;
1212import org .elasticsearch .action .admin .cluster .stats .CCSTelemetrySnapshot ;
1313import org .elasticsearch .client .internal .Client ;
14- import org .elasticsearch .common .settings .Setting ;
1514import org .elasticsearch .common .settings .Settings ;
16- import org .elasticsearch .compute .operator .exchange .ExchangeService ;
17- import org .elasticsearch .core .TimeValue ;
1815import org .elasticsearch .plugins .Plugin ;
1916import org .elasticsearch .tasks .Task ;
2017import org .elasticsearch .test .AbstractMultiClustersTestCase ;
18+ import org .elasticsearch .test .SkipUnavailableRule ;
2119import org .elasticsearch .usage .UsageService ;
22- import org .elasticsearch .xpack .esql .plugin .EsqlPlugin ;
2320import org .junit .Assert ;
2421import org .junit .Rule ;
25- import org .junit .rules .TestRule ;
26- import org .junit .runner .Description ;
27- import org .junit .runners .model .Statement ;
28-
29- import java .lang .annotation .ElementType ;
30- import java .lang .annotation .Retention ;
31- import java .lang .annotation .RetentionPolicy ;
32- import java .lang .annotation .Target ;
22+
3323import java .util .ArrayList ;
34- import java .util .Arrays ;
3524import java .util .Collection ;
3625import java .util .HashMap ;
3726import java .util .List ;
3827import java .util .Map ;
3928import java .util .concurrent .ExecutionException ;
40- import java .util .function .Function ;
41- import java .util .stream .Collectors ;
4229
4330import static org .elasticsearch .action .admin .cluster .stats .CCSUsageTelemetry .ASYNC_FEATURE ;
4431import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
@@ -52,19 +39,6 @@ public class CrossClustersUsageTelemetryIT extends AbstractMultiClustersTestCase
5239 private static final String LOCAL_INDEX = "logs-1" ;
5340 private static final String REMOTE_INDEX = "logs-2" ;
5441
55- @ Override
56- protected boolean reuseClusters () {
57- return false ;
58- }
59-
60- @ Override
61- protected List <String > remoteClusterAlias () {
62- return List .of (REMOTE1 , REMOTE2 );
63- }
64-
65- @ Rule
66- public SkipUnavailableRule skipOverride = new SkipUnavailableRule (REMOTE1 , REMOTE2 );
67-
6842 public void testLocalRemote () throws Exception {
6943 setupClusters ();
7044 var telemetry = getTelemetryFromQuery ("from logs-*,c*:logs-* | stats sum (v)" , "kibana" );
@@ -93,11 +67,7 @@ public void testLocalRemote() throws Exception {
9367
9468 }
9569
96- private CCSTelemetrySnapshot getTelemetryFromQuery (String query ) throws ExecutionException , InterruptedException {
97- return getTelemetryFromQuery (query , null );
98- }
99-
100- private CCSTelemetrySnapshot getTelemetryFromQuery (String query , String client ) throws ExecutionException , InterruptedException {
70+ protected CCSTelemetrySnapshot getTelemetryFromQuery (String query , String client ) throws ExecutionException , InterruptedException {
10171 EsqlQueryRequest request = EsqlQueryRequest .syncEsqlQueryRequest ();
10272 request .query (query );
10373 request .pragmas (AbstractEsqlIntegTestCase .randomPragmas ());
@@ -106,7 +76,7 @@ private CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client)
10676 return getTelemetryFromQuery (request , client );
10777 }
10878
109- private CCSTelemetrySnapshot getTelemetryFromQuery (EsqlQueryRequest request , String client ) throws ExecutionException ,
79+ protected CCSTelemetrySnapshot getTelemetryFromQuery (EsqlQueryRequest request , String client ) throws ExecutionException ,
11080 InterruptedException {
11181 // We want to send search to a specific node (we don't care which one) so that we could
11282 // collect the CCS telemetry from it later
@@ -116,7 +86,7 @@ private CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, Str
11686 if (client != null ) {
11787 assertResponse (
11888 cluster (LOCAL_CLUSTER ).client (nodeName )
119- .filterWithHeader (Map .of (Task .X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER , "kibana" ))
89+ .filterWithHeader (Map .of (Task .X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER , client ))
12090 .execute (EsqlQueryAction .INSTANCE , request ),
12191 Assert ::assertNotNull
12292 );
@@ -127,12 +97,44 @@ private CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, Str
12797 return getTelemetrySnapshot (nodeName );
12898 }
12999
100+ protected CCSTelemetrySnapshot getTelemetryFromFailedQuery (String query ) throws Exception {
101+ // We want to send search to a specific node (we don't care which one) so that we could
102+ // collect the CCS telemetry from it later
103+ String nodeName = cluster (LOCAL_CLUSTER ).getRandomNodeName ();
104+ EsqlQueryRequest request = EsqlQueryRequest .syncEsqlQueryRequest ();
105+ request .query (query );
106+ request .pragmas (AbstractEsqlIntegTestCase .randomPragmas ());
107+ request .columnar (randomBoolean ());
108+ request .includeCCSMetadata (randomBoolean ());
109+
110+ ExecutionException ee = expectThrows (
111+ ExecutionException .class ,
112+ cluster (LOCAL_CLUSTER ).client (nodeName ).execute (EsqlQueryAction .INSTANCE , request )::get
113+ );
114+ assertNotNull (ee .getCause ());
115+
116+ return getTelemetrySnapshot (nodeName );
117+ }
118+
130119 private CCSTelemetrySnapshot getTelemetrySnapshot (String nodeName ) {
131120 var usage = cluster (LOCAL_CLUSTER ).getInstance (UsageService .class , nodeName );
132121 return usage .getEsqlUsageHolder ().getCCSTelemetrySnapshot ();
133122 }
134123
135- Map <String , Object > setupClusters () {
124+ @ Override
125+ protected boolean reuseClusters () {
126+ return false ;
127+ }
128+
129+ @ Override
130+ protected List <String > remoteClusterAlias () {
131+ return List .of (REMOTE1 , REMOTE2 );
132+ }
133+
134+ @ Rule
135+ public SkipUnavailableRule skipOverride = new SkipUnavailableRule (REMOTE1 , REMOTE2 );
136+
137+ protected Map <String , Object > setupClusters () {
136138 int numShardsLocal = randomIntBetween (1 , 5 );
137139 populateLocalIndices (LOCAL_INDEX , numShardsLocal );
138140
@@ -186,65 +188,15 @@ void populateRemoteIndices(String clusterAlias, String indexName, int numShards)
186188 @ Override
187189 protected Collection <Class <? extends Plugin >> nodePlugins (String clusterAlias ) {
188190 List <Class <? extends Plugin >> plugins = new ArrayList <>(super .nodePlugins (clusterAlias ));
189- plugins .add (EsqlPlugin .class );
191+ plugins .add (EsqlPluginWithEnterpriseOrTrialLicense .class );
190192 plugins .add (CrossClustersQueryIT .InternalExchangePlugin .class );
191193 return plugins ;
192194 }
193195
194- public static class InternalExchangePlugin extends Plugin {
195- @ Override
196- public List <Setting <?>> getSettings () {
197- return List .of (
198- Setting .timeSetting (
199- ExchangeService .INACTIVE_SINKS_INTERVAL_SETTING ,
200- TimeValue .timeValueSeconds (30 ),
201- Setting .Property .NodeScope
202- )
203- );
204- }
205- }
206-
207196 @ Override
208197 protected Map <String , Boolean > skipUnavailableForRemoteClusters () {
209198 var map = skipOverride .getMap ();
210199 LOGGER .info ("Using skip_unavailable map: [{}]" , map );
211200 return map ;
212201 }
213-
214- /**
215- * Annotation to mark specific cluster in a test as not to be skipped when unavailable
216- */
217- @ Retention (RetentionPolicy .RUNTIME )
218- @ Target (ElementType .METHOD )
219- @interface SkipOverride {
220- String [] aliases ();
221- }
222-
223- /**
224- * Test rule to process skip annotations
225- */
226- static class SkipUnavailableRule implements TestRule {
227- private final Map <String , Boolean > skipMap ;
228-
229- SkipUnavailableRule (String ... clusterAliases ) {
230- this .skipMap = Arrays .stream (clusterAliases ).collect (Collectors .toMap (Function .identity (), alias -> true ));
231- }
232-
233- public Map <String , Boolean > getMap () {
234- return skipMap ;
235- }
236-
237- @ Override
238- public Statement apply (Statement base , Description description ) {
239- // Check for annotation named "SkipOverride" and set the overrides accordingly
240- var aliases = description .getAnnotation (SkipOverride .class );
241- if (aliases != null ) {
242- for (String alias : aliases .aliases ()) {
243- skipMap .put (alias , false );
244- }
245- }
246- return base ;
247- }
248-
249- }
250202}
0 commit comments