Skip to content

Commit ecf73c9

Browse files
authored
add cancellation api endpoint (#2530)
* cancel api * fix tests * fix builder * fix bug
1 parent 41e14e2 commit ecf73c9

File tree

9 files changed

+191
-13
lines changed

9 files changed

+191
-13
lines changed

airbyte-api/src/main/openapi/config.yaml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,29 @@ paths:
11011101
description: Job not found
11021102
"422":
11031103
$ref: "#/components/responses/InvalidInput"
1104+
/v1/jobs/cancel:
1105+
post:
1106+
tags:
1107+
- jobs
1108+
summary: Cancels a job
1109+
operationId: cancelJob
1110+
requestBody:
1111+
content:
1112+
application/json:
1113+
schema:
1114+
$ref: "#/components/schemas/JobIdRequestBody"
1115+
required: true
1116+
responses:
1117+
"200":
1118+
description: Successful operation
1119+
content:
1120+
application/json:
1121+
schema:
1122+
$ref: "#/components/schemas/JobInfoRead"
1123+
"404":
1124+
description: Job not found
1125+
"422":
1126+
$ref: "#/components/responses/InvalidInput"
11041127
/v1/health:
11051128
get:
11061129
tags:

airbyte-server/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ plugins {
33
}
44

55
dependencies {
6+
implementation 'io.temporal:temporal-sdk:1.0.4'
7+
68
implementation 'org.apache.cxf:cxf-core:3.4.2'
79

810
implementation 'org.eclipse.jetty:jetty-server:9.4.31.v20200723'

airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ public ConfigurationApi(final ConfigRepository configRepository,
133133
final FileTtlManager archiveTtlManager) {
134134
final SpecFetcher specFetcher = new SpecFetcher(synchronousSchedulerClient);
135135
final JsonSchemaValidator schemaValidator = new JsonSchemaValidator();
136-
schedulerHandler = new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient);
136+
schedulerHandler =
137+
new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, jobPersistence, configs.getWorkspaceRoot());
137138
final DockerImageValidator dockerImageValidator = new DockerImageValidator(synchronousSchedulerClient);
138139
sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient);
139140
connectionsHandler = new ConnectionsHandler(configRepository);
@@ -396,6 +397,11 @@ public SourceDiscoverSchemaRead executeSourceDiscoverSchema(@Valid SourceCoreCon
396397
return execute(() -> schedulerHandler.discoverSchemaForSourceFromSourceCreate(sourceCreate));
397398
}
398399

400+
@Override
401+
public JobInfoRead cancelJob(@Valid JobIdRequestBody jobIdRequestBody) {
402+
return execute(() -> schedulerHandler.cancelJob(jobIdRequestBody));
403+
}
404+
399405
// JOB HISTORY
400406

401407
@Override

airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.airbyte.api.model.DestinationDefinitionSpecificationRead;
3434
import io.airbyte.api.model.DestinationIdRequestBody;
3535
import io.airbyte.api.model.DestinationUpdate;
36+
import io.airbyte.api.model.JobIdRequestBody;
3637
import io.airbyte.api.model.JobInfoRead;
3738
import io.airbyte.api.model.SourceCoreConfig;
3839
import io.airbyte.api.model.SourceDefinitionIdRequestBody;
@@ -42,6 +43,7 @@
4243
import io.airbyte.api.model.SourceUpdate;
4344
import io.airbyte.commons.docker.DockerUtils;
4445
import io.airbyte.commons.enums.Enums;
46+
import io.airbyte.commons.io.IOs;
4547
import io.airbyte.config.DestinationConnection;
4648
import io.airbyte.config.SourceConnection;
4749
import io.airbyte.config.StandardCheckConnectionOutput;
@@ -56,13 +58,20 @@
5658
import io.airbyte.scheduler.client.SchedulerJobClient;
5759
import io.airbyte.scheduler.client.SynchronousResponse;
5860
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
61+
import io.airbyte.scheduler.persistence.JobPersistence;
5962
import io.airbyte.server.converters.CatalogConverter;
6063
import io.airbyte.server.converters.ConfigurationUpdate;
6164
import io.airbyte.server.converters.JobConverter;
6265
import io.airbyte.server.converters.SpecFetcher;
6366
import io.airbyte.validation.json.JsonSchemaValidator;
6467
import io.airbyte.validation.json.JsonValidationException;
68+
import io.airbyte.workers.WorkerUtils;
69+
import io.airbyte.workers.temporal.TemporalAttemptExecution;
70+
import io.airbyte.workers.temporal.TemporalUtils;
71+
import io.temporal.api.common.v1.WorkflowExecution;
72+
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest;
6573
import java.io.IOException;
74+
import java.nio.file.Path;
6675
import java.util.UUID;
6776

