Skip to content

Commit e56af00

Browse files
committed
Add in thread controlled sync system
1 parent 97ac7d5 commit e56af00

File tree

3 files changed

+495
-301
lines changed

3 files changed

+495
-301
lines changed

php/class-sync.php

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
use Cloudinary\Component\Assets;
1111
use Cloudinary\Component\Setup;
12-
use Cloudinary\Component\Settings;
12+
use Cloudinary\Settings\Setting;
1313
use Cloudinary\Sync\Delete_Sync;
1414
use Cloudinary\Sync\Download_Sync;
1515
use Cloudinary\Sync\Push_Sync;
@@ -68,9 +68,9 @@ class Sync implements Setup, Assets {
6868
/**
6969
* Holds the sync settings object.
7070
*
71-
* @var Settings
71+
* @var Setting
7272
*/
73-
protected $settings;
73+
public $settings;
7474

7575
/**
7676
* Holds the meta keys for sync meta to maintain consistency.
@@ -91,6 +91,7 @@ class Sync implements Setup, Assets {
9191
'downloading' => '_cloudinary_downloading',
9292
'process_log' => '_process_log',
9393
'storage' => '_cloudinary_storage',
94+
'queued' => '_cloudinary_sync_queued',
9495
);
9596

9697
/**
@@ -792,18 +793,8 @@ public function set_signature_item( $attachment_id, $type, $value = null ) {
792793
*/
793794
public function init_background_upload() {
794795
if ( ! empty( $this->to_sync ) ) {
795-
796-
$threads = $this->managers['push']->queue->threads;
797-
$chunk_size = ceil( count( $this->to_sync ) / count( $threads ) ); // Max of 3 threads to prevent server overload.
798-
$chunks = array_chunk( $this->to_sync, $chunk_size );
799-
$token = uniqid();
800-
foreach ( $chunks as $key => $ids ) {
801-
$params = array(
802-
'process_key' => $token . '-' . $threads[ $key ],
803-
);
804-
set_transient( $params['process_key'], $ids, 120 );
805-
$this->plugin->components['api']->background_request( 'process', $params );
806-
}
796+
$this->managers['queue']->add_to_queue( $this->to_sync, 'autosync' );
797+
$this->managers['queue']->start_threads( 'autosync' );
807798
}
808799
}
809800

php/sync/class-push-sync.php

Lines changed: 26 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,6 @@ public function rest_endpoints( $endpoints ) {
122122
'permission_callback' => array( $this, 'rest_can_manage_options' ),
123123
);
124124

125-
$endpoints['process'] = array(
126-
'method' => \WP_REST_Server::CREATABLE,
127-
'callback' => array( $this, 'process_sync' ),
128-
'args' => array(),
129-
);
130-
131125
$endpoints['queue'] = array(
132126
'method' => \WP_REST_Server::CREATABLE,
133127
'callback' => array( $this, 'process_queue' ),
@@ -158,7 +152,7 @@ public function rest_get_queue_status() {
158152
return rest_ensure_response(
159153
array(
160154
'success' => true,
161-
'data' => $this->queue->get_queue_status(),
155+
'data' => $this->queue->is_running(),
162156
)
163157
);
164158
}
@@ -172,17 +166,18 @@ public function rest_get_queue_status() {
172166
*/
173167
public function rest_start_sync( \WP_REST_Request $request ) {
174168

175-
$stop = $request->get_param( 'stop' );
176-
$status = $this->queue->get_queue_status();
177-
if ( empty( $status['pending'] ) || ! empty( $stop ) ) {
169+
$type = $request->get_param( 'type' );
170+
$start = $this->queue->is_enabled();
171+
$state = array(
172+
'success' => false,
173+
);
174+
if ( empty( $start ) ) {
178175
$this->queue->stop_queue();
179-
180-
return $this->rest_get_queue_status(); // Nothing to sync.
176+
} else {
177+
$state['success'] = $this->queue->start_queue( $type );
181178
}
182179

183-
$this->queue->start_queue();
184-
185-
return $this->rest_get_queue_status();
180+
return rest_ensure_response( $state );
186181
}
187182

188183
/**
@@ -217,61 +212,37 @@ public function process_assets( $attachments = array() ) {
217212

218213
// Create synced post meta as a way to search for synced / unsynced items.
219214
update_post_meta( $attachment_id, Sync::META_KEYS['public_id'], $this->media->get_public_id( $attachment_id ) );
220-
}
221-
222-
return $stat;
223-
}
224-
225-
/**
226-
* Process assets to sync vai WP REST API.
227-
*
228-
* @param \WP_REST_Request $request The request.
229-
*
230-
* @return mixed|\WP_REST_Response
231-
*/
232-
public function process_sync( \WP_REST_Request $request ) {
233-
$process_key = $request->get_param( 'process_key' );
234-
$note = 'no process key';
235-
if ( ! empty( $process_key ) ) {
236-
$attachments = get_transient( $process_key );
237-
if ( ! empty( $attachments ) ) {
238-
delete_transient( $process_key );
239215

240-
return rest_ensure_response(
241-
array(
242-
'success' => true,
243-
'data' => $this->process_assets( $attachments ),
244-
)
245-
);
216+
$sync_thread = get_post_meta( $attachment_id, Sync::META_KEYS['queued'], true );
217+
if ( ! empty( $sync_thread ) ) {
218+
delete_post_meta( $attachment_id, Sync::META_KEYS['queued'] );
219+
delete_post_meta( $attachment_id, $sync_thread );
246220
}
247-
$note = 'no attachments';
248221
}
249222

250-
return rest_ensure_response(
251-
array(
252-
'success' => false,
253-
'note' => $note,
254-
)
255-
);
223+
return $stat;
256224
}
257225

258226
/**
259227
* Resume the bulk sync.
260228
*
261229
* @param \WP_REST_Request $request The request.
262-
*
263-
* @return void
264230
*/
265231
public function process_queue( \WP_REST_Request $request ) {
266-
$thread = $request->get_param( 'thread' );
267-
$queue = $this->queue->get_thread_queue( $thread );
268-
269-
if ( ! empty( $queue ) && $this->queue->is_running() ) {
232+
$thread = $request->get_param( 'thread' );
233+
$thread_type = $this->queue->get_thread_type( $thread );
234+
$queue = $this->queue->get_thread_queue( $thread );
235+
if ( ! empty( $queue['next'] ) && ( $this->queue->is_running( $thread_type ) ) ) {
270236
while ( $attachment_id = $this->queue->get_post( $thread ) ) { // phpcs:ignore WordPress.CodeAnalysis.AssignmentInCondition
237+
// translators: variable is thread name and asset ID.
238+
$action_message = sprintf( __( '%1$s: Syncing asset %2$d', 'cloudinary' ), $thread, $attachment_id );
239+
do_action( '_cloudinary_queue_action', $action_message );
271240
$this->process_assets( $attachment_id );
272-
$this->queue->mark( $attachment_id, 'done' );
273241
}
274-
$this->queue->stop_maybe();
242+
$this->queue->stop_maybe( $thread_type );
275243
}
244+
// translators: variable is thread name.
245+
$action_message = sprintf( __( 'Ending thread %s', 'cloudinary' ), $thread );
246+
do_action( '_cloudinary_queue_action', $action_message );
276247
}
277248
}

0 commit comments

Comments
 (0)