1515import com .google .cloud .storage .StorageException ;
1616
1717import org .elasticsearch .action .support .master .AcknowledgedResponse ;
18+ import org .elasticsearch .common .bytes .BytesArray ;
1819import org .elasticsearch .common .settings .MockSecureSettings ;
1920import org .elasticsearch .common .settings .SecureSettings ;
2021import org .elasticsearch .common .settings .Settings ;
22+ import org .elasticsearch .common .unit .ByteSizeValue ;
2123import org .elasticsearch .core .Booleans ;
2224import org .elasticsearch .plugins .Plugin ;
2325import org .elasticsearch .repositories .AbstractThirdPartyRepositoryTestCase ;
26+ import org .elasticsearch .repositories .blobstore .BlobStoreRepository ;
2427import org .elasticsearch .rest .RestStatus ;
28+ import org .junit .AfterClass ;
29+ import org .junit .BeforeClass ;
2530import org .junit .ClassRule ;
2631
32+ import java .io .InputStream ;
33+ import java .nio .file .NoSuchFileException ;
2734import java .util .Base64 ;
2835import java .util .Collection ;
2936
37+ import static org .elasticsearch .common .io .Streams .readFully ;
38+ import static org .elasticsearch .repositories .blobstore .BlobStoreTestUtil .randomPurpose ;
39+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageClientSettings .PROXY_HOST_SETTING ;
40+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageClientSettings .PROXY_PORT_SETTING ;
41+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageClientSettings .PROXY_TYPE_SETTING ;
3042import static org .hamcrest .Matchers .blankOrNullString ;
3143import static org .hamcrest .Matchers .equalTo ;
3244import static org .hamcrest .Matchers .not ;
45+ import static org .hamcrest .Matchers .startsWith ;
3346
3447public class GoogleCloudStorageThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
3548 private static final boolean USE_FIXTURE = Booleans .parseBoolean (System .getProperty ("test.google.fixture" , "true" ));
49+ private static final String PROXIED_TEST_REPO = "proxied-test-repo" ;
50+ private static final String PROXIED_CLIENT = "proxied" ;
3651
3752 @ ClassRule
3853 public static GoogleCloudStorageHttpFixture fixture = new GoogleCloudStorageHttpFixture (USE_FIXTURE , "bucket" , "o/oauth2/token" );
54+ private static WebProxyServer proxyServer ;
55+
56+ @ BeforeClass
57+ public static void beforeClass () {
58+ proxyServer = new WebProxyServer ();
59+ }
60+
61+ @ AfterClass
62+ public static void afterClass () throws Exception {
63+ proxyServer .close ();
64+ }
3965
4066 @ Override
4167 protected Collection <Class <? extends Plugin >> getPlugins () {
@@ -49,8 +75,15 @@ protected Settings nodeSettings() {
4975 if (USE_FIXTURE ) {
5076 builder .put ("gcs.client.default.endpoint" , fixture .getAddress ());
5177 builder .put ("gcs.client.default.token_uri" , fixture .getAddress () + "/o/oauth2/token" );
78+ builder .put ("gcs.client.proxied.endpoint" , fixture .getAddress ());
79+ builder .put ("gcs.client.proxied.token_uri" , fixture .getAddress () + "/o/oauth2/token" );
5280 }
5381
82+ // Add a proxied client so we can test resume on fail
83+ builder .put (PROXY_HOST_SETTING .getConcreteSettingForNamespace (PROXIED_CLIENT ).getKey (), proxyServer .getHost ());
84+ builder .put (PROXY_PORT_SETTING .getConcreteSettingForNamespace (PROXIED_CLIENT ).getKey (), proxyServer .getPort ());
85+ builder .put (PROXY_TYPE_SETTING .getConcreteSettingForNamespace (PROXIED_CLIENT ).getKey (), "http" );
86+
5487 return builder .build ();
5588 }
5689
@@ -64,17 +97,26 @@ protected SecureSettings credentials() {
6497 MockSecureSettings secureSettings = new MockSecureSettings ();
6598 if (USE_FIXTURE ) {
6699 secureSettings .setFile ("gcs.client.default.credentials_file" , TestUtils .createServiceAccount (random ()));
100+ secureSettings .setFile ("gcs.client.proxied.credentials_file" , TestUtils .createServiceAccount (random ()));
67101 } else {
68102 secureSettings .setFile (
69103 "gcs.client.default.credentials_file" ,
70104 Base64 .getDecoder ().decode (System .getProperty ("test.google.account" ))
71105 );
106+ secureSettings .setFile (
107+ "gcs.client.proxied.credentials_file" ,
108+ Base64 .getDecoder ().decode (System .getProperty ("test.google.account" ))
109+ );
72110 }
73111 return secureSettings ;
74112 }
75113
76114 @ Override
77115 protected void createRepository (final String repoName ) {
116+ createRepository (repoName , "default" );
117+ }
118+
119+ private void createRepository (final String repoName , String clientName ) {
78120 AcknowledgedResponse putRepositoryResponse = clusterAdmin ().preparePutRepository (
79121 TEST_REQUEST_TIMEOUT ,
80122 TEST_REQUEST_TIMEOUT ,
@@ -85,6 +127,7 @@ protected void createRepository(final String repoName) {
85127 Settings .builder ()
86128 .put ("bucket" , System .getProperty ("test.google.bucket" ))
87129 .put ("base_path" , System .getProperty ("test.google.base" , "/" ))
130+ .put ("client" , clientName )
88131 )
89132 .get ();
90133 assertThat (putRepositoryResponse .isAcknowledged (), equalTo (true ));
@@ -95,4 +138,46 @@ public void testReadFromPositionLargerThanBlobLength() {
95138 e -> asInstanceOf (StorageException .class , e .getCause ()).getCode () == RestStatus .REQUESTED_RANGE_NOT_SATISFIED .getStatus ()
96139 );
97140 }
141+
142+ public void testResumeAfterUpdate () {
143+ createRepository (PROXIED_TEST_REPO , PROXIED_CLIENT );
144+
145+ // The blob needs to be large enough that it won't be entirely buffered on the first request
146+ final int enoughBytesToNotBeEntirelyBuffered = Math .toIntExact (ByteSizeValue .ofMb (10 ).getBytes ());
147+
148+ final BlobStoreRepository repo = getRepository (PROXIED_TEST_REPO );
149+ final String blobKey = randomIdentifier ();
150+ final byte [] initialValue = randomByteArrayOfLength (enoughBytesToNotBeEntirelyBuffered );
151+ executeOnBlobStore (repo , container -> {
152+ container .writeBlob (randomPurpose (), blobKey , new BytesArray (initialValue ), true );
153+
154+ try (InputStream inputStream = container .readBlob (randomPurpose (), blobKey )) {
155+ // Trigger the first request for the blob, partially read it
156+ int read = inputStream .read ();
157+ assert read != -1 ;
158+
159+ // Restart the server (this triggers a retry)
160+ proxyServer .restart ();
161+
162+ // Update the file
163+ byte [] updatedValue = randomByteArrayOfLength (enoughBytesToNotBeEntirelyBuffered );
164+ container .writeBlob (randomPurpose (), blobKey , new BytesArray (updatedValue ), false );
165+
166+ // Read the rest of the stream, it should throw because the contents changed
167+ String message = assertThrows (NoSuchFileException .class , () -> readFully (inputStream )).getMessage ();
168+ assertThat (
169+ message ,
170+ startsWith (
171+ "Blob object ["
172+ + container .path ().buildAsString ()
173+ + blobKey
174+ + "] generation [1] unavailable on resume (contents changed, or object deleted):"
175+ )
176+ );
177+ } catch (Exception e ) {
178+ fail (e );
179+ }
180+ return null ;
181+ });
182+ }
98183}
0 commit comments