Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept.
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
- Add test for Apache Iceberg integration ([#785]).

### Changed

Expand Down Expand Up @@ -39,6 +40,7 @@ All notable changes to this project will be documented in this file.
[#774]: https://github.com/stackabletech/nifi-operator/pull/774
[#776]: https://github.com/stackabletech/nifi-operator/pull/776
[#782]: https://github.com/stackabletech/nifi-operator/pull/782
[#785]: https://github.com/stackabletech/nifi-operator/pull/785
[#787]: https://github.com/stackabletech/nifi-operator/pull/787
[#789]: https://github.com/stackabletech/nifi-operator/pull/789

Expand Down
20 changes: 17 additions & 3 deletions docs/modules/nifi/pages/usage_guide/writing-to-iceberg-tables.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,27 @@
:description: Write to Apache Iceberg tables in NiFi using the PutIceberg processor. Supports integration with S3 and Hive Metastore for scalable data handling.
:iceberg: https://iceberg.apache.org/

WARNING: In NiFi `2.0.0` Iceberg support https://issues.apache.org/jira/browse/NIFI-13938[has been removed].

{iceberg}[Apache Iceberg] is a high-performance format for huge analytic tables.
Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time.

NiFi supports a `PutIceberg` processor to add rows to an existing Iceberg table https://issues.apache.org/jira/browse/NIFI-10442[starting from version 1.19.0].
As of NiFi version `1.23.1` only `PutIceberg` is supported, you need to create and compact your tables with other tools such as Trino or Spark (both included in the Stackable Data Platform).
As of NiFi version `2.4.0` only `PutIceberg` is supported, you need to create and compact your tables with other tools such as Trino or Spark (both included in the Stackable Data Platform).

== NiFi 2.x

In NiFi `2.0.0` Iceberg support https://issues.apache.org/jira/browse/NIFI-13938[has been removed] from upstream NiFi.

We forked the `nifi-iceberg-bundle` and made it available at https://github.com/stackabletech/nifi-iceberg-bundle.
Starting with SDP 25.7, we have added the necessary bundle to NiFi by default, you don't need to explicitly add Iceberg support to the Stackable NiFi.

Please read on https://github.com/stackabletech/nifi-iceberg-bundle[its documentation] on how to ingest data into Iceberg tables.
You don't need any special configs on the `NiFiCluster` in case you are using S3 and no Kerberos.

HDFS and Kerberos are also supported, please have a look at the https://github.com/stackabletech/nifi-operator/tree/main/tests/templates/kuttl/iceberg[Iceberg integration test] for that.

== NiFi 1.x

Starting with `1.19.0`, NiFi supports writing to Iceberg tables.

The following example shows an example NiFi setup using the Iceberg integration.

Expand Down
8 changes: 8 additions & 0 deletions tests/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@ releases:
operatorVersion: 0.0.0-dev
listener:
operatorVersion: 0.0.0-dev
opa:
operatorVersion: 0.0.0-dev
zookeeper:
operatorVersion: 0.0.0-dev
hdfs:
operatorVersion: 0.0.0-dev
hive:
operatorVersion: 0.0.0-dev
trino:
operatorVersion: 0.0.0-dev
nifi:
operatorVersion: 0.0.0-dev
9 changes: 9 additions & 0 deletions tests/templates/kuttl/iceberg/00-patch-ns.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% if test_scenario['values']['openshift'] == 'true' %}
# see https://github.com/stackabletech/issues/issues/566
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}'
timeout: 120
{% endif %}
29 changes: 29 additions & 0 deletions tests/templates/kuttl/iceberg/00-rbac.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: test-role
rules:
{% if test_scenario['values']['openshift'] == "true" %}
- apiGroups: ["security.openshift.io"]
resources: ["securitycontextconstraints"]
resourceNames: ["privileged"]
verbs: ["use"]
{% endif %}
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: test-sa
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: test-rb
subjects:
- kind: ServiceAccount
name: test-sa
roleRef:
kind: Role
name: test-role
apiGroup: rbac.authorization.k8s.io
5 changes: 5 additions & 0 deletions tests/templates/kuttl/iceberg/01-create-s3-connection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: envsubst '$NAMESPACE' < 01_s3-connection.yaml | kubectl apply -n $NAMESPACE -f -
36 changes: 36 additions & 0 deletions tests/templates/kuttl/iceberg/01_s3-connection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
apiVersion: s3.stackable.tech/v1alpha1
kind: S3Connection
metadata:
name: minio
spec:
host: "minio.${NAMESPACE}.svc.cluster.local"
port: 9000
accessStyle: Path
credentials:
secretClass: s3-credentials-class
tls:
verification:
server:
caCert:
secretClass: tls
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
name: s3-credentials-class
spec:
backend:
k8sSearch:
searchNamespace:
pod: {}
---
apiVersion: v1
kind: Secret
metadata:
name: minio-credentials
labels:
secrets.stackable.tech/class: s3-credentials-class
stringData:
accessKey: admin
secretKey: adminadmin
14 changes: 14 additions & 0 deletions tests/templates/kuttl/iceberg/02-assert.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 300
{% if test_scenario['values']['iceberg-use-kerberos'] == 'true' %}
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: krb5-kdc
status:
readyReplicas: 1
replicas: 1
{% endif %}
146 changes: 146 additions & 0 deletions tests/templates/kuttl/iceberg/02-install-krb5-kdc.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
{% if test_scenario['values']['iceberg-use-kerberos'] == 'true' %}
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: krb5-kdc
spec:
selector:
matchLabels:
app: krb5-kdc
template:
metadata:
labels:
app: krb5-kdc
spec:
serviceAccountName: test-sa
initContainers:
- name: init
image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev
args:
- sh
- -euo
- pipefail
- -c
- |
test -e /var/kerberos/krb5kdc/principal || kdb5_util create -s -P asdf
kadmin.local get_principal -terse root/admin || kadmin.local add_principal -pw asdf root/admin
# stackable-secret-operator principal must match the keytab specified in the SecretClass
kadmin.local get_principal -terse stackable-secret-operator || kadmin.local add_principal -e aes256-cts-hmac-sha384-192:normal -pw asdf stackable-secret-operator
env:
- name: KRB5_CONFIG
value: /stackable/config/krb5.conf
volumeMounts:
- mountPath: /stackable/config
name: config
- mountPath: /var/kerberos/krb5kdc
name: data
containers:
- name: kdc
image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev
args:
- krb5kdc
- -n
env:
- name: KRB5_CONFIG
value: /stackable/config/krb5.conf
volumeMounts:
- mountPath: /stackable/config
name: config
- mountPath: /var/kerberos/krb5kdc
name: data
# Root permissions required on Openshift to access internal ports
{% if test_scenario['values']['openshift'] == "true" %}
securityContext:
runAsUser: 0
{% endif %}
- name: kadmind
image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev
args:
- kadmind
- -nofork
env:
- name: KRB5_CONFIG
value: /stackable/config/krb5.conf
volumeMounts:
- mountPath: /stackable/config
name: config
- mountPath: /var/kerberos/krb5kdc
name: data
# Root permissions required on Openshift to access internal ports
{% if test_scenario['values']['openshift'] == "true" %}
securityContext:
runAsUser: 0
{% endif %}
- name: client
image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev
tty: true
stdin: true
env:
- name: KRB5_CONFIG
value: /stackable/config/krb5.conf
volumeMounts:
- mountPath: /stackable/config
name: config
volumes:
- name: config
configMap:
name: krb5-kdc
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: v1
kind: Service
metadata:
name: krb5-kdc
spec:
selector:
app: krb5-kdc
ports:
- name: kadmin
port: 749
- name: kdc
port: 88
- name: kdc-udp
port: 88
protocol: UDP
---
apiVersion: v1
kind: ConfigMap
metadata:
name: krb5-kdc
data:
krb5.conf: |
[logging]
default = STDERR
kdc = STDERR
admin_server = STDERR
# default = FILE:/var/log/krb5libs.log
# kdc = FILE:/var/log/krb5kdc.log
# admin_server = FILE:/vaggr/log/kadmind.log
[libdefaults]
dns_lookup_realm = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
rdns = false
default_realm = {{ test_scenario['values']['kerberos-realm'] }}
spake_preauth_groups = edwards25519
[realms]
{{ test_scenario['values']['kerberos-realm'] }} = {
acl_file = /stackable/config/kadm5.acl
disable_encrypted_timestamp = false
}
[domain_realm]
.cluster.local = {{ test_scenario['values']['kerberos-realm'] }}
cluster.local = {{ test_scenario['values']['kerberos-realm'] }}
kadm5.acl: |
root/admin *e
stackable-secret-operator *e
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
{% if test_scenario['values']['iceberg-use-kerberos'] == 'true' %}
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
# We need to replace $NAMESPACE (by KUTTL)
- script: envsubst '$NAMESPACE' < 03_kerberos-secretclass.yaml | kubectl apply -n $NAMESPACE -f -
{% endif %}
33 changes: 33 additions & 0 deletions tests/templates/kuttl/iceberg/03_kerberos-secretclass.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
name: kerberos-$NAMESPACE
spec:
backend:
kerberosKeytab:
realmName: {{ test_scenario['values']['kerberos-realm'] }}
kdc: krb5-kdc.$NAMESPACE.svc.cluster.local
admin:
mit:
kadminServer: krb5-kdc.$NAMESPACE.svc.cluster.local
adminKeytabSecret:
namespace: $NAMESPACE
name: secret-operator-keytab
adminPrincipal: stackable-secret-operator
---
apiVersion: v1
kind: Secret
metadata:
name: secret-operator-keytab
data:
# To create keytab. When promted enter password asdf
# cat | ktutil << 'EOF'
# list
# add_entry -password -p [email protected] -k 1 -e aes256-cts-hmac-sha384-192
# wkt /tmp/keytab
# EOF
{% if test_scenario['values']['kerberos-realm'] == 'CLUSTER.LOCAL' %}
keytab: BQIAAABdAAEADUNMVVNURVIuTE9DQUwAGXN0YWNrYWJsZS1zZWNyZXQtb3BlcmF0b3IAAAABZAYWIgEAFAAgm8MCZ8B//XF1tH92GciD6/usWUNAmBTZnZQxLua2TkgAAAAB
{% elif test_scenario['values']['kerberos-realm'] == 'PROD.MYCORP' %}
keytab: BQIAAABbAAEAC1BST0QuTVlDT1JQABlzdGFja2FibGUtc2VjcmV0LW9wZXJhdG9yAAAAAWQZa0EBABQAIC/EnFNejq/K5lX6tX+B3/tkI13TCzkPB7d2ggCIEzE8AAAAAQ==
{% endif %}
10 changes: 10 additions & 0 deletions tests/templates/kuttl/iceberg/10-assert.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-aggregator-discovery
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-aggregator-discovery
data:
ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }}
{% endif %}
19 changes: 19 additions & 0 deletions tests/templates/kuttl/iceberg/20-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 600
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: minio
status:
readyReplicas: 1
replicas: 1
---
apiVersion: batch/v1
kind: Job
metadata:
name: minio-post-job
status:
succeeded: 1
5 changes: 5 additions & 0 deletions tests/templates/kuttl/iceberg/20-install-minio.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: kubectl -n $NAMESPACE apply -f https://raw.githubusercontent.com/stackabletech/demos/refs/heads/release-25.3/stacks/_templates/minio-tls/rendered-chart.yaml
Loading
Loading