Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions includes/class-dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public static function init() {
\add_filter( 'activitypub_additional_inboxes', array( self::class, 'add_inboxes_by_mentioned_actors' ), 10, 3 );
\add_filter( 'activitypub_additional_inboxes', array( self::class, 'add_inboxes_of_replied_urls' ), 10, 3 );
\add_filter( 'activitypub_additional_inboxes', array( self::class, 'add_inboxes_of_relays' ), 10, 3 );

Scheduler::register_async_batch_callback( 'activitypub_send_activity', array( self::class, 'send_to_followers' ) );
Scheduler::register_async_batch_callback( 'activitypub_retry_activity', array( self::class, 'retry_send_to_followers' ) );
}

/**
Expand Down
92 changes: 14 additions & 78 deletions includes/class-migration.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ class Migration {
* Initialize the class, registering WordPress hooks.
*/
public static function init() {
\add_action( 'activitypub_migrate', array( self::class, 'async_migration' ) );
\add_action( 'activitypub_upgrade', array( self::class, 'async_upgrade' ), 10, 99 );
\add_action( 'activitypub_update_comment_counts', array( self::class, 'update_comment_counts' ), 10, 2 );

self::maybe_migrate();

Scheduler::register_async_batch_callback( 'activitypub_migrate_from_0_17', array( self::class, 'migrate_from_0_17' ) );
Scheduler::register_async_batch_callback( 'activitypub_update_comment_counts', array( self::class, 'update_comment_counts' ) );
Scheduler::register_async_batch_callback( 'activitypub_create_post_outbox_items', array( self::class, 'create_post_outbox_items' ) );
Scheduler::register_async_batch_callback( 'activitypub_create_comment_outbox_items', array( self::class, 'create_comment_outbox_items' ) );
}

/**
Expand Down Expand Up @@ -122,13 +123,12 @@ public static function maybe_migrate() {
$version_from_db = ACTIVITYPUB_PLUGIN_VERSION;
}

// Schedule the async migration.
if ( ! \wp_next_scheduled( 'activitypub_migrate', $version_from_db ) ) {
\wp_schedule_single_event( \time(), 'activitypub_migrate', array( $version_from_db ) );
}
if ( \version_compare( $version_from_db, '0.17.0', '<' ) ) {
self::migrate_from_0_16();
}
if ( \version_compare( $version_from_db, '1.0.0', '<' ) ) {
\wp_schedule_single_event( \time(), 'activitypub_migrate_from_0_17' );
}
if ( \version_compare( $version_from_db, '1.3.0', '<' ) ) {
self::migrate_from_1_2_0();
}
Expand Down Expand Up @@ -160,8 +160,9 @@ public static function maybe_migrate() {
add_action( 'init', 'flush_rewrite_rules', 20 );
}
if ( \version_compare( $version_from_db, '5.0.0', '<' ) ) {
\wp_schedule_single_event( \time(), 'activitypub_upgrade', array( 'create_post_outbox_items' ) );
\wp_schedule_single_event( \time() + 15, 'activitypub_upgrade', array( 'create_comment_outbox_items' ) );
Scheduler::register_schedules();
\wp_schedule_single_event( \time(), 'activitypub_create_post_outbox_items' );
\wp_schedule_single_event( \time() + 15, 'activitypub_create_comment_outbox_items' );
add_action( 'init', 'flush_rewrite_rules', 20 );
}
if ( \version_compare( $version_from_db, '5.4.0', '<' ) ) {
Expand Down Expand Up @@ -231,49 +232,6 @@ public static function maybe_migrate() {
self::unlock();
}

/**
* Asynchronously migrates the database structure.
*
* @param string $version_from_db The version from which to migrate.
*/
public static function async_migration( $version_from_db ) {
if ( \version_compare( $version_from_db, '1.0.0', '<' ) ) {
self::migrate_from_0_17();
}
}

/**
* Asynchronously runs upgrade routines.
*
* @param callable $callback Callable upgrade routine. Must be a method of this class.
* @params mixed ...$args Optional. Parameters that get passed to the callback.
*/
public static function async_upgrade( $callback ) {
$args = \func_get_args();

// Bail if the existing lock is still valid.
if ( self::is_locked() ) {
\wp_schedule_single_event( time() + MINUTE_IN_SECONDS, 'activitypub_upgrade', $args );
return;
}

self::lock();

$callback = array_shift( $args ); // Remove $callback from arguments.
$next = \call_user_func_array( array( self::class, $callback ), $args );

self::unlock();

if ( ! empty( $next ) ) {
// Schedule the next run, adding the result to the arguments.
\wp_schedule_single_event(
\time() + 30,
'activitypub_upgrade',
\array_merge( array( $callback ), \array_values( $next ) )
);
}
}

