1111
1212import org .apache .logging .log4j .LogManager ;
1313import org .apache .logging .log4j .Logger ;
14+ import org .apache .lucene .document .NumericDocValuesField ;
1415import org .elasticsearch .action .ActionListener ;
1516import org .elasticsearch .action .ActionRequest ;
1617import org .elasticsearch .action .ActionRequestValidationException ;
1718import org .elasticsearch .action .ActionResponse ;
1819import org .elasticsearch .action .ActionType ;
1920import org .elasticsearch .action .support .ActionFilters ;
2021import org .elasticsearch .action .support .HandledTransportAction ;
22+ import org .elasticsearch .cluster .ClusterState ;
23+ import org .elasticsearch .cluster .metadata .ProjectMetadata ;
2124import org .elasticsearch .cluster .project .ProjectResolver ;
25+ import org .elasticsearch .cluster .service .ClusterService ;
26+ import org .elasticsearch .common .UUIDs ;
2227import org .elasticsearch .common .bytes .BytesReference ;
2328import org .elasticsearch .common .io .stream .StreamInput ;
2429import org .elasticsearch .common .io .stream .StreamOutput ;
30+ import org .elasticsearch .common .lucene .uid .Versions ;
2531import org .elasticsearch .common .util .concurrent .EsExecutors ;
32+ import org .elasticsearch .index .VersionType ;
33+ import org .elasticsearch .index .engine .Engine ;
34+ import org .elasticsearch .index .mapper .LuceneDocument ;
35+ import org .elasticsearch .index .mapper .ParsedDocument ;
36+ import org .elasticsearch .index .mapper .SeqNoFieldMapper ;
37+ import org .elasticsearch .index .mapper .Uid ;
38+ import org .elasticsearch .index .seqno .SequenceNumbers ;
2639import org .elasticsearch .indices .IndicesService ;
2740import org .elasticsearch .injection .guice .Inject ;
2841import org .elasticsearch .tasks .Task ;
2942import org .elasticsearch .transport .TransportService ;
43+ import org .elasticsearch .xcontent .XContentType ;
3044
3145import java .io .IOException ;
46+ import java .util .List ;
3247
3348public class MetricsTransportAction extends HandledTransportAction <
3449 MetricsTransportAction .MetricsRequest ,
@@ -39,17 +54,20 @@ public class MetricsTransportAction extends HandledTransportAction<
3954
4055 private static final Logger logger = LogManager .getLogger (MetricsTransportAction .class );
4156
57+ private final ClusterService clusterService ;
4258 private final ProjectResolver projectResolver ;
4359 private final IndicesService indicesService ;
4460
4561 @ Inject
4662 public MetricsTransportAction (
4763 TransportService transportService ,
4864 ActionFilters actionFilters ,
65+ ClusterService clusterService ,
4966 ProjectResolver projectResolver ,
5067 IndicesService indicesService
5168 ) {
5269 super (NAME , transportService , actionFilters , MetricsRequest ::new , EsExecutors .DIRECT_EXECUTOR_SERVICE );
70+ this .clusterService = clusterService ;
5371 this .projectResolver = projectResolver ;
5472 this .indicesService = indicesService ;
5573 }
@@ -61,15 +79,70 @@ protected void doExecute(Task task, MetricsRequest request, ActionListener<Metri
6179
6280 logger .info ("Received " + metricsServiceRequest .getResourceMetricsCount () + " metrics" );
6381
64- // resolve index somehow
65- logger .info ("Indices service " + indicesService );
82+ final ClusterState clusterState = clusterService .state ();
83+ final ProjectMetadata project = projectResolver .getProjectMetadata (clusterState );
84+ var indexAbstraction = project .getIndicesLookup ().get (request .index );
85+ if (indexAbstraction == null ) {
86+ throw new IllegalStateException ("Index [" + request .index + "] does not exist" );
87+ }
88+ var writeIndex = indexAbstraction .getWriteIndex ();
89+ var indexService = indicesService .indexServiceSafe (writeIndex );
90+
91+ // TODO proper routing ???
92+ var shard = indexService .getShard (0 );
93+ var engine = shard .getEngineOrNull ();
94+
95+ // We receive a batch so there will be multiple documents as a result of processing it.
96+ var documents = createLuceneDocuments (metricsServiceRequest );
97+
98+ // TODO thread pool for writing
99+ for (var luceneDocument : documents ) {
100+ var id = UUIDs .randomBase64UUID ();
101+
102+ var parsedDocument = new ParsedDocument (
103+ // Even though this version field is here, it is not added to the LuceneDocument and won't be stored.
104+ // This is just the contract that the code expects.
105+ new NumericDocValuesField (NAME , -1L ),
106+ SeqNoFieldMapper .SequenceIDFields .emptySeqID (),
107+ id ,
108+ null ,
109+ List .of (luceneDocument ),
110+ null ,
111+ XContentType .JSON ,
112+ null ,
113+ 0
114+ );
115+ var indexRequest = new Engine .Index (
116+ Uid .encodeId (id ),
117+ parsedDocument ,
118+ SequenceNumbers .UNASSIGNED_SEQ_NO ,
119+ 1 ,
120+ Versions .MATCH_ANY ,
121+ VersionType .INTERNAL ,
122+ Engine .Operation .Origin .PRIMARY ,
123+ System .nanoTime (),
124+ System .currentTimeMillis (),
125+ false ,
126+ SequenceNumbers .UNASSIGNED_SEQ_NO ,
127+ 1
128+ );
129+
130+ engine .index (indexRequest );
131+ }
132+
133+ // TODO make sure to enable periodic flush on the index (?)
66134
67135 listener .onResponse (new MetricsResponse ());
68136 } catch (Exception e ) {
137+ logger .error (e );
69138 listener .onFailure (e );
70139 }
71140 }
72141
142+ private List <LuceneDocument > createLuceneDocuments (ExportMetricsServiceRequest exportMetricsServiceRequest ) {
143+ return List .of (new LuceneDocument ());
144+ }
145+
73146 public static class MetricsRequest extends ActionRequest {
74147 private String index ;
75148 private BytesReference exportMetricsServiceRequest ;
0 commit comments