Skip to content

support azure blob storage #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: kesque_2.4.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,21 @@ gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
# For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
gcsManagedLedgerOffloadServiceAccountKeyFile=

# For Azure Blob Storage, the account name
azureStorageAccountName=

# For Azure Blob Storage, the account key
azureStorageAccountKey=

# For Azure Blob Storage, the container name
azureStorageContainer=

# For Azure Blob Storage, Max block size in bytes. (64MB by default, 5MB minimum)
azureManagedLedgerOffloadMaxBlockSizeInBytes=67108864

# For Azure Blob Storage ledger offload, Read buffer size in bytes (1MB by default)
azureManagedLedgerOffloadReadBufferSizeInBytes=1048576

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public class TieredStorageConfigurationData implements Serializable, Cloneable {
// For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;

private String azureStorageAccountName = null;
private String azureStorageAccountKey = null;
private String azureStorageContainer = null;
private int azureManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB
private int azureManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB

/**
* Builds an AWS credential provider based on the offload options
* @return aws credential provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.jclouds.s3.reference.S3Constants;
import org.jclouds.osgi.ApiRegistry;
import org.jclouds.s3.S3ApiMetadata;
import org.jclouds.azureblob.AzureBlobProviderMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -82,7 +83,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
private static final String METADATA_FIELD_REGION = "region";
private static final String METADATA_FIELD_ENDPOINT = "endpoint";

public static final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"};
public static final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "azureblob"};

// use these keys for both s3 and gcs.
static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion";
Expand All @@ -100,6 +101,10 @@ public static boolean isGcsDriver(String driver) {
return driver.equalsIgnoreCase(DRIVER_NAMES[2]);
}

public static boolean isAzureBlobDriver(String driver) {
return driver.equalsIgnoreCase(DRIVER_NAMES[3]);
}

private static void addVersionInfo(BlobBuilder blobBuilder, Map<String, String> userMetadata) {
ImmutableMap.Builder<String, String> metadataBuilder = ImmutableMap.builder();
metadataBuilder.putAll(userMetadata);
Expand Down Expand Up @@ -128,6 +133,7 @@ private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String driver,
ApiRegistry.registerApi(new S3ApiMetadata());
ProviderRegistry.registerProvider(new AWSS3ProviderMetadata());
ProviderRegistry.registerProvider(new GoogleCloudStorageProviderMetadata());
ProviderRegistry.registerProvider(new AzureBlobProviderMetadata());

ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
contextBuilder.credentialsSupplier(credentials);
Expand Down Expand Up @@ -217,15 +223,23 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfigurationD
+ " if s3 offload enabled");
}

bucket = isAzureBlobDriver(driver) ?
conf.getAzureStorageContainer() : bucket;
if (Strings.isNullOrEmpty(bucket)) {
throw new IOException(
"ManagedLedgerOffloadBucket cannot be empty for s3 and gcs offload");
"ManagedLedgerOffloadBucket cannot be empty for s3, azure blob or gcs offload");
}

maxBlockSize = isAzureBlobDriver(driver) ?
conf.getAzureManagedLedgerOffloadMaxBlockSizeInBytes() : maxBlockSize;
if (maxBlockSize < 5*1024*1024) {
throw new IOException(
"ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for s3 and gcs offload");
}

readBufferSize = isAzureBlobDriver(driver) ?
conf.getAzureManagedLedgerOffloadReadBufferSizeInBytes() : readBufferSize;

Supplier<Credentials> credentials = getCredentials(driver, conf);

return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler,
Expand Down Expand Up @@ -282,6 +296,16 @@ public static Supplier<Credentials> getCredentials(String driver, TieredStorageC
return new Credentials(creds.getAWSAccessKeyId(), creds.getAWSSecretKey());
}
};
} else if (isAzureBlobDriver(driver)) {
String accountName = conf.getAzureStorageAccountName();
String accountKey = conf.getAzureStorageAccountKey();
if (Strings.isNullOrEmpty(accountName) || Strings.isNullOrEmpty(accountKey)) {
throw new RuntimeException("Unable to fetch Azure credentials after start, unexpected!");
}
return () -> {
return new Credentials(accountName, accountKey);
};

} else {
throw new IOException(
"Not support this kind of driver: " + driver);
Expand Down Expand Up @@ -316,6 +340,7 @@ public static Supplier<Credentials> getCredentials(String driver, TieredStorageC
.description(region)
.build();
} else {
// Azure has only one local `azureblob` so it does not require location or writeLocation
this.writeLocation = null;
}

Expand Down