Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/v1/sync_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ type SyncSinkExternal struct {
SecretKey SyncSinkValue `json:"secretKey,omitempty"`
FilesFrom *SyncFilesFrom `json:"filesFrom,omitempty"`

// KRB5Keytab specifies the path to the Kerberos keytab file for HDFS authentication.
KRB5Keytab SyncSinkValue `json:"krb5Keytab,omitempty"`
// KRB5KeytabBase64 contains the base64-encoded Kerberos keytab content for HDFS authentication.
KRB5KeytabBase64 SyncSinkValue `json:"krb5KeytabBase64,omitempty"`
// KRB5Principal is the Kerberos principal name used for HDFS authentication.
KRB5Principal SyncSinkValue `json:"krb5Principal,omitempty"`

ExtraVolumes []ExtraVolume `json:"extraVolumes,omitempty"`
}

Expand Down
3 changes: 3 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

360 changes: 360 additions & 0 deletions config/crd/bases/juicefs.io_cronsyncs.yaml

Large diffs are not rendered by default.

360 changes: 360 additions & 0 deletions config/crd/bases/juicefs.io_syncs.yaml

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions internal/controller/sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ func (r *SyncReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, r.updateStatus(ctx, sync)
}

// Reject sync when both from and to are HDFS
if strings.HasPrefix(from.Uri, "hdfs://") && strings.HasPrefix(to.Uri, "hdfs://") {
err := fmt.Errorf("HDFS-to-HDFS sync is not supported")
l.Error(err, "")
sync.Status.Phase = juicefsiov1.SyncPhaseFailed
sync.Status.Reason = err.Error()
return ctrl.Result{}, r.updateStatus(ctx, sync)
}

if strings.HasSuffix(from.Uri, "/") != strings.HasSuffix(to.Uri, "/") {
err := fmt.Errorf("FROM and TO should both end with path separator or not")
l.Error(err, "")
Expand Down
78 changes: 78 additions & 0 deletions internal/controller/sync_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,82 @@ var _ = Describe("Sync Controller", func() {
// Example: If you expect a certain status condition after reconciliation, verify it here.
})
})

