Skip to content

Commit bbacdd7

Browse files
authored
Migration: Re-use async batch infrastructure (#1343)
1 parent 281ede9 commit bbacdd7

File tree

5 files changed

+112
-171
lines changed

5 files changed

+112
-171
lines changed

includes/class-dispatcher.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public static function init() {
4545
\add_filter( 'activitypub_additional_inboxes', array( self::class, 'add_inboxes_by_mentioned_actors' ), 10, 3 );
4646
\add_filter( 'activitypub_additional_inboxes', array( self::class, 'add_inboxes_of_replied_urls' ), 10, 3 );
4747
\add_filter( 'activitypub_additional_inboxes', array( self::class, 'add_inboxes_of_relays' ), 10, 3 );
48+
49+
Scheduler::register_async_batch_callback( 'activitypub_send_activity', array( self::class, 'send_to_followers' ) );
50+
Scheduler::register_async_batch_callback( 'activitypub_retry_activity', array( self::class, 'retry_send_to_followers' ) );
4851
}
4952

5053
/**

includes/class-migration.php

Lines changed: 14 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ class Migration {
2424
* Initialize the class, registering WordPress hooks.
2525
*/
2626
public static function init() {
27-
\add_action( 'activitypub_migrate', array( self::class, 'async_migration' ) );
28-
\add_action( 'activitypub_upgrade', array( self::class, 'async_upgrade' ), 10, 99 );
29-
\add_action( 'activitypub_update_comment_counts', array( self::class, 'update_comment_counts' ), 10, 2 );
30-
3127
self::maybe_migrate();
28+
29+
Scheduler::register_async_batch_callback( 'activitypub_migrate_from_0_17', array( self::class, 'migrate_from_0_17' ) );
30+
Scheduler::register_async_batch_callback( 'activitypub_update_comment_counts', array( self::class, 'update_comment_counts' ) );
31+
Scheduler::register_async_batch_callback( 'activitypub_create_post_outbox_items', array( self::class, 'create_post_outbox_items' ) );
32+
Scheduler::register_async_batch_callback( 'activitypub_create_comment_outbox_items', array( self::class, 'create_comment_outbox_items' ) );
3233
}
3334

3435
/**
@@ -122,13 +123,12 @@ public static function maybe_migrate() {
122123
$version_from_db = ACTIVITYPUB_PLUGIN_VERSION;
123124
}
124125

125-
// Schedule the async migration.
126-
if ( ! \wp_next_scheduled( 'activitypub_migrate', $version_from_db ) ) {
127-
\wp_schedule_single_event( \time(), 'activitypub_migrate', array( $version_from_db ) );
128-
}
129126
if ( \version_compare( $version_from_db, '0.17.0', '<' ) ) {
130127
self::migrate_from_0_16();
131128
}
129+
if ( \version_compare( $version_from_db, '1.0.0', '<' ) ) {
130+
\wp_schedule_single_event( \time(), 'activitypub_migrate_from_0_17' );
131+
}
132132
if ( \version_compare( $version_from_db, '1.3.0', '<' ) ) {
133133
self::migrate_from_1_2_0();
134134
}
@@ -160,8 +160,9 @@ public static function maybe_migrate() {
160160
add_action( 'init', 'flush_rewrite_rules', 20 );
161161
}
162162
if ( \version_compare( $version_from_db, '5.0.0', '<' ) ) {
163-
\wp_schedule_single_event( \time(), 'activitypub_upgrade', array( 'create_post_outbox_items' ) );
164-
\wp_schedule_single_event( \time() + 15, 'activitypub_upgrade', array( 'create_comment_outbox_items' ) );
163+
Scheduler::register_schedules();
164+
\wp_schedule_single_event( \time(), 'activitypub_create_post_outbox_items' );
165+
\wp_schedule_single_event( \time() + 15, 'activitypub_create_comment_outbox_items' );
165166
add_action( 'init', 'flush_rewrite_rules', 20 );
166167
}
167168
if ( \version_compare( $version_from_db, '5.4.0', '<' ) ) {
@@ -231,49 +232,6 @@ public static function maybe_migrate() {
231232
self::unlock();
232233
}
233234

234-
/**
235-
* Asynchronously migrates the database structure.
236-
*
237-
* @param string $version_from_db The version from which to migrate.
238-
*/
239-
public static function async_migration( $version_from_db ) {
240-
if ( \version_compare( $version_from_db, '1.0.0', '<' ) ) {
241-
self::migrate_from_0_17();
242-
}
243-
}
244-
245-
/**
246-
* Asynchronously runs upgrade routines.
247-
*
248-
* @param callable $callback Callable upgrade routine. Must be a method of this class.
249-
* @params mixed ...$args Optional. Parameters that get passed to the callback.
250-
*/
251-
public static function async_upgrade( $callback ) {
252-
$args = \func_get_args();
253-
254-
// Bail if the existing lock is still valid.
255-
if ( self::is_locked() ) {
256-
\wp_schedule_single_event( time() + MINUTE_IN_SECONDS, 'activitypub_upgrade', $args );
257-
return;
258-
}
259-
260-
self::lock();
261-
262-
$callback = array_shift( $args ); // Remove $callback from arguments.
263-
$next = \call_user_func_array( array( self::class, $callback ), $args );
264-
265-
self::unlock();
266-
267-
if ( ! empty( $next ) ) {
268-
// Schedule the next run, adding the result to the arguments.
269-
\wp_schedule_single_event(
270-
\time() + 30,
271-
'activitypub_upgrade',
272-
\array_merge( array( $callback ), \array_values( $next ) )
273-
);
274-
}
275-
}
276-
277235
/**
278236
* Updates the custom template to use shortcodes instead of the deprecated templates.
279237
*/
@@ -515,25 +473,12 @@ public static function migrate_to_4_7_2() {
515473
* @see Comment::pre_wp_update_comment_count_now()
516474
* @param int $batch_size Optional. Number of posts to process per batch. Default 100.
517475
* @param int $offset Optional. Number of posts to skip. Default 0.
476+
*
477+
* @return int[]|void Array with batch size and offset if there are more posts to process.
518478
*/
519479
public static function update_comment_counts( $batch_size = 100, $offset = 0 ) {
520480
global $wpdb;
521481

522-
// Bail if the existing lock is still valid.
523-
if ( self::is_locked() ) {
524-
\wp_schedule_single_event(
525-
time() + ( 5 * MINUTE_IN_SECONDS ),
526-
'activitypub_update_comment_counts',
527-
array(
528-
'batch_size' => $batch_size,
529-
'offset' => $offset,
530-
)
531-
);
532-
return;
533-
}
534-
535-
self::lock();
536-
537482
Comment::register_comment_types();
538483
$comment_types = Comment::get_comment_type_slugs();
539484
$type_inclusion = "AND comment_type IN ('" . implode( "','", $comment_types ) . "')";
@@ -554,17 +499,8 @@ public static function update_comment_counts( $batch_size = 100, $offset = 0 ) {
554499

555500
if ( count( $post_ids ) === $batch_size ) {
556501
// Schedule next batch.
557-
\wp_schedule_single_event(
558-
time() + MINUTE_IN_SECONDS,
559-
'activitypub_update_comment_counts',
560-
array(
561-
'batch_size' => $batch_size,
562-
'offset' => $offset + $batch_size,
563-
)
564-
);
502+
return array( $batch_size, $offset + $batch_size );
565503
}
566-
567-
self::unlock();
568504
}
569505

570506
/**

includes/class-scheduler.php

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,12 @@ class Scheduler {
3636
public static function init() {
3737
self::register_schedulers();
3838

39-
self::$batch_callbacks = array(
40-
'activitypub_send_activity' => array( Dispatcher::class, 'send_to_followers' ),
41-
'activitypub_retry_activity' => array( Dispatcher::class, 'retry_send_to_followers' ),
42-
);
43-
4439
// Follower Cleanups.
4540
\add_action( 'activitypub_update_remote_actors', array( self::class, 'update_remote_actors' ) );
4641
\add_action( 'activitypub_cleanup_remote_actors', array( self::class, 'cleanup_remote_actors' ) );
4742

4843
// Event callbacks.
4944
\add_action( 'activitypub_async_batch', array( self::class, 'async_batch' ), 10, 99 );
50-
\add_action( 'activitypub_send_activity', array( self::class, 'async_batch' ), 10, 3 );
51-
\add_action( 'activitypub_retry_activity', array( self::class, 'async_batch' ), 10, 3 );
5245
\add_action( 'activitypub_reprocess_outbox', array( self::class, 'reprocess_outbox' ) );
5346
\add_action( 'activitypub_outbox_purge', array( self::class, 'purge_outbox' ) );
5447

@@ -74,6 +67,28 @@ public static function register_schedulers() {
7467
do_action( 'activitypub_register_schedulers' );
7568
}
7669

70+
/**
71+
* Register a batch callback for async processing.
72+
*
73+
* @param string $hook The cron event hook name.
74+
* @param callable $callback The callback to execute.
75+
*/
76+
public static function register_async_batch_callback( $hook, $callback ) {
77+
if ( \did_action( 'init' ) && ! \doing_action( 'init' ) ) {
78+
\_doing_it_wrong( __METHOD__, 'Async batch callbacks should be registered before or during the init action.', 'unreleased' );
79+
return;
80+
}
81+
82+
if ( ! \is_callable( $callback ) ) {
83+
return;
84+
}
85+
86+
self::$batch_callbacks[ $hook ] = $callback;
87+
88+
// Register the WordPress action hook to trigger async_batch.
89+
\add_action( $hook, array( self::class, 'async_batch' ), 10, 99 );
90+
}
91+
7792
/**
7893
* Schedule all ActivityPub schedules.
7994
*/
@@ -336,7 +351,7 @@ public static function async_batch() {
336351
return;
337352
}
338353

339-
$key = \md5( \serialize( $args[0] ?? $args ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize
354+
$key = \md5( \serialize( $callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize
340355

341356
// Bail if the existing lock is still valid.
342357
if ( self::is_locked( $key ) ) {

tests/includes/class-test-migration.php

Lines changed: 7 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Activitypub\Collection\Remote_Actors;
1616
use Activitypub\Comment;
1717
use Activitypub\Migration;
18+
use Activitypub\Scheduler;
1819

1920
/**
2021
* Test class for Activitypub Migrate.
@@ -168,23 +169,6 @@ public function test_migrate_actor_mode() {
168169
$this->assertEquals( ACTIVITYPUB_ACTOR_MODE, \get_option( 'activitypub_actor_mode', ACTIVITYPUB_ACTOR_MODE ) );
169170
}
170171

171-
/**
172-
* Tests scheduling of migration.
173-
*
174-
* @covers ::maybe_migrate
175-
*/
176-
public function test_migration_scheduling() {
177-
update_option( 'activitypub_db_version', '0.0.1' );
178-
179-
Migration::maybe_migrate();
180-
181-
$schedule = \wp_next_scheduled( 'activitypub_migrate', array( '0.0.1' ) );
182-
$this->assertNotFalse( $schedule );
183-
184-
// Clean up.
185-
delete_option( 'activitypub_db_version' );
186-
}
187-
188172
/**
189173
* Test migrate to 4.1.0.
190174
*
@@ -397,12 +381,12 @@ public function test_update_comment_counts_with_lock() {
397381
Comment::register_comment_types();
398382

399383
// Create test comments.
400-
$post_id = $this->factory->post->create(
384+
$post_id = self::factory()->post->create(
401385
array(
402386
'post_author' => 1,
403387
)
404388
);
405-
$comment_id = $this->factory->comment->create(
389+
$comment_id = self::factory()->comment->create(
406390
array(
407391
'comment_post_ID' => $post_id,
408392
'comment_approved' => '1',
@@ -420,41 +404,6 @@ public function test_update_comment_counts_with_lock() {
420404
wp_delete_post( $post_id, true );
421405
}
422406

423-
/**
424-
* Test update_comment_counts() with existing valid lock.
425-
*
426-
* @covers ::update_comment_counts
427-
*/
428-
public function test_update_comment_counts_with_existing_valid_lock() {
429-
// Register comment types.
430-
Comment::register_comment_types();
431-
432-
// Set a lock.
433-
Migration::lock();
434-
435-
Migration::update_comment_counts( 10, 0 );
436-
437-
// Verify a scheduled event was created.
438-
$next_scheduled = wp_next_scheduled(
439-
'activitypub_update_comment_counts',
440-
array(
441-
'batch_size' => 10,
442-
'offset' => 0,
443-
)
444-
);
445-
$this->assertNotFalse( $next_scheduled );
446-
447-
// Clean up.
448-
delete_option( 'activitypub_migration_lock' );
449-
wp_clear_scheduled_hook(
450-
'activitypub_update_comment_counts',
451-
array(
452-
'batch_size' => 10,
453-
'offset' => 0,
454-
)
455-
);
456-
}
457-
458407
/**
459408
* Test create post outbox items.
460409
*
@@ -526,43 +475,16 @@ public function test_create_outbox_items_batching() {
526475
$this->assertEquals( 5, count( $outbox_items ) );
527476
}
528477

529-
/**
530-
* Test async upgrade functionality.
531-
*
532-
* @covers ::async_upgrade
533-
* @covers ::lock
534-
* @covers ::unlock
535-
* @covers ::create_post_outbox_items
536-
*/
537-
public function test_async_upgrade() {
538-
// Test that lock prevents simultaneous upgrades.
539-
Migration::lock();
540-
Migration::async_upgrade( 'create_post_outbox_items' );
541-
$scheduled = \wp_next_scheduled( 'activitypub_upgrade', array( 'create_post_outbox_items' ) );
542-
$this->assertNotFalse( $scheduled );
543-
Migration::unlock();
544-
545-
// Test scheduling next batch when callback returns more work.
546-
Migration::async_upgrade( 'create_post_outbox_items', 1, 0 ); // Small batch size to force multiple batches.
547-
$scheduled = \wp_next_scheduled( 'activitypub_upgrade', array( 'create_post_outbox_items', 1, 1 ) );
548-
$this->assertNotFalse( $scheduled );
549-
550-
// Test no scheduling when callback returns null (no more work).
551-
Migration::async_upgrade( 'create_post_outbox_items', 100, 1000 ); // Large offset to ensure no posts found.
552-
$this->assertFalse(
553-
\wp_next_scheduled( 'activitypub_upgrade', array( 'create_post_outbox_items', 100, 1100 ) )
554-
);
555-
}
556-
557478
/**
558479
* Test async upgrade with multiple arguments.
559480
*
560-
* @covers ::async_upgrade
481+
* @covers ::update_comment_counts
482+
* @covers \Activitypub\Scheduler::async_batch
561483
*/
562484
public function test_async_upgrade_multiple_args() {
563485
// Test that multiple arguments are passed correctly.
564-
Migration::async_upgrade( 'update_comment_counts', 50, 100 );
565-
$scheduled = \wp_next_scheduled( 'activitypub_upgrade', array( 'update_comment_counts', 50, 150 ) );
486+
Scheduler::async_batch( array( Migration::class, 'update_comment_counts' ), 50, 100 );
487+
$scheduled = \wp_next_scheduled( 'activitypub_async_batch', array( array( Migration::class, 'update_comment_counts' ), 50, 150 ) );
566488
$this->assertFalse( $scheduled, 'Should not schedule next batch when no comments found' );
567489
}
568490

0 commit comments

Comments
 (0)