Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion src/base/DataType.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Cake\Utility\Hash;
use Craft;
use craft\base\Batchable;
use craft\base\Component;
use craft\helpers\UrlHelper;

Expand All @@ -12,8 +13,16 @@
* @property-read mixed $name
* @property-read mixed $class
*/
abstract class DataType extends Component
abstract class DataType extends Component implements Batchable
{
// Properties
// =========================================================================

/**
* @var array
*/
protected array $feedData = [];

// Public
// =========================================================================

Expand Down Expand Up @@ -59,4 +68,30 @@ public function setupPaginationUrl($array, $feed): void
// Replace the mapping value with the actual URL
$feed->paginationUrl = $url;
}

/**
* @inheritdoc
*/
public function getSlice(int $offset, int $limit): iterable
{
$feedData = $this->feedData;

if ($offset) {
$feedData = array_slice($feedData, $offset);
}

if ($limit) {
$feedData = array_slice($feedData, 0, $limit);
}

return $feedData;
}

/**
* @inheritdoc
*/
public function count(): int
{
return count($this->feedData);
}
}
3 changes: 2 additions & 1 deletion src/console/controllers/FeedsController.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use craft\feedme\Plugin;
use craft\feedme\queue\jobs\FeedImport;
use craft\helpers\Console;
use craft\helpers\Queue;
use yii\console\Controller;
use yii\console\ExitCode;