/**
* Updates the custom template to use shortcodes instead of the deprecated templates.
*/
Expand Down Expand Up @@ -515,25 +473,12 @@ public static function migrate_to_4_7_2() {
* @see Comment::pre_wp_update_comment_count_now()
* @param int $batch_size Optional. Number of posts to process per batch. Default 100.
* @param int $offset Optional. Number of posts to skip. Default 0.
*
* @return int[]|void Array with batch size and offset if there are more posts to process.
*/
public static function update_comment_counts( $batch_size = 100, $offset = 0 ) {
global $wpdb;

// Bail if the existing lock is still valid.
if ( self::is_locked() ) {
\wp_schedule_single_event(
time() + ( 5 * MINUTE_IN_SECONDS ),
'activitypub_update_comment_counts',
array(
'batch_size' => $batch_size,
'offset' => $offset,
)
);
return;
}

self::lock();

Comment::register_comment_types();
$comment_types = Comment::get_comment_type_slugs();
$type_inclusion = "AND comment_type IN ('" . implode( "','", $comment_types ) . "')";
Expand All @@ -554,17 +499,8 @@ public static function update_comment_counts( $batch_size = 100, $offset = 0 ) {

if ( count( $post_ids ) === $batch_size ) {
// Schedule next batch.
\wp_schedule_single_event(
time() + MINUTE_IN_SECONDS,
'activitypub_update_comment_counts',
array(
'batch_size' => $batch_size,
'offset' => $offset + $batch_size,
)
);
return array( $batch_size, $offset + $batch_size );
}

self::unlock();
}

/**
Expand Down
31 changes: 23 additions & 8 deletions includes/class-scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,12 @@ class Scheduler {
public static function init() {
self::register_schedulers();

self::$batch_callbacks = array(
'activitypub_send_activity' => array( Dispatcher::class, 'send_to_followers' ),
'activitypub_retry_activity' => array( Dispatcher::class, 'retry_send_to_followers' ),
);

// Follower Cleanups.
\add_action( 'activitypub_update_remote_actors', array( self::class, 'update_remote_actors' ) );
\add_action( 'activitypub_cleanup_remote_actors', array( self::class, 'cleanup_remote_actors' ) );

// Event callbacks.
\add_action( 'activitypub_async_batch', array( self::class, 'async_batch' ), 10, 99 );
\add_action( 'activitypub_send_activity', array( self::class, 'async_batch' ), 10, 3 );
\add_action( 'activitypub_retry_activity', array( self::class, 'async_batch' ), 10, 3 );
\add_action( 'activitypub_reprocess_outbox', array( self::class, 'reprocess_outbox' ) );
\add_action( 'activitypub_outbox_purge', array( self::class, 'purge_outbox' ) );

Expand All @@ -74,6 +67,28 @@ public static function register_schedulers() {
do_action( 'activitypub_register_schedulers' );
}

/**
* Register a batch callback for async processing.
*
* @param string $hook The cron event hook name.
* @param callable $callback The callback to execute.
*/
public static function register_async_batch_callback( $hook, $callback ) {
if ( \did_action( 'init' ) && ! \doing_action( 'init' ) ) {
\_doing_it_wrong( __METHOD__, 'Async batch callbacks should be registered before or during the init action.', 'unreleased' );
return;
}

if ( ! \is_callable( $callback ) ) {
return;
}

self::$batch_callbacks[ $hook ] = $callback;

// Register the WordPress action hook to trigger async_batch.
\add_action( $hook, array( self::class, 'async_batch' ), 10, 99 );
}

/**
* Schedule all ActivityPub schedules.
*/
Expand Down Expand Up @@ -336,7 +351,7 @@ public static function async_batch() {
return;
}

$key = \md5( \serialize( $args[0] ?? $args ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize
$key = \md5( \serialize( $callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize

// Bail if the existing lock is still valid.
if ( self::is_locked( $key ) ) {
Expand Down
92 changes: 7 additions & 85 deletions tests/includes/class-test-migration.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Activitypub\Collection\Remote_Actors;
use Activitypub\Comment;
use Activitypub\Migration;
use Activitypub\Scheduler;

/**
* Test class for Activitypub Migrate.
Expand Down Expand Up @@ -168,23 +169,6 @@ public function test_migrate_actor_mode() {
$this->assertEquals( ACTIVITYPUB_ACTOR_MODE, \get_option( 'activitypub_actor_mode', ACTIVITYPUB_ACTOR_MODE ) );
}

/**
* Tests scheduling of migration.
*
* @covers ::maybe_migrate
*/
public function test_migration_scheduling() {
update_option( 'activitypub_db_version', '0.0.1' );

Migration::maybe_migrate();

$schedule = \wp_next_scheduled( 'activitypub_migrate', array( '0.0.1' ) );
$this->assertNotFalse( $schedule );

// Clean up.
delete_option( 'activitypub_db_version' );
}

/**
* Test migrate to 4.1.0.
*
Expand Down Expand Up @@ -397,12 +381,12 @@ public function test_update_comment_counts_with_lock() {
Comment::register_comment_types();

// Create test comments.
$post_id = $this->factory->post->create(
$post_id = self::factory()->post->create(
array(
'post_author' => 1,
)
);
$comment_id = $this->factory->comment->create(
$comment_id = self::factory()->comment->create(
array(
'comment_post_ID' => $post_id,
'comment_approved' => '1',
Expand All @@ -420,41 +404,6 @@ public function test_update_comment_counts_with_lock() {
wp_delete_post( $post_id, true );
}

/**
* Test update_comment_counts() with existing valid lock.
*
* @covers ::update_comment_counts
*/
public function test_update_comment_counts_with_existing_valid_lock() {
// Register comment types.
Comment::register_comment_types();

// Set a lock.
Migration::lock();

Migration::update_comment_counts( 10, 0 );

// Verify a scheduled event was created.
$next_scheduled = wp_next_scheduled(
'activitypub_update_comment_counts',
array(
'batch_size' => 10,
'offset' => 0,
)
);
$this->assertNotFalse( $next_scheduled );

// Clean up.
delete_option( 'activitypub_migration_lock' );
wp_clear_scheduled_hook(
'activitypub_update_comment_counts',
array(
'batch_size' => 10,
'offset' => 0,
)
);
}

/**
* Test create post outbox items.
*
Expand Down Expand Up @@ -526,43 +475,16 @@ public function test_create_outbox_items_batching() {
$this->assertEquals( 5, count( $outbox_items ) );
}

/**
* Test async upgrade functionality.
*
* @covers ::async_upgrade
* @covers ::lock
* @covers ::unlock
* @covers ::create_post_outbox_items
*/
public function test_async_upgrade() {
// Test that lock prevents simultaneous upgrades.
Migration::lock();
Migration::async_upgrade( 'create_post_outbox_items' );
$scheduled = \wp_next_scheduled( 'activitypub_upgrade', array( 'create_post_outbox_items' ) );
$this->assertNotFalse( $scheduled );
Migration::unlock();

// Test scheduling next batch when callback returns more work.
Migration::async_upgrade( 'create_post_outbox_items', 1, 0 ); // Small batch size to force multiple batches.
$scheduled = \wp_next_scheduled( 'activitypub_upgrade', array( 'create_post_outbox_items', 1, 1 ) );
$this->assertNotFalse( $scheduled );

// Test no scheduling when callback returns null (no more work).
Migration::async_upgrade( 'create_post_outbox_items', 100, 1000 ); // Large offset to ensure no posts found.
$this->assertFalse(
\wp_next_scheduled( 'activitypub_upgrade', array( 'create_post_outbox_items', 100, 1100 ) )
);
}

/**
* Test async upgrade with multiple arguments.
*
* @covers ::async_upgrade
* @covers ::update_comment_counts
* @covers \Activitypub\Scheduler::async_batch
*/
public function test_async_upgrade_multiple_args() {
// Test that multiple arguments are passed correctly.
Migration::async_upgrade( 'update_comment_counts', 50, 100 );
$scheduled = \wp_next_scheduled( 'activitypub_upgrade', array( 'update_comment_counts', 50, 150 ) );
Scheduler::async_batch( array( Migration::class, 'update_comment_counts' ), 50, 100 );
$scheduled = \wp_next_scheduled( 'activitypub_async_batch', array( array( Migration::class, 'update_comment_counts' ), 50, 150 ) );
$this->assertFalse( $scheduled, 'Should not schedule next batch when no comments found' );
}

Expand Down
Loading