Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sd_prompt_reader/format/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
from .drawthings import DrawThings
from .swarmui import SwarmUI
from .fooocus import Fooocus
from .civitai import CivitaiComfyUIFormat
160 changes: 160 additions & 0 deletions sd_prompt_reader/format/civitai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# sd_prompt_reader/format/civitai.py
# (For your Ktiseos-Nyx/stable-diffusion-prompt-reader fork, mojibake branch)
# Refined version with fixes for double parsing and width/height handling.

import json
import re
from .base_format import BaseFormat
from ..logger import Logger

class CivitaiComfyUIFormat(BaseFormat):
"""
Parser for UserComment metadata found in JPEGs generated by
Civitai's ComfyUI-based service. This typically involves a
"charset=Unicode" prefix and potentially a mojibake UTF-16LE JSON string
(if piexif doesn't fully clean it) representing the ComfyUI workflow.
The workflow contains an "extraMetadata" key with A1111-style parameters.
"""
def __init__(self, info: dict = None, raw: str = "", width: int = 0, height: int = 0): # Added width, height
super().__init__(info, raw, width, height) # Pass width, height to BaseFormat
self._logger = Logger(f"SDPR.{self.__class__.__name__}")
self.workflow_data: dict | None = None
self.tool = "Civitai ComfyUI" # Set the tool name this parser identifies

def _decode_user_comment_for_civitai(self, uc_string: str) -> str | None:
"""
Decodes/cleans the UserComment string, expecting piexif's output.
Handles "charset=Unicode" prefix and potential mojibake reversal.
Returns a clean JSON string if successful, else None.
"""
self._logger.debug(f"Attempting to decode UserComment (first 70): '{uc_string[:70]}'")
if not uc_string or not isinstance(uc_string, str):
self._logger.warn("UserComment string is empty or not a string.")
return None

data_to_process = uc_string
prefix_pattern = r'^charset\s*=\s*["\']?(UNICODE|UTF-16|UTF-16LE)["\']?\s*'
match = re.match(prefix_pattern, uc_string, re.IGNORECASE)
if match:
data_to_process = uc_string[len(match.group(0)):].strip()
self._logger.debug(f"Stripped 'charset=Unicode' prefix. Remaining: '{data_to_process[:50]}'")

# Check for mojibake ONLY if piexif's output might still contain it.
# Based on logs, piexif seems to provide clean JSON, so this block might not be hit often
# when used within SDPR's ImageDataReader (which uses piexif).
# It's kept for robustness or if piexif's behavior changes or varies.
if ('笀' in data_to_process or '∀' in data_to_process or 'izarea' in data_to_process) and \
not (data_to_process.startswith('{') and data_to_process.endswith('}')):
self._logger.debug("Mojibake characters detected. Attempting reversal.")
try:
byte_list = []
for char_from_mojibake in data_to_process:
codepoint_val = ord(char_from_mojibake)
byte_list.append((codepoint_val >> 8) & 0xFF)
byte_list.append(codepoint_val & 0xFF)

recovered_bytes = bytes(byte_list)
json_string = recovered_bytes.decode('utf-16le', errors='strict')
json.loads(json_string) # Validate
self._logger.debug("Mojibake reversal successful.")
return json_string
except Exception as e:
self._logger.warn(f"Mojibake reversal failed: {e}")
return None

# If no mojibake (or reversal failed), check if it's already plain JSON
if data_to_process.startswith('{') and data_to_process.endswith('}'):
self._logger.debug("Data (post-prefix/post-mojibake-attempt) looks like plain JSON. Validating.")
try:
json.loads(data_to_process)
self._logger.debug("Plain JSON validation successful.")
return data_to_process
except json.JSONDecodeError as e:
self._logger.warn(f"Plain JSON validation failed: {e}")
return None

self._logger.debug("UserComment string not recognized as Civitai JSON (mojibake or plain) after processing.")
return None

def parse(self):
# Prevent re-parsing if already successful
if self._status == BaseFormat.Status.READ_SUCCESS:
self._logger.debug(f"{self.__class__.__name__} already parsed successfully. Skipping re-parse.")
return self._status

