88
99import org .apache .logging .log4j .LogManager ;
1010import org .apache .logging .log4j .Logger ;
11- import org .elasticsearch .ElasticsearchParseException ;
1211import org .elasticsearch .action .ActionListener ;
1312import org .elasticsearch .action .DocWriteResponse ;
1413import org .elasticsearch .action .admin .indices .template .put .TransportPutComposableIndexTemplateAction ;
1514import org .elasticsearch .action .bulk .BulkRequest ;
1615import org .elasticsearch .action .index .IndexRequest ;
16+ import org .elasticsearch .action .support .SubscribableListener ;
1717import org .elasticsearch .client .internal .OriginSettingClient ;
18- import org .elasticsearch .cluster .metadata .ComposableIndexTemplate ;
18+ import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
1919import org .elasticsearch .cluster .service .ClusterService ;
20- import org .elasticsearch .core .Strings ;
2120import org .elasticsearch .core .TimeValue ;
2221import org .elasticsearch .xcontent .ToXContent ;
2322import org .elasticsearch .xcontent .XContentBuilder ;
24- import org .elasticsearch .xcontent .XContentParserConfiguration ;
25- import org .elasticsearch .xcontent .json .JsonXContent ;
2623import org .elasticsearch .xpack .core .ml .utils .MlIndexAndAlias ;
27- import org .elasticsearch .xpack .core .template .IndexTemplateConfig ;
2824
2925import java .io .IOException ;
3026import java .util .Date ;
3127import java .util .Objects ;
3228import java .util .Queue ;
3329import java .util .concurrent .ConcurrentLinkedQueue ;
3430import java .util .concurrent .atomic .AtomicBoolean ;
35- import java .util .function .Supplier ;
3631
3732import static org .elasticsearch .xcontent .XContentFactory .jsonBuilder ;
3833
@@ -43,59 +38,36 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
4338
4439 private static final Logger logger = LogManager .getLogger (AbstractAuditor .class );
4540 static final int MAX_BUFFER_SIZE = 1000 ;
46- static final TimeValue MASTER_TIMEOUT = TimeValue .timeValueMinutes (1 );
41+ protected static final TimeValue MASTER_TIMEOUT = TimeValue .timeValueMinutes (1 );
4742
4843 private final OriginSettingClient client ;
4944 private final String nodeName ;
50- private final String auditIndex ;
51- private final String templateName ;
52- private final Supplier <TransportPutComposableIndexTemplateAction .Request > templateSupplier ;
45+ private final String auditIndexWriteAlias ;
5346 private final AbstractAuditMessageFactory <T > messageFactory ;
54- private final AtomicBoolean hasLatestTemplate ;
55-
56- private Queue <ToXContent > backlog ;
5747 private final ClusterService clusterService ;
58- private final AtomicBoolean putTemplateInProgress ;
59-
60- protected AbstractAuditor (
61- OriginSettingClient client ,
62- String auditIndex ,
63- IndexTemplateConfig templateConfig ,
64- String nodeName ,
65- AbstractAuditMessageFactory <T > messageFactory ,
66- ClusterService clusterService
67- ) {
48+ private final IndexNameExpressionResolver indexNameExpressionResolver ;
49+ private final AtomicBoolean indexAndAliasCreated ;
6850
69- this (client , auditIndex , templateConfig .getTemplateName (), () -> {
70- try (var parser = JsonXContent .jsonXContent .createParser (XContentParserConfiguration .EMPTY , templateConfig .loadBytes ())) {
71- return new TransportPutComposableIndexTemplateAction .Request (templateConfig .getTemplateName ()).indexTemplate (
72- ComposableIndexTemplate .parse (parser )
73- ).masterNodeTimeout (MASTER_TIMEOUT );
74- } catch (IOException e ) {
75- throw new ElasticsearchParseException ("unable to parse composable template " + templateConfig .getTemplateName (), e );
76- }
77- }, nodeName , messageFactory , clusterService );
78- }
51+ private Queue <ToXContent > backlog ;
52+ private final AtomicBoolean indexAndAliasCreationInProgress ;
7953
8054 protected AbstractAuditor (
8155 OriginSettingClient client ,
82- String auditIndex ,
83- String templateName ,
84- Supplier <TransportPutComposableIndexTemplateAction .Request > templateSupplier ,
56+ String auditIndexWriteAlias ,
8557 String nodeName ,
8658 AbstractAuditMessageFactory <T > messageFactory ,
87- ClusterService clusterService
59+ ClusterService clusterService ,
60+ IndexNameExpressionResolver indexNameExpressionResolver
8861 ) {
8962 this .client = Objects .requireNonNull (client );
90- this .auditIndex = Objects .requireNonNull (auditIndex );
91- this .templateName = Objects .requireNonNull (templateName );
92- this .templateSupplier = Objects .requireNonNull (templateSupplier );
63+ this .auditIndexWriteAlias = Objects .requireNonNull (auditIndexWriteAlias );
9364 this .messageFactory = Objects .requireNonNull (messageFactory );
94- this .clusterService = Objects .requireNonNull (clusterService );
9565 this .nodeName = Objects .requireNonNull (nodeName );
66+ this .clusterService = Objects .requireNonNull (clusterService );
67+ this .indexNameExpressionResolver = Objects .requireNonNull (indexNameExpressionResolver );
9668 this .backlog = new ConcurrentLinkedQueue <>();
97- this .hasLatestTemplate = new AtomicBoolean ();
98- this .putTemplateInProgress = new AtomicBoolean ();
69+ this .indexAndAliasCreated = new AtomicBoolean ();
70+ this .indexAndAliasCreationInProgress = new AtomicBoolean ();
9971 }
10072
10173 public void audit (Level level , String resourceId , String message ) {
@@ -114,6 +86,19 @@ public void error(String resourceId, String message) {
11486 audit (Level .ERROR , resourceId , message );
11587 }
11688
89+ /**
90+ * Calling reset will cause the auditor to check the required
91+ * index and alias exist and recreate if necessary
92+ */
93+ public void reset () {
94+ indexAndAliasCreated .set (false );
95+ if (backlog == null ) {
96+ // create a new backlog in case documents need
97+ // to be temporarily stored when the new index/alias is created
98+ backlog = new ConcurrentLinkedQueue <>();
99+ }
100+ }
101+
117102 private static void onIndexResponse (DocWriteResponse response ) {
118103 logger .trace ("Successfully wrote audit message" );
119104 }
@@ -123,35 +108,24 @@ private static void onIndexFailure(Exception exception) {
123108 }
124109
125110 protected void indexDoc (ToXContent toXContent ) {
126- if (hasLatestTemplate .get ()) {
111+ if (indexAndAliasCreated .get ()) {
127112 writeDoc (toXContent );
128113 return ;
129114 }
130115
131- if (MlIndexAndAlias .hasIndexTemplate (clusterService .state (), templateName )) {
116+ // install template & create index with alias
117+ var createListener = ActionListener .<Boolean >wrap (success -> {
118+ indexAndAliasCreationInProgress .set (false );
132119 synchronized (this ) {
133- // synchronized so nothing can be added to backlog while this value changes
134- hasLatestTemplate .set (true );
120+ // synchronized so nothing can be added to backlog while writing it
121+ indexAndAliasCreated .set (true );
122+ writeBacklog ();
135123 }
136- writeDoc (toXContent );
137- return ;
138- }
139124
140- ActionListener <Boolean > putTemplateListener = ActionListener .wrap (r -> {
141- synchronized (this ) {
142- // synchronized so nothing can be added to backlog while this value changes
143- hasLatestTemplate .set (true );
144- }
145- logger .info ("Auditor template [{}] successfully installed" , templateName );
146- putTemplateInProgress .set (false );
147- writeBacklog ();
148- }, e -> {
149- logger .warn (Strings .format ("Error putting latest template [%s]" , templateName ), e );
150- putTemplateInProgress .set (false );
151- });
125+ }, e -> { indexAndAliasCreationInProgress .set (false ); });
152126
153127 synchronized (this ) {
154- if (hasLatestTemplate .get () == false ) {
128+ if (indexAndAliasCreated .get () == false ) {
155129 // synchronized so that hasLatestTemplate does not change value
156130 // between the read and adding to the backlog
157131 assert backlog != null ;
@@ -165,29 +139,22 @@ protected void indexDoc(ToXContent toXContent) {
165139 }
166140
167141 // stop multiple invocations
168- if (putTemplateInProgress .compareAndSet (false , true )) {
169- MlIndexAndAlias .installIndexTemplateIfRequired (
170- clusterService .state (),
171- client ,
172- templateSupplier .get (),
173- putTemplateListener
174- );
142+ if (indexAndAliasCreationInProgress .compareAndSet (false , true )) {
143+ installTemplateAndCreateIndex (createListener );
175144 }
176- return ;
177145 }
178146 }
179-
180- indexDoc (toXContent );
181147 }
182148
183149 private void writeDoc (ToXContent toXContent ) {
184150 client .index (indexRequest (toXContent ), ActionListener .wrap (AbstractAuditor ::onIndexResponse , AbstractAuditor ::onIndexFailure ));
185151 }
186152
187153 private IndexRequest indexRequest (ToXContent toXContent ) {
188- IndexRequest indexRequest = new IndexRequest (auditIndex );
154+ IndexRequest indexRequest = new IndexRequest (auditIndexWriteAlias );
189155 indexRequest .source (toXContentBuilder (toXContent ));
190156 indexRequest .timeout (TimeValue .timeValueSeconds (5 ));
157+ indexRequest .setRequireAlias (true );
191158 return indexRequest ;
192159 }
193160
@@ -206,7 +173,7 @@ protected void clearBacklog() {
206173 protected void writeBacklog () {
207174 assert backlog != null ;
208175 if (backlog == null ) {
209- logger .error ("Message back log has already been written" );
176+ logger .debug ("Message back log has already been written" );
210177 return ;
211178 }
212179
@@ -221,7 +188,7 @@ protected void writeBacklog() {
221188 if (bulkItemResponses .hasFailures ()) {
222189 logger .warn ("Failures bulk indexing the message back log: {}" , bulkItemResponses .buildFailureMessage ());
223190 } else {
224- logger .trace ("Successfully wrote audit message backlog after upgrading template " );
191+ logger .trace ("Successfully wrote audit message backlog" );
225192 }
226193 backlog = null ;
227194 }, AbstractAuditor ::onIndexFailure ));
@@ -231,4 +198,31 @@ protected void writeBacklog() {
231198 int backLogSize () {
232199 return backlog .size ();
233200 }
201+
202+ private void installTemplateAndCreateIndex (ActionListener <Boolean > listener ) {
203+ SubscribableListener .<Boolean >newForked (l -> {
204+ MlIndexAndAlias .installIndexTemplateIfRequired (clusterService .state (), client , templateVersion (), putTemplateRequest (), l );
205+ }).<Boolean >andThen ((l , success ) -> {
206+ var indexDetails = indexDetails ();
207+ MlIndexAndAlias .createIndexAndAliasIfNecessary (
208+ client ,
209+ clusterService .state (),
210+ indexNameExpressionResolver ,
211+ indexDetails .indexPrefix (),
212+ indexDetails .indexVersion (),
213+ auditIndexWriteAlias ,
214+ MASTER_TIMEOUT ,
215+ l
216+ );
217+
218+ }).addListener (listener );
219+ }
220+
221+ protected abstract TransportPutComposableIndexTemplateAction .Request putTemplateRequest ();
222+
223+ protected abstract int templateVersion ();
224+
225+ protected abstract IndexDetails indexDetails ();
226+
227+ public record IndexDetails (String indexPrefix , String indexVersion ) {};
234228}
0 commit comments