6877
public class SchedulerHandler {
@@ -73,17 +82,23 @@ public class SchedulerHandler {
7382
private final SpecFetcher specFetcher;
7483
private final ConfigurationUpdate configurationUpdate;
7584
private final JsonSchemaValidator jsonSchemaValidator;
85+
private final JobPersistence jobPersistence;
86+
private final Path workspaceRoot;
7687

7788
public SchedulerHandler(ConfigRepository configRepository,
7889
SchedulerJobClient schedulerJobClient,
79-
SynchronousSchedulerClient synchronousSchedulerClient) {
90+
SynchronousSchedulerClient synchronousSchedulerClient,
91+
JobPersistence jobPersistence,
92+
Path workspaceRoot) {
8093
this(
8194
configRepository,
8295
schedulerJobClient,
8396
synchronousSchedulerClient,
8497
new ConfigurationUpdate(configRepository, new SpecFetcher(synchronousSchedulerClient)),
8598
new JsonSchemaValidator(),
86-
new SpecFetcher(synchronousSchedulerClient));
99+
new SpecFetcher(synchronousSchedulerClient),
100+
jobPersistence,
101+
workspaceRoot);
87102
}
88103

89104
@VisibleForTesting
@@ -92,13 +107,17 @@ public SchedulerHandler(ConfigRepository configRepository,
92107
SynchronousSchedulerClient synchronousSchedulerClient,
93108
ConfigurationUpdate configurationUpdate,
94109
JsonSchemaValidator jsonSchemaValidator,
95-
SpecFetcher specFetcher) {
110+
SpecFetcher specFetcher,
111+
JobPersistence jobPersistence,
112+
Path workspaceRoot) {
96113
this.configRepository = configRepository;
97114
this.schedulerJobClient = schedulerJobClient;
98115
this.synchronousSchedulerClient = synchronousSchedulerClient;
99116
this.configurationUpdate = configurationUpdate;
100117
this.jsonSchemaValidator = jsonSchemaValidator;
101118
this.specFetcher = specFetcher;
119+
this.jobPersistence = jobPersistence;
120+
this.workspaceRoot = workspaceRoot;
102121
}
103122

104123
public CheckConnectionRead checkSourceConnectionFromSourceId(SourceIdRequestBody sourceIdRequestBody)
@@ -276,6 +295,30 @@ public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdReq
276295
return JobConverter.getJobInfoRead(job);
277296
}
278297

