Skip to content

Commit b4d1152

Browse files
committed
improve k8s log handling
1 parent 637be27 commit b4d1152

File tree

2 files changed

+56
-58
lines changed

2 files changed

+56
-58
lines changed

pkg/container/kubernetes.go

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -50,31 +50,17 @@ func (w *K8sWrapper) Pull(_ context.Context, _ string, _ types.ImagePullOptions)
5050
}
5151

5252
func (w *K8sWrapper) getCurrentNamespace() (string, error) {
53-
w.logger.Info("Getting current namespace from service account")
5453
b, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
5554
if err != nil {
5655
w.logger.Warnf("Failed to read namespace from service account: %v", err)
5756
return "", err
5857
}
5958
namespace := string(b)
60-
w.logger.Infof("Current namespace: %s", namespace)
6159
return namespace, nil
6260
}
6361

6462
// Helper method to create a job
6563
func (w *K8sWrapper) createJob(ctx context.Context, opts K8sOpts) (*batchv1.Job, error) {
66-
w.logger.Info("Creating job with options")
67-
68-
if opts.Namespace == "" {
69-
ns, err := w.getCurrentNamespace()
70-
if err != nil {
71-
ns = "default"
72-
w.logger.Infof("Using default namespace due to error: %v", err)
73-
}
74-
opts.Namespace = ns
75-
w.logger.Infof("Using namespace: %s", opts.Namespace)
76-
}
77-
7864
if opts.PodName == "" {
7965
opts.PodName = "transferia-runner"
8066
w.logger.Info("Using default job name: transferia-runner")
@@ -203,11 +189,17 @@ func (w *K8sWrapper) ensureSecret(ctx context.Context, namespace string, secret
203189
}
204190

205191
func (w *K8sWrapper) Run(ctx context.Context, opts ContainerOpts) (stdout io.ReadCloser, stderr io.ReadCloser, err error) {
206-
w.logger.Info("Running container in Kubernetes using Job")
207-
208192
// Convert options to K8s options
209193
k8sOpts := opts.ToK8sOpts()
210194

195+
if k8sOpts.Namespace == "" {
196+
ns, err := w.getCurrentNamespace()
197+
if err != nil {
198+
ns = "default"
199+
}
200+
k8sOpts.Namespace = ns
201+
}
202+
211203
// Create secrets if needed
212204
if len(k8sOpts.Secrets) > 0 {
213205
w.logger.Infof("Creating %d secrets for job %s", len(k8sOpts.Secrets), k8sOpts.PodName)
@@ -231,28 +223,25 @@ func (w *K8sWrapper) Run(ctx context.Context, opts ContainerOpts) (stdout io.Rea
231223
}
232224

233225
// Create the job
234-
w.logger.Infof("Creating job %s in namespace %s", k8sOpts.PodName, k8sOpts.Namespace)
235226
job, err := w.createJob(ctx, k8sOpts)
236227
if err != nil {
237228
w.logger.Errorf("Failed to create job %s: %v", k8sOpts.PodName, err)
238229
return nil, nil, xerrors.Errorf("failed to create job: %w", err)
239230
}
240-
w.logger.Infof("Job %s created successfully in namespace %s", job.Name, job.Namespace)
241231

242232
// Find the pod created by the job
243233
pod, err := w.findJobPod(ctx, job)
244234
if err != nil {
245235
w.logger.Errorf("Failed to find pod for job %s: %v", job.Name, err)
246236
return nil, nil, xerrors.Errorf("failed to find pod for job: %w", err)
247237
}
248-
w.logger.Infof("Found pod %s for job %s", pod.Name, job.Name)
249238

250239
// Channel to signal when log streaming is done
251240
logStreamingDone := make(chan struct{})
252241

253242
// Wait for pod to reach Running state before trying to stream logs
254243
w.logger.Infof("Waiting for pod %s to be ready", pod.Name)
255-
if err := w.waitForPodReady(ctx, pod.GetNamespace(), pod.GetName(), 30*time.Second); err != nil {
244+
if err := w.waitForPodReady(ctx, pod.GetNamespace(), pod.GetName(), 30*time.Minute); err != nil {
256245
// If pod can't get to running state, try to get logs anyway
257246
// but log a warning
258247
w.logger.Warnf("Warning: pod %s may not be ready: %v", pod.GetName(), err)
@@ -329,8 +318,6 @@ func (w *K8sWrapper) waitForPodReady(ctx context.Context, namespace, name string
329318

330319
// RunAndWait creates a job and waits for it to complete, collecting logs
331320
func (w *K8sWrapper) RunAndWait(ctx context.Context, opts ContainerOpts) (*bytes.Buffer, *bytes.Buffer, error) {
332-
w.logger.Info("Running container with job and waiting for completion")
333-
334321
stdoutReader, _, err := w.Run(ctx, opts)
335322
if err != nil {
336323
w.logger.Errorf("Failed to run container: %v", err)

pkg/providers/airbyte/storage.go

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,14 @@ type Storage struct {
4343
cw container.ContainerImpl
4444
}
4545

46-
func (a *Storage) Close() {}
46+
func (a *Storage) Close() {
47+
dir, err := a.getFilesDir()
48+
if err == nil {
49+
if err := os.RemoveAll(dir); err != nil {
50+
a.logger.Warnf("unable to remove temp dir: %s", dir)
51+
}
52+
}
53+
}
4754

4855
func (a *Storage) Ping() error {
4956
return a.check()
@@ -313,22 +320,31 @@ func (a *Storage) parse(data []byte) (*Message, []string) {
313320
return res, logs
314321
}
315322

323+
func (a *Storage) getFilesDir() (string, error) {
324+
switch a.cw.Type() {
325+
case container.BackendDocker:
326+
return a.config.DataDir(), nil
327+
case container.BackendKubernetes:
328+
tempDir := filepath.Join(os.TempDir(), "airbyte-secrets", a.transfer.ID)
329+
if err := os.MkdirAll(tempDir, 0o755); err != nil && !os.IsExist(err) {
330+
return "", xerrors.Errorf("unable to create temp directory: %w", err)
331+
}
332+
return tempDir, nil
333+
}
334+
335+
return "", xerrors.New("unknown container backend")
336+
}
337+
316338
func (a *Storage) writeFile(fileName, fileData string) error {
317339
var fullPath string
318340

319-
// Check if running in Kubernetes by checking the container wrapper type
320-
if a.cw.Type() == container.BackendKubernetes {
321-
// Use temporary directory for Kubernetes
322-
tempDir := "/tmp/airbyte-secrets"
323-
if err := os.MkdirAll(tempDir, 0o755); err != nil {
324-
return xerrors.Errorf("unable to create temp directory: %w", err)
325-
}
326-
fullPath = filepath.Join(tempDir, fileName)
327-
} else {
328-
// Use regular data directory for non-Kubernetes environments
329-
fullPath = filepath.Join(a.config.DataDir(), fileName)
341+
dir, err := a.getFilesDir()
342+
if err != nil {
343+
return xerrors.Errorf("unable to get files dir: %w", err)
330344
}
331345

346+
fullPath = filepath.Join(dir, fileName)
347+
332348
a.logger.Debugf("%s -> \n%s", fileName, fileData)
333349
defer a.logger.Infof("file(%s) %s written", format.SizeInt(len(fileData)), fullPath)
334350
return os.WriteFile(
@@ -419,32 +435,41 @@ func (a *Storage) baseOpts() container.ContainerOpts {
419435
AutoRemove: true,
420436
}
421437

422-
// Check if running in Kubernetes
423-
if a.cw.Type() == container.BackendKubernetes {
424-
// Check if temporary files exist
425-
tempDir := "/tmp/airbyte-secrets"
426-
if _, err := os.Stat(tempDir); err == nil {
438+
dir, err := a.getFilesDir()
439+
if err != nil {
440+
a.logger.Errorf("unable to specify data dir: %w", err)
441+
}
442+
443+
switch a.cw.Type() {
444+
case container.BackendDocker:
445+
opts.Volumes = []container.Volume{
446+
{
447+
Name: "data",
448+
HostPath: dir,
449+
ContainerPath: "/data",
450+
VolumeType: "bind",
451+
},
452+
}
453+
case container.BackendKubernetes:
454+
if _, err := os.Stat(dir); err == nil {
427455
// Create a unique secret name based on the transfer ID
428456
secretName := fmt.Sprintf("airbyte-secret-%s", a.transfer.ID)
429457

430458
// Create a map to store file contents for the secret
431459
secretData := make(map[string][]byte)
432460

433461
// Read all files from the temporary directory
434-
files, err := os.ReadDir(tempDir)
462+
files, err := os.ReadDir(dir)
435463
if err == nil && len(files) > 0 {
436464
for _, file := range files {
437465
if !file.IsDir() {
438-
filePath := filepath.Join(tempDir, file.Name())
466+
filePath := filepath.Join(dir, file.Name())
439467
data, err := os.ReadFile(filePath)
440468
if err == nil {
441469
secretData[file.Name()] = data
442470
} else {
443471
a.logger.Warnf("Failed to read file %s: %v", filePath, err)
444472
}
445-
446-
// Clean up the temporary file
447-
_ = os.Remove(filePath)
448473
}
449474
}
450475

@@ -466,25 +491,11 @@ func (a *Storage) baseOpts() container.ContainerOpts {
466491
},
467492
}
468493

469-
// Clean up the temporary directory
470-
_ = os.RemoveAll(tempDir)
471-
472-
a.logger.Infof("Created Kubernetes secret %s with %d files", secretName, len(secretData))
473494
return opts
474495
}
475496
}
476497
}
477498

478-
// Default volume configuration for non-Kubernetes or when no temp files exist
479-
opts.Volumes = []container.Volume{
480-
{
481-
Name: "data",
482-
HostPath: a.config.DataDir(),
483-
ContainerPath: "/data",
484-
VolumeType: "bind",
485-
},
486-
}
487-
488499
return opts
489500
}
490501

0 commit comments

Comments
 (0)