88import static java .util .concurrent .TimeUnit .SECONDS ;
99import static org .opensearch .ml .common .CommonValue .MASTER_KEY ;
1010import static org .opensearch .ml .common .CommonValue .ML_CONFIG_INDEX ;
11+ import static org .opensearch .ml .common .MLConfig .CREATE_TIME_FIELD ;
1112
1213import java .nio .charset .StandardCharsets ;
1314import java .security .SecureRandom ;
15+ import java .time .Instant ;
1416import java .util .Base64 ;
1517import java .util .concurrent .CountDownLatch ;
1618import java .util .concurrent .atomic .AtomicReference ;
1719
1820import javax .crypto .spec .SecretKeySpec ;
1921
22+ import org .apache .commons .lang3 .exception .ExceptionUtils ;
2023import org .opensearch .ResourceNotFoundException ;
21- import org .opensearch .action .LatchedActionListener ;
24+ import org .opensearch .action .DocWriteRequest ;
2225import org .opensearch .action .get .GetRequest ;
23- import org .opensearch .action .get .GetResponse ;
26+ import org .opensearch .action .index .IndexRequest ;
27+ import org .opensearch .action .support .WriteRequest ;
2428import org .opensearch .client .Client ;
2529import org .opensearch .cluster .service .ClusterService ;
2630import org .opensearch .common .util .concurrent .ThreadContext ;
2731import org .opensearch .core .action .ActionListener ;
32+ import org .opensearch .index .engine .VersionConflictEngineException ;
2833import org .opensearch .ml .common .exception .MLException ;
34+ import org .opensearch .ml .engine .indices .MLIndicesHandler ;
2935
3036import com .amazonaws .encryptionsdk .AwsCrypto ;
3137import com .amazonaws .encryptionsdk .CommitmentPolicy ;
3238import com .amazonaws .encryptionsdk .CryptoResult ;
3339import com .amazonaws .encryptionsdk .jce .JceMasterKey ;
40+ import com .google .common .collect .ImmutableMap ;
3441
3542import lombok .extern .log4j .Log4j2 ;
3643
@@ -42,11 +49,13 @@ public class EncryptorImpl implements Encryptor {
4249 private ClusterService clusterService ;
4350 private Client client ;
4451 private volatile String masterKey ;
52+ private MLIndicesHandler mlIndicesHandler ;
4553
46- public EncryptorImpl (ClusterService clusterService , Client client ) {
54+ public EncryptorImpl (ClusterService clusterService , Client client , MLIndicesHandler mlIndicesHandler ) {
4755 this .masterKey = null ;
4856 this .clusterService = clusterService ;
4957 this .client = client ;
58+ this .mlIndicesHandler = mlIndicesHandler ;
5059 }
5160
5261 public EncryptorImpl (String masterKey ) {
@@ -104,28 +113,68 @@ private void initMasterKey() {
104113 AtomicReference <Exception > exceptionRef = new AtomicReference <>();
105114
106115 CountDownLatch latch = new CountDownLatch (1 );
107- if (clusterService .state ().metadata ().hasIndex (ML_CONFIG_INDEX )) {
116+ mlIndicesHandler .initMLConfigIndex (ActionListener .wrap (r -> {
117+ GetRequest getRequest = new GetRequest (ML_CONFIG_INDEX ).id (MASTER_KEY );
108118 try (ThreadContext .StoredContext context = client .threadPool ().getThreadContext ().stashContext ()) {
109- GetRequest getRequest = new GetRequest (ML_CONFIG_INDEX ).id (MASTER_KEY );
110- client .get (getRequest , ActionListener .runBefore (new LatchedActionListener (ActionListener .<GetResponse >wrap (r -> {
111- if (r .isExists ()) {
112- String masterKey = (String ) r .getSourceAsMap ().get (MASTER_KEY );
113- this .masterKey = masterKey ;
119+ client .get (getRequest , ActionListener .wrap (getResponse -> {
120+ if (getResponse == null || !getResponse .isExists ()) {
121+ IndexRequest indexRequest = new IndexRequest (ML_CONFIG_INDEX ).id (MASTER_KEY );
122+ final String generatedMasterKey = generateMasterKey ();
123+ indexRequest
124+ .source (ImmutableMap .of (MASTER_KEY , generatedMasterKey , CREATE_TIME_FIELD , Instant .now ().toEpochMilli ()));
125+ indexRequest .setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
126+ indexRequest .opType (DocWriteRequest .OpType .CREATE );
127+ client .index (indexRequest , ActionListener .wrap (indexResponse -> {
128+ this .masterKey = generatedMasterKey ;
129+ log .info ("ML encryption master key initialized successfully" );
130+ latch .countDown ();
131+ }, e -> {
132+
133+ if (ExceptionUtils .getRootCause (e ) instanceof VersionConflictEngineException ) {
134+ GetRequest getMasterKeyRequest = new GetRequest (ML_CONFIG_INDEX ).id (MASTER_KEY );
135+ try (ThreadContext .StoredContext threadContext = client .threadPool ().getThreadContext ().stashContext ()) {
136+ client .get (getMasterKeyRequest , ActionListener .wrap (getMasterKeyResponse -> {
137+ if (getMasterKeyResponse != null && getMasterKeyResponse .isExists ()) {
138+ final String masterKey = (String ) getMasterKeyResponse .getSourceAsMap ().get (MASTER_KEY );
139+ this .masterKey = masterKey ;
140+ log .info ("ML encryption master key already initialized, no action needed" );
141+ latch .countDown ();
142+ } else {
143+ exceptionRef .set (new ResourceNotFoundException (MASTER_KEY_NOT_READY_ERROR ));
144+ latch .countDown ();
145+ }
146+ }, error -> {
147+ log .debug ("Failed to get ML encryption master key" , e );
148+ exceptionRef .set (error );
149+ latch .countDown ();
150+ }));
151+ }
152+ } else {
153+ log .debug ("Failed to index ML encryption master key" , e );
154+ exceptionRef .set (e );
155+ latch .countDown ();
156+ }
157+ }));
114158 } else {
115- exceptionRef .set (new ResourceNotFoundException (MASTER_KEY_NOT_READY_ERROR ));
159+ final String masterKey = (String ) getResponse .getSourceAsMap ().get (MASTER_KEY );
160+ this .masterKey = masterKey ;
161+ log .info ("ML encryption master key already initialized, no action needed" );
162+ latch .countDown ();
116163 }
117164 }, e -> {
118- log .error ("Failed to get ML encryption master key" , e );
165+ log .debug ("Failed to get ML encryption master key from config index " , e );
119166 exceptionRef .set (e );
120- }), latch ), () -> context .restore ()));
167+ latch .countDown ();
168+ }));
121169 }
122- } else {
123- exceptionRef .set (new ResourceNotFoundException (MASTER_KEY_NOT_READY_ERROR ));
170+ }, e -> {
171+ log .debug ("Failed to init ML config index" , e );
172+ exceptionRef .set (e );
124173 latch .countDown ();
125- }
174+ }));
126175
127176 try {
128- latch .await (5 , SECONDS );
177+ latch .await (1 , SECONDS );
129178 } catch (InterruptedException e ) {
130179 throw new IllegalStateException (e );
131180 }
@@ -142,4 +191,5 @@ private void initMasterKey() {
142191 throw new ResourceNotFoundException (MASTER_KEY_NOT_READY_ERROR );
143192 }
144193 }
194+
145195}
0 commit comments