Skip to content

Commit 19df13f

Browse files
committed
improve k8s log handling
1 parent b25e95c commit 19df13f

File tree

2 files changed

+15
-18
lines changed

2 files changed

+15
-18
lines changed

pkg/container/kubernetes.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,7 @@ func (w *K8sWrapper) createJob(ctx context.Context, opts K8sOpts, suspend bool)
124124
return nil, err
125125
}
126126

127-
w.logger.Infof("Successfully created job %s in namespace %s (suspended: %v)",
128-
createdJob.Name, createdJob.Namespace, suspend)
127+
w.logger.Infof("Successfully created job %s", createdJob.Name)
129128

130129
return createdJob, nil
131130
}
@@ -188,7 +187,6 @@ func (w *K8sWrapper) ensureSecret(ctx context.Context, namespace string, secret
188187
_, err := w.client.CoreV1().Secrets(namespace).Get(ctx, secret.Name, metav1.GetOptions{})
189188
if err != nil {
190189
// Secret doesn't exist, create it
191-
w.logger.Infof("Secret %s not found, creating it", secret.Name)
192190
_, err = w.client.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
193191
if err != nil {
194192
w.logger.Errorf("Failed to create secret %s: %v", secret.Name, err)
@@ -197,7 +195,6 @@ func (w *K8sWrapper) ensureSecret(ctx context.Context, namespace string, secret
197195
w.logger.Infof("Successfully created secret %s", secret.Name)
198196
} else {
199197
// Secret exists, update it
200-
w.logger.Infof("Secret %s exists, updating it", secret.Name)
201198
_, err = w.client.CoreV1().Secrets(namespace).Update(ctx, secret, metav1.UpdateOptions{})
202199
if err != nil {
203200
w.logger.Errorf("Failed to update secret %s: %v", secret.Name, err)
@@ -220,7 +217,6 @@ func (w *K8sWrapper) Run(ctx context.Context, opts ContainerOpts) (stdout io.Rea
220217
}
221218

222219
if len(k8sOpts.Secrets) > 0 {
223-
w.logger.Infof("Preparing %d secrets for job %s", len(k8sOpts.Secrets), k8sOpts.PodName)
224220
for _, secret := range k8sOpts.Secrets {
225221
k8sSecret := &corev1.Secret{
226222
ObjectMeta: metav1.ObjectMeta{
@@ -248,8 +244,6 @@ func (w *K8sWrapper) Run(ctx context.Context, opts ContainerOpts) (stdout io.Rea
248244
}
249245

250246
if len(secretsToCreate) > 0 {
251-
w.logger.Infof("Creating %d secrets with owner job %s", len(secretsToCreate), job.Name)
252-
253247
// Create owner reference
254248
ownerRef := &metav1.OwnerReference{
255249
APIVersion: "batch/v1",
@@ -304,7 +298,7 @@ func (w *K8sWrapper) Run(ctx context.Context, opts ContainerOpts) (stdout io.Rea
304298
logStreamingDone := make(chan struct{})
305299

306300
// Wait for pod to reach Running / Completed / Failed state before trying to stream logs
307-
w.logger.Infof("Waiting for pod %s to be ready", pod.Name)
301+
w.logger.Infof("Waiting for pod %s to be ready...", pod.Name)
308302
if err := w.waitForPodReady(ctx, pod.GetNamespace(), pod.GetName(), 30*time.Minute); err != nil {
309303
// If pod can't get to running state, delete the job and return the error
310304
w.logger.Errorf("Pod %s failed to become ready after 30 minutes: %v", pod.GetName(), err)
@@ -462,7 +456,6 @@ func (w *K8sWrapper) RunAndWait(ctx context.Context, opts ContainerOpts) (*bytes
462456
}
463457
}()
464458

465-
w.logger.Info("Container started, collecting logs")
466459
stdoutBuf := new(bytes.Buffer)
467460

468461
// Copy logs to buffer

pkg/providers/airbyte/storage.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ func (a *Storage) LoadTable(ctx context.Context, table abstract.TableDescription
9595
return xerrors.Errorf("%s unable to start: %w", table.ID().String(), err)
9696
}
9797
defer stdoutReader.Close()
98-
defer stderrReader.Close()
98+
if stderrReader != nil {
99+
defer stderrReader.Close()
100+
}
99101

100102
var batch *RecordBatch
101103
cntr := 0
@@ -172,14 +174,16 @@ func (a *Storage) LoadTable(ctx context.Context, table abstract.TableDescription
172174
return xerrors.Errorf("unable to store incremental state: %w", err)
173175
}
174176

175-
// Read stderr to completion to ensure the container process is waited upon
176-
stderrBuf := new(bytes.Buffer)
177-
_, err = io.Copy(stderrBuf, stderrReader)
178-
if err != nil {
179-
return xerrors.Errorf("%s stderr read failed: %w", table.ID().String(), err)
180-
}
181-
if stderrBuf.Len() > 0 {
182-
a.logger.Warnf("stderr: %v\nlast error:%v", stderrBuf.String(), lastAirbyteError)
177+
if stderrReader != nil {
178+
// Read stderr to completion to ensure the container process is waited upon
179+
stderrBuf := new(bytes.Buffer)
180+
_, err = io.Copy(stderrBuf, stderrReader)
181+
if err != nil {
182+
return xerrors.Errorf("%s stderr read failed: %w", table.ID().String(), err)
183+
}
184+
if stderrBuf.Len() > 0 {
185+
a.logger.Warnf("stderr: %v\nlast error:%v", stderrBuf.String(), lastAirbyteError)
186+
}
183187
}
184188

185189
return nil

0 commit comments

Comments
 (0)