Spaces:
Running
Running
| import json | |
| import re | |
| from PIL import Image | |
| from PIL.PngImagePlugin import PngInfo | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def extract_png_metadata(image_path: str) -> dict: | |
| """ | |
| Extract metadata from PNG files generated by A1111 or ComfyUI | |
| Args: | |
| image_path: Path to the PNG image file | |
| Returns: | |
| Dictionary containing extracted metadata | |
| """ | |
| try: | |
| with Image.open(image_path) as img: | |
| metadata = {} | |
| # Check for A1111 metadata | |
| a1111_data = extract_a1111_metadata(img) | |
| if a1111_data: | |
| metadata.update(a1111_data) | |
| metadata['source'] = 'automatic1111' | |
| # Check for ComfyUI metadata | |
| comfyui_data = extract_comfyui_metadata(img) | |
| if comfyui_data: | |
| metadata.update(comfyui_data) | |
| metadata['source'] = 'comfyui' | |
| # Check for other common metadata fields | |
| other_data = extract_other_metadata(img) | |
| if other_data: | |
| metadata.update(other_data) | |
| return metadata if metadata else None | |
| except Exception as e: | |
| logger.error(f"Error extracting metadata from {image_path}: {str(e)}") | |
| return None | |
| def extract_a1111_metadata(img: Image.Image) -> dict: | |
| """Extract Automatic1111 metadata from PNG text fields""" | |
| try: | |
| metadata = {} | |
| # A1111 stores metadata in the 'parameters' text field | |
| if hasattr(img, 'text') and 'parameters' in img.text: | |
| parameters_text = img.text['parameters'] | |
| metadata.update(parse_a1111_parameters(parameters_text)) | |
| return metadata | |
| except Exception as e: | |
| logger.error(f"Error extracting A1111 metadata: {str(e)}") | |
| return {} | |
| def parse_a1111_parameters(parameters_text: str) -> dict: | |
| """Parse A1111 parameters text into structured data""" | |
| try: | |
| metadata = {} | |
| # Split the parameters text into lines | |
| lines = parameters_text.strip().split('\n') | |
| # The first line is usually the prompt | |
| if lines: | |
| metadata['prompt'] = lines[0].strip() | |
| # Look for negative prompt | |
| negative_prompt_match = re.search(r'Negative prompt:\s*(.+?)(?:\n|$)', parameters_text, re.DOTALL) | |
| if negative_prompt_match: | |
| metadata['negative_prompt'] = negative_prompt_match.group(1).strip() | |
| # Extract other parameters using regex | |
| param_patterns = { | |
| 'steps': r'Steps:\s*(\d+)', | |
| 'sampler': r'Sampler:\s*([^,\n]+)', | |
| 'cfg_scale': r'CFG scale:\s*([\d.]+)', | |
| 'seed': r'Seed:\s*(\d+)', | |
| 'size': r'Size:\s*(\d+x\d+)', | |
| 'model_hash': r'Model hash:\s*([a-fA-F0-9]+)', | |
| 'model': r'Model:\s*([^,\n]+)', | |
| 'denoising_strength': r'Denoising strength:\s*([\d.]+)', | |
| 'clip_skip': r'Clip skip:\s*(\d+)', | |
| 'ensd': r'ENSD:\s*(\d+)' | |
| } | |
| for param_name, pattern in param_patterns.items(): | |
| match = re.search(pattern, parameters_text) | |
| if match: | |
| value = match.group(1).strip() | |
| # Convert numeric values | |
| if param_name in ['steps', 'seed', 'clip_skip', 'ensd']: | |
| metadata[param_name] = int(value) | |
| elif param_name in ['cfg_scale', 'denoising_strength']: | |
| metadata[param_name] = float(value) | |
| else: | |
| metadata[param_name] = value | |
| # Parse size into width and height | |
| if 'size' in metadata: | |
| size_match = re.match(r'(\d+)x(\d+)', metadata['size']) | |
| if size_match: | |
| metadata['width'] = int(size_match.group(1)) | |
| metadata['height'] = int(size_match.group(2)) | |
| return metadata | |
| except Exception as e: | |
| logger.error(f"Error parsing A1111 parameters: {str(e)}") | |
| return {} | |
| def extract_comfyui_metadata(img: Image.Image) -> dict: | |
| """Extract ComfyUI metadata from PNG text fields""" | |
| try: | |
| metadata = {} | |
| # ComfyUI stores metadata in 'workflow' and 'prompt' text fields | |
| if hasattr(img, 'text'): | |
| # Check for workflow data | |
| if 'workflow' in img.text: | |
| try: | |
| workflow_data = json.loads(img.text['workflow']) | |
| metadata.update(parse_comfyui_workflow(workflow_data)) | |
| except json.JSONDecodeError: | |
| logger.warning("Could not parse ComfyUI workflow JSON") | |
| # Check for prompt data | |
| if 'prompt' in img.text: | |
| try: | |
| prompt_data = json.loads(img.text['prompt']) | |
| metadata.update(parse_comfyui_prompt(prompt_data)) | |
| except json.JSONDecodeError: | |
| logger.warning("Could not parse ComfyUI prompt JSON") | |
| return metadata | |
| except Exception as e: | |
| logger.error(f"Error extracting ComfyUI metadata: {str(e)}") | |
| return {} | |
| def parse_comfyui_workflow(workflow_data: dict) -> dict: | |
| """Parse ComfyUI workflow data""" | |
| try: | |
| metadata = {} | |
| # Extract nodes from workflow | |
| if 'nodes' in workflow_data: | |
| nodes = workflow_data['nodes'] | |
| # Look for common node types | |
| for node in nodes: | |
| if isinstance(node, dict): | |
| node_type = node.get('type', '') | |
| # Extract prompt from text nodes | |
| if 'text' in node_type.lower() or 'prompt' in node_type.lower(): | |
| if 'widgets_values' in node and node['widgets_values']: | |
| text_value = node['widgets_values'][0] | |
| if isinstance(text_value, str) and len(text_value) > 10: | |
| if 'prompt' not in metadata: | |
| metadata['prompt'] = text_value | |
| # Extract sampler settings | |
| elif 'sampler' in node_type.lower(): | |
| if 'widgets_values' in node: | |
| values = node['widgets_values'] | |
| if len(values) >= 3: | |
| metadata['steps'] = values[0] if isinstance(values[0], int) else None | |
| metadata['cfg_scale'] = values[1] if isinstance(values[1], (int, float)) else None | |
| metadata['sampler'] = values[2] if isinstance(values[2], str) else None | |
| return metadata | |
| except Exception as e: | |
| logger.error(f"Error parsing ComfyUI workflow: {str(e)}") | |
| return {} | |
| def parse_comfyui_prompt(prompt_data: dict) -> dict: | |
| """Parse ComfyUI prompt data""" | |
| try: | |
| metadata = {} | |
| # ComfyUI prompt data is usually a nested structure | |
| # Extract common parameters from the prompt structure | |
| for node_id, node_data in prompt_data.items(): | |
| if isinstance(node_data, dict) and 'inputs' in node_data: | |
| inputs = node_data['inputs'] | |
| # Look for text inputs (prompts) | |
| for key, value in inputs.items(): | |
| if isinstance(value, str) and len(value) > 10: | |
| if 'text' in key.lower() or 'prompt' in key.lower(): | |
| if 'prompt' not in metadata: | |
| metadata['prompt'] = value | |
| # Look for numeric parameters | |
| if 'steps' in inputs: | |
| metadata['steps'] = inputs['steps'] | |
| if 'cfg' in inputs: | |
| metadata['cfg_scale'] = inputs['cfg'] | |
| if 'seed' in inputs: | |
| metadata['seed'] = inputs['seed'] | |
| if 'denoise' in inputs: | |
| metadata['denoising_strength'] = inputs['denoise'] | |
| return metadata | |
| except Exception as e: | |
| logger.error(f"Error parsing ComfyUI prompt: {str(e)}") | |
| return {} | |
| def extract_other_metadata(img: Image.Image) -> dict: | |
| """Extract other common metadata fields""" | |
| try: | |
| metadata = {} | |
| # Check standard EXIF data | |
| if hasattr(img, '_getexif') and img._getexif(): | |
| exif_data = img._getexif() | |
| # Extract relevant EXIF fields | |
| exif_fields = { | |
| 'software': 0x0131, # Software tag | |
| 'artist': 0x013B, # Artist tag | |
| 'copyright': 0x8298 # Copyright tag | |
| } | |
| for field_name, tag_id in exif_fields.items(): | |
| if tag_id in exif_data: | |
| metadata[field_name] = exif_data[tag_id] | |
| # Check for other text fields that might contain prompts | |
| if hasattr(img, 'text'): | |
| text_fields = ['description', 'comment', 'title', 'subject'] | |
| for field in text_fields: | |
| if field in img.text: | |
| value = img.text[field].strip() | |
| if len(value) > 10 and 'prompt' not in metadata: | |
| metadata['prompt'] = value | |
| return metadata | |
| except Exception as e: | |
| logger.error(f"Error extracting other metadata: {str(e)}") | |
| return {} | |
| def clean_prompt_text(prompt: str) -> str: | |
| """Clean and normalize prompt text""" | |
| try: | |
| if not prompt: | |
| return "" | |
| # Remove extra whitespace | |
| prompt = re.sub(r'\s+', ' ', prompt.strip()) | |
| # Remove common prefixes/suffixes | |
| prefixes_to_remove = [ | |
| 'prompt:', 'positive prompt:', 'text prompt:', | |
| 'description:', 'caption:' | |
| ] | |
| for prefix in prefixes_to_remove: | |
| if prompt.lower().startswith(prefix): | |
| prompt = prompt[len(prefix):].strip() | |
| return prompt | |
| except Exception: | |
| return prompt if prompt else "" | |
| def get_generation_parameters(metadata: dict) -> dict: | |
| """Extract key generation parameters for display""" | |
| try: | |
| params = {} | |
| # Essential parameters | |
| if 'prompt' in metadata: | |
| params['prompt'] = clean_prompt_text(metadata['prompt']) | |
| if 'negative_prompt' in metadata: | |
| params['negative_prompt'] = clean_prompt_text(metadata['negative_prompt']) | |
| # Technical parameters | |
| technical_params = ['steps', 'cfg_scale', 'sampler', 'seed', 'model', 'width', 'height'] | |
| for param in technical_params: | |
| if param in metadata: | |
| params[param] = metadata[param] | |
| # Source information | |
| if 'source' in metadata: | |
| params['source'] = metadata['source'] | |
| return params | |
| except Exception as e: | |
| logger.error(f"Error extracting generation parameters: {str(e)}") | |
| return {} | |