Expand Down Expand Up @@ -108,7 +109,7 @@ protected function queueFeed($feed, $limit = null, $offset = null, bool $continu
$this->stdout($feed->name, Console::FG_CYAN);
$this->stdout(' ... ');

$this->module->queue->push(new FeedImport([
Queue::push(new FeedImport([
'feed' => $feed,
'limit' => $limit,
'offset' => $offset,
Expand Down
5 changes: 3 additions & 2 deletions src/controllers/FeedsController.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use craft\feedme\Plugin;
use craft\feedme\queue\jobs\FeedImport;
use craft\helpers\Json;
use craft\helpers\Queue;
use craft\helpers\StringHelper;
use craft\web\Controller;
use Exception;
Expand Down Expand Up @@ -344,7 +345,7 @@ private function _runImportTask($feed): ?bool
Craft::$app->getSession()->setNotice(Craft::t('feed-me', 'Feed processing started.'));

// Create the import task
$this->module->queue->push(new FeedImport([
Queue::push(new FeedImport([
'feed' => $feed,
'limit' => $limit,
'offset' => $offset,
Expand All @@ -363,7 +364,7 @@ private function _runImportTask($feed): ?bool

// Create the import task only if provided the correct passkey
if ($proceed) {
$this->module->queue->push(new FeedImport([
Queue::push(new FeedImport([
'feed' => $feed,
'limit' => $limit,
'offset' => $offset,
Expand Down
1 change: 1 addition & 0 deletions src/datatypes/Csv.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public function getFeed($url, $settings, bool $usePrimaryElement = true): array
$array = Plugin::$plugin->data->findPrimaryElement($primaryElement, $array);
}

$this->feedData = $array;
return ['success' => true, 'data' => $array];
}

Expand Down
40 changes: 40 additions & 0 deletions src/datatypes/DataBatcher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php


namespace craft\feedme\datatypes;

use craft\base\Batchable;

class DataBatcher implements Batchable
{
public function __construct(
private array $data,
) {
}

/**
* @inheritdoc
*/
public function count(): int
{
return count($this->data);
}

/**
* @inheritdoc
*/
public function getSlice(int $offset, int $limit): iterable
{
$slice = $this->data;

if ($offset) {
$slice = array_slice($slice, $offset);
}

if ($limit) {
$slice = array_slice($slice, 0, $limit);
}

return $slice;
}
}
1 change: 1 addition & 0 deletions src/datatypes/GoogleSheet.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public function getFeed($url, $settings, bool $usePrimaryElement = true): array
$array = Plugin::$plugin->data->findPrimaryElement($primaryElement, $array);
}

$this->feedData = $array;
return ['success' => true, 'data' => $array];
}
}
1 change: 1 addition & 0 deletions src/datatypes/Json.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public function getFeed($url, $settings, bool $usePrimaryElement = true): array
$array = Plugin::$plugin->data->findPrimaryElement($primaryElement, $array);
}

$this->feedData = $array;
return ['success' => true, 'data' => $array];
}
}
1 change: 1 addition & 0 deletions src/datatypes/Xml.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public function getFeed($url, $settings, bool $usePrimaryElement = true): array
$array = Plugin::$plugin->data->findPrimaryElement($primaryElement, $array);
}

$this->feedData = $array;
return ['success' => true, 'data' => $array];
}
}
141 changes: 99 additions & 42 deletions src/queue/jobs/FeedImport.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@

namespace craft\feedme\queue\jobs;

use Cake\Utility\Hash;
use Craft;
use craft\base\Batchable;
use craft\feedme\datatypes\DataBatcher;
use craft\feedme\events\FeedProcessEvent;
use craft\feedme\models\FeedModel;
use craft\feedme\Plugin;
use craft\queue\BaseJob;
use craft\feedme\services\Process;
use craft\helpers\Queue;
use craft\queue\BaseBatchedJob;
use Throwable;
use yii\queue\RetryableJobInterface;

/**
*
* @property-read mixed $ttr
*/
class FeedImport extends BaseJob implements RetryableJobInterface
class FeedImport extends BaseBatchedJob implements RetryableJobInterface
{
// Properties
// =========================================================================
Expand Down Expand Up @@ -44,6 +50,22 @@ class FeedImport extends BaseJob implements RetryableJobInterface
*/
public bool $continueOnError = true;

/**
* @var mixed The Unix timestamp with microseconds of when the feed import started being processed
* @since 5.11.0
*/
public mixed $startTime = null;

/**
* @var array The Feed's settings as prepared by beforeProcessFeed()
*/
private array $_feedSettings = [];

/**
* @var int The index of currently processed item in current batch
*/
private int $_index = 0;

// Public Methods
// =========================================================================

Expand All @@ -65,73 +87,108 @@ public function canRetry($attempt, $error): bool
}

/**
* @inheritDoc
* @inheritdoc
*/
public function execute($queue): void
protected function loadData(): Batchable
{
try {
$feedData = $this->feed->getFeedData();
$feedData = $this->feed->getFeedData();

if ($this->offset) {
$feedData = array_slice($feedData, $this->offset);
}
if ($this->offset) {
$feedData = array_slice($feedData, $this->offset);
}

if ($this->limit) {
$feedData = array_slice($feedData, 0, $this->limit);
}
if ($this->limit) {
$feedData = array_slice($feedData, 0, $this->limit);
}

$data = $feedData;

// Do we even have any data to process?
if (!$feedData) {
Plugin::info('No feed items to process.');
return;
// Our main data-parsing function. Handles the actual data values, defaults and field options
foreach ($feedData as $key => $nodeData) {
if (!is_array($nodeData)) {
$nodeData = [$nodeData];
}

$feedSettings = Plugin::$plugin->process->beforeProcessFeed($this->feed, $feedData);
$data[$key] = Hash::flatten($nodeData, '/');
}

$data = array_values($data);

$feedData = $feedSettings['feedData'];
// Fire an 'onBeforeProcessFeed' event
$event = new FeedProcessEvent([
'feed' => $this->feed,
'feedData' => $data,
]);

$totalSteps = count($feedData);
Plugin::$plugin->process->trigger(Process::EVENT_BEFORE_PROCESS_FEED, $event);

$index = 0;
if (!$event->isValid) {
return new DataBatcher([]);
}

foreach ($feedData as $data) {
try {
Plugin::$plugin->process->processFeed($index, $feedSettings, $this->processedElementIds);
} catch (Throwable $e) {
if (!$this->continueOnError) {
throw $e;
}
// Allow event to modify the feed data
$data = $event->feedData;

// We want to catch any issues in each iteration of the loop (and log them), but this allows the
// rest of the feed to continue processing.
Plugin::error('`{e} - {f}: {l}`.', ['e' => $e->getMessage(), 'f' => basename($e->getFile()), 'l' => $e->getLine()]);
Craft::$app->getErrorHandler()->logException($e);
}
return new DataBatcher($data);
}

$this->setProgress($queue, $index++ / $totalSteps);
/**
* @inheritdoc
*/
protected function processItem(mixed $item): void
{
try {
Plugin::$plugin->process->processFeed($this->_index, $this->_feedSettings, $this->processedElementIds, $item, $this->batchIndex);
} catch (Throwable $e) {
if (!$this->continueOnError) {
throw $e;
}

// Check if we need to paginate the feed to run again
// We want to catch any issues in each iteration of the loop (and log them), but this allows the
// rest of the feed to continue processing.
Plugin::error('`{e} - {f}: {l}`.', ['e' => $e->getMessage(), 'f' => basename($e->getFile()), 'l' => $e->getLine()]);
Craft::$app->getErrorHandler()->logException($e);
}

$this->_index++;
}
/**
* @inheritDoc
*/
public function execute($queue): void
{
$processService = Plugin::$plugin->getProcess();
if ($this->itemOffset == 0) {
$processService->beforeProcessFeed($this->feed, (array)$this->data());
}

if (!$this->startTime) {
$this->startTime = $processService->time_start;
}

if (empty($this->_feedSettings)) {
$this->_feedSettings = $processService->getFeedSettings($this->feed, (array)$this->data());
}

parent::execute($queue);

// Check if we need to paginate the feed to run again
if ($this->itemOffset == $this->totalItems()) {
if ($this->feed->getNextPagination()) {
Plugin::getInstance()->queue->push(new self([
Queue::push(new self([
'feed' => $this->feed,
'limit' => $this->limit,
'offset' => $this->offset,
'processedElementIds' => $this->processedElementIds,
'startTime' => $this->startTime,
]));
} else {
// Only perform the afterProcessFeed function after any/all pagination is done
Plugin::$plugin->process->afterProcessFeed($feedSettings, $this->feed, $this->processedElementIds);
$processService->afterProcessFeed($this->_feedSettings, $this->feed, $this->processedElementIds, $this->startTime);
}
} catch (Throwable $e) {
// Even though we catch errors on each step of the loop, make sure to catch errors that can be anywhere
// else in this function, just to be super-safe and not cause the queue job to die.
Plugin::error('`{e} - {f}: {l}`.', ['e' => $e->getMessage(), 'f' => basename($e->getFile()), 'l' => $e->getLine()]);
Craft::$app->getErrorHandler()->logException($e);
}
}


// Protected Methods
// =========================================================================

Expand Down
Loading
Loading