Skip to content

Commit 14f1211

Browse files
committed
Merge remote-tracking branch 'remotes/vsementsov/tags/pull-jobs-2021-10-07-v2' into staging
mirror: Handle errors after READY cancel v2: add small fix by Stefano, Hanna's series fixed # gpg: Signature made Thu 07 Oct 2021 08:25:07 AM PDT # gpg: using RSA key 8B9C26CDB2FD147C880E86A1561F24C1F19F79FB # gpg: Good signature from "Vladimir Sementsov-Ogievskiy <[email protected]>" [unknown] # gpg: WARNING: This key is not certified with a trusted signature! # gpg: There is no indication that the signature belongs to the owner. # Primary key fingerprint: 8B9C 26CD B2FD 147C 880E 86A1 561F 24C1 F19F 79FB * remotes/vsementsov/tags/pull-jobs-2021-10-07-v2: iotests: Add mirror-ready-cancel-error test mirror: Do not clear .cancelled mirror: Stop active mirroring after force-cancel mirror: Check job_is_cancelled() earlier mirror: Use job_is_cancelled() job: Add job_cancel_requested() job: Do not soft-cancel after a job is done jobs: Give Job.force_cancel more meaning job: @force parameter for job_cancel_sync() job: Force-cancel jobs in a failed transaction mirror: Drop s->synced mirror: Keep s->synced on error job: Context changes in job_completed_txn_abort() block/aio_task: assert `max_busy_tasks` is greater than 0 block/backup: avoid integer overflow of `max-workers` Signed-off-by: Richard Henderson <[email protected]>
2 parents 3c01933 + 2451f72 commit 14f1211

File tree

12 files changed

+316
-92
lines changed

12 files changed

+316
-92
lines changed

block/aio_task.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks)
9898
{
9999
AioTaskPool *pool = g_new0(AioTaskPool, 1);
100100

101+
assert(max_busy_tasks > 0);
102+
101103
pool->main_co = qemu_coroutine_self();
102104
pool->max_busy_tasks = max_busy_tasks;
103105

block/backup.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,11 +327,12 @@ static void coroutine_fn backup_set_speed(BlockJob *job, int64_t speed)
327327
}
328328
}
329329

330-
static void backup_cancel(Job *job, bool force)
330+
static bool backup_cancel(Job *job, bool force)
331331
{
332332
BackupBlockJob *s = container_of(job, BackupBlockJob, common.job);
333333

334334
bdrv_cancel_in_flight(s->target_bs);
335+
return true;
335336
}
336337