Context("When rejecting HDFS-to-HDFS sync", func() {
const hdfsToHdfsResourceName = "hdfs-to-hdfs-sync"

ctx := context.Background()

typeNamespacedName := types.NamespacedName{
Name: hdfsToHdfsResourceName,
Namespace: "default",
}

BeforeEach(func() {
By("creating a Sync resource with both from and to as HDFS")
resource := &juicefsiov1.Sync{
ObjectMeta: metav1.ObjectMeta{
Name: hdfsToHdfsResourceName,
Namespace: "default",
},
Spec: juicefsiov1.SyncSpec{
Image: "juicedata/mount:ce-v1.3.0",
From: juicefsiov1.SyncSink{
External: &juicefsiov1.SyncSinkExternal{
Uri: "hdfs://example.com/user/source/",
KRB5Principal: juicefsiov1.SyncSinkValue{
Value: "hdfs/[email protected]",
},
KRB5KeytabBase64: juicefsiov1.SyncSinkValue{
Value: "dGVzdC1rZXl0YWItYmFzZTY0",
},
},
},
To: juicefsiov1.SyncSink{
External: &juicefsiov1.SyncSinkExternal{
Uri: "hdfs://example.com/user/destination/",
KRB5Principal: juicefsiov1.SyncSinkValue{
Value: "hdfs/[email protected]",
},
KRB5KeytabBase64: juicefsiov1.SyncSinkValue{
Value: "dGVzdC1rZXl0YWItYmFzZTY0",
},
},
},
},
}
err := k8sClient.Get(ctx, typeNamespacedName, &juicefsiov1.Sync{})
if err != nil && errors.IsNotFound(err) {
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
}
})

AfterEach(func() {
By("cleaning up the HDFS-to-HDFS sync resource")
resource := &juicefsiov1.Sync{}
err := k8sClient.Get(ctx, typeNamespacedName, resource)
if err == nil {
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
}
})

It("should reject sync and set status to Failed with appropriate reason", func() {
By("reconciling the HDFS-to-HDFS sync resource")
controllerReconciler := &SyncReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())

By("verifying the sync status is set to Failed")
sync := &juicefsiov1.Sync{}
Expect(k8sClient.Get(ctx, typeNamespacedName, sync)).To(Succeed())
Expect(sync.Status.Phase).To(Equal(juicefsiov1.SyncPhaseFailed))
Expect(sync.Status.Reason).To(ContainSubstring("HDFS-to-HDFS sync is not supported"))
})
})
})
10 changes: 10 additions & 0 deletions pkg/builder/sync_secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ func genExternalSecretData(external *juicefsiov1.SyncSinkExternal, suffix string
if external.SecretKey.Value != "" {
data[fmt.Sprintf("%s_SECRET_KEY", suffix)] = external.SecretKey.Value
}
// HDFS Kerberos credentials
if external.KRB5Keytab.Value != "" {
data[fmt.Sprintf("%s_KRB5KEYTAB", suffix)] = external.KRB5Keytab.Value
}
if external.KRB5KeytabBase64.Value != "" {
data[fmt.Sprintf("%s_KRB5KEYTAB_BASE64", suffix)] = external.KRB5KeytabBase64.Value
}
if external.KRB5Principal.Value != "" {
data[fmt.Sprintf("%s_KRB5PRINCIPAL", suffix)] = external.KRB5Principal.Value
}
return data
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/utils/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,30 @@ func parseExternalSyncSink(external *juicefsiov1.SyncSinkExternal, syncName, ref
if len(pss.Envs) == 2 {
ep.User = url.UserPassword(fmt.Sprintf("$%s", ak), fmt.Sprintf("$%s", sk))
}
if ep.Scheme == "hdfs" {
krb5Keytab := fmt.Sprintf("EXTERNAL_%s_%s", strings.ToUpper(ref), "KRB5KEYTAB")
krb5KeytabBase64 := fmt.Sprintf("EXTERNAL_%s_%s", strings.ToUpper(ref), "KRB5KEYTAB_BASE64")
krb5Principal := fmt.Sprintf("EXTERNAL_%s_%s", strings.ToUpper(ref), "KRB5PRINCIPAL")

hasKeytab := external.KRB5Keytab.Value != "" || external.KRB5Keytab.ValueFrom != nil
hasKeytabBase64 := external.KRB5KeytabBase64.Value != "" || external.KRB5KeytabBase64.ValueFrom != nil
if hasKeytab && hasKeytabBase64 {
return nil, fmt.Errorf("krb5Keytab and krb5KeytabBase64 are mutually exclusive. Please specify only one keytab method for the %s field", strings.ToUpper(ref))
}
if hasKeytab {
pss.Envs = append(pss.Envs, parseSinkValueToEnv(external.KRB5Keytab, syncName, krb5Keytab)...)
// Export to standard env var name for juicefs sync compatibility
pss.PrepareCommand += fmt.Sprintf("export %s=$%s\n", "KRB5KEYTAB", krb5Keytab)
}
if hasKeytabBase64 {
pss.Envs = append(pss.Envs, parseSinkValueToEnv(external.KRB5KeytabBase64, syncName, krb5KeytabBase64)...)
pss.PrepareCommand += fmt.Sprintf("export %s=$%s\n", "KRB5KEYTAB_BASE64", krb5KeytabBase64)
}
if external.KRB5Principal.Value != "" || external.KRB5Principal.ValueFrom != nil {
pss.Envs = append(pss.Envs, parseSinkValueToEnv(external.KRB5Principal, syncName, krb5Principal)...)
pss.PrepareCommand += fmt.Sprintf("export %s=$%s\n", "KRB5PRINCIPAL", krb5Principal)
}
}
pss.Uri = ep.String()
pss.ExtraVolumes = external.ExtraVolumes
return pss, nil
Expand Down
97 changes: 97 additions & 0 deletions pkg/utils/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,103 @@ func TestParseSyncSink(t *testing.T) {
want: nil,
wantErr: true,
},
{
name: "HDFS sink with KRB5Principal and KRB5Keytab",
sink: juicefsiov1.SyncSink{
External: &juicefsiov1.SyncSinkExternal{
Uri: "hdfs://example.com/user/test/",
KRB5Principal: juicefsiov1.SyncSinkValue{
Value: "hdfs/[email protected]",
},
KRB5Keytab: juicefsiov1.SyncSinkValue{
Value: "/root/keytab/keytab",
},
},
},
syncName: "sync4",
ref: "FROM",
want: &juicefsiov1.ParsedSyncSink{
Uri: "hdfs://example.com/user/test/",
Envs: []corev1.EnvVar{
{
Name: "EXTERNAL_FROM_KRB5KEYTAB",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: common.GenSyncSecretName("sync4"),
},
Key: "EXTERNAL_FROM_KRB5KEYTAB",
},
},
},
{
Name: "EXTERNAL_FROM_KRB5PRINCIPAL",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: common.GenSyncSecretName("sync4"),
},
Key: "EXTERNAL_FROM_KRB5PRINCIPAL",
},
},
},
},
PrepareCommand: "export KRB5KEYTAB=$EXTERNAL_FROM_KRB5KEYTAB\nexport KRB5PRINCIPAL=$EXTERNAL_FROM_KRB5PRINCIPAL\n",
},
wantErr: false,
},
{
name: "HDFS sink with KRB5Principal and KRB5KeytabBase64",
sink: juicefsiov1.SyncSink{
External: &juicefsiov1.SyncSinkExternal{
Uri: "hdfs://example.com/user/test/",
KRB5Principal: juicefsiov1.SyncSinkValue{
Value: "hdfs/[email protected]",
},
KRB5KeytabBase64: juicefsiov1.SyncSinkValue{
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "keytabbase64-secret",
},
Key: "KRB5KEYTAB_BASE64",
},
},
},
},
},
syncName: "sync5",
ref: "FROM",
want: &juicefsiov1.ParsedSyncSink{
Uri: "hdfs://example.com/user/test/",
Envs: []corev1.EnvVar{
{
Name: "EXTERNAL_FROM_KRB5KEYTAB_BASE64",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "keytabbase64-secret",
},
Key: "KRB5KEYTAB_BASE64",
},
},
},
{
Name: "EXTERNAL_FROM_KRB5PRINCIPAL",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: common.GenSyncSecretName("sync5"),
},
Key: "EXTERNAL_FROM_KRB5PRINCIPAL",
},
},
},
},
PrepareCommand: "export KRB5KEYTAB_BASE64=$EXTERNAL_FROM_KRB5KEYTAB_BASE64\nexport KRB5PRINCIPAL=$EXTERNAL_FROM_KRB5PRINCIPAL\n",
},
wantErr: false,
},
}

for _, tt := range tests {
Expand Down
Loading