Spaces:
Sleeping
Sleeping
| import PyPDF2 | |
| import re | |
| from collections import Counter | |
| import nltk | |
| from nltk.tokenize import sent_tokenize, word_tokenize | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| import string | |
| from datetime import datetime, timedelta | |
| import json | |
| import torch | |
| from transformers import T5ForConditionalGeneration, T5Tokenizer, pipeline, BlipProcessor, BlipForConditionalGeneration | |
| import warnings | |
| import fitz # PyMuPDF | |
| from PIL import Image, ImageEnhance, ImageFilter | |
| import io | |
| import base64 | |
| import os | |
| import pytesseract | |
| import hashlib | |
| import logging | |
| import getpass | |
| import tempfile | |
| import shutil | |
| from fastapi import FastAPI | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Any, Optional | |
| from cryptography.fernet import Fernet | |
| from cryptography.hazmat.primitives import hashes | |
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
| try: | |
| import cv2 | |
| import numpy as np | |
| OPENCV_AVAILABLE = True | |
| except ImportError: | |
| print("OpenCV not available. Using PIL for image preprocessing.") | |
| OPENCV_AVAILABLE = False | |
| import numpy as np | |
| warnings.filterwarnings('ignore') | |
| app = FastAPI(title='AI (PDF→Summary+QnA+Scores)', version='0.2.1') | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| class HIPAALogger: | |
| """HIPAA-compliant audit logging system""" | |
| def __init__(self, log_file="hipaa_audit.log"): | |
| self.log_file = log_file | |
| self.setup_logging() | |
| def setup_logging(self): | |
| """Setup secure audit logging""" | |
| logging.basicConfig( | |
| filename=self.log_file, | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| self.logger = logging.getLogger('HIPAA_AUDIT') | |
| def log_access(self, user_id, action, resource, success=True): | |
| """Log access attempts and actions""" | |
| status = "SUCCESS" if success else "FAILURE" | |
| message = f"USER:{user_id} ACTION:{action} RESOURCE:{resource} STATUS:{status}" | |
| self.logger.info(message) | |
| def log_phi_processing(self, user_id, document_hash, action): | |
| """Log PHI processing events""" | |
| message = f"PHI_PROCESSING USER:{user_id} DOC_HASH:{document_hash} ACTION:{action}" | |
| self.logger.info(message) | |
| class SecureFileHandler: | |
| """Secure file handling with encryption and secure deletion""" | |
| def __init__(self, password=None): | |
| self.password = password | |
| self.key = self._derive_key(password) if password else None | |
| self.fernet = Fernet(self.key) if self.key else None | |
| def _derive_key(self, password): | |
| """Derive encryption key from password""" | |
| password_bytes = password.encode() | |
| kdf = PBKDF2HMAC( | |
| algorithm=hashes.SHA256(), | |
| length=32, | |
| salt=b'hipaa_thesis_analyzer_salt', | |
| iterations=100000, | |
| ) | |
| key = base64.urlsafe_b64encode(kdf.derive(password_bytes)) | |
| return key | |
| def encrypt_data(self, data): | |
| """Encrypt sensitive data""" | |
| if not self.fernet: | |
| return data | |
| if isinstance(data, str): | |
| data = data.encode() | |
| return self.fernet.encrypt(data) | |
| def decrypt_data(self, encrypted_data): | |
| """Decrypt sensitive data""" | |
| if not self.fernet: | |
| return encrypted_data | |
| decrypted = self.fernet.decrypt(encrypted_data) | |
| return decrypted.decode() | |
| def secure_save(self, data, filepath): | |
| """Save data with encryption""" | |
| if self.fernet: | |
| encrypted_data = self.encrypt_data(json.dumps(data)) | |
| with open(filepath + '.enc', 'wb') as f: | |
| f.write(encrypted_data) | |
| else: | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2) | |
| def secure_load(self, filepath): | |
| """Load encrypted data""" | |
| if self.fernet and os.path.exists(filepath + '.enc'): | |
| with open(filepath + '.enc', 'rb') as f: | |
| encrypted_data = f.read() | |
| decrypted_data = self.decrypt_data(encrypted_data) | |
| return json.loads(decrypted_data) | |
| elif os.path.exists(filepath): | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| return None | |
| def secure_delete(self, filepath): | |
| """Securely delete files by overwriting""" | |
| if os.path.exists(filepath): | |
| # Overwrite file multiple times before deletion | |
| file_size = os.path.getsize(filepath) | |
| with open(filepath, 'rb+') as f: | |
| for _ in range(3): # DoD 5220.22-M standard | |
| f.seek(0) | |
| f.write(os.urandom(file_size)) | |
| f.flush() | |
| os.remove(filepath) | |
| # Also check for encrypted version | |
| if os.path.exists(filepath + '.enc'): | |
| file_size = os.path.getsize(filepath + '.enc') | |
| with open(filepath + '.enc', 'rb+') as f: | |
| for _ in range(3): | |
| f.seek(0) | |
| f.write(os.urandom(file_size)) | |
| f.flush() | |
| os.remove(filepath + '.enc') | |
| class HIPAACompliantThesisAnalyzer: | |
| """HIPAA-compliant version of the thesis analyzer""" | |
| def __init__(self, user_id=None, password=None, session_timeout=30): | |
| self.user_id = user_id or getpass.getuser() | |
| self.session_timeout = session_timeout # minutes | |
| self.session_start = datetime.now() | |
| self.last_activity = datetime.now() | |
| # Initialize HIPAA compliance components | |
| self.hipaa_logger = HIPAALogger() | |
| self.secure_handler = SecureFileHandler(password) | |
| # Log session start | |
| self.hipaa_logger.log_access(self.user_id, "SESSION_START", "THESIS_ANALYZER") | |
| # Initialize base analyzer components | |
| self._initialize_analyzer() | |
| print(f"HIPAA-Compliant Thesis Analyzer initialized for user: {self.user_id}") | |
| print(f"Session timeout: {session_timeout} minutes") | |
| print(f"Encryption enabled: {'Yes' if password else 'No'}") | |
| def _initialize_analyzer(self): | |
| """Initialize the core analyzer components""" | |
| try: | |
| self.lemmatizer = WordNetLemmatizer() | |
| self.stop_words = set(stopwords.words('english')) | |
| except LookupError as e: | |
| print(f"NLTK resource error: {e}") | |
| self._download_nltk_resources() | |
| self.lemmatizer = WordNetLemmatizer() | |
| self.stop_words = set(stopwords.words('english')) | |
| self.thesis_text = "" | |
| self.sentences = [] | |
| self.key_terms = [] | |
| self.extracted_images = [] | |
| self.image_descriptions = [] | |
| self.ocr_results = [] | |
| self.use_ocr = True | |
| self.use_blip = True | |
| # Initialize T5 model | |
| print("Loading T5-small model (HIPAA-compliant local processing)...") | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| self.model_name = "t5-small" | |
| self.tokenizer = T5Tokenizer.from_pretrained(self.model_name) | |
| self.model = T5ForConditionalGeneration.from_pretrained(self.model_name) | |
| self.model.to(self.device) | |
| # Initialize pipelines | |
| self.summarizer = pipeline( | |
| "summarization", | |
| model=self.model_name, | |
| tokenizer=self.model_name, | |
| device=0 if torch.cuda.is_available() else -1, | |
| max_length=200, | |
| min_length=150, | |
| do_sample=True, | |
| temperature=0.7 | |
| ) | |
| self.qa_pipeline = pipeline( | |
| "text2text-generation", | |
| model=self.model_name, | |
| tokenizer=self.model_name, | |
| device=0 if torch.cuda.is_available() else -1, | |
| max_length=512, | |
| do_sample=True, | |
| temperature=0.7 | |
| ) | |
| # Initialize BLIP if enabled | |
| if self.use_blip: | |
| try: | |
| self.blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") | |
| self.blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base") | |
| self.blip_model.to(self.device) | |
| print("BLIP model loaded for local image analysis") | |
| except Exception as e: | |
| print(f"BLIP model loading failed: {e}") | |
| self.use_blip = False | |
| # Check OCR availability | |
| if self.use_ocr: | |
| try: | |
| pytesseract.get_tesseract_version() | |
| print("Tesseract OCR available for local processing") | |
| except Exception as e: | |
| print(f"Tesseract OCR not available: {e}") | |
| self.use_ocr = False | |
| def _download_nltk_resources(self): | |
| """Download required NLTK resources""" | |
| resources = [ | |
| ('tokenizers/punkt', 'punkt'), | |
| ('tokenizers/punkt_tab', 'punkt_tab'), | |
| ('corpora/stopwords', 'stopwords'), | |
| ('corpora/wordnet', 'wordnet'), | |
| ('corpora/omw-1.4', 'omw-1.4') | |
| ] | |
| for resource_path, resource_name in resources: | |
| try: | |
| nltk.data.find(resource_path) | |
| except LookupError: | |
| try: | |
| nltk.download(resource_name, quiet=True) | |
| except Exception as e: | |
| print(f"Warning: Failed to download {resource_name}: {e}") | |
| def check_session_timeout(self): | |
| """Check if session has timed out""" | |
| time_since_start = datetime.now() - self.session_start | |
| time_since_activity = datetime.now() - self.last_activity | |
| if time_since_activity.total_seconds() > (self.session_timeout * 60): | |
| self.hipaa_logger.log_access(self.user_id, "SESSION_TIMEOUT", "THESIS_ANALYZER") | |
| raise Exception("Session timed out due to inactivity. Please restart for security.") | |
| self.last_activity = datetime.now() | |
| def calculate_document_hash(self, content): | |
| """Calculate secure hash of document content""" | |
| return hashlib.sha256(content.encode()).hexdigest() | |
| def process_document_securely(self, pdf_path, questions, output_file=None): | |
| """Process document with full HIPAA compliance""" | |
| self.check_session_timeout() | |
| # Calculate document hash for audit trail | |
| with open(pdf_path, 'rb') as f: | |
| doc_content = f.read() | |
| doc_hash = hashlib.sha256(doc_content).hexdigest()[:16] | |
| self.hipaa_logger.log_phi_processing(self.user_id, doc_hash, "DOCUMENT_LOAD") | |
| try: | |
| # Extract text and images | |
| text, images = self._extract_text_and_images(pdf_path) | |
| self.hipaa_logger.log_phi_processing(self.user_id, doc_hash, "TEXT_EXTRACTION") | |
| # Perform OCR if enabled | |
| ocr_results = [] | |
| if self.use_ocr and images: | |
| ocr_results = self._perform_secure_ocr(images) | |
| self.hipaa_logger.log_phi_processing(self.user_id, doc_hash, "OCR_PROCESSING") | |
| # Analyze images if BLIP enabled | |
| image_descriptions = [] | |
| if self.use_blip and images: | |
| image_descriptions = self._analyze_images_securely(images) | |
| self.hipaa_logger.log_phi_processing(self.user_id, doc_hash, "IMAGE_ANALYSIS") | |
| # Combine all text | |
| ocr_text = " ".join([result['ocr_text'] for result in ocr_results if result.get('ocr_text')]) | |
| combined_text = text + " " + ocr_text | |
| # Generate analysis | |
| sections = self._extract_key_sections(combined_text) | |
| key_terms = self._extract_key_terms(combined_text) | |
| summary = self._generate_summary_secure(combined_text) | |
| question_answers = self._answer_questions_secure(questions, combined_text) | |
| self.hipaa_logger.log_phi_processing(self.user_id, doc_hash, "ANALYSIS_COMPLETE") | |
| # Compile HIPAA-compliant report | |
| report = { | |
| "hipaa_compliance": { | |
| "processed_locally": True, | |
| "encrypted_storage": bool(self.secure_handler.fernet), | |
| "audit_logged": True, | |
| "user_id": self.user_id, | |
| "session_id": hashlib.md5(f"{self.user_id}{self.session_start}".encode()).hexdigest()[:8], | |
| "document_hash": doc_hash, | |
| "processing_timestamp": datetime.now().isoformat(), | |
| "no_external_apis": True, | |
| "local_processing_only": True | |
| }, | |
| "document_info": { | |
| "file_path": os.path.basename(pdf_path), # Only filename for privacy | |
| "analysis_timestamp": datetime.now().isoformat(), | |
| "total_characters": len(text), | |
| "total_images": len(images), | |
| "device_used": str(self.device) | |
| }, | |
| "text_analysis": { | |
| "summary": summary, | |
| "key_terms": key_terms[:15], | |
| "sections_found": list(sections.keys()) | |
| }, | |
| "image_analysis": { | |
| "total_images_extracted": len(images), | |
| "images_with_text": len([r for r in ocr_results if r.get('has_text', False)]), | |
| "ocr_available": self.use_ocr, | |
| "blip_available": self.use_blip | |
| }, | |
| "question_responses": question_answers, | |
| "statistics": { | |
| "total_text_characters": len(text), | |
| "ocr_text_characters": len(ocr_text), | |
| "questions_processed": len(questions), | |
| "sections_identified": len(sections), | |
| "key_terms_extracted": len(key_terms) | |
| } | |
| } | |
| # Save securely if output file specified | |
| if output_file: | |
| self.secure_handler.secure_save(report, output_file) | |
| self.hipaa_logger.log_access(self.user_id, "REPORT_SAVE", output_file) | |
| return report | |
| except Exception as e: | |
| self.hipaa_logger.log_access(self.user_id, "PROCESSING_ERROR", pdf_path, success=False) | |
| raise e | |
| def _extract_text_and_images(self, pdf_path): | |
| """Securely extract text and images from PDF""" | |
| text = "" | |
| images = [] | |
| try: | |
| # Use PyMuPDF for comprehensive extraction | |
| doc = fitz.open(pdf_path) | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| # Extract text | |
| page_text = page.get_text() | |
| if page_text.strip(): | |
| text += page_text + "\n" | |
| # Extract images | |
| image_list = page.get_images() | |
| for img_index, img in enumerate(image_list): | |
| try: | |
| xref = img[0] | |
| pix = fitz.Pixmap(doc, xref) | |
| if pix.n - pix.alpha < 4: | |
| img_data = pix.tobytes("ppm") | |
| img_pil = Image.open(io.BytesIO(img_data)) | |
| image_info = { | |
| 'page': page_num + 1, | |
| 'index': img_index, | |
| 'image': img_pil, | |
| 'size': img_pil.size, | |
| 'format': img_pil.format or 'Unknown' | |
| } | |
| images.append(image_info) | |
| pix = None | |
| except Exception as e: | |
| print(f"Error extracting image {img_index} from page {page_num + 1}: {e}") | |
| continue | |
| doc.close() | |
| except Exception as e: | |
| print(f"Error in secure extraction: {e}") | |
| return text, images | |
| def _perform_secure_ocr(self, images): | |
| """Perform OCR with audit logging""" | |
| ocr_results = [] | |
| for i, img_info in enumerate(images): | |
| try: | |
| img = img_info['image'] | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| # Preprocess for OCR | |
| if OPENCV_AVAILABLE: | |
| img_array = np.array(img) | |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| denoised = cv2.medianBlur(gray, 3) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
| enhanced = clahe.apply(denoised) | |
| _, thresh = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| processed_img = Image.fromarray(thresh) | |
| else: | |
| gray = img.convert('L') | |
| enhancer = ImageEnhance.Contrast(gray) | |
| enhanced = enhancer.enhance(2.0) | |
| processed_img = enhanced.filter(ImageFilter.SHARPEN) | |
| # Perform OCR locally | |
| ocr_text = pytesseract.image_to_string(processed_img, config='--psm 6') | |
| ocr_result = { | |
| 'page': img_info['page'], | |
| 'image_index': img_info['index'], | |
| 'ocr_text': ocr_text.strip(), | |
| 'has_text': bool(ocr_text.strip()), | |
| 'processing_method': 'Local_OCR' | |
| } | |
| ocr_results.append(ocr_result) | |
| except Exception as e: | |
| ocr_results.append({ | |
| 'page': img_info['page'], | |
| 'image_index': img_info['index'], | |
| 'ocr_text': '', | |
| 'has_text': False, | |
| 'error': str(e) | |
| }) | |
| return ocr_results | |
| def _analyze_images_securely(self, images): | |
| """Analyze images locally with BLIP""" | |
| if not self.use_blip: | |
| return [] | |
| descriptions = [] | |
| for img_info in images: | |
| try: | |
| image = img_info['image'] | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| inputs = self.blip_processor(image, return_tensors="pt").to(self.device) | |
| with torch.no_grad(): | |
| out = self.blip_model.generate(**inputs, max_length=100, num_beams=5) | |
| caption = self.blip_processor.decode(out[0], skip_special_tokens=True) | |
| description = { | |
| 'page': img_info['page'], | |
| 'image_index': img_info['index'], | |
| 'caption': caption, | |
| 'processing_method': 'Local_BLIP' | |
| } | |
| descriptions.append(description) | |
| except Exception as e: | |
| descriptions.append({ | |
| 'page': img_info['page'], | |
| 'image_index': img_info['index'], | |
| 'caption': 'Analysis failed', | |
| 'error': str(e) | |
| }) | |
| return descriptions | |
| def _extract_key_sections(self, text): | |
| """Extract key sections from text""" | |
| sections = {} | |
| section_patterns = { | |
| 'abstract': r'abstract\s*:?\s*(.*?)(?=\n\s*(?:introduction|chapter|acknowledgment|table of contents))', | |
| 'introduction': r'introduction\s*:?\s*(.*?)(?=\n\s*(?:literature review|methodology|chapter|background))', | |
| 'methodology': r'(?:methodology|methods)\s*:?\s*(.*?)(?=\n\s*(?:results|findings|analysis|chapter))', | |
| 'results': r'(?:results|findings)\s*:?\s*(.*?)(?=\n\s*(?:discussion|conclusion|chapter))', | |
| 'conclusion': r'conclusion\s*:?\s*(.*?)(?=\n\s*(?:references|bibliography|appendix))' | |
| } | |
| for section_name, pattern in section_patterns.items(): | |
| match = re.search(pattern, text.lower(), re.DOTALL | re.IGNORECASE) | |
| if match: | |
| sections[section_name] = match.group(1).strip()[:1000] # Truncate for privacy | |
| return sections | |
| def _extract_key_terms(self, text): | |
| """Extract key terms securely""" | |
| try: | |
| words = re.findall(r'\b[a-zA-Z]+\b', text.lower()) | |
| words = [ | |
| self.lemmatizer.lemmatize(word) | |
| for word in words | |
| if word not in self.stop_words | |
| and len(word) > 3 | |
| and word.isalpha() | |
| ] | |
| word_freq = Counter(words) | |
| return [term for term, freq in word_freq.most_common(20)] | |
| except Exception as e: | |
| print(f"Error in key term extraction: {e}") | |
| return [] | |
| def _generate_summary_secure(self, text): | |
| """Generate summary using local T5 model""" | |
| try: | |
| clean_text = re.sub(r'\s+', ' ', text).strip() | |
| # Chunk text for processing | |
| max_length = 1000 | |
| if len(clean_text) > max_length: | |
| clean_text = clean_text[:max_length] | |
| summary = self.summarizer( | |
| clean_text, | |
| max_length=200, | |
| min_length=150, | |
| do_sample=True, | |
| temperature=0.7 | |
| ) | |
| return summary[0]['summary_text'] | |
| except Exception as e: | |
| print(f"Error in T5 summarization: {e}") | |
| # Fallback to extractive summary | |
| sentences = re.split(r'[.!?]+', text) | |
| return " ".join(sentences[:3]) + "..." | |
| def _answer_questions_secure(self, questions, text): | |
| """Answer questions using local T5 model""" | |
| answers = {} | |
| for question in questions: | |
| try: | |
| prompt = f"question: {question} context: {text[:1000]}" | |
| answer_result = self.qa_pipeline( | |
| prompt, | |
| max_length=200, | |
| min_length=30, | |
| do_sample=True, | |
| temperature=0.7, | |
| num_return_sequences=1 | |
| ) | |
| answer = answer_result[0]['generated_text'] | |
| answer = re.sub(r'^(answer:|Answer:)', '', answer).strip() | |
| answers[question] = { | |
| 'answer': answer, | |
| 'method': 'Local_T5', | |
| 'processed_securely': True | |
| } | |
| except Exception as e: | |
| answers[question] = { | |
| 'answer': 'Unable to process question securely', | |
| 'error': str(e), | |
| 'method': 'Error' | |
| } | |
| return answers | |
| def cleanup_session(self): | |
| """Clean up session data securely""" | |
| self.hipaa_logger.log_access(self.user_id, "SESSION_END", "THESIS_ANALYZER") | |
| # Clear sensitive data from memory | |
| self.thesis_text = "" | |
| self.extracted_images = [] | |
| self.ocr_results = [] | |
| self.image_descriptions = [] | |
| # Clear model cache if needed | |
| if hasattr(torch.cuda, 'empty_cache'): | |
| torch.cuda.empty_cache() | |
| print("Session cleaned up securely") | |
| class AnalyzeReq(BaseModel): | |
| storageKey: str # path to PDF on disk (or adjust to your storage scheme) | |
| projectId: Optional[str] = None | |
| documentId: Optional[str] = None | |
| ocr: bool = False | |
| blip: bool = False | |
| userId:str | |
| password:str | |
| useEncryption: bool =False | |
| def analyze(req: AnalyzeReq): | |
| """Main function with HIPAA compliance demonstration""" | |
| print("HIPAA-COMPLIANT THESIS ANALYZER") | |
| print("=" * 50) | |
| try: | |
| # Initialize HIPAA-compliant analyzer | |
| analyzer = HIPAACompliantThesisAnalyzer( | |
| user_id=req.userId, | |
| password=req.password, | |
| session_timeout=30 | |
| ) | |
| pdf_path = req.storageKey | |
| # Sample questions | |
| questions = [ | |
| "What is the main objective of the research?", | |
| "What methodology was used in the study?", | |
| "What are the key findings or results?", | |
| "What conclusions did the authors draw?", | |
| "What are the limitations of the study?", | |
| "What motivated the researchers to conduct this study?", | |
| "How does this research relate to existing literature?", | |
| "What are the practical implications of the findings?", | |
| "What assumptions underlie the research?", | |
| "What statistical methods were used to analyze the data?", | |
| "How robust are the study’s findings?", | |
| "Are there any potential biases in the study design or data collection?", | |
| "How do the results compare with previous studies on the same topic?", | |
| "What are the potential future applications of this research?", | |
| "How could this research be expanded or built upon in future studies?", | |
| "What new questions have emerged as a result of this study?" | |
| ] | |
| # Process document securely | |
| print("\nProcessing document with HIPAA compliance...") | |
| report = analyzer.process_document_securely( | |
| pdf_path=pdf_path, | |
| questions=questions, | |
| output_file="hipaa_compliant_analysis" | |
| ) | |
| print("\n" + "="*60) | |
| print("HIPAA-COMPLIANT ANALYSIS COMPLETE") | |
| print("="*60) | |
| print(f"✓ Processed locally: {report['hipaa_compliance']['processed_locally']}") | |
| print(f"✓ Encrypted storage: {report['hipaa_compliance']['encrypted_storage']}") | |
| print(f"✓ Audit logged: {report['hipaa_compliance']['audit_logged']}") | |
| print(f"✓ No external APIs: {report['hipaa_compliance']['no_external_apis']}") | |
| print(f"✓ Session ID: {report['hipaa_compliance']['session_id']}") | |
| # Cleanup | |
| analyzer.cleanup_session() | |
| return report | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| print("Ensure all requirements are installed and Tesseract is available.") | |
| #if __name__ == "__main__": | |
| print(""" | |
| HIPAA-COMPLIANT THESIS ANALYZER | |
| =============================== | |
| HIPAA COMPLIANCE FEATURES: | |
| ✓ Local processing only - no external API calls | |
| ✓ Encryption at rest with password protection | |
| ✓ Comprehensive audit logging | |
| ✓ Session timeout and access controls | |
| ✓ Secure file deletion | |
| ✓ PHI processing audit trail | |
| ✓ User authentication | |
| ✓ Data integrity verification | |
| INSTALLATION: | |
| pip install torch transformers PyPDF2 nltk PyMuPDF pillow pytesseract cryptography | |
| SECURITY FEATURES: | |
| - All processing happens locally | |
| - Optional file encryption | |
| - Secure memory cleanup | |
| - Audit trail for all operations | |
| - Session management with timeouts | |
| - Secure file overwriting for deletion | |
| COMPLIANCE NOTES: | |
| - This tool provides technical safeguards | |
| - You must implement administrative and physical safeguards | |
| - Ensure your workstation meets HIPAA requirements | |
| - Regular security assessments recommended | |
| """) | |
| #main() |