337338
static const BlockJobDriver backup_job_driver = {
@@ -407,8 +408,8 @@ BlockJob *backup_job_create(const char *job_id, BlockDriverState *bs,
407408
return NULL;
408409
}
409410

410-
if (perf->max_workers < 1) {
411-
error_setg(errp, "max-workers must be greater than zero");
411+
if (perf->max_workers < 1 || perf->max_workers > INT_MAX) {
412+
error_setg(errp, "max-workers must be between 1 and %d", INT_MAX);
412413
return NULL;
413414
}
414415

block/mirror.c

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ typedef struct MirrorBlockJob {
5656
bool zero_target;
5757
MirrorCopyMode copy_mode;
5858
BlockdevOnError on_source_error, on_target_error;
59-
bool synced;
6059
/* Set when the target is synced (dirty bitmap is clean, nothing
6160
* in flight) and the job is running in active mode */
6261
bool actively_synced;
@@ -121,7 +120,6 @@ typedef enum MirrorMethod {
121120
static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
122121
int error)
123122
{
124-
s->synced = false;
125123
s->actively_synced = false;
126124
if (read) {
127125
return block_job_error_action(&s->common, s->on_source_error,
@@ -944,12 +942,10 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
944942
if (s->bdev_length == 0) {
945943
/* Transition to the READY state and wait for complete. */
946944
job_transition_to_ready(&s->common.job);
947-
s->synced = true;
948945
s->actively_synced = true;
949-
while (!job_is_cancelled(&s->common.job) && !s->should_complete) {
946+
while (!job_cancel_requested(&s->common.job) && !s->should_complete) {
950947
job_yield(&s->common.job);
951948
}
952-
s->common.job.cancelled = false;
953949
goto immediate_exit;
954950
}
955951

@@ -1010,6 +1006,11 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
10101006

10111007
job_pause_point(&s->common.job);
10121008

1009+
if (job_is_cancelled(&s->common.job)) {
1010+
ret = 0;
1011+
goto immediate_exit;
1012+
}
1013+
10131014
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
10141015
/* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
10151016
* the number of bytes currently being processed; together those are
@@ -1036,7 +1037,7 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
10361037
should_complete = false;
10371038
if (s->in_flight == 0 && cnt == 0) {
10381039
trace_mirror_before_flush(s);
1039-
if (!s->synced) {
1040+
if (!job_is_ready(&s->common.job)) {
10401041
if (mirror_flush(s) < 0) {
10411042
/* Go check s->ret. */
10421043
continue;
@@ -1047,14 +1048,13 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
10471048
* the target in a consistent state.
10481049
*/
10491050
job_transition_to_ready(&s->common.job);
1050-
s->synced = true;
10511051
if (s->copy_mode != MIRROR_COPY_MODE_BACKGROUND) {
10521052
s->actively_synced = true;
10531053
}
10541054
}
10551055

10561056
should_complete = s->should_complete ||
1057-
job_is_cancelled(&s->common.job);
1057+
job_cancel_requested(&s->common.job);
10581058
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
10591059
}
10601060

@@ -1084,24 +1084,17 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
10841084
* completion.
10851085
*/
10861086
assert(QLIST_EMPTY(&bs->tracked_requests));
1087-
s->common.job.cancelled = false;
10881087
need_drain = false;
10891088
break;
10901089
}
10911090

1092-
ret = 0;
1093-
1094-
if (s->synced && !should_complete) {
1091+
if (job_is_ready(&s->common.job) && !should_complete) {
10951092
delay_ns = (s->in_flight == 0 &&
10961093
cnt == 0 ? BLOCK_JOB_SLICE_TIME : 0);
10971094
}
1098-
trace_mirror_before_sleep(s, cnt, s->synced, delay_ns);
1095+
trace_mirror_before_sleep(s, cnt, job_is_ready(&s->common.job),
1096+
delay_ns);
10991097
job_sleep_ns(&s->common.job, delay_ns);
1100-
if (job_is_cancelled(&s->common.job) &&
1101-
(!s->synced || s->common.job.force_cancel))
1102-
{
1103-
break;
1104-
}
11051098
s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
11061099
}
11071100

@@ -1111,8 +1104,7 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
11111104
* or it was cancelled prematurely so that we do not guarantee that
11121105
* the target is a copy of the source.
11131106
*/
1114-
assert(ret < 0 || ((s->common.job.force_cancel || !s->synced) &&
1115-
job_is_cancelled(&s->common.job)));
1107+
assert(ret < 0 || job_is_cancelled(&s->common.job));
11161108
assert(need_drain);
11171109
mirror_wait_for_all_io(s);
11181110
}
@@ -1135,7 +1127,7 @@ static void mirror_complete(Job *job, Error **errp)
11351127
{
11361128
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
11371129

1138-
if (!s->synced) {
1130+
if (!job_is_ready(job)) {
11391131
error_setg(errp, "The active block job '%s' cannot be completed",
11401132
job->id);
11411133
return;
@@ -1190,21 +1182,34 @@ static bool mirror_drained_poll(BlockJob *job)
11901182
* from one of our own drain sections, to avoid a deadlock waiting for
11911183
* ourselves.
11921184
*/
1193-
if (!s->common.job.paused && !s->common.job.cancelled && !s->in_drain) {
1185+
if (!s->common.job.paused && !job_is_cancelled(&job->job) && !s->in_drain) {
11941186
return true;
11951187
}
11961188

11971189
return !!s->in_flight;
11981190
}
11991191

1200-
static void mirror_cancel(Job *job, bool force)
1192+
static bool mirror_cancel(Job *job, bool force)
12011193
{
12021194
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
12031195
BlockDriverState *target = blk_bs(s->target);
12041196

1205-
if (force || !job_is_ready(job)) {
1197+
/*
1198+
* Before the job is READY, we treat any cancellation like a
1199+
* force-cancellation.
1200+
*/
1201+
force = force || !job_is_ready(job);
1202+
1203+
if (force) {
12061204
bdrv_cancel_in_flight(target);
12071205
}
1206+
return force;
1207+
}
1208+
1209+
static bool commit_active_cancel(Job *job, bool force)
1210+
{
1211+
/* Same as above in mirror_cancel() */
1212+
return force || !job_is_ready(job);
12081213
}
12091214

12101215
static const BlockJobDriver mirror_job_driver = {
@@ -1234,6 +1239,7 @@ static const BlockJobDriver commit_active_job_driver = {
12341239
.abort = mirror_abort,
12351240
.pause = mirror_pause,
12361241
.complete = mirror_complete,
1242+
.cancel = commit_active_cancel,
12371243
},
12381244
.drained_poll = mirror_drained_poll,
12391245
};
@@ -1417,6 +1423,7 @@ static int coroutine_fn bdrv_mirror_top_do_write(BlockDriverState *bs,
14171423
bool copy_to_target;
14181424

14191425
copy_to_target = s->job->ret >= 0 &&
1426+
!job_is_cancelled(&s->job->common.job) &&
14201427
s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
14211428

14221429
if (copy_to_target) {
@@ -1465,6 +1472,7 @@ static int coroutine_fn bdrv_mirror_top_pwritev(BlockDriverState *bs,
14651472
bool copy_to_target;
14661473

14671474
copy_to_target = s->job->ret >= 0 &&
1475+
!job_is_cancelled(&s->job->common.job) &&
14681476
s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
14691477

14701478
if (copy_to_target) {

block/replication.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ static void replication_close(BlockDriverState *bs)
149149
if (s->stage == BLOCK_REPLICATION_FAILOVER) {
150150
commit_job = &s->commit_job->job;
151151
assert(commit_job->aio_context == qemu_get_current_aio_context());
152-
job_cancel_sync(commit_job);
152+
job_cancel_sync(commit_job, false);
153153
}
154154

155155
if (s->mode == REPLICATION_MODE_SECONDARY) {
@@ -726,7 +726,7 @@ static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
726726
* disk, secondary disk in backup_job_completed().
727727
*/
728728
if (s->backup_job) {
729-
job_cancel_sync(&s->backup_job->job);
729+
job_cancel_sync(&s->backup_job->job, true);
730730
}
731731

732732
if (!failover) {

blockdev.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1847,7 +1847,7 @@ static void drive_backup_abort(BlkActionState *common)
18471847
aio_context = bdrv_get_aio_context(state->bs);
18481848
aio_context_acquire(aio_context);
18491849

1850-
job_cancel_sync(&state->job->job);
1850+
job_cancel_sync(&state->job->job, true);
18511851

18521852
aio_context_release(aio_context);
18531853
}
@@ -1948,7 +1948,7 @@ static void blockdev_backup_abort(BlkActionState *common)
19481948
aio_context = bdrv_get_aio_context(state->bs);
19491949
aio_context_acquire(aio_context);
19501950

1951-
job_cancel_sync(&state->job->job);
1951+
job_cancel_sync(&state->job->job, true);
19521952

19531953
aio_context_release(aio_context);
19541954
}

include/qemu/job.h

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,17 @@ struct JobDriver {
253253

254254
/**
255255
* If the callback is not NULL, it will be invoked in job_cancel_async
256+
*
257+
* This function must return true if the job will be cancelled
258+
* immediately without any further I/O (mandatory if @force is
259+
* true), and false otherwise. This lets the generic job layer
260+
* know whether a job has been truly (force-)cancelled, or whether
261+
* it is just in a special completion mode (like mirror after
262+
* READY).
263+
* (If the callback is NULL, the job is assumed to terminate
264+
* without I/O.)
256265
*/
257-
void (*cancel)(Job *job, bool force);
266+
bool (*cancel)(Job *job, bool force);
258267

259268

260269
/** Called when the job is freed */
@@ -427,9 +436,15 @@ const char *job_type_str(const Job *job);
427436
/** Returns true if the job should not be visible to the management layer. */
428437
bool job_is_internal(Job *job);
429438

430-
/** Returns whether the job is scheduled for cancellation. */
439+
/** Returns whether the job is being cancelled. */
431440
bool job_is_cancelled(Job *job);
432441

442+
/**
443+
* Returns whether the job is scheduled for cancellation (at an
444+
* indefinite point).
445+
*/
446+
bool job_cancel_requested(Job *job);
447+
433448
/** Returns whether the job is in a completed state. */
434449
bool job_is_completed(Job *job);
435450

@@ -506,18 +521,18 @@ void job_user_cancel(Job *job, bool force, Error **errp);
506521

507522
/**
508523
* Synchronously cancel the @job. The completion callback is called
509-
* before the function returns. The job may actually complete
510-
* instead of canceling itself; the circumstances under which this
511-
* happens depend on the kind of job that is active.
524+
* before the function returns. If @force is false, the job may
525+
* actually complete instead of canceling itself; the circumstances
526+
* under which this happens depend on the kind of job that is active.
512527
*
513528
* Returns the return value from the job if the job actually completed
514529
* during the call, or -ECANCELED if it was canceled.
515530
*
516531
* Callers must hold the AioContext lock of job->aio_context.
517532
*/
518-
int job_cancel_sync(Job *job);
533+
int job_cancel_sync(Job *job, bool force);
519534

520-
/** Synchronously cancels all jobs using job_cancel_sync(). */
535+
/** Synchronously force-cancels all jobs using job_cancel_sync(). */
521536
void job_cancel_sync_all(void);
522537

523538
/**

0 commit comments

Comments
 (0)