Spaces:
Sleeping
Sleeping
| import json | |
| import sqlite3 | |
| from pathlib import Path | |
| from typing import List, Dict, Optional, Tuple | |
| import chromadb | |
| from chromadb import Settings | |
| from sentence_transformers import SentenceTransformer | |
| from datetime import datetime | |
| class EnhancedRAGUtils: | |
| def __init__(self, vector_stores_path: str = "./vector_stores"): | |
| self.vector_stores_path = Path(vector_stores_path) | |
| # Initialize embedding model (shared across all VDBs) | |
| self.embedder = SentenceTransformer('all-MiniLM-L6-v2') | |
| # Initialize all VDB connections | |
| self._init_regulatory_vdb() | |
| self._init_product_spec_vdb() | |
| self._init_checklist_examples_vdb() | |
| print("Enhanced RAG Utils initialized with 3 vector databases") | |
| def _init_regulatory_vdb(self): | |
| """Initialize regulatory guidelines VDB""" | |
| try: | |
| self.regulatory_chroma_path = self.vector_stores_path / "chroma_db" / "regulatory_docs" | |
| self.regulatory_metadata_db = self.regulatory_chroma_path / "metadata" / "regulatory_metadata.db" | |
| self.regulatory_client = chromadb.PersistentClient( | |
| path=str(self.regulatory_chroma_path), | |
| settings=Settings(anonymized_telemetry=False) | |
| ) | |
| self.regulatory_collection = self.regulatory_client.get_collection("regulatory_guidelines") | |
| print("✓ Regulatory VDB connected") | |
| except Exception as e: | |
| print(f"⚠ Regulatory VDB not available: {e}") | |
| self.regulatory_collection = None | |
| def _init_product_spec_vdb(self): | |
| """Initialize product specifications VDB""" | |
| try: | |
| self.product_spec_chroma_path = self.vector_stores_path / "chroma_db" / "product_specs" | |
| self.product_spec_metadata_db = self.product_spec_chroma_path / "metadata" / "product_metadata.db" | |
| self.product_spec_client = chromadb.PersistentClient( | |
| path=str(self.product_spec_chroma_path), | |
| settings=Settings(anonymized_telemetry=False) | |
| ) | |
| self.product_spec_collection = self.product_spec_client.get_collection("product_specifications") | |
| print("✓ Product Specifications VDB connected") | |
| except Exception as e: | |
| print(f"⚠ Product Specifications VDB not available: {e}") | |
| self.product_spec_collection = None | |
| def _init_checklist_examples_vdb(self): | |
| """Initialize checklist examples VDB""" | |
| try: | |
| self.checklist_chroma_path = self.vector_stores_path / "chroma_db" / "checklist_examples" | |
| self.checklist_metadata_db = self.checklist_chroma_path / "metadata" / "checklist_structures.db" | |
| self.checklist_client = chromadb.PersistentClient( | |
| path=str(self.checklist_chroma_path), | |
| settings=Settings(anonymized_telemetry=False) | |
| ) | |
| self.checklist_collection = self.checklist_client.get_collection("checklist_examples") | |
| print("✓ Checklist Examples VDB connected") | |
| except Exception as e: | |
| print(f"⚠ Checklist Examples VDB not available: {e}") | |
| self.checklist_collection = None | |
| def retrieve_regulatory_requirements(self, product_name: str, domain: str = "Food Manufacturing", k: int = 3) -> List[Dict]: | |
| """Retrieve relevant regulatory requirements - only when specifically relevant""" | |
| if not self.regulatory_collection: | |
| return [] | |
| try: | |
| # UPDATED: More targeted query without forcing specific standards | |
| query_text = f"{product_name} {domain} quality requirements standards" | |
| query_embedding = self.embedder.encode(query_text).tolist() | |
| # Query ChromaDB | |
| results = self.regulatory_collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=k | |
| ) | |
| guidelines = [] | |
| if results['documents'] and results['documents'][0]: | |
| for i, doc in enumerate(results['documents'][0]): | |
| metadata = results['metadatas'][0][i] | |
| # UPDATED: Only include if truly relevant (high relevance score) | |
| relevance_score = 1 - results['distances'][0][i] if 'distances' in results else 0.5 | |
| # Higher threshold for including regulatory requirements | |
| if relevance_score > 0.7: # Only highly relevant results | |
| clause_ref = self._extract_clause_reference(metadata, doc) | |
| guidelines.append({ | |
| "text": doc[:600], # Reduced text length | |
| "regulatory_body": metadata.get('regulatory_body', 'Unknown'), | |
| "standard_code": metadata.get('standard_code', ''), | |
| "clause_reference": clause_ref, | |
| "topics": metadata.get('topics', ''), | |
| "jurisdiction": metadata.get('jurisdiction', ''), | |
| "relevance_score": relevance_score, | |
| "source_type": "regulatory" | |
| }) | |
| # Sort by relevance | |
| guidelines = sorted(guidelines, key=lambda x: x['relevance_score'], reverse=True) | |
| return guidelines[:k] # Return only top k results | |
| except Exception as e: | |
| print(f"Error retrieving regulatory requirements: {str(e)}") | |
| return [] | |
| def retrieve_product_specifications(self, product_name: str, k: int = 3) -> List[Dict]: | |
| """Retrieve similar product specifications for reference only""" | |
| if not self.product_spec_collection: | |
| return [] | |
| try: | |
| # UPDATED: Focus on product characteristics, not prescriptive requirements | |
| query_text = f"{product_name} product characteristics quality attributes" | |
| query_embedding = self.embedder.encode(query_text).tolist() | |
| # Query ChromaDB | |
| results = self.product_spec_collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=k | |
| ) | |
| specifications = [] | |
| if results['documents'] and results['documents'][0]: | |
| for i, doc in enumerate(results['documents'][0]): | |
| metadata = results['metadatas'][0][i] | |
| # UPDATED: Extract category dynamically | |
| product_category = self._determine_product_category( | |
| metadata.get('product_name', ''), | |
| metadata.get('product_category', ''), | |
| doc | |
| ) | |
| specifications.append({ | |
| "text": doc[:400], # Reduced text | |
| "product_name": metadata.get('product_name', 'Unknown'), | |
| "supplier": metadata.get('supplier', 'Unknown'), | |
| "category": product_category, # Dynamic category | |
| "specification_type": metadata.get('specification_type', 'Unknown'), | |
| "parameters_count": metadata.get('total_parameters', 0), | |
| "detail_level": metadata.get('detail_level', 'standard'), | |
| "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5, | |
| "source_type": "product_spec" | |
| }) | |
| return sorted(specifications, key=lambda x: x['relevance_score'], reverse=True) | |
| except Exception as e: | |
| print(f"Error retrieving product specifications: {str(e)}") | |
| return [] | |
| def retrieve_checklist_examples(self, product_name: str, k: int = 3) -> List[Dict]: | |
| """Retrieve similar checklist examples as reference patterns only""" | |
| if not self.checklist_collection: | |
| return [] | |
| try: | |
| # UPDATED: Focus on pattern discovery, not template copying | |
| query_text = f"{product_name} inspection checklist structure" | |
| query_embedding = self.embedder.encode(query_text).tolist() | |
| # Query ChromaDB | |
| results = self.checklist_collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=k | |
| ) | |
| examples = [] | |
| if results['documents'] and results['documents'][0]: | |
| for i, doc in enumerate(results['documents'][0]): | |
| metadata = results['metadatas'][0][i] | |
| # Get parameter structures from metadata | |
| parameter_info = self._extract_parameter_structure(metadata) | |
| examples.append({ | |
| "text": doc[:300], # Reduced text | |
| "document_type": metadata.get('document_type', 'QC Checklist'), | |
| "product_name": metadata.get('product_name', 'Unknown'), | |
| "checklist_category": metadata.get('checklist_category', 'General'), | |
| "total_parameters": metadata.get('total_parameters', 0), | |
| "parameter_types": metadata.get('parameter_types', []), | |
| "input_methods": metadata.get('input_methods', []), | |
| "parameter_structure": parameter_info, | |
| "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5, | |
| "source_type": "checklist_example" | |
| }) | |
| return examples | |
| except Exception as e: | |
| print(f"Error retrieving checklist examples: {str(e)}") | |
| return [] | |
| def retrieve_parameter_patterns(self, product_category: str = "", k: int = 10) -> List[Dict]: | |
| """Retrieve common parameter patterns based on actual usage""" | |
| if not self.checklist_metadata_db.exists(): | |
| return [] | |
| try: | |
| conn = sqlite3.connect(self.checklist_metadata_db) | |
| cursor = conn.cursor() | |
| # UPDATED: Dynamic query based on product category if provided | |
| if product_category: | |
| query = """ | |
| SELECT | |
| cp.parameter_name, | |
| cp.parameter_type, | |
| cp.input_method, | |
| cp.specifications, | |
| cp.options_list, | |
| cp.tolerance_limits, | |
| cp.measurement_units, | |
| cp.has_remarks, | |
| COUNT(*) as usage_frequency, | |
| GROUP_CONCAT(DISTINCT cd.product_name) as used_in_products | |
| FROM checklist_parameters cp | |
| JOIN checklist_documents cd ON cp.file_hash = cd.file_hash | |
| WHERE cd.checklist_category LIKE ? | |
| GROUP BY cp.parameter_name, cp.parameter_type, cp.input_method | |
| ORDER BY usage_frequency DESC, cp.parameter_name | |
| LIMIT ? | |
| """ | |
| cursor.execute(query, (f"%{product_category}%", k)) | |
| else: | |
| # General patterns without category filter | |
| query = """ | |
| SELECT | |
| cp.parameter_name, | |
| cp.parameter_type, | |
| cp.input_method, | |
| cp.specifications, | |
| cp.options_list, | |
| cp.tolerance_limits, | |
| cp.measurement_units, | |
| cp.has_remarks, | |
| COUNT(*) as usage_frequency, | |
| GROUP_CONCAT(DISTINCT cd.product_name) as used_in_products | |
| FROM checklist_parameters cp | |
| JOIN checklist_documents cd ON cp.file_hash = cd.file_hash | |
| GROUP BY cp.parameter_name, cp.parameter_type, cp.input_method | |
| ORDER BY usage_frequency DESC, cp.parameter_name | |
| LIMIT ? | |
| """ | |
| cursor.execute(query, (k,)) | |
| patterns = [] | |
| for row in cursor.fetchall(): | |
| patterns.append({ | |
| "parameter_name": row[0], | |
| "parameter_type": row[1], | |
| "input_method": row[2], | |
| "specifications": row[3] or "", | |
| "options_list": row[4] or "", | |
| "tolerance_limits": row[5] or "", | |
| "measurement_units": row[6] or "", | |
| "has_remarks": bool(row[7]), | |
| "usage_frequency": row[8], | |
| "used_in_products": row[9].split(',') if row[9] else [] | |
| }) | |
| return patterns | |
| except Exception as e: | |
| print(f"Error retrieving parameter patterns: {str(e)}") | |
| return [] | |
| finally: | |
| if 'conn' in locals(): | |
| conn.close() | |
| def get_comprehensive_context(self, product_name: str, domain: str = "Food Manufacturing", | |
| include_patterns: bool = True) -> Dict: | |
| """Get comprehensive context from all VDBs - as reference only""" | |
| context = { | |
| "product_name": product_name, | |
| "domain": domain, | |
| "regulatory_requirements": [], | |
| "product_specifications": [], | |
| "checklist_examples": [], | |
| "parameter_patterns": [], | |
| "context_summary": {}, | |
| "generated_at": datetime.now().isoformat() | |
| } | |
| print(f"Retrieving reference context for: {product_name}") | |
| # UPDATED: Only get regulatory if likely to be relevant | |
| # Don't force regulatory requirements for every product | |
| context["regulatory_requirements"] = self.retrieve_regulatory_requirements(product_name, domain, k=2) | |
| # Get product specifications | |
| context["product_specifications"] = self.retrieve_product_specifications(product_name, k=2) | |
| # Extract dynamic category from specifications | |
| product_category = "" | |
| if context["product_specifications"]: | |
| # Use the most relevant specification's category | |
| product_category = context["product_specifications"][0].get("category", "") | |
| # Get checklist examples | |
| context["checklist_examples"] = self.retrieve_checklist_examples(product_name, k=3) | |
| # Get parameter patterns based on dynamic category | |
| if include_patterns: | |
| context["parameter_patterns"] = self.retrieve_parameter_patterns( | |
| product_category=product_category, | |
| k=10 | |
| ) | |
| # Generate context summary | |
| context["context_summary"] = self._generate_context_summary(context) | |
| return context | |
| def format_context_for_prompt(self, context: Dict, max_length: int = 4000) -> str: | |
| """Format comprehensive context for AI prompt - as suggestions only""" | |
| # UPDATED: Emphasize that this is reference material only | |
| formatted_context = "\n# REFERENCE CONTEXT (Use as suggestions, not requirements):\n" | |
| formatted_context += "Note: The following is retrieved reference material. Use it to understand the domain better, but prioritize user requirements.\n" | |
| # Add regulatory compliance only if found | |
| if context["regulatory_requirements"]: | |
| formatted_context += "\n## 📚 Regulatory References (if applicable):\n" | |
| for i, req in enumerate(context["regulatory_requirements"][:2], 1): | |
| clause_ref = req.get('clause_reference', req.get('standard_code', '')) | |
| formatted_context += f"\n### Reference {i}: {req['regulatory_body']}" | |
| if clause_ref: | |
| formatted_context += f" - {clause_ref}\n" | |
| else: | |
| formatted_context += "\n" | |
| if req.get('text'): | |
| formatted_context += f"Content: {req['text'][:200]}...\n" | |
| # Add product specification insights | |
| if context["product_specifications"]: | |
| formatted_context += "\n## 🔍 Similar Product Insights:\n" | |
| for i, spec in enumerate(context["product_specifications"][:2], 1): | |
| formatted_context += f"\n### Similar Product: {spec['product_name']}\n" | |
| formatted_context += f"**Category**: {spec['category']} (dynamically determined)\n" | |
| formatted_context += f"**Typical Parameters**: {spec['parameters_count']}\n" | |
| if spec.get('text'): | |
| formatted_context += f"**Characteristics**: {spec['text'][:150]}...\n" | |
| # Add checklist pattern examples | |
| if context["checklist_examples"]: | |
| formatted_context += "\n## 📋 Checklist Patterns (for reference):\n" | |
| for i, example in enumerate(context["checklist_examples"][:2], 1): | |
| formatted_context += f"\n### Pattern from: {example['product_name']}\n" | |
| if example.get('input_methods'): | |
| methods = ', '.join(set(example['input_methods'][:5])) | |
| formatted_context += f"**Common Input Types**: {methods}\n" | |
| if example.get('parameter_structure'): | |
| formatted_context += "**Example Parameters**:\n" | |
| for param in example['parameter_structure'][:3]: | |
| formatted_context += f" - {param['name']}: {param['input_method']}\n" | |
| # Add parameter patterns without prescribing | |
| if context["parameter_patterns"]: | |
| formatted_context += "\n## 💡 Parameter Patterns (common patterns, not requirements):\n" | |
| # Show diverse patterns | |
| shown_types = set() | |
| for pattern in context["parameter_patterns"]: | |
| if pattern['input_method'] not in shown_types and len(shown_types) < 5: | |
| shown_types.add(pattern['input_method']) | |
| formatted_context += f"\n**{pattern['input_method']} Example**:\n" | |
| formatted_context += f" • {pattern['parameter_name']}" | |
| if pattern['specifications']: | |
| formatted_context += f" (e.g., {pattern['specifications'][:30]})" | |
| formatted_context += f" - seen {pattern['usage_frequency']} times\n" | |
| # Add context summary | |
| if context.get("context_summary"): | |
| formatted_context += "\n## 💬 Context Insights:\n" | |
| summary = context["context_summary"] | |
| if summary.get("product_insights"): | |
| formatted_context += f"**Product Type**: {summary['product_insights']}\n" | |
| if summary.get("common_patterns"): | |
| formatted_context += f"**Common Patterns**: {summary['common_patterns']}\n" | |
| formatted_context += "\n**Remember**: These are suggestions based on similar products. " | |
| formatted_context += "The user's specific requirements always take priority.\n" | |
| # Truncate if too long | |
| if len(formatted_context) > max_length: | |
| formatted_context = formatted_context[:max_length] + "\n\n[Context truncated for length...]" | |
| return formatted_context | |
| def _determine_product_category(self, product_name: str, stored_category: str, doc_text: str) -> str: | |
| """Dynamically determine product category without hardcoding""" | |
| # If we have a stored category that's not generic, use it | |
| if stored_category and stored_category not in ["General", "Unknown", "Food"]: | |
| return stored_category | |
| # Otherwise, analyze the product name and text to determine category | |
| product_lower = product_name.lower() | |
| text_lower = doc_text.lower() if doc_text else "" | |
| # Let the category emerge from the content | |
| # Don't use predefined categories | |
| if any(word in product_lower + text_lower for word in ["frozen", "freeze", "iqf", "-18"]): | |
| return "Temperature Controlled" | |
| elif any(word in product_lower + text_lower for word in ["fresh", "chilled", "refrigerated"]): | |
| return "Fresh/Chilled" | |
| elif any(word in product_lower + text_lower for word in ["fried", "oil", "crispy"]): | |
| return "Processed/Fried" | |
| elif any(word in product_lower + text_lower for word in ["baked", "bakery", "bread"]): | |
| return "Bakery/Baked" | |
| else: | |
| # Return a general category based on the product name itself | |
| return "Specialty Product" | |
| def _extract_clause_reference(self, metadata: Dict, document_text: str) -> str: | |
| """Extract clause reference from regulatory document""" | |
| standard_code = metadata.get('standard_code', '') | |
| regulatory_body = metadata.get('regulatory_body', '') | |
| # Only return if there's a specific clause | |
| if standard_code and standard_code != regulatory_body: | |
| return standard_code | |
| # Look for section numbers in the text | |
| import re | |
| section_patterns = [ | |
| r"(Section\s+\d+\.\d+)", | |
| r"(Clause\s+\d+\.\d+)", | |
| r"(\d+\.\d+\s+[A-Z][\w\s]{10,30})" | |
| ] | |
| for pattern in section_patterns: | |
| match = re.search(pattern, document_text[:300]) | |
| if match: | |
| return match.group(1).strip() | |
| return "" | |
| def _extract_parameter_structure(self, metadata: Dict) -> List[Dict]: | |
| """Extract parameter structure info from checklist metadata""" | |
| structure = [] | |
| param_types = metadata.get('parameter_types', []) | |
| input_methods = metadata.get('input_methods', []) | |
| # Create sample structure without being prescriptive | |
| for i, (ptype, method) in enumerate(zip(param_types[:3], input_methods[:3])): | |
| structure.append({ | |
| "name": f"{ptype} Parameter", | |
| "type": ptype, | |
| "input_method": method, | |
| "spec": "", | |
| "options": [] | |
| }) | |
| return structure | |
| def _generate_context_summary(self, context: Dict) -> Dict: | |
| """Generate intelligent summary of retrieved context - no prescriptions""" | |
| summary = { | |
| "product_insights": "", | |
| "common_patterns": "", | |
| "regulatory_relevance": "minimal" # Default to minimal | |
| } | |
| # Product insights based on what we found | |
| if context["product_specifications"]: | |
| categories = [spec.get('category', '') for spec in context["product_specifications"]] | |
| unique_categories = [c for c in categories if c and c != "Unknown"] | |
| if unique_categories: | |
| summary["product_insights"] = f"Similar to {', '.join(unique_categories[:2])} products" | |
| # Common patterns without being prescriptive | |
| if context["parameter_patterns"]: | |
| input_methods = {} | |
| for pattern in context["parameter_patterns"][:5]: | |
| method = pattern['input_method'] | |
| input_methods[method] = input_methods.get(method, 0) + 1 | |
| if input_methods: | |
| common_method = max(input_methods, key=input_methods.get) | |
| summary["common_patterns"] = f"Often uses {common_method} for data collection" | |
| # Regulatory relevance assessment | |
| if context["regulatory_requirements"]: | |
| # Only mark as relevant if we found highly relevant requirements | |
| avg_relevance = sum(req.get('relevance_score', 0) for req in context["regulatory_requirements"]) / len(context["regulatory_requirements"]) | |
| if avg_relevance > 0.75: | |
| summary["regulatory_relevance"] = "high" | |
| elif avg_relevance > 0.6: | |
| summary["regulatory_relevance"] = "moderate" | |
| return summary | |
| # Singleton instance for global use | |
| rag_utils = EnhancedRAGUtils() | |
| # Export convenience functions - UPDATED to be less prescriptive | |
| def get_comprehensive_context(product_name: str, domain: str = "Food Manufacturing") -> Dict: | |
| """Get comprehensive context from all VDBs as reference material only""" | |
| return rag_utils.get_comprehensive_context(product_name, domain) | |
| def format_context_for_prompt(context: Dict, max_length: int = 4000) -> str: | |
| """Format context for AI prompt as suggestions only""" | |
| return rag_utils.format_context_for_prompt(context, max_length) | |
| def retrieve_regulatory_requirements(product_name: str, domain: str = "Food Manufacturing") -> List[Dict]: | |
| """Get regulatory requirements only when relevant""" | |
| return rag_utils.retrieve_regulatory_requirements(product_name, domain) | |
| def retrieve_checklist_examples(product_name: str) -> List[Dict]: | |
| """Get checklist examples as patterns, not templates""" | |
| return rag_utils.retrieve_checklist_examples(product_name) | |
| def retrieve_parameter_patterns(product_category: str = "") -> List[Dict]: | |
| """Get parameter patterns based on dynamic category""" | |
| return rag_utils.retrieve_parameter_patterns(product_category) | |