Skip to content

Commit bc67982

Browse files
authored
Add Azure Blob Storage support to ClickPipe object storage (#332)
1 parent 1de0af4 commit bc67982

File tree

9 files changed

+282
-41
lines changed

9 files changed

+282
-41
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
## ClickPipe Object Storage with Azure Blob Storage example
2+
3+
This example demonstrates how to deploy a ClickPipe with an Azure Blob Storage container as the input source,
4+
authenticated with a connection string.
5+
6+
## Prerequisites
7+
8+
- Azure Storage Account with a blob container
9+
- Files in the container (e.g., JSON files in `data/` folder)
10+
- Azure Storage Account connection string
11+
12+
## How to run
13+
14+
- Rename `variables.sample.tfvars` to `variables.tfvars` and fill in all needed data.
15+
- Run `terraform init`
16+
- Run `terraform <plan|apply> -var-file=variables.tfvars`
17+
18+
## Azure Blob Storage Configuration
19+
20+
The example uses:
21+
- `type = "azureblobstorage"` for Azure Blob Storage
22+
- `authentication = "CONNECTION_STRING"` with Azure connection string
23+
- `azure_container_name` to specify the container
24+
- `path` to specify file path within the container (supports wildcards)
25+
26+
## Connection String Format
27+
28+
The Azure connection string should be in the format:
29+
```
30+
DefaultEndpointsProtocol=https;AccountName=<account_name>;AccountKey=<account_key>;EndpointSuffix=core.windows.net
31+
```
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
variable "organization_id" {}
2+
variable "token_key" {}
3+
variable "token_secret" {}
4+
5+
variable "service_id" {
6+
description = "ClickHouse service ID"
7+
}
8+
9+
variable "azure_connection_string" {
10+
description = "Azure Blob Storage connection string"
11+
sensitive = true
12+
}
13+
14+
variable "azure_container_name" {
15+
description = "Azure Blob Storage container name"
16+
}
17+
18+
variable "azure_path" {
19+
description = "Path to the file(s) within the Azure container"
20+
default = "data/*.json"
21+
}
22+
23+
resource "clickhouse_clickpipe" "azure_blob" {
24+
name = "Azure Blob Storage 🚀 ClickPipe"
25+
service_id = var.service_id
26+
state = "Running"
27+
source = {
28+
object_storage = {
29+
type = "azureblobstorage"
30+
format = "JSONEachRow"
31+
32+
path = var.azure_path
33+
azure_container_name = var.azure_container_name
34+
connection_string = var.azure_connection_string
35+
36+
authentication = "CONNECTION_STRING"
37+
}
38+
}
39+
40+
destination = {
41+
table = "my_azure_data_table"
42+
managed_table = true
43+
44+
table_definition = {
45+
engine = {
46+
type = "MergeTree"
47+
}
48+
}
49+
50+
columns = [
51+
{
52+
name = "id"
53+
type = "UInt64"
54+
}, {
55+
name = "name"
56+
type = "String"
57+
}, {
58+
name = "timestamp"
59+
type = "DateTime"
60+
}
61+
]
62+
}
63+
64+
field_mappings = [
65+
{
66+
source_field = "user_id"
67+
destination_field = "id"
68+
}, {
69+
source_field = "user_name"
70+
destination_field = "name"
71+
}, {
72+
source_field = "created_at"
73+
destination_field = "timestamp"
74+
}
75+
]
76+
}
77+
78+
output "clickpipe_id" {
79+
value = clickhouse_clickpipe.azure_blob.id
80+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# This file is generated automatically please do not edit
2+
terraform {
3+
required_providers {
4+
clickhouse = {
5+
version = "3.3.3-alpha3"
6+
source = "ClickHouse/clickhouse"
7+
}
8+
}
9+
}
10+
11+
provider "clickhouse" {
12+
organization_id = var.organization_id
13+
token_key = var.token_key
14+
token_secret = var.token_secret
15+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
terraform {
2+
required_providers {
3+
clickhouse = {
4+
version = "${CLICKHOUSE_TERRAFORM_PROVIDER_VERSION}"
5+
source = "ClickHouse/clickhouse"
6+
}
7+
}
8+
}
9+
10+
provider "clickhouse" {
11+
organization_id = var.organization_id
12+
token_key = var.token_key
13+
token_secret = var.token_secret
14+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# these keys are for example only and won't work when pointed to a deployed ClickHouse OpenAPI server
2+
organization_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"
3+
token_key = "avhj1U5QCdWAE9CA9"
4+
token_secret = "4b1dROiHQEuSXJHlV8zHFd0S7WQj7CGxz5kGJeJnca"
5+
service_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"
6+
7+
# Azure Blob Storage configuration
8+
azure_connection_string = "DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=mykey123;EndpointSuffix=core.windows.net"
9+
azure_container_name = "mycontainer"
10+
azure_path = "data/*.json"

pkg/internal/api/clickpipe.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ var (
4646
)
4747

4848
const (
49-
ClickPipeAuthenticationIAMRole = "IAM_ROLE"
50-
ClickPipeAuthenticationIAMUser = "IAM_USER"
49+
ClickPipeAuthenticationIAMRole = "IAM_ROLE"
50+
ClickPipeAuthenticationIAMUser = "IAM_USER"
51+
ClickPipeAuthenticationConnectionString = "CONNECTION_STRING"
5152

5253
ClickPipeKafkaAuthenticationPlain = "PLAIN"
5354
ClickPipeKafkaAuthenticationScramSha256 = "SCRAM-SHA-256"
@@ -83,6 +84,7 @@ var ClickPipeKafkaSourceTypes = []string{
8384
var ClickPipeObjectStorageAuthenticationMethods = []string{
8485
ClickPipeAuthenticationIAMRole,
8586
ClickPipeAuthenticationIAMUser,
87+
ClickPipeAuthenticationConnectionString,
8688
}
8789

8890
var ClickPipeKinesisAuthenticationMethods = []string{
@@ -110,13 +112,15 @@ var ClickPipeObjectStorageFormats = []string{
110112
}
111113

112114
const (
113-
ClickPipeObjectStorageS3Type = "s3"
114-
ClickPipeObjectStorageGCSType = "gcs"
115+
ClickPipeObjectStorageS3Type = "s3"
116+
ClickPipeObjectStorageGCSType = "gcs"
117+
ClickPipeObjectStorageAzureBlobType = "azureblobstorage"
115118
)
116119

117120
var ClickPipeObjectStorageTypes = []string{
118121
ClickPipeObjectStorageS3Type,
119122
ClickPipeObjectStorageGCSType,
123+
ClickPipeObjectStorageAzureBlobType,
120124
}
121125

122126
const (

pkg/internal/api/clickpipe_models.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type ClickPipeObjectStorageSource struct {
6161
Type string `json:"type"`
6262
Format string `json:"format"`
6363

64-
URL string `json:"url"`
64+
URL string `json:"url,omitempty"`
6565
Delimiter *string `json:"delimiter,omitempty"`
6666
Compression *string `json:"compression,omitempty"`
6767

@@ -70,6 +70,11 @@ type ClickPipeObjectStorageSource struct {
7070
Authentication *string `json:"authentication,omitempty"`
7171
AccessKey *ClickPipeSourceAccessKey `json:"accessKey,omitempty"`
7272
IAMRole *string `json:"iamRole,omitempty"`
73+
74+
// Azure Blob Storage specific fields
75+
ConnectionString *string `json:"connectionString,omitempty"`
76+
Path *string `json:"path,omitempty"`
77+
AzureContainerName *string `json:"azureContainerName,omitempty"`
7378
}
7479

7580
type ClickPipeKinesisSource struct {

pkg/resource/clickpipe.go

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest,
322322
},
323323
},
324324
"object_storage": schema.SingleNestedAttribute{
325-
MarkdownDescription: "The Kafka source configuration for the ClickPipe.",
325+
MarkdownDescription: "The compatible object storage source configuration for the ClickPipe.",
326326
Optional: true,
327327
Attributes: map[string]schema.Attribute{
328328
"type": schema.StringAttribute{
@@ -355,8 +355,8 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest,
355355
},
356356
},
357357
"url": schema.StringAttribute{
358-
MarkdownDescription: "The URL of the S3 bucket. Provide a path to the file(s) you want to ingest. You can specify multiple files using bash-like wildcards. For more information, see the documentation on using wildcards in path: https://clickhouse.com/docs/en/integrations/clickpipes/object-storage#limitations",
359-
Required: true,
358+
MarkdownDescription: "The URL of the S3/GCS bucket. Required for S3 and GCS types. Not used for Azure Blob Storage (use path and azure_container_name instead). You can specify multiple files using bash-like wildcards. For more information, see the documentation on using wildcards in path: https://clickhouse.com/docs/en/integrations/clickpipes/object-storage#limitations",
359+
Optional: true,
360360
PlanModifiers: []planmodifier.String{
361361
stringplanmodifier.RequiresReplace(),
362362
},
@@ -391,11 +391,8 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest,
391391
},
392392
},
393393
"authentication": schema.StringAttribute{
394-
MarkdownDescription: fmt.Sprintf(
395-
"Authentication method. If not provided, no authentication is used. It can be used to access public buckets.. (%s).",
396-
wrapStringsWithBackticksAndJoinCommaSeparated(api.ClickPipeObjectStorageAuthenticationMethods),
397-
),
398-
Optional: true,
394+
MarkdownDescription: "CONNECTION_STRING is for Azure Blob Storage. IAM_ROLE and IAM_USER are for AWS S3/GCS/DigitalOcean. If not provided, no authentication is used",
395+
Optional: true,
399396
Validators: []validator.String{
400397
stringvalidator.OneOf(api.ClickPipeObjectStorageAuthenticationMethods...),
401398
},
@@ -420,6 +417,25 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest,
420417
MarkdownDescription: "The IAM role for the S3 source. Use with `IAM_ROLE` authentication. It can be used with AWS ClickHouse service only. Read more in [ClickPipes documentation page](https://clickhouse.com/docs/en/integrations/clickpipes/object-storage#authentication)",
421418
Optional: true,
422419
},
420+
"connection_string": schema.StringAttribute{
421+
MarkdownDescription: "Connection string for Azure Blob Storage authentication. Required when authentication is CONNECTION_STRING. Example: `DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=mykey;EndpointSuffix=core.windows.net`",
422+
Optional: true,
423+
Sensitive: true,
424+
},
425+
"path": schema.StringAttribute{
426+
MarkdownDescription: "Path to the file(s) within the Azure container. Used for Azure Blob Storage sources. You can specify multiple files using bash-like wildcards. For more information, see the documentation on using wildcards in path: https://clickhouse.com/docs/en/integrations/clickpipes/object-storage#limitations. Example: `data/logs/*.json`",
427+
Optional: true,
428+
PlanModifiers: []planmodifier.String{
429+
stringplanmodifier.RequiresReplace(),
430+
},
431+
},
432+
"azure_container_name": schema.StringAttribute{
433+
MarkdownDescription: "Container name for Azure Blob Storage. Required when type is azureblobstorage. Example: `mycontainer`",
434+
Optional: true,
435+
PlanModifiers: []planmodifier.String{
436+
stringplanmodifier.RequiresReplace(),
437+
},
438+
},
423439
},
424440
PlanModifiers: []planmodifier.Object{
425441
objectplanmodifier.RequiresReplace(),
@@ -957,17 +973,61 @@ func (c *ClickPipeResource) extractSourceFromPlan(ctx context.Context, diagnosti
957973
}
958974
}
959975

960-
source.ObjectStorage = &api.ClickPipeObjectStorageSource{
976+
storageType := objectStorageModel.Type.ValueString()
977+
if storageType == api.ClickPipeObjectStorageAzureBlobType {
978+
if !objectStorageModel.URL.IsNull() && !objectStorageModel.URL.IsUnknown() && objectStorageModel.URL.ValueString() != "" {
979+
diagnostics.AddError(
980+
"Error Creating ClickPipe",
981+
"URL field should not be used with Azure Blob Storage. Use 'path' and 'azure_container_name' fields instead",
982+
)
983+
return nil
984+
}
985+
986+
if objectStorageModel.AzureContainerName.IsNull() || objectStorageModel.AzureContainerName.IsUnknown() || objectStorageModel.AzureContainerName.ValueString() == "" {
987+
diagnostics.AddError(
988+
"Error Creating ClickPipe",
989+
"azure_container_name is required when using Azure Blob Storage",
990+
)
991+
return nil
992+
}
993+
994+
if objectStorageModel.ConnectionString.IsNull() || objectStorageModel.ConnectionString.IsUnknown() || objectStorageModel.ConnectionString.ValueString() == "" {
995+
diagnostics.AddError(
996+
"Error Creating ClickPipe",
997+
"connection_string is required when using Azure Blob Storage",
998+
)
999+
return nil
1000+
}
1001+
} else {
1002+
if objectStorageModel.URL.IsNull() || objectStorageModel.URL.IsUnknown() || objectStorageModel.URL.ValueString() == "" {
1003+
diagnostics.AddError(
1004+
"Error Creating ClickPipe",
1005+
fmt.Sprintf("URL is required when using %s storage type", storageType),
1006+
)
1007+
return nil
1008+
}
1009+
}
1010+
1011+
objectStorage := &api.ClickPipeObjectStorageSource{
9611012
Type: objectStorageModel.Type.ValueString(),
9621013
Format: objectStorageModel.Format.ValueString(),
963-
URL: objectStorageModel.URL.ValueString(),
9641014
Delimiter: objectStorageModel.Delimiter.ValueStringPointer(),
9651015
Compression: objectStorageModel.Compression.ValueStringPointer(),
9661016
IsContinuous: objectStorageModel.IsContinuous.ValueBool(),
9671017
Authentication: objectStorageModel.Authentication.ValueStringPointer(),
9681018
AccessKey: accessKey,
9691019
IAMRole: objectStorageModel.IAMRole.ValueStringPointer(),
9701020
}
1021+
1022+
if storageType == api.ClickPipeObjectStorageAzureBlobType {
1023+
objectStorage.ConnectionString = objectStorageModel.ConnectionString.ValueStringPointer()
1024+
objectStorage.Path = objectStorageModel.Path.ValueStringPointer()
1025+
objectStorage.AzureContainerName = objectStorageModel.AzureContainerName.ValueStringPointer()
1026+
} else {
1027+
objectStorage.URL = objectStorageModel.URL.ValueString()
1028+
}
1029+
1030+
source.ObjectStorage = objectStorage
9711031
} else if !sourceModel.Kinesis.IsNull() {
9721032
kinesisModel := models.ClickPipeKinesisSourceModel{}
9731033
diagnostics.Append(sourceModel.Kinesis.As(ctx, &kinesisModel, basetypes.ObjectAsOptions{})...)
@@ -1161,14 +1221,25 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model
11611221
}
11621222

11631223
objectStorageModel := models.ClickPipeObjectStorageSourceModel{
1164-
Type: types.StringValue(clickPipe.Source.ObjectStorage.Type),
1165-
Format: types.StringValue(clickPipe.Source.ObjectStorage.Format),
1166-
URL: types.StringValue(clickPipe.Source.ObjectStorage.URL),
1167-
Delimiter: types.StringPointerValue(clickPipe.Source.ObjectStorage.Delimiter),
1168-
Compression: types.StringPointerValue(clickPipe.Source.ObjectStorage.Compression),
1169-
IsContinuous: types.BoolValue(clickPipe.Source.ObjectStorage.IsContinuous),
1170-
Authentication: types.StringPointerValue(clickPipe.Source.ObjectStorage.Authentication),
1171-
IAMRole: types.StringPointerValue(clickPipe.Source.ObjectStorage.IAMRole),
1224+
Type: types.StringValue(clickPipe.Source.ObjectStorage.Type),
1225+
Format: types.StringValue(clickPipe.Source.ObjectStorage.Format),
1226+
Delimiter: types.StringPointerValue(clickPipe.Source.ObjectStorage.Delimiter),
1227+
Compression: types.StringPointerValue(clickPipe.Source.ObjectStorage.Compression),
1228+
IsContinuous: types.BoolValue(clickPipe.Source.ObjectStorage.IsContinuous),
1229+
IAMRole: types.StringPointerValue(clickPipe.Source.ObjectStorage.IAMRole),
1230+
}
1231+
1232+
// Set storage-type-specific fields
1233+
if clickPipe.Source.ObjectStorage.Type == api.ClickPipeObjectStorageAzureBlobType {
1234+
// For Azure Blob Storage, preserve all fields from state as API doesn't return them
1235+
objectStorageModel.Authentication = stateObjectStorageModel.Authentication
1236+
objectStorageModel.ConnectionString = stateObjectStorageModel.ConnectionString
1237+
objectStorageModel.Path = stateObjectStorageModel.Path
1238+
objectStorageModel.AzureContainerName = stateObjectStorageModel.AzureContainerName
1239+
} else {
1240+
// For S3-compatible storage, use API response values
1241+
objectStorageModel.Authentication = types.StringPointerValue(clickPipe.Source.ObjectStorage.Authentication)
1242+
objectStorageModel.URL = types.StringValue(clickPipe.Source.ObjectStorage.URL)
11721243
}
11731244

11741245
if !stateObjectStorageModel.AccessKey.IsNull() {

0 commit comments

Comments
 (0)