-
Notifications
You must be signed in to change notification settings - Fork 1
Add GPT product feed regeneration pipeline #595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development
Are you sure you want to change the base?
Add GPT product feed regeneration pipeline #595
Conversation
Reviewer's GuideThis PR introduces a full GPT product feed regeneration pipeline: models are extended to store feeds and track sync flags, signal receivers mark remote products for updates, a feed factory orchestrates incremental and full regenerations, scheduled tasks trigger syncs, and end-to-end behavior is validated with tests. Sequence diagram for GPT feed regeneration pipeline (scheduled task triggers)sequenceDiagram
participant CronJob as "CronJob (db_periodic_task)"
participant Factory as "SalesChannelGptProductFeedFactory"
participant RemoteProduct as "RemoteProduct"
participant SalesChannel as "SalesChannel"
CronJob->>Factory: work(sync_all)
Factory->>RemoteProduct: _collect_remote_products()
Factory->>SalesChannel: _group_by_sales_channel()
Factory->>SalesChannel: _persist_feed()
Factory->>SalesChannel: _send_feed()
Factory->>RemoteProduct: _clear_required_flags()
Sequence diagram for marking remote products for feed sync on signalssequenceDiagram
participant Signal as "Signal Receiver"
participant RemoteProduct as "RemoteProduct"
Signal->>RemoteProduct: _mark_remote_products_for_products(product_ids)
RemoteProduct->>RemoteProduct: set required_feed_sync=True
Entity relationship diagram for GPT feed fields and sync flagerDiagram
SALESCHANNEL {
id int PK
gpt_feed_json json
gpt_feed_file file
}
REMOTEPRODUCT {
id int PK
sales_channel_id int FK
required_feed_sync bool
}
SALESCHANNEL ||--o{ REMOTEPRODUCT : has
Class diagram for updated SalesChannel and RemoteProduct modelsclassDiagram
class SalesChannel {
+gpt_feed_json: JSONField
+gpt_feed_file: FileField
}
class RemoteProduct {
+required_feed_sync: Boolean
+save()
}
SalesChannel "1" -- "*" RemoteProduct: has
RemoteProduct --> SalesChannel: sales_channel
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- The multiple signal handlers in receivers.py for marking required_feed_sync are quite repetitive; consider consolidating them via a mapping of signal to field names or a decorator to reduce boilerplate.
- SalesChannelGptProductFeedFactory.work currently orchestrates data collection, payload building, merging, persisting, and flag clearing in one method; splitting these responsibilities into smaller services or well-named methods would improve readability and testability.
- The merge logic in _merge_entries for incremental vs full sync is non-trivial; extracting the different behaviors into helper methods or adding inline comments would make the intent clearer and reduce maintenance risk.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The multiple signal handlers in receivers.py for marking required_feed_sync are quite repetitive; consider consolidating them via a mapping of signal to field names or a decorator to reduce boilerplate.
- SalesChannelGptProductFeedFactory.work currently orchestrates data collection, payload building, merging, persisting, and flag clearing in one method; splitting these responsibilities into smaller services or well-named methods would improve readability and testability.
- The merge logic in _merge_entries for incremental vs full sync is non-trivial; extracting the different behaviors into helper methods or adding inline comments would make the intent clearer and reduce maintenance risk.
## Individual Comments
### Comment 1
<location> `OneSila/sales_channels/factories/cpt/product_feed.py:235-239` </location>
<code_context>
+ timestamp = timezone.now().strftime("%Y%m%d%H%M%S")
+ filename = f"gpt-feed-{sales_channel.pk}-{timestamp}.json"
+ content = json.dumps(ordered, ensure_ascii=False, indent=2, default=str)
+ sales_channel.gpt_feed_file.save(filename, ContentFile(content), save=False)
+ sales_channel.save(update_fields=["gpt_feed_json", "gpt_feed_file"])
+
</code_context>
<issue_to_address>
**suggestion (performance):** Saving files in a loop may cause race conditions or file bloat.
Implement a cleanup or overwrite mechanism to prevent excessive file creation during frequent syncs.
```suggestion
filename = f"gpt-feed-{sales_channel.pk}.json"
content = json.dumps(ordered, ensure_ascii=False, indent=2, default=str)
# Overwrite the existing file if it exists
if sales_channel.gpt_feed_file:
sales_channel.gpt_feed_file.delete(save=False)
sales_channel.gpt_feed_file.save(filename, ContentFile(content), save=False)
sales_channel.save(update_fields=["gpt_feed_json", "gpt_feed_file"])
```
</issue_to_address>
### Comment 2
<location> `OneSila/sales_channels/models/products.py:130` </location>
<code_context>
mutable_fields = list(dict.fromkeys(update_fields))
if status_changed and "status" not in mutable_fields:
mutable_fields.append("status")
+ if is_new and getattr(self.sales_channel, "gpt_enable", False) and "required_feed_sync" not in mutable_fields:
+ mutable_fields.append("required_feed_sync")
kwargs["update_fields"] = mutable_fields
</code_context>
<issue_to_address>
**issue (bug_risk):** Mutable fields logic may be fragile if update_fields is None.
If update_fields is None, mutable_fields will not be initialized, which may prevent required_feed_sync from being updated. Ensure update_fields is always a list before this block.
</issue_to_address>
### Comment 3
<location> `OneSila/sales_channels/tests/test_gpt_feed_factory.py:92` </location>
<code_context>
+ remote_product.refresh_from_db()
+ return remote_product
+
+ def test_incremental_updates_feed_and_file(self):
+ remote_product = self._build_remote_product(sku="SKU-1")
+ payload = {"id": "SKU-1", "title": "Updated"}
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for error handling in payload factory failures.
Please add a test that triggers a factory exception and verifies the feed update handles it correctly: no entries added, required_feed_sync cleared, and no crash.
</issue_to_address>
### Comment 4
<location> `OneSila/sales_channels/tests/test_gpt_feed_factory.py:118` </location>
<code_context>
+ self.channel.gpt_feed_file.close()
+ self.assertEqual(data, [payload])
+
+ def test_incremental_removes_stale_entries(self):
+ remote_product = self._build_remote_product(sku="SKU-2")
+ self.channel.gpt_feed_json = [{"id": "SKU-2", "title": "Old"}]
</code_context>
<issue_to_address>
**suggestion (testing):** Missing test for multiple remote products and sales channels.
Please add a test with multiple sales channels and remote products to verify correct grouping and selective updates.
Suggested implementation:
```python
def test_incremental_removes_stale_entries(self):
remote_product = self._build_remote_product(sku="SKU-2")
self.channel.gpt_feed_json = [{"id": "SKU-2", "title": "Old"}]
self.channel.save(update_fields=["gpt_feed_json"])
factory_mock = Mock()
factory_mock.build.return_value = []
with patch(
"sales_channels.factories.cpt.product_feed.ProductFeedPayloadFactory",
return_value=factory_mock,
):
SalesChannelGptProductFeedFactory(sync_all=False).work()
def test_multiple_channels_and_products_grouping_and_updates(self):
# Create two sales channels
channel1 = self._build_channel(name="Channel 1")
channel2 = self._build_channel(name="Channel 2")
# Create remote products for each channel
product1 = self._build_remote_product(sku="SKU-1", channel=channel1)
product2 = self._build_remote_product(sku="SKU-2", channel=channel1)
product3 = self._build_remote_product(sku="SKU-3", channel=channel2)
product4 = self._build_remote_product(sku="SKU-4", channel=channel2)
# Set initial stale feed JSON for both channels
channel1.gpt_feed_json = [{"id": "SKU-1", "title": "Old"}, {"id": "SKU-2", "title": "Old"}]
channel2.gpt_feed_json = [{"id": "SKU-3", "title": "Old"}, {"id": "SKU-4", "title": "Old"}]
channel1.save(update_fields=["gpt_feed_json"])
channel2.save(update_fields=["gpt_feed_json"])
# Prepare payloads for each channel
payloads_channel1 = [
{"id": "SKU-1", "title": "New 1"},
{"id": "SKU-2", "title": "New 2"},
]
payloads_channel2 = [
{"id": "SKU-3", "title": "New 3"},
{"id": "SKU-4", "title": "New 4"},
]
def build_side_effect(channel, *args, **kwargs):
if channel == channel1:
return payloads_channel1
elif channel == channel2:
return payloads_channel2
return []
factory_mock = Mock()
factory_mock.build.side_effect = build_side_effect
with patch(
"sales_channels.factories.cpt.product_feed.ProductFeedPayloadFactory",
return_value=factory_mock,
):
SalesChannelGptProductFeedFactory(sync_all=True).work()
# Refresh channels from DB and check feed JSON
channel1.refresh_from_db()
channel2.refresh_from_db()
self.assertEqual(channel1.gpt_feed_json, payloads_channel1)
self.assertEqual(channel2.gpt_feed_json, payloads_channel2)
```
- Ensure that the helper methods `_build_channel` and `_build_remote_product` support specifying the channel for the product.
- If your test class does not support multiple channels, you may need to adjust the setup or use fixtures to create them.
- If the `SalesChannelGptProductFeedFactory` does not accept a list of channels, you may need to adapt the factory or patch its internal logic to iterate over all channels.
</issue_to_address>
### Comment 5
<location> `OneSila/sales_channels/tests/test_gpt_feed_factory.py:133-134` </location>
<code_context>
+ self.channel.refresh_from_db()
+ self.assertEqual(self.channel.gpt_feed_json, [])
+
+ def test_sync_all_refreshes_even_without_flags(self):
+ remote_product = self._build_remote_product(sku="SKU-3")
+ RemoteProduct.objects.filter(pk=remote_product.pk).update(required_feed_sync=False)
</code_context>
<issue_to_address>
**suggestion (testing):** Consider testing edge cases for empty payloads and missing SKUs.
Please add tests where the payload lacks 'id' or the product has no SKU to confirm these cases are correctly handled or skipped.
```suggestion
self.channel.refresh_from_db()
self.assertEqual(self.channel.gpt_feed_json, [])
def test_sync_all_with_empty_payload(self):
factory_mock = Mock()
factory_mock.build.return_value = []
with patch(
"sales_channels.factories.cpt.product_feed.ProductFeedPayloadFactory",
return_value=factory_mock,
):
SalesChannelGptProductFeedFactory(sync_all=True).work()
self.channel.refresh_from_db()
self.assertEqual(self.channel.gpt_feed_json, [])
def test_sync_all_with_missing_id_in_payload(self):
remote_product = self._build_remote_product(sku="SKU-2")
payload = {"title": "No ID"}
factory_mock = Mock()
factory_mock.build.return_value = [payload]
with patch(
"sales_channels.factories.cpt.product_feed.ProductFeedPayloadFactory",
return_value=factory_mock,
):
SalesChannelGptProductFeedFactory(sync_all=True).work()
self.channel.refresh_from_db()
# Should not include payload without 'id'
self.assertNotIn(payload, self.channel.gpt_feed_json)
def test_sync_all_with_remote_product_missing_sku(self):
remote_product = self._build_remote_product(sku=None)
payload = {"id": None, "title": "Missing SKU"}
factory_mock = Mock()
factory_mock.build.return_value = [payload]
with patch(
"sales_channels.factories.cpt.product_feed.ProductFeedPayloadFactory",
return_value=factory_mock,
):
SalesChannelGptProductFeedFactory(sync_all=True).work()
self.channel.refresh_from_db()
# Should not include payload with missing SKU
self.assertNotIn(payload, self.channel.gpt_feed_json)
```
</issue_to_address>
### Comment 6
<location> `OneSila/sales_channels/factories/cpt/product_feed.py:127` </location>
<code_context>
def _collect_channel_assignments(
self,
*,
channel_products: Sequence[RemoteProduct],
assignments_map: Dict[int, List[SalesChannelViewAssign]],
) -> Dict[int, _AssignmentPayload]:
channel_assignments: Dict[int, _AssignmentPayload] = {}
for remote_product in channel_products:
assignments = assignments_map.get(remote_product.id, [])
payloads: Dict[str, Dict[str, object]] = {}
identifiers: Set[str] = set()
if not assignments:
sku = getattr(getattr(remote_product, "local_instance", None), "sku", None)
if sku:
identifiers.add(str(sku))
channel_assignments[remote_product.id] = _AssignmentPayload(
identifiers=identifiers,
payloads=payloads,
)
continue
for assignment in assignments:
payload_data = self._build_payload_for_assignment(assignment=assignment)
payloads.update(payload_data.payloads)
identifiers.update(payload_data.identifiers)
channel_assignments[remote_product.id] = _AssignmentPayload(
identifiers=identifiers,
payloads=payloads,
)
return channel_assignments
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Merge dictionary updates via the union operator ([`dict-assign-update-to-union`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/dict-assign-update-to-union/))
</issue_to_address>
### Comment 7
<location> `OneSila/sales_channels/factories/cpt/product_feed.py:151` </location>
<code_context>
def _build_payload_for_assignment(
self,
*,
assignment: SalesChannelViewAssign,
) -> _AssignmentPayload:
identifiers: Set[str] = set()
product = assignment.product
sku = getattr(product, "sku", None)
if sku:
identifiers.add(str(sku))
try:
if product.is_configurable():
for variation in product.get_configurable_variations(active_only=False):
variation_sku = getattr(variation, "sku", None)
if variation_sku:
identifiers.add(str(variation_sku))
except AttributeError:
pass
payloads: Dict[str, Dict[str, object]] = {}
try:
factory = ProductFeedPayloadFactory(sales_channel_view_assign=assignment)
for payload in factory.build():
identifier = payload.get("id")
if not identifier:
continue
identifier_str = str(identifier)
identifiers.add(identifier_str)
payloads[identifier_str] = payload
except Exception: # pragma: no cover - safety net for factory failures
logger.exception(
"Failed to build GPT feed payload for assignment %s", assignment.pk
)
return _AssignmentPayload(identifiers=identifiers, payloads=payloads)
</code_context>
<issue_to_address>
**issue (code-quality):** Use named expression to simplify assignment and conditional [×2] ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
</issue_to_address>
### Comment 8
<location> `OneSila/sales_channels/factories/cpt/product_feed.py:188-196` </location>
<code_context>
def _load_existing_feed(
self,
*,
sales_channel,
) -> Dict[str, Dict[str, object]]:
existing = getattr(sales_channel, "gpt_feed_json", None) or []
if isinstance(existing, dict):
items = existing.get("items", [])
else:
items = existing
entries: Dict[str, Dict[str, object]] = {}
for item in items:
identifier = item.get("id")
if not identifier:
continue
entries[str(identifier)] = item
return entries
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Remove redundant continue statement ([`remove-redundant-continue`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-continue/))
</issue_to_address>
### Comment 9
<location> `OneSila/sales_channels/factories/cpt/product_feed.py:222-225` </location>
<code_context>
def _merge_entries(
self,
*,
sales_channel,
existing_entries: Dict[str, Dict[str, object]],
assignments: Dict[int, _AssignmentPayload],
) -> Dict[str, Dict[str, object]] | None:
if not assignments and not self.sync_all:
return None
merged = {} if self.sync_all else dict(existing_entries)
for payload in assignments.values():
for identifier, item in payload.payloads.items():
merged[identifier] = item
if not self.sync_all:
for payload in assignments.values():
for identifier in payload.identifiers:
if identifier not in payload.payloads:
merged.pop(identifier, None)
if not self.sync_all and merged == existing_entries:
return None
return merged
</code_context>
<issue_to_address>
**suggestion (code-quality):** We've found these issues:
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
```suggestion
return None if not self.sync_all and merged == existing_entries else merged
```
</issue_to_address>
### Comment 10
<location> `OneSila/sales_channels/factories/cpt/product_feed.py:258-260` </location>
<code_context>
def _clear_required_flags(
self,
*,
remote_product_ids: Iterable[int],
) -> None:
ids = {remote_id for remote_id in remote_product_ids if remote_id}
if not ids:
return
RemoteProduct.objects.filter(id__in=ids).update(required_feed_sync=False)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Swap if/else branches ([`swap-if-else-branches`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-if-else-branches/))
</issue_to_address>
### Comment 11
<location> `OneSila/sales_channels/receivers.py:46-48` </location>
<code_context>
def _mark_remote_products_for_products(*, product_ids: Iterable[int]) -> None:
ids = {product_id for product_id in product_ids if product_id}
if not ids:
return
RemoteProduct.objects.filter(
local_instance_id__in=ids,
sales_channel__gpt_enable=True,
).update(required_feed_sync=True)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Swap if/else branches ([`swap-if-else-branches`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-if-else-branches/))
</issue_to_address>
### Comment 12
<location> `OneSila/sales_channels/tests/test_gpt_feed_factory.py:87` </location>
<code_context>
def _build_remote_product(self, *, sku: str, required_feed_sync: bool = True):
product = baker.make(
"products.Product",
multi_tenant_company=self.multi_tenant_company,
sku=sku,
type=SIMPLE,
)
remote_product = baker.make(
"sales_channels.RemoteProduct",
sales_channel=self.channel,
multi_tenant_company=self.multi_tenant_company,
local_instance=product,
remote_sku=f"REMOTE-{sku}",
)
baker.make(
"sales_channels.SalesChannelViewAssign",
product=product,
sales_channel=self.channel,
sales_channel_view=self.view,
remote_product=remote_product,
multi_tenant_company=self.multi_tenant_company,
)
if required_feed_sync is False:
RemoteProduct.objects.filter(pk=remote_product.pk).update(required_feed_sync=False)
remote_product.refresh_from_db()
return remote_product
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify comparison to boolean ([`simplify-boolean-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-boolean-comparison/))
```suggestion
if not required_feed_sync:
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| timestamp = timezone.now().strftime("%Y%m%d%H%M%S") | ||
| filename = f"gpt-feed-{sales_channel.pk}-{timestamp}.json" | ||
| content = json.dumps(ordered, ensure_ascii=False, indent=2, default=str) | ||
| sales_channel.gpt_feed_file.save(filename, ContentFile(content), save=False) | ||
| sales_channel.save(update_fields=["gpt_feed_json", "gpt_feed_file"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): Saving files in a loop may cause race conditions or file bloat.
Implement a cleanup or overwrite mechanism to prevent excessive file creation during frequent syncs.
| timestamp = timezone.now().strftime("%Y%m%d%H%M%S") | |
| filename = f"gpt-feed-{sales_channel.pk}-{timestamp}.json" | |
| content = json.dumps(ordered, ensure_ascii=False, indent=2, default=str) | |
| sales_channel.gpt_feed_file.save(filename, ContentFile(content), save=False) | |
| sales_channel.save(update_fields=["gpt_feed_json", "gpt_feed_file"]) | |
| filename = f"gpt-feed-{sales_channel.pk}.json" | |
| content = json.dumps(ordered, ensure_ascii=False, indent=2, default=str) | |
| # Overwrite the existing file if it exists | |
| if sales_channel.gpt_feed_file: | |
| sales_channel.gpt_feed_file.delete(save=False) | |
| sales_channel.gpt_feed_file.save(filename, ContentFile(content), save=False) | |
| sales_channel.save(update_fields=["gpt_feed_json", "gpt_feed_file"]) |
| if not self.sync_all and merged == existing_entries: | ||
| return None | ||
|
|
||
| return merged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): We've found these issues:
- Lift code into else after jump in control flow (
reintroduce-else) - Replace if statement with if expression (
assign-if-exp)
| if not self.sync_all and merged == existing_entries: | |
| return None | |
| return merged | |
| return None if not self.sync_all and merged == existing_entries else merged |
| remote_product=remote_product, | ||
| multi_tenant_company=self.multi_tenant_company, | ||
| ) | ||
| if required_feed_sync is False: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Simplify comparison to boolean (simplify-boolean-comparison)
| if required_feed_sync is False: | |
| if not required_feed_sync: |
Summary
Testing
https://chatgpt.com/codex/tasks/task_e_6900b7a9115c832ea103a8b80c30914a
Summary by Sourcery
Implement a GPT product feed regeneration pipeline with persistent storage, sync flags, and scheduled tasks
New Features:
Enhancements:
Tests: