diff --git a/Cargo.lock b/Cargo.lock
index 5cc606f2a..83a281d20 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3240,6 +3240,7 @@ dependencies = [
"rand",
"reqwest 0.12.12",
"ring",
+ "rustls-pemfile 2.2.0",
"serde",
"serde_json",
"snafu",
diff --git a/Cargo.toml b/Cargo.toml
index aa2d2de11..382c22142 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,14 +10,19 @@ build = "build.rs"
[dependencies]
# Arrow and DataFusion ecosystem
arrow = "54.0.0"
-arrow-array = "54.0.0"
+arrow-array = "54.0.0"
arrow-flight = { version = "54.0.0", features = ["tls"] }
arrow-ipc = { version = "54.0.0", features = ["zstd"] }
arrow-json = "54.0.0"
arrow-schema = { version = "54.0.0", features = ["serde"] }
arrow-select = "54.0.0"
datafusion = "45.0.0"
-object_store = { version = "0.11.2", features = ["cloud", "aws", "azure"] }
+object_store = { version = "0.11.2", features = [
+ "cloud",
+ "aws",
+ "azure",
+ "gcp",
+] }
parquet = "54.0.0"
# Web server and HTTP-related
@@ -34,7 +39,11 @@ tower-http = { version = "0.6.1", features = ["cors"] }
url = "2.4.0"
# Connectors dependencies
-rdkafka = { version = "0.37", optional = true, features = ["cmake-build", "tracing", "libz-static"] }
+rdkafka = { version = "0.37", optional = true, features = [
+ "cmake-build",
+ "tracing",
+ "libz-static",
+] }
sasl2-sys = { version = "0.1.22", optional = true, features = ["vendored"] }
# Authentication and Security
@@ -144,7 +153,14 @@ assets-sha1 = "3e703ef8bedf8ae55fd31713f6267ad14ad3d29d"
[features]
debug = []
-kafka = ["rdkafka", "rdkafka/ssl-vendored", "rdkafka/ssl", "rdkafka/sasl", "sasl2-sys", "sasl2-sys/vendored"]
+kafka = [
+ "rdkafka",
+ "rdkafka/ssl-vendored",
+ "rdkafka/ssl",
+ "rdkafka/sasl",
+ "sasl2-sys",
+ "sasl2-sys/vendored",
+]
[profile.release-lto]
inherits = "release"
diff --git a/Dockerfile.debug b/Dockerfile.debug
index 35ba5fbae..de7880003 100644
--- a/Dockerfile.debug
+++ b/Dockerfile.debug
@@ -14,7 +14,7 @@
# along with this program. If not, see .
# build stage
-FROM rust:1.84.0-bookworm AS builder
+FROM docker.io/rust:1.84.0-bookworm AS builder
LABEL org.opencontainers.image.title="Parseable"
LABEL maintainer="Parseable Team "
diff --git a/docker-compose-gcs-distributed-test.yaml b/docker-compose-gcs-distributed-test.yaml
new file mode 100644
index 000000000..a4a99b946
--- /dev/null
+++ b/docker-compose-gcs-distributed-test.yaml
@@ -0,0 +1,110 @@
+networks:
+ parseable-internal:
+
+services:
+ # query server
+ parseable-query:
+ container_name: parseable-query
+ build:
+ context: .
+ dockerfile: Dockerfile.debug
+ platform: linux/amd64
+ command: ["parseable", "gcs-store"]
+ ports:
+ - "8000:8000"
+ environment:
+ - P_GCS_BUCKET=parseable-test-gcs-local
+ - P_STAGING_DIR=/tmp/data
+ - P_USERNAME=parseableadmin
+ - P_PASSWORD=parseableadmin
+ - P_CHECK_UPDATE=false
+ - P_PARQUET_COMPRESSION_ALGO=snappy
+ - P_MODE=query
+ - RUST_LOG=warn
+ - GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/${GCS_CREDENTIALS_FILE:-key.json}
+ networks:
+ - parseable-internal
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"]
+ interval: 15s
+ timeout: 20s
+ retries: 5
+ deploy:
+ restart_policy:
+ condition: on-failure
+ delay: 20s
+ max_attempts:
+ volumes:
+ - "${GCS_CREDENTIALS_PATH:-./credentials}:/parseable/svc/:ro,z"
+ # ingest server one
+ parseable-ingest-one:
+ container_name: parseable-ingest-one
+ build:
+ context: .
+ dockerfile: Dockerfile.debug
+ platform: linux/amd64
+ command: ["parseable", "gcs-store"]
+ ports:
+ - "8000"
+ environment:
+ - P_GCS_BUCKET=parseable-test-gcs-local
+ - P_STAGING_DIR=/tmp/data
+ - P_USERNAME=parseableadmin
+ - P_PASSWORD=parseableadmin
+ - P_CHECK_UPDATE=false
+ - P_PARQUET_COMPRESSION_ALGO=snappy
+ - P_MODE=ingest
+ - P_INGESTOR_ENDPOINT=parseable-ingest-one:8000
+ - RUST_LOG=warn
+ - GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/${GCS_CREDENTIALS_FILE:-key.json}
+ networks:
+ - parseable-internal
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"]
+ interval: 15s
+ timeout: 20s
+ retries: 5
+ depends_on:
+ parseable-query:
+ condition: service_healthy
+ deploy:
+ restart_policy:
+ condition: on-failure
+ delay: 20s
+ max_attempts: 3
+ volumes:
+ - "${GCS_CREDENTIALS_PATH:-./credentials}:/parseable/svc/:ro,z"
+
+ quest:
+ platform: linux/amd64
+ image: ghcr.io/parseablehq/quest:main
+ pull_policy: always
+ command:
+ [
+ "load",
+ "http://parseable-query:8000",
+ "parseableadmin",
+ "parseableadmin",
+ "20",
+ "10",
+ "5m",
+ "storage.googleapis.com",
+ "",
+ "",
+ "parseable-test-gcs-local",
+ "http://parseable-ingest-one:8000",
+ "parseableadmin",
+ "parseableadmin",
+ ]
+ networks:
+ - parseable-internal
+ depends_on:
+ parseable-query:
+ condition: service_healthy
+ parseable-ingest-one:
+ condition: service_healthy
+ deploy:
+ restart_policy:
+ condition: on-failure
+ delay: 20s
+ max_attempts: 3
diff --git a/helm/templates/ingestor-statefulset.yaml b/helm/templates/ingestor-statefulset.yaml
index 5143e78f3..73185cd2b 100644
--- a/helm/templates/ingestor-statefulset.yaml
+++ b/helm/templates/ingestor-statefulset.yaml
@@ -66,6 +66,12 @@ spec:
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
+ {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }}
+ volumes:
+ - name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }}
+ secret:
+ secretName: {{ .Values.parseable.gcsModeSecret.auth.secret_name }}
+ {{- end }}
containers:
- name: {{ .Chart.Name }}
securityContext:
@@ -74,19 +80,19 @@ spec:
imagePullPolicy: {{ .Values.parseable.image.pullPolicy }}
args:
- /usr/bin/parseable
- - {{ if eq .Values.parseable.store "gcs-store" }}"s3-store"{{ else }}{{ .Values.parseable.store | quote }}{{ end }}
+ - {{ .Values.parseable.store | quote }}
- --ingestor-endpoint=$(HOSTNAME).{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }}
env:
- {{- range $key, $value := .Values.parseable.highAvailability.ingestor.env }}
- - name: {{ $key }}
- value: {{ tpl $value $ | quote }}
- name: HOSTNAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
+ {{- range $key, $value := .Values.parseable.highAvailability.ingestor.env }}
+ - name: {{ $key }}
+ value: {{ tpl $value $ | quote }}
{{- end }}
-
+
{{- if .Values.parseable.auditLogging.enabled }}
- name: P_AUDIT_LOGGER
value: {{ .Values.parseable.auditLogging.p_server | quote }}
@@ -111,11 +117,13 @@ spec:
{{- end }}
{{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }}
+ - name: GOOGLE_APPLICATION_CREDENTIALS
+ value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}/{{ .Values.parseable.gcsModeSecret.auth.secret_key }}
{{- range $secret := .Values.parseable.gcsModeSecret.secrets }}
{{- range $key := $secret.keys }}
{{- $envPrefix := $secret.prefix | default "" | upper }}
{{- $envKey := $key | upper | replace "." "_" | replace "-" "_" }}
- - name: {{ $envPrefix }}{{ $envKey | replace "GCS" "S3"}}
+ - name: {{ $envPrefix }}{{ $envKey }}
valueFrom:
secretKeyRef:
name: {{ $secret.name }}
@@ -137,30 +145,39 @@ spec:
{{- end }}
{{- end }}
{{- end }}
+
- name: P_MODE
value: "ingest"
+
{{- if .Values.parseable.kafkaConnector.enabled }}
- {{- range $key, $value := .Values.parseable.kafkaConnector.env }}
+ {{- range $key, $value := .Values.parseable.kafkaConnector.env }}
- name: {{ $key }}
value: {{ tpl $value $ | quote }}
- {{- end }}
+ {{- end }}
{{- end }}
ports:
- containerPort: {{ .Values.parseable.highAvailability.ingestor.port }}
- {{- with .Values.readinessProbe }}
+ {{- with .Values.readinessProbe }}
readinessProbe:
- {{ toYaml . | nindent 12 }}
- {{- end }}
+ {{- toYaml . | nindent 10 }}
+ {{- end }}
resources:
- {{- toYaml .Values.parseable.highAvailability.ingestor.resources | nindent 12 }}
- {{- if .Values.parseable.persistence.ingestor.enabled }}
+ {{- toYaml .Values.parseable.highAvailability.ingestor.resources | nindent 10 }}
+ {{- if or .Values.parseable.persistence.ingestor.enabled (and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled) }}
volumeMounts:
+ {{- if .Values.parseable.persistence.ingestor.enabled }}
- mountPath: "/parseable/staging"
name: stage-volume
{{- end }}
- volumeClaimTemplates:
+ {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }}
+ - mountPath: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}
+ name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }}
+ readOnly: true
+ {{- end }}
+ {{- end }}
{{- if .Values.parseable.persistence.ingestor.enabled }}
+ volumeClaimTemplates:
- metadata:
name: stage-volume
spec:
@@ -171,4 +188,4 @@ spec:
requests:
storage: {{ .Values.parseable.persistence.ingestor.size | quote }}
{{- end }}
-{{- end }}
\ No newline at end of file
+{{- end }}
diff --git a/helm/templates/querier-statefulset.yaml b/helm/templates/querier-statefulset.yaml
index 31333a189..de1f28298 100644
--- a/helm/templates/querier-statefulset.yaml
+++ b/helm/templates/querier-statefulset.yaml
@@ -29,8 +29,10 @@ spec:
minReadySeconds: 2
template:
metadata:
+ {{- with .Values.parseable.podAnnotations }}
annotations:
- {{- .Values.parseable.podAnnotations | toYaml | nindent 8 }}
+ {{- toYaml . | nindent 8 }}
+ {{- end }}
labels:
{{- .Values.parseable.podLabels | toYaml | nindent 8 }}
{{- include "parseable.querierLabelsSelector" . | nindent 8 }}
@@ -39,19 +41,33 @@ spec:
serviceAccountName: {{ include "parseable.serviceAccountName" . }}
{{- with .Values.parseable.toleration }}
tolerations:
- {{ toYaml . | nindent 8 }}
+ {{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.parseable.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
+ {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }}
+ volumes:
+ - name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }}
+ secret:
+ secretName: {{ .Values.parseable.gcsModeSecret.auth.secret_name }}
+ - name: stage-volume
+ emptyDir: {}
+ {{- else }}
+ volumes:
+ - name: stage-volume
+ emptyDir: {}
+ {{- end }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.parseable.securityContext | nindent 10 }}
image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }}
imagePullPolicy: {{ .Values.parseable.image.pullPolicy }}
- args: ["/usr/bin/parseable", {{ if eq .Values.parseable.store "gcs-store" }}"s3-store"{{ else }}{{ .Values.parseable.store | quote }}{{ end }}]
+ args:
+ - "/usr/bin/parseable"
+ - {{ .Values.parseable.store | quote }}
env:
- name: HOSTNAME
valueFrom:
@@ -66,7 +82,7 @@ spec:
- name: P_MAX_DISK_USAGE_PERCENT
value: "95.0"
{{- end }}
- {{- range $key, $value := .Values.parseable.env }}
+ {{- range $key, $value := .Values.parseable.env }}
- name: {{ $key }}
value: {{ tpl $value $ | quote }}
{{- end }}
@@ -95,11 +111,13 @@ spec:
{{- end }}
{{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }}
+ - name: GOOGLE_APPLICATION_CREDENTIALS
+ value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}/{{ .Values.parseable.gcsModeSecret.auth.secret_key }}
{{- range $secret := .Values.parseable.gcsModeSecret.secrets }}
{{- range $key := $secret.keys }}
{{- $envPrefix := $secret.prefix | default "" | upper }}
{{- $envKey := $key | upper | replace "." "_" | replace "-" "_" }}
- - name: {{ $envPrefix }}{{ $envKey | replace "GCS" "S3"}}
+ - name: {{ $envPrefix }}{{ $envKey }}
valueFrom:
secretKeyRef:
name: {{ $secret.name }}
@@ -107,7 +125,7 @@ spec:
{{- end }}
{{- end }}
{{- end }}
-
+
{{- if and .Values.parseable.blobModeSecret .Values.parseable.blobModeSecret.enabled }}
{{- range $secret := .Values.parseable.blobModeSecret.secrets }}
{{- range $key := $secret.keys }}
@@ -121,14 +139,15 @@ spec:
{{- end }}
{{- end }}
{{- end }}
+
ports:
- containerPort: 8000
{{- with .Values.parseable.readinessProbe }}
readinessProbe:
- {{ toYaml . | nindent 12 }}
+ {{- toYaml . | nindent 10 }}
{{- end }}
resources:
- {{- toYaml .Values.parseable.resources | nindent 12 }}
+ {{- toYaml .Values.parseable.resources | nindent 10 }}
volumeMounts:
- mountPath: "/parseable/staging"
name: stage-volume
@@ -136,19 +155,21 @@ spec:
- mountPath: "/parseable/hot-tier"
name: hot-tier-volume
{{- end }}
- volumes:
- - emptyDir: {}
- name: stage-volume
- {{- if .Values.parseable.sidecar.enabled}}
+ {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }}
+ - mountPath: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}
+ name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }}
+ readOnly: true
+ {{- end }}
+ {{- if .Values.parseable.sidecar.enabled }}
- name: {{ .Chart.Name }}-sidecar
securityContext:
- {{- toYaml .Values.parseable.securityContext | nindent 8 }}
+ {{- toYaml .Values.parseable.securityContext | nindent 10 }}
image: {{ .Values.parseable.sidecar.image.repository }}:{{ .Values.parseable.sidecar.image.tag }}
imagePullPolicy: {{ .Values.parseable.sidecar.image.pullPolicy }}
- command: {{ .Values.parseable.sidecar.command }}
- args: {{ .Values.parseable.sidecar.args }}
+ command: {{ .Values.parseable.sidecar.command }}
+ args: {{ .Values.parseable.sidecar.args }}
env:
- {{- range $key, $value := .Values.parseable.sidecar.env }}
+ {{- range $key, $value := .Values.parseable.sidecar.env }}
- name: {{ $key }}
value: {{ tpl $value $ | quote }}
{{- end }}
@@ -156,10 +177,11 @@ spec:
- containerPort: {{ .Values.parseable.sidecar.ports }}
resources:
{{- toYaml .Values.parseable.sidecar.resources | nindent 10 }}
- volumeMounts: {{ .Values.parseable.sidecar.volumeMounts | toYaml | nindent 10 }}
+ volumeMounts:
+ {{- .Values.parseable.sidecar.volumeMounts | toYaml | nindent 10 }}
{{- end }}
- volumeClaimTemplates:
{{- if .Values.parseable.persistence.querier.enabled }}
+ volumeClaimTemplates:
- metadata:
name: hot-tier-volume
spec:
@@ -178,8 +200,13 @@ spec:
resources:
requests:
storage: 5Gi
+ {{- if .Values.parseable.sidecar.enabled }}
+ {{- .Values.parseable.sidecar.volumeClaimTemplates | toYaml | nindent 2 }}
+ {{- end }}
+ {{- else }}
+ {{- if .Values.parseable.sidecar.enabled }}
+ volumeClaimTemplates:
+ {{- .Values.parseable.sidecar.volumeClaimTemplates | toYaml | nindent 2 }}
{{- end }}
- {{- if .Values.parseable.sidecar.enabled}}
- {{- .Values.parseable.sidecar.volumeClaimTemplates | toYaml | nindent 4 }}
{{- end }}
{{- end }}
diff --git a/helm/templates/standalone-deployment.yaml b/helm/templates/standalone-deployment.yaml
index 968d237b8..20627bfe3 100644
--- a/helm/templates/standalone-deployment.yaml
+++ b/helm/templates/standalone-deployment.yaml
@@ -36,14 +36,14 @@ spec:
imagePullPolicy: {{ .Values.parseable.image.pullPolicy }}
# Uncomment to debug
# command: [ "/bin/sh", "-c", "sleep 1000000" ]
- args: [ "/usr/bin/parseable", {{ if eq .Values.parseable.store "gcs-store" }}"s3-store"{{ else }}{{ .Values.parseable.store | quote }}{{ end }}]
+ args: [ "/usr/bin/parseable", {{ .Values.parseable.store | quote }}]
env:
- name: HOSTNAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- {{- range $key, $value := .Values.parseable.env }}
+ {{- range $key, $value := .Values.parseable.env }}
- name: {{ $key }}
value: {{ tpl $value $ | quote }}
{{- end }}
@@ -92,11 +92,13 @@ spec:
{{- end }}
{{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }}
+ - name: GOOGLE_APPLICATION_CREDENTIALS
+ value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}
{{- range $secret := .Values.parseable.gcsModeSecret.secrets }}
{{- range $key := $secret.keys }}
{{- $envPrefix := $secret.prefix | default "" | upper }}
{{- $envKey := $key | upper | replace "." "_" | replace "-" "_" }}
- - name: {{ $envPrefix }}{{ $envKey | replace "GCS" "S3"}}
+ - name: {{ $envPrefix }}{{ $envKey }}
valueFrom:
secretKeyRef:
name: {{ $secret.name }}
@@ -119,21 +121,30 @@ spec:
{{- end }}
{{- end }}
-
ports:
- containerPort: 8000
{{- with .Values.readinessProbe }}
readinessProbe:
- {{ toYaml . | nindent 12 }}
+ {{- toYaml . | nindent 12 }}
{{- end }}
resources:
{{- toYaml .Values.parseable.resources | nindent 12 }}
volumeMounts:
+ {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }}
+ - mountPath: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}
+ name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }}
+ readOnly: true
+ {{- end }}
- mountPath: "/parseable/data"
name: data-volume
- mountPath: "/parseable/staging"
name: stage-volume
volumes:
+ {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }}
+ - name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }}
+ secret:
+ secretName: {{ .Values.parseable.gcsModeSecret.auth.secret_name }}
+ {{- end }}
{{- if .Values.parseable.persistence.staging.enabled }}
- name: stage-volume
persistentVolumeClaim:
@@ -158,4 +169,4 @@ spec:
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
-{{- end }} # Closing for "if eq .Values.parseable.highAvailability.enabled false"
+{{- end }}
diff --git a/helm/values.yaml b/helm/values.yaml
index 16279d08c..591f8d8ac 100644
--- a/helm/values.yaml
+++ b/helm/values.yaml
@@ -8,9 +8,9 @@ parseable:
## Set to true if you want to deploy Parseable in a HA mode (multiple ingestors + hot tier)
## Please note that highAvailability is not supported in local mode
highAvailability:
- enabled: false
+ enabled: true
ingestor:
- affinity: { }
+ affinity: {}
# podAntiAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# - labelSelector:
@@ -21,9 +21,9 @@ parseable:
port: 8000
extraLabels:
app: parseable
- podAnnotations: { }
- nodeSelector: { }
- tolerations: [ ]
+ podAnnotations: {}
+ nodeSelector: {}
+ tolerations: []
labels:
app: parseable
component: ingestor
@@ -137,7 +137,11 @@ parseable:
- s3.bucket
- s3.region
gcsModeSecret:
- enabled: false
+ enabled: true
+ auth:
+ secret_name: parseable-env-secret
+ secret_key: key.json
+ mount_path: /var/secrets/google
secrets:
- name: parseable-env-secret
prefix: P_
@@ -148,15 +152,12 @@ parseable:
- staging.dir
- fs.dir
- gcs.url
- - gcs.access.key
- - gcs.secret.key
- gcs.bucket
- - gcs.region
serviceAccount:
create: true
name: "parseable"
- annotations: { }
- nodeSelector: { }
+ annotations: {}
+ nodeSelector: {}
service:
type: ClusterIP
port: 80
@@ -164,7 +165,7 @@ parseable:
httpGet:
path: /api/v1/readiness
port: 8000
- toleration: [ ]
+ toleration: []
resources:
limits:
cpu: 500m
@@ -173,7 +174,7 @@ parseable:
cpu: 250m
memory: 1Gi
## works only when highAvailability is enabled
- ## Set it to true if you want to deploy Parseable
+ ## Set it to true if you want to deploy Parseable
## Query node with a sidecar
sidecar:
enabled: false
@@ -181,8 +182,8 @@ parseable:
repository: busybox
tag: latest
pullPolicy: IfNotPresent
- command: [ ]
- args: [ ]
+ command: []
+ args: []
env:
RUST_LOG: warn
ports: 8000
@@ -193,7 +194,7 @@ parseable:
- metadata:
name: test-volume
spec:
- accessModes: [ "ReadWriteOnce" ]
+ accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 1Gi
@@ -217,36 +218,36 @@ parseable:
fsGroupChangePolicy: "Always"
nameOverride: ""
fullnameOverride: ""
- affinity: { }
+ affinity: {}
podLabels:
app: parseable
component: query
- tolerations: [ ]
+ tolerations: []
## Use this section to create ServiceMonitor object for
## this Parseable deployment. Read more on ServiceMonitor
## here: https://prometheus-operator.dev/docs/api-reference/api/#monitoring.coreos.com/v1.ServiceMonitor
metrics:
serviceMonitor:
enabled: false
- labels: { }
+ labels: {}
namespace: ""
spec:
jobLabel: ""
- targetLabels: [ ]
- podTargetLabels: [ ]
- endpoints: [ ]
- selector: { }
- namespaceSelector: { }
+ targetLabels: []
+ podTargetLabels: []
+ endpoints: []
+ selector: {}
+ namespaceSelector: {}
sampleLimit: 0
- scrapeProtocols: [ ]
+ scrapeProtocols: []
targetLimit: 0
labelLimit: 0
labelNameLengthLimit: 0
labelValueLengthLimit: 0
keepDroppedTargets: 0
- attachMetadata: { }
+ attachMetadata: {}
scrapeClass: ""
- bodySizeLimit: { }
+ bodySizeLimit: {}
kafkaConnector:
enabled: false
env:
@@ -319,7 +320,7 @@ vector:
image:
repository: timberio/vector
pullPolicy: IfNotPresent
- pullSecrets: [ ]
+ pullSecrets: []
tag: ""
sha: ""
replicas: 1
@@ -332,7 +333,7 @@ vector:
create: true
serviceAccount:
create: true
- annotations: { }
+ annotations: {}
name:
automountToken: true
podLabels:
@@ -344,13 +345,13 @@ vector:
service:
enabled: true
type: "ClusterIP"
- annotations: { }
- topologyKeys: [ ]
- ports: [ ]
+ annotations: {}
+ topologyKeys: []
+ ports: []
externalTrafficPolicy: ""
loadBalancerIP: ""
ipFamilyPolicy: ""
- ipFamilies: [ ]
+ ipFamilies: []
serviceHeadless:
enabled: true
dnsPolicy: ClusterFirst
@@ -376,7 +377,7 @@ vector:
- kubernetes_logs
encoding:
codec: json
- uri: 'http://parseable.parseable.svc.cluster.local/api/v1/ingest'
+ uri: "http://parseable.parseable.svc.cluster.local/api/v1/ingest"
auth:
strategy: basic
user: admin
@@ -386,7 +387,7 @@ vector:
X-P-Stream: vectordemo
healthcheck:
enabled: true
- path: 'http://parseable.parseable.svc.cluster.local/api/v1/liveness'
+ path: "http://parseable.parseable.svc.cluster.local/api/v1/liveness"
port: 80
# Default values for fluent-bit.
@@ -413,7 +414,7 @@ fluent-bit:
tag: latest
serviceAccount:
create: true
- annotations: { }
+ annotations: {}
name:
rbac:
create: true
@@ -424,8 +425,8 @@ fluent-bit:
type: ClusterIP
port: 2020
loadBalancerClass:
- loadBalancerSourceRanges: [ ]
- labels: { }
+ loadBalancerSourceRanges: []
+ labels: {}
livenessProbe:
httpGet:
path: /
@@ -515,9 +516,9 @@ fluent-bit:
Server_Port 80
Username {{ .Values.serverUsername }}
Password {{ .Values.serverPassword }}
- Stream k8s-events
+ Stream k8s-events
- upstream: { }
+ upstream: {}
customParsers: |
[PARSER]
@@ -533,7 +534,7 @@ fluent-bit:
# Regex ^(?\S+Z) stderr F (?\S+ \S+) \[(?\S+)\] (?\d+#\d+): \*(?\d+) (?.*?) client: (?\S+), server: (?\S+), request: "(?\S+) (?\S+) HTTP/\S+", upstream: "(?[^"]+)", host: "(?\S+)"$
# Time_Key timestamp
# Time_Format %Y/%m/%d %H:%M:%S
-
+
# [PARSER]
# Name nginx_access
# Format regex
diff --git a/src/cli.rs b/src/cli.rs
index cfc74d81d..a4bce12b0 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -27,7 +27,7 @@ use crate::connectors::kafka::config::KafkaConfig;
use crate::{
oidc::{self, OpenidConfig},
option::{validation, Compression, Mode},
- storage::{AzureBlobConfig, FSConfig, S3Config},
+ storage::{AzureBlobConfig, FSConfig, GcsConfig, S3Config},
};
/// Default username and password for Parseable server, used by default for local mode.
@@ -80,6 +80,9 @@ pub enum StorageOptions {
#[command(name = "blob-store")]
Blob(BlobStoreArgs),
+
+ #[command(name = "gcs-store")]
+ Gcs(GcsStoreArgs),
}
#[derive(Parser)]
@@ -115,6 +118,17 @@ pub struct BlobStoreArgs {
pub kafka: KafkaConfig,
}
+#[derive(Parser)]
+pub struct GcsStoreArgs {
+ #[command(flatten)]
+ pub options: Options,
+ #[command(flatten)]
+ pub storage: GcsConfig,
+ #[cfg(feature = "kafka")]
+ #[command(flatten)]
+ pub kafka: KafkaConfig,
+}
+
#[derive(Parser, Debug, Default)]
pub struct Options {
// Authentication
@@ -338,7 +352,7 @@ pub struct Options {
#[arg(
long,
- env = "P_MEMORY_THRESHOLD",
+ env = "P_MEMORY_THRESHOLD",
default_value = "80.0",
value_parser = validation::validate_percentage,
help = "Memory utilization threshold percentage (0.0-100.0) for resource monitoring"
diff --git a/src/metrics/storage.rs b/src/metrics/storage.rs
index a91c431cb..f96a317d9 100644
--- a/src/metrics/storage.rs
+++ b/src/metrics/storage.rs
@@ -125,3 +125,42 @@ pub mod azureblob {
}
}
}
+
+pub mod gcs {
+ use crate::{metrics::METRICS_NAMESPACE, storage::GcsConfig};
+ use once_cell::sync::Lazy;
+ use prometheus::{HistogramOpts, HistogramVec};
+
+ use super::StorageMetrics;
+
+ pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| {
+ HistogramVec::new(
+ HistogramOpts::new("gcs_response_time", "GCS Request Latency")
+ .namespace(METRICS_NAMESPACE),
+ &["method", "status"],
+ )
+ .expect("metric can be created")
+ });
+
+ pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| {
+ HistogramVec::new(
+ HistogramOpts::new("query_gcs_response_time", "GCS Request Latency")
+ .namespace(METRICS_NAMESPACE),
+ &["method", "status"],
+ )
+ .expect("metric can be created")
+ });
+
+ impl StorageMetrics for GcsConfig {
+ fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
+ handler
+ .registry
+ .register(Box::new(REQUEST_RESPONSE_TIME.clone()))
+ .expect("metric can be registered");
+ handler
+ .registry
+ .register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone()))
+ .expect("metric can be registered");
+ }
+ }
+}
diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs
index 439f71ee0..87496cfc1 100644
--- a/src/parseable/mod.rs
+++ b/src/parseable/mod.rs
@@ -117,6 +117,12 @@ pub static PARSEABLE: Lazy = Lazy::new(|| match Cli::parse().storage
args.kafka,
Arc::new(args.storage),
),
+ StorageOptions::Gcs(args) => Parseable::new(
+ args.options,
+ #[cfg(feature = "kafka")]
+ args.kafka,
+ Arc::new(args.storage),
+ ),
});
/// All state related to parseable, in one place.
@@ -243,6 +249,8 @@ impl Parseable {
return "S3 bucket";
} else if self.storage.name() == "blob_store" {
return "Azure Blob Storage";
+ } else if self.storage.name() == "gcs" {
+ return "Google Object Store";
}
"Unknown"
}
diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs
new file mode 100644
index 000000000..292bf3f02
--- /dev/null
+++ b/src/storage/gcs.rs
@@ -0,0 +1,671 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::{
+ collections::{BTreeMap, HashSet},
+ path::Path,
+ sync::Arc,
+ time::{Duration, Instant},
+};
+
+use crate::{
+ handlers::http::users::USERS_ROOT_DIR,
+ metrics::storage::{gcs::REQUEST_RESPONSE_TIME, StorageMetrics},
+ parseable::LogStream,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use datafusion::{
+ datasource::listing::ListingTableUrl,
+ execution::{
+ object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl},
+ runtime_env::RuntimeEnvBuilder,
+ },
+};
+use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
+use object_store::{
+ buffered::BufReader,
+ gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder},
+ limit::LimitStore,
+ path::Path as StorePath,
+ BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
+};
+use relative_path::{RelativePath, RelativePathBuf};
+use tokio::{fs::OpenOptions, io::AsyncReadExt};
+use tracing::{error, info};
+
+use super::{
+ metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path,
+ ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS,
+ MIN_MULTIPART_UPLOAD_SIZE, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME,
+ STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
+};
+
+#[derive(Debug, Clone, clap::Args)]
+#[command(
+ name = "GCS config",
+ about = "Start Parseable with GCS or compatible as storage",
+ help_template = "\
+{about-section}
+{all-args}
+"
+)]
+pub struct GcsConfig {
+ /// The endpoint to GCS or compatible object storage platform
+ #[arg(
+ long,
+ env = "P_GCS_URL",
+ value_name = "url",
+ default_value = "https://storage.googleapis.com",
+ required = false
+ )]
+ pub endpoint_url: String,
+
+ /// The GCS or compatible object storage bucket to be used for storage
+ #[arg(
+ long,
+ env = "P_GCS_BUCKET",
+ value_name = "bucket-name",
+ required = true
+ )]
+ pub bucket_name: String,
+
+ /// Set client to skip tls verification
+ #[arg(
+ long,
+ env = "P_GCS_TLS_SKIP_VERIFY",
+ value_name = "bool",
+ default_value = "false"
+ )]
+ pub skip_tls: bool,
+}
+
+impl GcsConfig {
+ fn get_default_builder(&self) -> GoogleCloudStorageBuilder {
+ let mut client_options = ClientOptions::default()
+ .with_allow_http(true)
+ .with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS))
+ .with_timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS));
+
+ if self.skip_tls {
+ client_options = client_options.with_allow_invalid_certificates(true)
+ }
+ let retry_config = RetryConfig {
+ max_retries: 5,
+ retry_timeout: Duration::from_secs(30),
+ backoff: BackoffConfig::default(),
+ };
+
+ let builder = GoogleCloudStorageBuilder::from_env()
+ .with_bucket_name(&self.bucket_name)
+ .with_retry(retry_config);
+
+ builder.with_client_options(client_options)
+ }
+}
+
+impl ObjectStorageProvider for GcsConfig {
+ fn name(&self) -> &'static str {
+ "gcs"
+ }
+
+ fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder {
+ let gcs = self.get_default_builder().build().unwrap();
+
+ // limit objectstore to a concurrent request limit
+ let gcs = LimitStore::new(gcs, super::MAX_OBJECT_STORE_REQUESTS);
+ let gcs = MetricLayer::new(gcs);
+
+ let object_store_registry = DefaultObjectStoreRegistry::new();
+ // Register GCS client under the "gs://" scheme so DataFusion can route
+ // object store calls to our GoogleCloudStorage implementation
+ let url = ObjectStoreUrl::parse(format!("gs://{}", &self.bucket_name)).unwrap();
+ object_store_registry.register_store(url.as_ref(), Arc::new(gcs));
+
+ RuntimeEnvBuilder::new().with_object_store_registry(Arc::new(object_store_registry))
+ }
+
+ fn construct_client(&self) -> Arc {
+ let gcs = self.get_default_builder().build().unwrap();
+
+ Arc::new(Gcs {
+ client: Arc::new(gcs),
+ bucket: self.bucket_name.clone(),
+ root: StorePath::from(""),
+ })
+ }
+
+ fn get_endpoint(&self) -> String {
+ format!("{}/{}", self.endpoint_url, self.bucket_name)
+ }
+
+ fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
+ self.register_metrics(handler);
+ }
+
+ fn get_object_store(&self) -> Arc {
+ static STORE: once_cell::sync::OnceCell> =
+ once_cell::sync::OnceCell::new();
+
+ STORE.get_or_init(|| self.construct_client()).clone()
+ }
+}
+
+#[derive(Debug)]
+pub struct Gcs {
+ client: Arc,
+ bucket: String,
+ root: StorePath,
+}
+
+impl Gcs {
+ async fn _get_object(&self, path: &RelativePath) -> Result {
+ let instant = Instant::now();
+
+ let resp = self.client.get(&to_object_store_path(path)).await;
+
+ match resp {
+ Ok(resp) => {
+ let time = instant.elapsed().as_secs_f64();
+ REQUEST_RESPONSE_TIME
+ .with_label_values(&["GET", "200"])
+ .observe(time);
+ let body = resp.bytes().await.unwrap();
+ Ok(body)
+ }
+ Err(err) => {
+ let time = instant.elapsed().as_secs_f64();
+ REQUEST_RESPONSE_TIME
+ .with_label_values(&["GET", "400"])
+ .observe(time);
+ Err(err.into())
+ }
+ }
+ }
+
+ async fn _put_object(
+ &self,
+ path: &RelativePath,
+ resource: PutPayload,
+ ) -> Result<(), ObjectStorageError> {
+ let time = Instant::now();
+ let resp = self.client.put(&to_object_store_path(path), resource).await;
+ let status = if resp.is_ok() { "200" } else { "400" };
+ let time = time.elapsed().as_secs_f64();
+ REQUEST_RESPONSE_TIME
+ .with_label_values(&["PUT", status])
+ .observe(time);
+
+ if let Err(object_store::Error::NotFound { source, .. }) = &resp {
+ let source_str = source.to_string();
+ if source_str.contains("NoSuchBucket
") {
+ return Err(ObjectStorageError::Custom(
+ format!("Bucket '{}' does not exist in GCS.", self.bucket).to_string(),
+ ));
+ }
+ }
+
+ resp.map(|_| ()).map_err(|err| err.into())
+ }
+
+ async fn _delete_prefix(&self, key: &str) -> Result<(), ObjectStorageError> {
+ let object_stream = self.client.list(Some(&(key.into())));
+
+ object_stream
+ .for_each_concurrent(None, |x| async {
+ match x {
+ Ok(obj) => {
+ if (self.client.delete(&obj.location).await).is_err() {
+ error!("Failed to fetch object during delete stream");
+ }
+ }
+ Err(_) => {
+ error!("Failed to fetch object during delete stream");
+ }
+ };
+ })
+ .await;
+
+ Ok(())
+ }
+
+ async fn _list_streams(&self) -> Result, ObjectStorageError> {
+ let mut result_file_list = HashSet::new();
+ let resp = self.client.list_with_delimiter(None).await?;
+
+ let streams = resp
+ .common_prefixes
+ .iter()
+ .flat_map(|path| path.parts())
+ .map(|name| name.as_ref().to_string())
+ .filter(|name| name != PARSEABLE_ROOT_DIRECTORY && name != USERS_ROOT_DIR)
+ .collect::>();
+
+ for stream in streams {
+ let stream_path =
+ object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY));
+ let resp = self.client.list_with_delimiter(Some(&stream_path)).await?;
+ if resp
+ .objects
+ .iter()
+ .any(|name| name.location.filename().unwrap().ends_with("stream.json"))
+ {
+ result_file_list.insert(stream);
+ }
+ }
+
+ Ok(result_file_list)
+ }
+
+ async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> {
+ let resp = self
+ .client
+ .list_with_delimiter(Some(&(stream.into())))
+ .await?;
+
+ let common_prefixes = resp.common_prefixes;
+
+ // return prefixes at the root level
+ let dates: Vec<_> = common_prefixes
+ .iter()
+ .filter_map(|path| path.as_ref().strip_prefix(&format!("{stream}/")))
+ .map(String::from)
+ .collect();
+
+ Ok(dates)
+ }
+
+ async fn _list_manifest_files(
+ &self,
+ stream: &str,
+ ) -> Result>, ObjectStorageError> {
+ let mut result_file_list: BTreeMap> = BTreeMap::new();
+ let resp = self
+ .client
+ .list_with_delimiter(Some(&(stream.into())))
+ .await?;
+
+ let dates = resp
+ .common_prefixes
+ .iter()
+ .flat_map(|path| path.parts())
+ .filter(|name| name.as_ref() != stream && name.as_ref() != STREAM_ROOT_DIRECTORY)
+ .map(|name| name.as_ref().to_string())
+ .collect::>();
+ for date in dates {
+ let date_path = object_store::path::Path::from(format!("{}/{}", stream, &date));
+ let resp = self.client.list_with_delimiter(Some(&date_path)).await?;
+ let manifests: Vec = resp
+ .objects
+ .iter()
+ .filter(|name| name.location.filename().unwrap().ends_with("manifest.json"))
+ .map(|name| name.location.to_string())
+ .collect();
+ result_file_list.entry(date).or_default().extend(manifests);
+ }
+ Ok(result_file_list)
+ }
+ async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
+ let instant = Instant::now();
+
+ let bytes = tokio::fs::read(path).await?;
+ let result = self.client.put(&key.into(), bytes.into()).await?;
+ info!("Uploaded file to GCS: {:?}", result);
+
+ let time = instant.elapsed().as_secs_f64();
+ REQUEST_RESPONSE_TIME
+ .with_label_values(&["UPLOAD_PARQUET", "200"])
+ .observe(time);
+
+ Ok(())
+ }
+
+ async fn _upload_multipart(
+ &self,
+ key: &RelativePath,
+ path: &Path,
+ ) -> Result<(), ObjectStorageError> {
+ let mut file = OpenOptions::new().read(true).open(path).await?;
+ let location = &to_object_store_path(key);
+
+ let mut async_writer = self.client.put_multipart(location).await?;
+
+ let meta = file.metadata().await?;
+ let total_size = meta.len() as usize;
+ if total_size < MIN_MULTIPART_UPLOAD_SIZE {
+ let mut data = Vec::new();
+ file.read_to_end(&mut data).await?;
+ self.client.put(location, data.into()).await?;
+ return Ok(());
+ } else {
+ let mut data = Vec::new();
+ file.read_to_end(&mut data).await?;
+
+ let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
+ let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE;
+ let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 };
+
+ // Upload each part
+ for part_number in 0..(total_parts) {
+ let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE;
+ let end_pos = if part_number == num_full_parts && has_final_partial_part {
+ // Last part might be smaller than 5MB (which is allowed)
+ total_size
+ } else {
+ // All other parts must be at least 5MB
+ start_pos + MIN_MULTIPART_UPLOAD_SIZE
+ };
+
+ // Extract this part's data
+ let part_data = data[start_pos..end_pos].to_vec();
+
+ // Upload the part
+ async_writer.put_part(part_data.into()).await?;
+ }
+ if let Err(err) = async_writer.complete().await {
+ if let Err(abort_err) = async_writer.abort().await {
+ error!(
+ "Failed to abort multipart upload after completion failure: {:?}",
+ abort_err
+ );
+ }
+ return Err(err.into());
+ };
+ }
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl ObjectStorage for Gcs {
+ async fn get_buffered_reader(
+ &self,
+ path: &RelativePath,
+ ) -> Result {
+ let path = &to_object_store_path(path);
+ let meta = self.client.head(path).await?;
+
+ let store: Arc = self.client.clone();
+ let buf = object_store::buffered::BufReader::new(store, &meta);
+ Ok(buf)
+ }
+ async fn upload_multipart(
+ &self,
+ key: &RelativePath,
+ path: &Path,
+ ) -> Result<(), ObjectStorageError> {
+ self._upload_multipart(key, path).await
+ }
+ async fn head(&self, path: &RelativePath) -> Result {
+ Ok(self.client.head(&to_object_store_path(path)).await?)
+ }
+
+ async fn get_object(&self, path: &RelativePath) -> Result {
+ Ok(self._get_object(path).await?)
+ }
+
+ async fn get_objects(
+ &self,
+ base_path: Option<&RelativePath>,
+ filter_func: Box bool + Send>,
+ ) -> Result, ObjectStorageError> {
+ let instant = Instant::now();
+
+ let prefix = if let Some(base_path) = base_path {
+ to_object_store_path(base_path)
+ } else {
+ self.root.clone()
+ };
+
+ let mut list_stream = self.client.list(Some(&prefix));
+
+ let mut res = vec![];
+
+ while let Some(meta) = list_stream.next().await.transpose()? {
+ let ingestor_file = filter_func(meta.location.filename().unwrap().to_string());
+
+ if !ingestor_file {
+ continue;
+ }
+
+ let byts = self
+ .get_object(
+ RelativePath::from_path(meta.location.as_ref())
+ .map_err(ObjectStorageError::PathError)?,
+ )
+ .await?;
+
+ res.push(byts);
+ }
+
+ let instant = instant.elapsed().as_secs_f64();
+ REQUEST_RESPONSE_TIME
+ .with_label_values(&["GET", "200"])
+ .observe(instant);
+
+ Ok(res)
+ }
+
+ async fn get_ingestor_meta_file_paths(
+ &self,
+ ) -> Result, ObjectStorageError> {
+ let time = Instant::now();
+ let mut path_arr = vec![];
+ let mut object_stream = self.client.list(Some(&self.root));
+
+ while let Some(meta) = object_stream.next().await.transpose()? {
+ let flag = meta.location.filename().unwrap().starts_with("ingestor");
+
+ if flag {
+ path_arr.push(RelativePathBuf::from(meta.location.as_ref()));
+ }
+ }
+
+ let time = time.elapsed().as_secs_f64();
+ REQUEST_RESPONSE_TIME
+ .with_label_values(&["GET", "200"])
+ .observe(time);
+
+ Ok(path_arr)
+ }
+
+ async fn get_stream_file_paths(
+ &self,
+ stream_name: &str,
+ ) -> Result, ObjectStorageError> {
+ let time = Instant::now();
+ let mut path_arr = vec![];
+ let path = to_object_store_path(&RelativePathBuf::from(stream_name));
+ let mut object_stream = self.client.list(Some(&path));
+
+ while let Some(meta) = object_stream.next().await.transpose()? {
+ let flag = meta.location.filename().unwrap().starts_with(".ingestor");
+
+ if flag {
+ path_arr.push(RelativePathBuf::from(meta.location.as_ref()));
+ }
+ }
+
+ path_arr.push(RelativePathBuf::from_iter([
+ stream_name,
+ STREAM_METADATA_FILE_NAME,
+ ]));
+ path_arr.push(RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]));
+
+ let time = time.elapsed().as_secs_f64();
+ REQUEST_RESPONSE_TIME
+ .with_label_values(&["GET", "200"])
+ .observe(time);
+
+ Ok(path_arr)
+ }
+
+ async fn put_object(
+ &self,
+ path: &RelativePath,
+ resource: Bytes,
+ ) -> Result<(), ObjectStorageError> {
+ self._put_object(path, resource.into())
+ .await
+ .map_err(|err| ObjectStorageError::ConnectionError(Box::new(err)))?;
+
+ Ok(())
+ }
+
+ async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError> {
+ self._delete_prefix(path.as_ref()).await?;
+
+ Ok(())
+ }
+
+ async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> {
+ Ok(self.client.delete(&to_object_store_path(path)).await?)
+ }
+
+ async fn check(&self) -> Result<(), ObjectStorageError> {
+ Ok(self
+ .client
+ .head(&to_object_store_path(&parseable_json_path()))
+ .await
+ .map(|_| ())?)
+ }
+
+ async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
+ self._delete_prefix(stream_name).await?;
+
+ Ok(())
+ }
+
+ async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> {
+ let file = RelativePathBuf::from(&node_filename);
+ match self.client.delete(&to_object_store_path(&file)).await {
+ Ok(_) => Ok(()),
+ Err(err) => {
+ // if the object is not found, it is not an error
+ // the given url path was incorrect
+ if matches!(err, object_store::Error::NotFound { .. }) {
+ error!("Node does not exist");
+ Err(err.into())
+ } else {
+ error!("Error deleting node meta file: {:?}", err);
+ Err(err.into())
+ }
+ }
+ }
+ }
+
+ async fn list_streams(&self) -> Result, ObjectStorageError> {
+ self._list_streams().await
+ }
+
+ async fn list_old_streams(&self) -> Result, ObjectStorageError> {
+ let resp = self.client.list_with_delimiter(None).await?;
+
+ let common_prefixes = resp.common_prefixes; // get all dirs
+
+ // return prefixes at the root level
+ let dirs: HashSet<_> = common_prefixes
+ .iter()
+ .filter_map(|path| path.parts().next())
+ .map(|name| name.as_ref().to_string())
+ .filter(|x| x != PARSEABLE_ROOT_DIRECTORY)
+ .collect();
+
+ let stream_json_check = FuturesUnordered::new();
+
+ for dir in &dirs {
+ let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}");
+ let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) };
+ stream_json_check.push(task);
+ }
+
+ stream_json_check.try_collect::<()>().await?;
+
+ Ok(dirs)
+ }
+
+ async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> {
+ let streams = self._list_dates(stream_name).await?;
+
+ Ok(streams)
+ }
+
+ async fn list_manifest_files(
+ &self,
+ stream_name: &str,
+ ) -> Result>, ObjectStorageError> {
+ let files = self._list_manifest_files(stream_name).await?;
+
+ Ok(files)
+ }
+
+ async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
+ self._upload_file(key, path).await?;
+
+ Ok(())
+ }
+
+ fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path {
+ object_store::path::Path::parse(prefix).unwrap()
+ }
+
+ fn query_prefixes(&self, prefixes: Vec) -> Vec {
+ prefixes
+ .into_iter()
+ .map(|prefix| {
+ let path = format!("gs://{}/{}", &self.bucket, prefix);
+ ListingTableUrl::parse(path).unwrap()
+ })
+ .collect()
+ }
+
+ fn store_url(&self) -> url::Url {
+ url::Url::parse(&format!("gs://{}", self.bucket)).unwrap()
+ }
+
+ async fn list_dirs(&self) -> Result, ObjectStorageError> {
+ let pre = object_store::path::Path::from("/");
+ let resp = self.client.list_with_delimiter(Some(&pre)).await?;
+
+ Ok(resp
+ .common_prefixes
+ .iter()
+ .flat_map(|path| path.parts())
+ .map(|name| name.as_ref().to_string())
+ .collect::>())
+ }
+
+ async fn list_dirs_relative(
+ &self,
+ relative_path: &RelativePath,
+ ) -> Result, ObjectStorageError> {
+ let prefix = object_store::path::Path::from(relative_path.as_str());
+ let resp = self.client.list_with_delimiter(Some(&prefix)).await?;
+
+ Ok(resp
+ .common_prefixes
+ .iter()
+ .flat_map(|path| path.parts())
+ .map(|name| name.as_ref().to_string())
+ .collect::>())
+ }
+
+ fn get_bucket_name(&self) -> String {
+ self.bucket.clone()
+ }
+}
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index 29dc2ea13..d160760d5 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -37,6 +37,7 @@ use std::fmt::Debug;
mod azure_blob;
pub mod field_stats;
+mod gcs;
mod localfs;
mod metrics_layer;
pub mod object_storage;
@@ -46,6 +47,7 @@ pub mod store_metadata;
use self::retention::Retention;
pub use azure_blob::AzureBlobConfig;
+pub use gcs::GcsConfig;
pub use localfs::FSConfig;
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::S3Config;