Skip to content

Commit fd7aa2d

Browse files
committed
feat: rejected records link on connection timeline (#16798)
1 parent 84eca49 commit fd7aa2d

File tree

10 files changed

+500
-25
lines changed

10 files changed

+500
-25
lines changed

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1511,6 +1511,7 @@ public void backfillConnectionEvents(final ConnectionEventsBackfillRequestBody c
15111511
job.configType.name(),
15121512
job.status.name(),
15131513
JobConverter.getStreamsAssociatedWithJob(job),
1514+
null,
15141515
job.getLastAttempt()
15151516
.flatMap(Attempt::getFailureSummary)
15161517
.flatMap(summary -> summary.getFailures().stream().findFirst()));
@@ -1529,7 +1530,8 @@ public void backfillConnectionEvents(final ConnectionEventsBackfillRequestBody c
15291530
job.getAttemptsCount(),
15301531
job.configType.name(),
15311532
job.status.name(),
1532-
JobConverter.getStreamsAssociatedWithJob(job));
1533+
JobConverter.getStreamsAssociatedWithJob(job),
1534+
null);
15331535
}
15341536
// Save an event
15351537
connectionTimelineEventService.writeEventWithTimestamp(

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/helpers/ConnectionTimelineEventHelper.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.commons.server.handlers.helpers;
66

77
import com.google.common.annotations.VisibleForTesting;
8-
import io.airbyte.api.client.model.generated.JobRead;
98
import io.airbyte.api.model.generated.AirbyteCatalogDiff;
109
import io.airbyte.api.model.generated.CatalogDiff;
1110
import io.airbyte.api.model.generated.ConnectionRead;
@@ -33,14 +32,14 @@
3332
import io.airbyte.data.services.ConnectionTimelineEventService;
3433
import io.airbyte.data.services.shared.ConnectionDisabledEvent;
3534
import io.airbyte.data.services.shared.ConnectionEnabledEvent;
36-
import io.airbyte.data.services.shared.ConnectionEvent;
3735
import io.airbyte.data.services.shared.ConnectionSettingsChangedEvent;
3836
import io.airbyte.data.services.shared.FailedEvent;
3937
import io.airbyte.data.services.shared.FinalStatusEvent;
4038
import io.airbyte.data.services.shared.FinalStatusEvent.FinalStatus;
4139
import io.airbyte.data.services.shared.ManuallyStartedEvent;
4240
import io.airbyte.data.services.shared.SchemaChangeAutoPropagationEvent;
4341
import io.airbyte.data.services.shared.SchemaConfigUpdateEvent;
42+
import io.airbyte.domain.services.storage.ConnectorObjectStorageService;
4443
import io.airbyte.persistence.job.JobPersistence.AttemptStats;
4544
import jakarta.annotation.Nullable;
4645
import jakarta.inject.Inject;
@@ -71,6 +70,7 @@ public class ConnectionTimelineEventHelper {
7170
private final PermissionHandler permissionHandler;
7271
private final UserPersistence userPersistence;
7372
private final ConnectionTimelineEventService connectionTimelineEventService;
73+
private final ConnectorObjectStorageService connectorObjectStorageService;
7474

7575
@Inject
7676
public ConnectionTimelineEventHelper(
@@ -79,12 +79,14 @@ public ConnectionTimelineEventHelper(
7979
final OrganizationPersistence organizationPersistence,
8080
final PermissionHandler permissionHandler,
8181
final UserPersistence userPersistence,
82+
final ConnectorObjectStorageService connectorObjectStorageService,
8283
final ConnectionTimelineEventService connectionTimelineEventService) {
8384
this.airbyteSupportEmailDomains = airbyteSupportEmailDomains;
8485
this.currentUserService = currentUserService;
8586
this.organizationPersistence = organizationPersistence;
8687
this.permissionHandler = permissionHandler;
8788
this.userPersistence = userPersistence;
89+
this.connectorObjectStorageService = connectorObjectStorageService;
8890
this.connectionTimelineEventService = connectionTimelineEventService;
8991
}
9092

@@ -180,7 +182,8 @@ public void logJobSuccessEventInConnectionTimeline(final Job job, final UUID con
180182
job.getAttemptsCount(),
181183
job.configType.name(),
182184
JobStatus.SUCCEEDED.name(),
183-
JobConverter.getStreamsAssociatedWithJob(job));
185+
JobConverter.getStreamsAssociatedWithJob(job),
186+
connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)); // TODO only include if there are rejected records
184187
connectionTimelineEventService.writeEvent(connectionId, event, null);
185188
} catch (final Exception e) {
186189
LOGGER.error("Failed to persist timeline event for job: {}", job.id, e);
@@ -205,6 +208,7 @@ public void logJobFailureEventInConnectionTimeline(final Job job, final UUID con
205208
job.configType.name(),
206209
jobEventFailureStatus,
207210
JobConverter.getStreamsAssociatedWithJob(job),
211+
connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job), // TODO only include if there are rejected records
208212
firstFailureReasonOfLastAttempt);
209213
connectionTimelineEventService.writeEvent(connectionId, event, null);
210214
} catch (final Exception e) {
@@ -216,7 +220,7 @@ public void logJobCancellationEventInConnectionTimeline(final Job job,
216220
final List<AttemptStats> attemptStats) {
217221
try {
218222
final LoadedStats stats = buildLoadedStats(job, attemptStats);
219-
223+
final UUID connectionId = UUID.fromString(job.scope);
220224
final FinalStatusEvent event = new FinalStatusEvent(
221225
job.id,
222226
job.createdAtInSecond,
@@ -226,8 +230,9 @@ public void logJobCancellationEventInConnectionTimeline(final Job job,
226230
job.getAttemptsCount(),
227231
job.configType.name(),
228232
io.airbyte.config.JobStatus.CANCELLED.name(),
229-
JobConverter.getStreamsAssociatedWithJob(job));
230-
connectionTimelineEventService.writeEvent(UUID.fromString(job.scope), event, getCurrentUserIdIfExist());
233+
JobConverter.getStreamsAssociatedWithJob(job),
234+
connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)); // TODO only include if there are rejected records
235+
connectionTimelineEventService.writeEvent(connectionId, event, getCurrentUserIdIfExist());
231236
} catch (final Exception e) {
232237
LOGGER.error("Failed to persist job cancelled event for job: {}", job.id, e);
233238
}
@@ -345,15 +350,4 @@ public void logConnectionSettingsChangedEventInConnectionTimeline(final UUID con
345350
}
346351
}
347352

