Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions OneSila/sales_channels/factories/cpt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Factories for CPT (channel product tooling)."""

from .product_feed import SalesChannelGptProductFeedFactory

__all__ = ["SalesChannelGptProductFeedFactory"]
261 changes: 261 additions & 0 deletions OneSila/sales_channels/factories/cpt/product_feed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import json
import logging
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Iterable, List, Sequence, Set, Tuple

from django.core.files.base import ContentFile
from django.db import transaction
from django.utils import timezone

from llm.factories import ProductFeedPayloadFactory
from sales_channels.models import RemoteProduct, SalesChannel, SalesChannelViewAssign

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class _AssignmentPayload:
identifiers: Set[str]
payloads: Dict[str, Dict[str, object]]


class SalesChannelGptProductFeedFactory:
"""Synchronise GPT product feeds for sales channels."""

def __init__(self, *, sync_all: bool = False):
self.sync_all = sync_all

def work(self) -> None:
remote_products = self._collect_remote_products()
if not remote_products:
return

assignments_map = self._load_assignments(remote_products=remote_products)
grouped = self._group_by_sales_channel(remote_products=remote_products)

processed_remote_ids: Set[int] = set()

for sales_channel, channel_products in grouped:
processed_remote_ids.update(product.id for product in channel_products)
channel_assignments = self._collect_channel_assignments(
channel_products=channel_products,
assignments_map=assignments_map,
)
existing_entries = self._load_existing_feed(sales_channel=sales_channel)
updated_entries = self._merge_entries(
sales_channel=sales_channel,
existing_entries=existing_entries,
assignments=channel_assignments,
)

if updated_entries is None:
continue

with transaction.atomic():
self._persist_feed(
sales_channel=sales_channel,
entries=updated_entries,
)
self._send_feed(
sales_channel=sales_channel,
entries=updated_entries,
)

if processed_remote_ids:
self._clear_required_flags(remote_product_ids=processed_remote_ids)

def _collect_remote_products(self) -> List[RemoteProduct]:
queryset = RemoteProduct.objects.filter(
sales_channel__gpt_enable=True,
)
if not self.sync_all:
queryset = queryset.filter(required_feed_sync=True)
return list(
queryset.select_related("sales_channel", "local_instance").order_by("sales_channel_id", "id")
)

def _load_assignments(
self,
*,
remote_products: Sequence[RemoteProduct],
) -> Dict[int, List[SalesChannelViewAssign]]:
remote_product_ids = {product.id for product in remote_products}
if not remote_product_ids:
return {}
assignments = (
SalesChannelViewAssign.objects.filter(remote_product_id__in=remote_product_ids)
.select_related(
"product",
"sales_channel",
"sales_channel_view",
"sales_channel_view__sales_channel",
"remote_product",
)
.order_by("remote_product_id", "id")
)
mapping: Dict[int, List[SalesChannelViewAssign]] = defaultdict(list)
for assignment in assignments:
mapping[assignment.remote_product_id].append(assignment)
return mapping

def _group_by_sales_channel(
self,
*,
remote_products: Sequence[RemoteProduct],
) -> List[Tuple[SalesChannel, List[RemoteProduct]]]:
grouped: Dict[int, List[RemoteProduct]] = defaultdict(list)
channels: Dict[int, SalesChannel] = {}
for product in remote_products:
grouped[product.sales_channel_id].append(product)
channels[product.sales_channel_id] = product.sales_channel
return [(channels[channel_id], grouped[channel_id]) for channel_id in grouped]

def _collect_channel_assignments(
self,
*,
channel_products: Sequence[RemoteProduct],
assignments_map: Dict[int, List[SalesChannelViewAssign]],
) -> Dict[int, _AssignmentPayload]:
channel_assignments: Dict[int, _AssignmentPayload] = {}
for remote_product in channel_products:
assignments = assignments_map.get(remote_product.id, [])
payloads: Dict[str, Dict[str, object]] = {}
identifiers: Set[str] = set()

if not assignments:
sku = getattr(getattr(remote_product, "local_instance", None), "sku", None)
if sku:
identifiers.add(str(sku))
channel_assignments[remote_product.id] = _AssignmentPayload(
identifiers=identifiers,
payloads=payloads,
)
continue

for assignment in assignments:
payload_data = self._build_payload_for_assignment(assignment=assignment)
payloads.update(payload_data.payloads)
identifiers.update(payload_data.identifiers)

channel_assignments[remote_product.id] = _AssignmentPayload(
identifiers=identifiers,
payloads=payloads,
)
return channel_assignments

def _build_payload_for_assignment(
self,
*,
assignment: SalesChannelViewAssign,
) -> _AssignmentPayload:
identifiers: Set[str] = set()
product = assignment.product
sku = getattr(product, "sku", None)
if sku:
identifiers.add(str(sku))
try:
if product.is_configurable():
for variation in product.get_configurable_variations(active_only=False):
variation_sku = getattr(variation, "sku", None)
if variation_sku:
identifiers.add(str(variation_sku))
except AttributeError:
pass

payloads: Dict[str, Dict[str, object]] = {}
try:
factory = ProductFeedPayloadFactory(sales_channel_view_assign=assignment)
for payload in factory.build():
identifier = payload.get("id")
if not identifier:
continue
identifier_str = str(identifier)
identifiers.add(identifier_str)
payloads[identifier_str] = payload
except Exception: # pragma: no cover - safety net for factory failures
logger.exception(
"Failed to build GPT feed payload for assignment %s", assignment.pk
)
return _AssignmentPayload(identifiers=identifiers, payloads=payloads)

def _load_existing_feed(
self,
*,
sales_channel,
) -> Dict[str, Dict[str, object]]:
existing = getattr(sales_channel, "gpt_feed_json", None) or []
if isinstance(existing, dict):
items = existing.get("items", [])
else:
items = existing
entries: Dict[str, Dict[str, object]] = {}
for item in items:
identifier = item.get("id")
if not identifier:
continue
entries[str(identifier)] = item
return entries

def _merge_entries(
self,
*,
sales_channel,
existing_entries: Dict[str, Dict[str, object]],
assignments: Dict[int, _AssignmentPayload],
) -> Dict[str, Dict[str, object]] | None:
if not assignments and not self.sync_all:
return None

merged = {} if self.sync_all else dict(existing_entries)

for payload in assignments.values():
for identifier, item in payload.payloads.items():
merged[identifier] = item

if not self.sync_all:
for payload in assignments.values():
for identifier in payload.identifiers:
if identifier not in payload.payloads:
merged.pop(identifier, None)

if not self.sync_all and merged == existing_entries:
return None

return merged
Comment on lines +222 to +225
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): We've found these issues:

Suggested change
if not self.sync_all and merged == existing_entries:
return None
return merged
return None if not self.sync_all and merged == existing_entries else merged


def _persist_feed(
self,
*,
sales_channel,
entries: Dict[str, Dict[str, object]],
) -> None:
ordered = [entries[key] for key in sorted(entries.keys())]
sales_channel.gpt_feed_json = ordered
timestamp = timezone.now().strftime("%Y%m%d%H%M%S")
filename = f"gpt-feed-{sales_channel.pk}-{timestamp}.json"
content = json.dumps(ordered, ensure_ascii=False, indent=2, default=str)
sales_channel.gpt_feed_file.save(filename, ContentFile(content), save=False)
sales_channel.save(update_fields=["gpt_feed_json", "gpt_feed_file"])
Comment on lines +235 to +239
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Saving files in a loop may cause race conditions or file bloat.

Implement a cleanup or overwrite mechanism to prevent excessive file creation during frequent syncs.

Suggested change
timestamp = timezone.now().strftime("%Y%m%d%H%M%S")
filename = f"gpt-feed-{sales_channel.pk}-{timestamp}.json"
content = json.dumps(ordered, ensure_ascii=False, indent=2, default=str)
sales_channel.gpt_feed_file.save(filename, ContentFile(content), save=False)
sales_channel.save(update_fields=["gpt_feed_json", "gpt_feed_file"])
filename = f"gpt-feed-{sales_channel.pk}.json"
content = json.dumps(ordered, ensure_ascii=False, indent=2, default=str)
# Overwrite the existing file if it exists
if sales_channel.gpt_feed_file:
sales_channel.gpt_feed_file.delete(save=False)
sales_channel.gpt_feed_file.save(filename, ContentFile(content), save=False)
sales_channel.save(update_fields=["gpt_feed_json", "gpt_feed_file"])


def _send_feed(
self,
*,
sales_channel,
entries: Dict[str, Dict[str, object]],
) -> None:
logger.debug(
"Mock send GPT feed for sales_channel_id=%s entries=%s",
sales_channel.id,
list(entries.keys()),
)

def _clear_required_flags(
self,
*,
remote_product_ids: Iterable[int],
) -> None:
ids = {remote_id for remote_id in remote_product_ids if remote_id}
if not ids:
return
RemoteProduct.objects.filter(id__in=ids).update(required_feed_sync=False)
9 changes: 9 additions & 0 deletions OneSila/sales_channels/models/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class RemoteProduct(PolymorphicModel, RemoteObjectMixin, models.Model):
db_index=True,
help_text="Current sync status derived from progress and sync errors.",
)
required_feed_sync = models.BooleanField(
default=False,
help_text="Indicates if the GPT product feed needs to be refreshed for this remote product.",
)

class Meta:
unique_together = (('sales_channel', 'local_instance', 'remote_parent_product'),)
Expand Down Expand Up @@ -110,6 +114,9 @@ def __str__(self):
return f"Remote product {local_name} (SKU: {remote_sku}) on {sales_channel}"

def save(self, *args, **kwargs):
is_new = self.pk is None
if is_new and getattr(self.sales_channel, "gpt_enable", False):
self.required_feed_sync = True
previous_status = self.status
computed_status = self._determine_status()
status_changed = previous_status != computed_status
Expand All @@ -120,6 +127,8 @@ def save(self, *args, **kwargs):
mutable_fields = list(dict.fromkeys(update_fields))
if status_changed and "status" not in mutable_fields:
mutable_fields.append("status")
if is_new and getattr(self.sales_channel, "gpt_enable", False) and "required_feed_sync" not in mutable_fields:
mutable_fields.append("required_feed_sync")
kwargs["update_fields"] = mutable_fields

super().save(*args, **kwargs)
Expand Down
11 changes: 11 additions & 0 deletions OneSila/sales_channels/models/sales_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ class SalesChannel(Integration, models.Model):
blank=True,
help_text=_("Return window (for example, in days) required when GPT is enabled."),
)
gpt_feed_json = models.JSONField(
default=list,
blank=True,
help_text=_("Cached GPT product feed entries for this sales channel."),
)
gpt_feed_file = models.FileField(
upload_to="gpt_feeds/",
null=True,
blank=True,
help_text=_("Downloadable JSON file containing the GPT product feed."),
)

is_external_install = models.BooleanField(
default=False,
Expand Down
54 changes: 53 additions & 1 deletion OneSila/sales_channels/receivers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from django.db.models.signals import post_delete, pre_delete, pre_save
from typing import Iterable

from core.decorators import trigger_signal_for_dirty_fields
from core.schema.core.subscriptions import refresh_subscription_receiver
Expand All @@ -16,7 +17,7 @@
from .integrations.amazon.models import AmazonSalesChannel, AmazonSalesChannelImport
from .integrations.ebay.models import EbaySalesChannel, EbaySalesChannelImport
from .integrations.magento2.models import MagentoProduct
from .models import SalesChannelImport
from .models import RemoteProduct, SalesChannelImport
# from .models import ImportProcess
from .models.sales_channels import SalesChannelViewAssign
from .signals import (
Expand All @@ -41,6 +42,57 @@
logger = logging.getLogger(__name__)


def _mark_remote_products_for_products(*, product_ids: Iterable[int]) -> None:
ids = {product_id for product_id in product_ids if product_id}
if not ids:
return
RemoteProduct.objects.filter(
local_instance_id__in=ids,
sales_channel__gpt_enable=True,
).update(required_feed_sync=True)


# ------------------------------------------------------------- GPT FEED SYNC FLAGS


@receiver(create_remote_product_property, sender='properties.ProductProperty')
@receiver(update_remote_product_property, sender='properties.ProductProperty')
@receiver(delete_remote_product_property, sender='properties.ProductProperty')
def sales_channels__gpt_feed_flag__product_property(sender, instance, **kwargs):
_mark_remote_products_for_products(product_ids=[getattr(instance, "product_id", None)])


@receiver(update_remote_price, sender='products.Product')
@receiver(update_remote_product_content, sender='products.Product')
@receiver(update_remote_product, sender='products.Product')
@receiver(update_remote_product_eancode, sender='products.Product')
@receiver(sales_view_assign_updated, sender='products.Product')
def sales_channels__gpt_feed_flag__product(sender, instance, **kwargs):
_mark_remote_products_for_products(product_ids=[getattr(instance, "id", None)])


@receiver(add_remote_product_variation, sender='products.ConfigurableVariation')
@receiver(remove_remote_product_variation, sender='products.ConfigurableVariation')
def sales_channels__gpt_feed_flag__variation(sender, instance, **kwargs):
_mark_remote_products_for_products(
product_ids=[getattr(instance, "parent_id", None), getattr(instance, "variation_id", None)]
)


@receiver(create_remote_image_association, sender='media.MediaProductThrough')
@receiver(delete_remote_image_association, sender='media.MediaProductThrough')
def sales_channels__gpt_feed_flag__image_association(sender, instance, **kwargs):
_mark_remote_products_for_products(product_ids=[getattr(instance, "product_id", None)])


@receiver(delete_remote_image, sender='media.Media')
def sales_channels__gpt_feed_flag__image_delete(sender, instance, **kwargs):
product_ids = list(
MediaProductThrough.objects.filter(media=instance).values_list("product_id", flat=True)
)
_mark_remote_products_for_products(product_ids=product_ids)


@receiver(post_update, sender=SalesChannelImport)
@receiver(post_update, sender=AmazonSalesChannelImport)
@receiver(post_update, sender=EbaySalesChannelImport)
Expand Down
Loading
Loading