self._logger.info(f"Attempting to parse using {self.__class__.__name__}.")
if not self._raw:
self._logger.warn("Raw data (UserComment from piexif) is empty. Cannot parse.")
self._status = BaseFormat.Status.FORMAT_ERROR
self._error = "Raw UserComment data is empty."
return self._status

cleaned_workflow_json_str = self._decode_user_comment_for_civitai(self._raw)

if not cleaned_workflow_json_str:
self._logger.warn("Failed to decode UserComment or not a Civitai JSON format for this parser.")
self._status = BaseFormat.Status.FORMAT_ERROR
self._error = "UserComment decoding failed or not the specific Civitai JSON structure."
return self._status

try:
self.workflow_data = json.loads(cleaned_workflow_json_str)
self._logger.info("Successfully parsed main workflow JSON from UserComment.")
except json.JSONDecodeError as e:
self._logger.error(f"Decoded UserComment is not valid JSON: {e}")
self._status = BaseFormat.Status.FORMAT_ERROR
self._error = f"Invalid JSON in decoded UserComment: {e}"
return self._status

extra_metadata_str = self.workflow_data.get("extraMetadata")
if extra_metadata_str and isinstance(extra_metadata_str, str):
self._logger.info("Found 'extraMetadata' string. Attempting to parse.")
try:
extra_meta_dict = json.loads(extra_metadata_str)
self._logger.debug("'extraMetadata' parsed into dict successfully.")

self._positive = extra_meta_dict.get("prompt", "")
self._negative = extra_meta_dict.get("negativePrompt", "")

self._parameter = {}
if "steps" in extra_meta_dict: self._parameter["steps"] = str(extra_meta_dict["steps"])

if "CFG scale" in extra_meta_dict: self._parameter["cfg_scale"] = str(extra_meta_dict["CFG scale"])
elif "cfgScale" in extra_meta_dict: self._parameter["cfg_scale"] = str(extra_meta_dict["cfgScale"])

if "sampler" in extra_meta_dict: self._parameter["sampler_name"] = str(extra_meta_dict["sampler"])
elif "sampler_name" in extra_meta_dict: self._parameter["sampler_name"] = str(extra_meta_dict["sampler_name"])

if "seed" in extra_meta_dict: self._parameter["seed"] = str(extra_meta_dict["seed"])

# Use width/height from extraMetadata if available, otherwise keep what was passed in __init__
# BaseFormat __init__ already sets self._width and self._height
if "width" in extra_meta_dict: self._width = str(extra_meta_dict.get("width"))
if "height" in extra_meta_dict: self._height = str(extra_meta_dict.get("height"))
# If not found, self._width and self._height retain values from ImageDataReader (via super init)

self._raw_setting = extra_metadata_str # Store the raw extraMetadata JSON string

self._logger.info("Successfully extracted parameters from 'extraMetadata'.")
self._status = BaseFormat.Status.READ_SUCCESS

except json.JSONDecodeError as e_extra:
self._logger.error(f"Failed to parse JSON from 'extraMetadata': {e_extra}")
self._status = BaseFormat.Status.FORMAT_ERROR
self._error = f"Invalid JSON in 'extraMetadata': {e_extra}"
# self.raw is set below even if extraMetadata fails, as main workflow was parsed.
else:
self._logger.warn("'extraMetadata' not found or not a string in UserComment workflow.")
# This parser specifically targets Civitai workflows that HAVE extraMetadata.
# If it's missing, it's a format error FOR THIS PARSER.
# A generic ComfyUI parser might handle workflows without extraMetadata.
self._status = BaseFormat.Status.FORMAT_ERROR
self._error = "'extraMetadata' missing or invalid in Civitai UserComment."

# self._raw should store the primary data chunk this parser worked on before detailed parsing.
# In this case, it's the cleaned_workflow_json_str.
# Their other parsers often set self._raw to the input string if it's A1111 text,
# or to the "parameters" from PNGInfo.
self._raw = cleaned_workflow_json_str

return self._status
Loading