348-
public Optional<User> getUserAssociatedWithJobTimelineEventType(final JobRead job, final ConnectionEvent.Type eventType) {
349-
final Optional<UUID> userId = Optional.ofNullable(connectionTimelineEventService.findAssociatedUserForAJob(job, eventType));
350-
return userId.flatMap(id -> {
351-
try {
352-
return userPersistence.getUser(id);
353-
} catch (IOException e) {
354-
throw new RuntimeException(e);
355-
}
356-
});
357-
}
358-
359353
}

airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/helpers/ConnectionTimelineEventHelperTest.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.airbyte.data.services.shared.ConnectionEvent.Type;
4545
import io.airbyte.data.services.shared.ConnectionSettingsChangedEvent;
4646
import io.airbyte.data.services.shared.SchemaChangeAutoPropagationEvent;
47+
import io.airbyte.domain.services.storage.ConnectorObjectStorageService;
4748
import io.airbyte.persistence.job.JobPersistence;
4849
import java.io.IOException;
4950
import java.util.HashMap;
@@ -65,6 +66,7 @@ class ConnectionTimelineEventHelperTest {
6566
private PermissionHandler permissionHandler;
6667
private UserPersistence userPersistence;
6768
private ConnectionTimelineEventService connectionTimelineEventService;
69+
private ConnectorObjectStorageService connectorObjectStorageService;
6870
private static final UUID CONNECTION_ID = UUID.randomUUID();
6971

7072
@BeforeEach
@@ -74,13 +76,15 @@ void setup() {
7476
permissionHandler = mock(PermissionHandler.class);
7577
userPersistence = mock(UserPersistence.class);
7678
connectionTimelineEventService = mock(ConnectionTimelineEventService.class);
79+
connectorObjectStorageService = mock(ConnectorObjectStorageService.class);
7780
}
7881

7982
@Test
8083
void testGetLoadedStats() {
8184

8285
connectionTimelineEventHelper = new ConnectionTimelineEventHelper(Set.of(),
83-
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectionTimelineEventService);
86+
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectorObjectStorageService,
87+
connectionTimelineEventService);
8488

8589
final String userStreamName = "user";
8690
final SyncMode userStreamMode = SyncMode.FULL_REFRESH;
@@ -165,7 +169,8 @@ class TestGetUserReadInConnectionEvent {
165169
void notApplicableInOSS() throws IOException {
166170
// No support email domains. Should show real name as always.
167171
connectionTimelineEventHelper = new ConnectionTimelineEventHelper(ossAirbyteSupportEmailDomain,
168-
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectionTimelineEventService);
172+
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectorObjectStorageService,
173+
connectionTimelineEventService);
169174
when(userPersistence.getUser(any())).thenReturn(Optional.of(externalUser));
170175
when(permissionHandler.isUserInstanceAdmin(any())).thenReturn(false);
171176
when(organizationPersistence.getOrganizationByConnectionId(any())).thenReturn(
@@ -179,7 +184,8 @@ void notApplicableInOSS() throws IOException {
179184
void airbyteSupportInAirbytersInternalWorkspace() throws IOException {
180185
// Should show real name.
181186
connectionTimelineEventHelper = new ConnectionTimelineEventHelper(cloudAirbyteSupportEmailDomain,
182-
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectionTimelineEventService);
187+
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectorObjectStorageService,
188+
connectionTimelineEventService);
183189
when(userPersistence.getUser(any())).thenReturn(Optional.of(airbyteUser));
184190
when(permissionHandler.isUserInstanceAdmin(any())).thenReturn(true);
185191
when(organizationPersistence.getOrganizationByConnectionId(any())).thenReturn(
@@ -192,7 +198,8 @@ void airbyteSupportInAirbytersInternalWorkspace() throws IOException {
192198
void airbyteSupportInCustomersExternalWorkspace() throws IOException {
193199
// Should hide real name.
194200
connectionTimelineEventHelper = new ConnectionTimelineEventHelper(cloudAirbyteSupportEmailDomain,
195-
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectionTimelineEventService);
201+
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectorObjectStorageService,
202+
connectionTimelineEventService);
196203
when(userPersistence.getUser(any())).thenReturn(Optional.of(airbyteUser));
197204
when(permissionHandler.isUserInstanceAdmin(any())).thenReturn(true);
198205
when(organizationPersistence.getOrganizationByConnectionId(any())).thenReturn(
@@ -205,7 +212,8 @@ void airbyteSupportInCustomersExternalWorkspace() throws IOException {
205212
void detectNonAirbyteSupportUserInCloud() throws IOException {
206213
// Should show real name.
207214
connectionTimelineEventHelper = new ConnectionTimelineEventHelper(cloudAirbyteSupportEmailDomain,
208-
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectionTimelineEventService);
215+
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectorObjectStorageService,
216+
connectionTimelineEventService);
209217
when(userPersistence.getUser(any())).thenReturn(Optional.of(externalUser));
210218
when(permissionHandler.isUserInstanceAdmin(any())).thenReturn(true);
211219
when(organizationPersistence.getOrganizationByConnectionId(any())).thenReturn(
@@ -220,7 +228,8 @@ void detectNonAirbyteSupportUserInCloud() throws IOException {
220228
@Test
221229
void testLogConnectionSettingsChangedEvent() {
222230
connectionTimelineEventHelper = new ConnectionTimelineEventHelper(Set.of(),
223-
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectionTimelineEventService);
231+
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectorObjectStorageService,
232+
connectionTimelineEventService);
224233
final UUID connectionId = UUID.randomUUID();
225234
final UUID dataplaneGroupId = UUID.randomUUID();
226235
final ConnectionRead originalConnectionRead = new ConnectionRead()
@@ -255,7 +264,8 @@ void testLogConnectionSettingsChangedEvent() {
255264
@Test
256265
void testLogSchemaChangeAutoPropagationEvent() {
257266
connectionTimelineEventHelper = new ConnectionTimelineEventHelper(Set.of(),
258-
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectionTimelineEventService);
267+
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectorObjectStorageService,
268+
connectionTimelineEventService);
259269
final UUID connectionId = UUID.randomUUID();
260270
final CatalogDiff diff = new CatalogDiff().addTransformsItem(new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM));
261271

airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/FailedEvent.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.data.services.shared
66

77
import com.fasterxml.jackson.annotation.JsonInclude
88
import io.airbyte.config.FailureReason
9+
import io.airbyte.domain.models.RejectedRecordsMetadata
910
import io.airbyte.protocol.models.v0.StreamDescriptor
1011
import java.util.Optional
1112

@@ -20,6 +21,7 @@ class FailedEvent(
2021
jobType: String,
2122
statusType: String,
2223
streams: List<StreamDescriptor>? = null,
24+
rejectedRecords: RejectedRecordsMetadata? = null,
2325
private val failureReason: Optional<FailureReason>,
2426
) : FinalStatusEvent(
2527
jobId,
@@ -31,6 +33,7 @@ class FailedEvent(
3133
jobType,
3234
statusType,
3335
streams,
36+
rejectedRecords,
3437
) {
3538
fun getFailureReason(): Optional<FailureReason> = failureReason
3639
}

airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/FinalStatusEvent.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.data.services.shared
66

77
import com.fasterxml.jackson.annotation.JsonInclude
88
import io.airbyte.config.JobConfig
9+
import io.airbyte.domain.models.RejectedRecordsMetadata
910
import io.airbyte.protocol.models.v0.StreamDescriptor
1011
import io.micronaut.data.annotation.TypeDef
1112
import io.micronaut.data.model.DataType
@@ -21,6 +22,7 @@ open class FinalStatusEvent(
2122
private val jobType: String,
2223
private val statusType: String,
2324
private val streams: List<StreamDescriptor>? = null,
25+
private val rejectedRecords: RejectedRecordsMetadata? = null,
2426
) : ConnectionEvent {
2527
fun getJobId(): Long = jobId
2628

@@ -34,6 +36,8 @@ open class FinalStatusEvent(
3436

3537
fun getAttemptsCount(): Int = attemptsCount
3638

39+
fun getRejectedRecords(): RejectedRecordsMetadata? = rejectedRecords
40+
3741
fun getStreams(): List<StreamDescriptor>? = streams
3842

3943
@TypeDef(type = DataType.STRING)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.domain.models
6+
7+
import com.fasterxml.jackson.annotation.JsonInclude
8+
9+
@JsonInclude(JsonInclude.Include.NON_NULL)
10+
data class RejectedRecordsMetadata(
11+
/**
12+
* Link to the cloud console for the storage bucket containing rejected records.
13+
* This helps users navigate to the bucket directly via the cloud provider's console UI.
14+
*/
15+
val cloudConsoleUrl: String? = null,
16+
/**
17+
* URI to the storage bucket path containing rejected records.
18+
* This is typically in the format `s3://bucket-name/path/to/job-id`.
19+
*/
20+
val storageUri: String? = null,
21+
)
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.domain.services.storage
6+
7+
import com.fasterxml.jackson.databind.JsonNode
8+
import io.airbyte.config.ActorDefinitionVersion
9+
import io.airbyte.config.Job
10+
import io.airbyte.config.JobConfig
11+
import io.airbyte.data.services.ActorDefinitionService
12+
import io.airbyte.data.services.ConnectionService
13+
import io.airbyte.data.services.DestinationService
14+
import io.airbyte.domain.models.ConnectionId
15+
import io.airbyte.domain.models.RejectedRecordsMetadata
16+
import io.github.oshai.kotlinlogging.KotlinLogging
17+
import jakarta.inject.Singleton
18+
19+
val log = KotlinLogging.logger {}
20+
21+
const val OBJECT_STORAGE_TYPE_CONFIG_KEY = "storage_type"
22+
const val S3_STORAGE_TYPE = "S3"
23+
24+
@Singleton
25+
class ConnectorObjectStorageService(
26+
private val actorDefinitionService: ActorDefinitionService,
27+
private val connectionService: ConnectionService,
28+
private val destinationService: DestinationService,
29+
) {
30+
@JvmName("getRejectedRecordsForJob")
31+
fun getRejectedRecordsForJob(
32+
connectionId: ConnectionId,
33+
job: Job,
34+
): RejectedRecordsMetadata? {
35+
val destinationVersion = getJobDestinationVersion(job)
36+
if (destinationVersion == null) {
37+
log.warn { "Job ${job.id} does not have a valid destination version, cannot retrieve rejected records metadata." }
38+
return null
39+
}
40+
41+
try {
42+
val objectStorageConfigProperty = getObjectStorageConfigProperty(destinationVersion)
43+
return if (objectStorageConfigProperty != null) {
44+
val connection = connectionService.getStandardSync(connectionId.value)
45+
val destination = destinationService.getDestinationConnection(connection.destinationId)
46+
getRejectedRecordsFromDestination(destination.configuration, objectStorageConfigProperty, job.id)
47+
} else {
48+
null
49+
}
50+
} catch (e: Exception) {
51+
// Never fail due to errors generating rejected records metadata
52+
log.error(e) { "Failed to get rejected records metadata for job ${job.id} in connection $connectionId" }
53+
return null
54+
}
55+
}
56+
57+
private fun getRejectedRecordsFromDestination(
58+
destinationConfig: JsonNode,
59+
objectStorageConfigPath: String,
60+
jobId: Long,
61+
): RejectedRecordsMetadata? {
62+
val objectStorageConfig = destinationConfig.get(objectStorageConfigPath)
63+
64+
if (objectStorageConfig == null || objectStorageConfig.isNull) {
65+
return null
66+
}
67+
68+
val storageType = objectStorageConfig.get(OBJECT_STORAGE_TYPE_CONFIG_KEY)?.asText()
69+
if (storageType == null) {
70+
return null
71+
}
72+
73+
// Instantiate the appropriate bucket link resolver based on storage type
74+
val metaResolver: ObjectStoragePathResolver =
75+
when (storageType) {
76+
S3_STORAGE_TYPE -> S3ObjectStoragePathResolver(objectStorageConfig)
77+
else -> return null // Unsupported storage type
78+
}
79+
80+
return metaResolver.resolveRejectedRecordsPaths(jobId)
81+
}
82+
83+
private fun getJobDestinationVersion(job: Job): ActorDefinitionVersion? {
84+
if (job.config?.configType != JobConfig.ConfigType.SYNC) return null
85+
86+
val destinationVersionId = job.config.sync.destinationDefinitionVersionId
87+
return actorDefinitionService.getActorDefinitionVersion(destinationVersionId)
88+
}
89+
90+
private fun getObjectStorageConfigProperty(destinationVersion: ActorDefinitionVersion): String? {
91+
// In the future, we may want to grab the specific field from the spec, once we do bucket injection
92+
// For now, we assume all data activation destination will have the bucket config at `objectStorageConfig`
93+
if (destinationVersion.supportsDataActivation) {
94+
return "object_storage_config"
95+
}
96+
97+
return null
98+
}
99+
}

0 commit comments

Comments
 (0)