298+
public JobInfoRead cancelJob(JobIdRequestBody jobIdRequestBody) throws IOException {
299+
final long jobId = jobIdRequestBody.getId();
300+
301+
// first prevent this job from being scheduled again
302+
jobPersistence.cancelJob(jobId);
303+
304+
// second cancel the temporal execution
305+
// TODO: this is hacky, resolve https://github.com/airbytehq/airbyte/issues/2564 to avoid this
306+
// behavior
307+
final Path attemptParentDir = WorkerUtils.getJobRoot(workspaceRoot, String.valueOf(jobId), 0L).getParent();
308+
final String workflowId = IOs.readFile(attemptParentDir, TemporalAttemptExecution.WORKFLOW_ID_FILENAME);
309+
final WorkflowExecution workflowExecution = WorkflowExecution.newBuilder()
310+
.setWorkflowId(workflowId)
311+
.build();
312+
final RequestCancelWorkflowExecutionRequest cancelRequest = RequestCancelWorkflowExecutionRequest.newBuilder()
313+
.setWorkflowExecution(workflowExecution)
314+
.setNamespace(TemporalUtils.DEFAULT_NAMESPACE)
315+
.build();
316+
317+
TemporalUtils.TEMPORAL_SERVICE.blockingStub().requestCancelWorkflowExecution(cancelRequest);
318+
319+
return JobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
320+
}
321+
279322
private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<StandardCheckConnectionOutput> response) {
280323
final CheckConnectionRead checkConnectionRead = new CheckConnectionRead()
281324
.jobInfo(JobConverter.getSynchronousJobRead(response));

airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import io.airbyte.scheduler.client.SynchronousJobMetadata;
7373
import io.airbyte.scheduler.client.SynchronousResponse;
7474
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
75+
import io.airbyte.scheduler.persistence.JobPersistence;
7576
import io.airbyte.server.converters.ConfigurationUpdate;
7677
import io.airbyte.server.converters.SpecFetcher;
7778
import io.airbyte.server.helpers.ConnectionHelpers;
@@ -81,6 +82,7 @@
8182
import io.airbyte.validation.json.JsonValidationException;
8283
import java.io.IOException;
8384
import java.net.URI;
85+
import java.nio.file.Path;
8486
import java.util.HashMap;
8587
import java.util.Optional;
8688
import java.util.UUID;
@@ -128,6 +130,7 @@ class SchedulerHandlerTest {
128130
private ConfigurationUpdate configurationUpdate;
129131
private JsonSchemaValidator jsonSchemaValidator;
130132
private SpecFetcher specFetcher;
133+
private JobPersistence jobPersistence;
131134

132135
@BeforeEach
133136
void setup() {
@@ -143,9 +146,10 @@ void setup() {
143146
schedulerJobClient = spy(SchedulerJobClient.class);
144147
synchronousSchedulerClient = mock(SynchronousSchedulerClient.class);
145148
configRepository = mock(ConfigRepository.class);
149+
jobPersistence = mock(JobPersistence.class);
146150

147151
schedulerHandler = new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, configurationUpdate,
148-
jsonSchemaValidator, specFetcher);
152+
jsonSchemaValidator, specFetcher, jobPersistence, mock(Path.class));
149153
}
150154

151155
@Test

airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727
import com.google.common.annotations.VisibleForTesting;
2828
import io.airbyte.commons.functional.CheckedConsumer;
2929
import io.airbyte.commons.functional.CheckedSupplier;
30+
import io.airbyte.commons.io.IOs;
3031
import io.airbyte.config.EnvConfigs;
3132
import io.airbyte.scheduler.models.JobRunConfig;
3233
import io.airbyte.workers.Worker;
3334
import io.airbyte.workers.WorkerConstants;
3435
import io.airbyte.workers.WorkerException;
3536
import io.airbyte.workers.WorkerUtils;
37+
import io.temporal.activity.Activity;
3638
import java.io.IOException;
3739
import java.nio.file.Files;
3840
import java.nio.file.Path;
@@ -56,20 +58,24 @@ public class TemporalAttemptExecution<INPUT, OUTPUT> implements CheckedSupplier<
5658
private static final Logger LOGGER = LoggerFactory.getLogger(TemporalAttemptExecution.class);
5759
private static final Duration HEARTBEAT_INTERVAL = Duration.ofSeconds(10);
5860

61+
public static String WORKFLOW_ID_FILENAME = "WORKFLOW_ID";
62+
5963
private final Path jobRoot;
6064
private final CheckedSupplier<Worker<INPUT, OUTPUT>, Exception> workerSupplier;
6165
private final Supplier<INPUT> inputSupplier;
6266
private final String jobId;
6367
private final BiConsumer<Path, String> mdcSetter;
6468
private final CheckedConsumer<Path, IOException> jobRootDirCreator;
6569
private final CancellationHandler cancellationHandler;
70+
private final Supplier<String> workflowIdProvider;
6671

6772
public TemporalAttemptExecution(Path workspaceRoot,
6873
JobRunConfig jobRunConfig,
6974
CheckedSupplier<Worker<INPUT, OUTPUT>, Exception> workerSupplier,
7075
Supplier<INPUT> inputSupplier,
7176
CancellationHandler cancellationHandler) {
72-
this(workspaceRoot, jobRunConfig, workerSupplier, inputSupplier, WorkerUtils::setJobMdc, Files::createDirectories, cancellationHandler);
77+
this(workspaceRoot, jobRunConfig, workerSupplier, inputSupplier, WorkerUtils::setJobMdc, Files::createDirectories, cancellationHandler,
78+
() -> Activity.getExecutionContext().getInfo().getWorkflowId());
7379
}
7480

7581
@VisibleForTesting
@@ -79,14 +85,16 @@ public TemporalAttemptExecution(Path workspaceRoot,
7985
Supplier<INPUT> inputSupplier,
8086
BiConsumer<Path, String> mdcSetter,
8187
CheckedConsumer<Path, IOException> jobRootDirCreator,
82-
CancellationHandler cancellationHandler) {
88+
CancellationHandler cancellationHandler,
89+
Supplier<String> workflowIdProvider) {
8390
this.jobRoot = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
8491
this.workerSupplier = workerSupplier;
8592
this.inputSupplier = inputSupplier;
8693
this.jobId = jobRunConfig.getJobId();
8794
this.mdcSetter = mdcSetter;
8895
this.jobRootDirCreator = jobRootDirCreator;
8996
this.cancellationHandler = cancellationHandler;
97+
this.workflowIdProvider = workflowIdProvider;
9098
}
9199

92100
@Override
@@ -97,6 +105,10 @@ public OUTPUT get() throws TemporalJobException {
97105
LOGGER.info("Executing worker wrapper. Airbyte version: {}", new EnvConfigs().getAirbyteVersionOrWarning());
98106
jobRootDirCreator.accept(jobRoot);
99107

108+
final String workflowId = workflowIdProvider.get();
109+
final Path workflowIdFile = jobRoot.getParent().resolve(WORKFLOW_ID_FILENAME);
110+
IOs.writeFile(workflowIdFile, workflowId);
111+
100112
final Worker<INPUT, OUTPUT> worker = workerSupplier.get();
101113
final CompletableFuture<OUTPUT> outputFuture = new CompletableFuture<>();
102114
final Thread workerThread = getWorkerThread(worker, outputFuture);

airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class TemporalUtils {
4646

4747
public static final RetryOptions NO_RETRY = RetryOptions.newBuilder().setMaximumAttempts(1).build();
4848

49+
public static final String DEFAULT_NAMESPACE = "default";
50+
4951
@FunctionalInterface
5052
public interface TemporalJobCreator<T extends Serializable> {
5153

airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ void setup() throws IOException {
7070

7171
execution = mock(CheckedSupplier.class);
7272
mdcSetter = mock(BiConsumer.class);
73-
jobRootDirCreator = mock(CheckedConsumer.class);
73+
jobRootDirCreator = Files::createDirectories;
7474

7575
attemptExecution = new TemporalAttemptExecution<>(workspaceRoot, JOB_RUN_CONFIG, execution, () -> "", mdcSetter, jobRootDirCreator,
76-
mock(CancellationHandler.class));
76+
mock(CancellationHandler.class), () -> "workflow_id");
7777
}
7878

7979
@Test
@@ -87,9 +87,9 @@ void testSuccessfulSupplierRun() throws Exception {
8787
final String actual = attemptExecution.get();
8888

8989
assertEquals(expected, actual);
90+
9091
verify(execution).get();
9192
verify(mdcSetter, atLeast(2)).accept(jobRoot, JOB_ID);
92-
verify(jobRootDirCreator).accept(jobRoot);
9393
}
9494

9595
@Test
@@ -102,7 +102,6 @@ void testThrowsCheckedException() throws Exception {
102102

103103
verify(execution).get();
104104
verify(mdcSetter).accept(jobRoot, JOB_ID);
105-
verify(jobRootDirCreator).accept(jobRoot);
106105
}
107106

108107
@Test
@@ -115,7 +114,6 @@ void testThrowsUnCheckedException() throws Exception {
115114

116115
verify(execution).get();
117116
verify(mdcSetter).accept(jobRoot, JOB_ID);
118-
verify(jobRootDirCreator).accept(jobRoot);
119117
}
120118

121119
@Test
@@ -129,7 +127,6 @@ void testThrowsTemporalJobExceptionException() throws Exception {
129127

130128
verify(execution).get();
131129
verify(mdcSetter).accept(jobRoot, JOB_ID);
132-
verify(jobRootDirCreator).accept(jobRoot);
133130
}
134131

135132
}

0 commit comments

Comments
 (0)