|
| 1 | +/* |
| 2 | + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one |
| 3 | + * or more contributor license agreements. Licensed under the Elastic License |
| 4 | + * 2.0 and the Server Side Public License, v 1; you may not use this file except |
| 5 | + * in compliance with, at your election, the Elastic License 2.0 or the Server |
| 6 | + * Side Public License, v 1. |
| 7 | + */ |
| 8 | + |
| 9 | +package org.elasticsearch.repositories.azure; |
| 10 | + |
| 11 | +import com.azure.storage.common.policy.RequestRetryOptions; |
| 12 | +import com.azure.storage.common.policy.RetryPolicyType; |
| 13 | +import com.sun.net.httpserver.HttpExchange; |
| 14 | +import com.sun.net.httpserver.HttpServer; |
| 15 | + |
| 16 | +import org.elasticsearch.cluster.metadata.RepositoryMetadata; |
| 17 | +import org.elasticsearch.common.blobstore.BlobContainer; |
| 18 | +import org.elasticsearch.common.blobstore.BlobPath; |
| 19 | +import org.elasticsearch.common.network.InetAddresses; |
| 20 | +import org.elasticsearch.common.settings.MockSecureSettings; |
| 21 | +import org.elasticsearch.common.settings.SecureSettings; |
| 22 | +import org.elasticsearch.common.settings.Settings; |
| 23 | +import org.elasticsearch.common.unit.ByteSizeUnit; |
| 24 | +import org.elasticsearch.common.unit.ByteSizeValue; |
| 25 | +import org.elasticsearch.common.util.BigArrays; |
| 26 | +import org.elasticsearch.core.SuppressForbidden; |
| 27 | +import org.elasticsearch.core.TimeValue; |
| 28 | +import org.elasticsearch.core.Tuple; |
| 29 | +import org.elasticsearch.mocksocket.MockHttpServer; |
| 30 | +import org.elasticsearch.test.ESTestCase; |
| 31 | +import org.elasticsearch.threadpool.TestThreadPool; |
| 32 | +import org.elasticsearch.threadpool.ThreadPool; |
| 33 | +import org.junit.After; |
| 34 | +import org.junit.Before; |
| 35 | + |
| 36 | +import java.io.IOException; |
| 37 | +import java.io.InputStream; |
| 38 | +import java.net.InetAddress; |
| 39 | +import java.net.InetSocketAddress; |
| 40 | +import java.util.Base64; |
| 41 | +import java.util.Locale; |
| 42 | +import java.util.Optional; |
| 43 | +import java.util.concurrent.TimeUnit; |
| 44 | +import java.util.regex.Matcher; |
| 45 | +import java.util.regex.Pattern; |
| 46 | + |
| 47 | +import static java.nio.charset.StandardCharsets.UTF_8; |
| 48 | +import static org.elasticsearch.repositories.azure.AzureRepository.Repository.CONTAINER_SETTING; |
| 49 | +import static org.elasticsearch.repositories.azure.AzureRepository.Repository.LOCATION_MODE_SETTING; |
| 50 | +import static org.elasticsearch.repositories.azure.AzureRepository.Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING; |
| 51 | +import static org.elasticsearch.repositories.azure.AzureStorageSettings.ACCOUNT_SETTING; |
| 52 | +import static org.elasticsearch.repositories.azure.AzureStorageSettings.ENDPOINT_SUFFIX_SETTING; |
| 53 | +import static org.elasticsearch.repositories.azure.AzureStorageSettings.KEY_SETTING; |
| 54 | +import static org.elasticsearch.repositories.azure.AzureStorageSettings.MAX_RETRIES_SETTING; |
| 55 | +import static org.elasticsearch.repositories.azure.AzureStorageSettings.TIMEOUT_SETTING; |
| 56 | +import static org.hamcrest.Matchers.equalTo; |
| 57 | +import static org.hamcrest.Matchers.lessThanOrEqualTo; |
| 58 | + |
| 59 | +@SuppressForbidden(reason = "use a http server") |
| 60 | +public abstract class AbstractAzureServerTestCase extends ESTestCase { |
| 61 | + protected static final long MAX_RANGE_VAL = Long.MAX_VALUE - 1L; |
| 62 | + |
| 63 | + protected HttpServer httpServer; |
| 64 | + protected HttpServer secondaryHttpServer; |
| 65 | + private ThreadPool threadPool; |
| 66 | + private AzureClientProvider clientProvider; |
| 67 | + |
| 68 | + @Before |
| 69 | + public void setUp() throws Exception { |
| 70 | + threadPool = new TestThreadPool( |
| 71 | + getTestClass().getName(), |
| 72 | + AzureRepositoryPlugin.executorBuilder(), |
| 73 | + AzureRepositoryPlugin.nettyEventLoopExecutorBuilder(Settings.EMPTY) |
| 74 | + ); |
| 75 | + httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); |
| 76 | + httpServer.start(); |
| 77 | + secondaryHttpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); |
| 78 | + secondaryHttpServer.start(); |
| 79 | + clientProvider = AzureClientProvider.create(threadPool, Settings.EMPTY); |
| 80 | + clientProvider.start(); |
| 81 | + super.setUp(); |
| 82 | + } |
| 83 | + |
| 84 | + @After |
| 85 | + public void tearDown() throws Exception { |
| 86 | + clientProvider.close(); |
| 87 | + httpServer.stop(0); |
| 88 | + secondaryHttpServer.stop(0); |
| 89 | + super.tearDown(); |
| 90 | + ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS); |
| 91 | + } |
| 92 | + |
| 93 | + protected BlobContainer createBlobContainer(final int maxRetries) { |
| 94 | + return createBlobContainer(maxRetries, null, LocationMode.PRIMARY_ONLY); |
| 95 | + } |
| 96 | + |
| 97 | + protected BlobContainer createBlobContainer(final int maxRetries, String secondaryHost, final LocationMode locationMode) { |
| 98 | + final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); |
| 99 | + final MockSecureSettings secureSettings = new MockSecureSettings(); |
| 100 | + secureSettings.setString(ACCOUNT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), "account"); |
| 101 | + final String key = Base64.getEncoder().encodeToString(randomAlphaOfLength(14).getBytes(UTF_8)); |
| 102 | + secureSettings.setString(KEY_SETTING.getConcreteSettingForNamespace(clientName).getKey(), key); |
| 103 | + |
| 104 | + return createBlobContainer(maxRetries, secondaryHost, locationMode, clientName, secureSettings); |
| 105 | + } |
| 106 | + |
| 107 | + protected BlobContainer createBlobContainer( |
| 108 | + final int maxRetries, |
| 109 | + String secondaryHost, |
| 110 | + final LocationMode locationMode, |
| 111 | + String clientName, |
| 112 | + SecureSettings secureSettings |
| 113 | + ) { |
| 114 | + final Settings.Builder clientSettings = Settings.builder(); |
| 115 | + |
| 116 | + String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=" + getEndpointForServer(httpServer, "account"); |
| 117 | + if (secondaryHost != null) { |
| 118 | + endpoint += ";BlobSecondaryEndpoint=" + getEndpointForServer(secondaryHttpServer, "account"); |
| 119 | + } |
| 120 | + clientSettings.put(ENDPOINT_SUFFIX_SETTING.getConcreteSettingForNamespace(clientName).getKey(), endpoint); |
| 121 | + clientSettings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxRetries); |
| 122 | + clientSettings.put(TIMEOUT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), TimeValue.timeValueMillis(500)); |
| 123 | + |
| 124 | + clientSettings.setSecureSettings(secureSettings); |
| 125 | + |
| 126 | + final AzureStorageService service = new AzureStorageService(clientSettings.build(), clientProvider) { |
| 127 | + @Override |
| 128 | + RequestRetryOptions getRetryOptions(LocationMode locationMode, AzureStorageSettings azureStorageSettings) { |
| 129 | + return new RequestRetryOptions( |
| 130 | + RetryPolicyType.EXPONENTIAL, |
| 131 | + maxRetries + 1, |
| 132 | + 60, |
| 133 | + 50L, |
| 134 | + 100L, |
| 135 | + // The SDK doesn't work well with ip endponts. Secondary host endpoints that contain |
| 136 | + // a path causes the sdk to rewrite the endpoint with an invalid path, that's the reason why we provide just the host + |
| 137 | + // port. |
| 138 | + secondaryHost != null ? secondaryHost.replaceFirst("/account", "") : null |
| 139 | + ); |
| 140 | + } |
| 141 | + |
| 142 | + @Override |
| 143 | + long getUploadBlockSize() { |
| 144 | + return ByteSizeUnit.MB.toBytes(1); |
| 145 | + } |
| 146 | + |
| 147 | + @Override |
| 148 | + int getMaxReadRetries(String clientName) { |
| 149 | + return maxRetries; |
| 150 | + } |
| 151 | + }; |
| 152 | + |
| 153 | + final RepositoryMetadata repositoryMetadata = new RepositoryMetadata( |
| 154 | + "repository", |
| 155 | + AzureRepository.TYPE, |
| 156 | + Settings.builder() |
| 157 | + .put(CONTAINER_SETTING.getKey(), "container") |
| 158 | + .put(ACCOUNT_SETTING.getKey(), clientName) |
| 159 | + .put(LOCATION_MODE_SETTING.getKey(), locationMode) |
| 160 | + .put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB)) |
| 161 | + .build() |
| 162 | + ); |
| 163 | + |
| 164 | + return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service, BigArrays.NON_RECYCLING_INSTANCE)); |
| 165 | + } |
| 166 | + |
| 167 | + protected static byte[] randomBlobContent() { |
| 168 | + return randomByteArrayOfLength(randomIntBetween(1, 1 << 20)); // rarely up to 1mb |
| 169 | + } |
| 170 | + |
| 171 | + private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$"); |
| 172 | + |
| 173 | + protected static Tuple<Long, Long> getRanges(HttpExchange exchange) { |
| 174 | + final String rangeHeader = exchange.getRequestHeaders().getFirst("X-ms-range"); |
| 175 | + if (rangeHeader == null) { |
| 176 | + return Tuple.tuple(0L, MAX_RANGE_VAL); |
| 177 | + } |
| 178 | + |
| 179 | + final Matcher matcher = RANGE_PATTERN.matcher(rangeHeader); |
| 180 | + assertTrue(rangeHeader + " matches expected pattern", matcher.matches()); |
| 181 | + final long rangeStart = Long.parseLong(matcher.group(1)); |
| 182 | + final long rangeEnd = Long.parseLong(matcher.group(2)); |
| 183 | + assertThat(rangeStart, lessThanOrEqualTo(rangeEnd)); |
| 184 | + return Tuple.tuple(rangeStart, rangeEnd); |
| 185 | + } |
| 186 | + |
| 187 | + protected static int getRangeStart(HttpExchange exchange) { |
| 188 | + return Math.toIntExact(getRanges(exchange).v1()); |
| 189 | + } |
| 190 | + |
| 191 | + protected static Optional<Integer> getRangeEnd(HttpExchange exchange) { |
| 192 | + final long rangeEnd = getRanges(exchange).v2(); |
| 193 | + if (rangeEnd == MAX_RANGE_VAL) { |
| 194 | + return Optional.empty(); |
| 195 | + } |
| 196 | + return Optional.of(Math.toIntExact(rangeEnd)); |
| 197 | + } |
| 198 | + |
| 199 | + protected String getEndpointForServer(HttpServer server, String accountName) { |
| 200 | + InetSocketAddress address = server.getAddress(); |
| 201 | + return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort() + "/" + accountName; |
| 202 | + } |
| 203 | + |
| 204 | + protected void readFromInputStream(InputStream inputStream, long bytesToRead) { |
| 205 | + try { |
| 206 | + long totalBytesRead = 0; |
| 207 | + while (inputStream.read() != -1 && totalBytesRead < bytesToRead) { |
| 208 | + totalBytesRead += 1; |
| 209 | + } |
| 210 | + assertThat(totalBytesRead, equalTo(bytesToRead)); |
| 211 | + } catch (IOException e) { |
| 212 | + throw new RuntimeException(e); |
| 213 | + } |
| 214 | + } |
| 215 | +} |
0 commit comments