Spaces:
Sleeping
Sleeping
| import base64 | |
| import json | |
| import requests | |
| import datetime | |
| import hashlib | |
| import hmac | |
| import logging | |
| import ntplib | |
| import time | |
| import os | |
| import tempfile | |
| import io | |
| from openai import OpenAI | |
| from openpyxl import Workbook | |
| import gradio as gr | |
| import re | |
| import fitz # PyMuPDF | |
| import pandas as pd | |
| from gradio_pdf import PDF # Import the new PDF component | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Get configuration from environment variables | |
| SECRET_ID = os.getenv("SECRET_ID", "AKID9EGD5tdKtpq5V1pkfbkwcJLOLEFVnJwp") | |
| SECRET_KEY = os.getenv("SECRET_KEY", "374ugKueFkK7DFA62675Gk9TizCGA49A") | |
| REGION = os.getenv("REGION", "ap-guangzhou") | |
| ENDPOINT = os.getenv("ENDPOINT", "lke.tencentcloudapi.com") | |
| SERVICE = "lke" | |
| ACTION = "ReconstructDocument" | |
| VERSION = "2023-11-30" | |
| # OpenAI API key | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", | |
| "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") | |
| # Get NTP time | |
| def get_ntp_time(): | |
| ntp_client = ntplib.NTPClient() | |
| try: | |
| response = ntp_client.request('pool.ntp.org', version=3, timeout=5) | |
| return datetime.datetime.fromtimestamp(response.tx_time, datetime.timezone.utc) | |
| except Exception as e: | |
| logging.warning(f"Unable to get NTP time, using local time: {e}") | |
| return datetime.datetime.now(datetime.timezone.utc) | |
| # Signing function | |
| def sign(key, msg): | |
| return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() | |
| # Get authentication information | |
| def get_auth(secret_id, secret_key, host, method, params, headers): | |
| algorithm = "TC3-HMAC-SHA256" | |
| ntp_time = get_ntp_time() | |
| timestamp = int(ntp_time.timestamp()) | |
| date = ntp_time.strftime('%Y-%m-%d') | |
| http_request_method = method.upper() | |
| canonical_uri = "/" | |
| canonical_querystring = "" | |
| ct = headers.get("content-type", "application/x-www-form-urlencoded") | |
| payload = json.dumps(params) | |
| canonical_headers = f"content-type:{ct}\nhost:{host}\n" | |
| signed_headers = "content-type;host" | |
| hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest() | |
| canonical_request = (f"{http_request_method}\n{canonical_uri}\n{canonical_querystring}\n" | |
| f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}") | |
| credential_scope = f"{date}/{SERVICE}/tc3_request" | |
| hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() | |
| string_to_sign = (f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}") | |
| secret_date = sign(f"TC3{secret_key}".encode("utf-8"), date) | |
| secret_service = sign(secret_date, SERVICE) | |
| secret_signing = sign(secret_service, "tc3_request") | |
| signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() | |
| authorization = (f"{algorithm} Credential={secret_id}/{credential_scope}, " | |
| f"SignedHeaders={signed_headers}, Signature={signature}") | |
| return { | |
| "Authorization": authorization, | |
| "Host": host, | |
| "Content-Type": ct, | |
| "X-TC-Timestamp": str(timestamp), | |
| "X-TC-Version": VERSION, | |
| "X-TC-Action": ACTION, | |
| "X-TC-Region": REGION | |
| } | |
| # Extract information | |
| def extract_information(content): | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| prompt = ( | |
| "There are some guides, respond in detailed content, respond without content in (), JSON begin with contracts value:\n" | |
| "1. Contract awarded date\n" | |
| "2. Construction location (This part of the content is in the title, not in the table; the address must be returned and should be detailed.)\n" | |
| "3. Tender reference\n" | |
| "4. Construction summary (in the 'particular' section)\n" | |
| "5. Contractor\n" | |
| "6. Contractor address(this is not company name, the address must be returned and should be detailed.)\n" | |
| "7. Amount\n" | |
| "8. Notice publish date (at the end of the content)" | |
| ) | |
| for attempt in range(3): # Try three times | |
| try: | |
| logging.info(f"Extracting information (Attempt {attempt + 1}/3)") | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant designed to output JSON"}, | |
| {"role": "user", "content": f"{prompt}\n\n{content}"} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| if response.choices[0].finish_reason == "stop": | |
| extracted_info = json.loads(response.choices[0].message.content) | |
| return json.dumps(extracted_info, ensure_ascii=False, indent=4) | |
| else: | |
| logging.warning(f"Warning: Unexpected completion reason - {response.choices[0].finish_reason}") | |
| except Exception as e: | |
| logging.error(f"Error: API call failed - {str(e)}") | |
| if attempt < 2: # If not the last attempt, wait before retrying | |
| time.sleep(5) | |
| return None # If all three attempts fail, return None. | |
| # JSON to Excel | |
| def json_to_excel(json_data): | |
| data = json.loads(json_data) | |
| wb = Workbook() | |
| ws = wb.active | |
| headers = ['contract_awarded_date', 'construction_location', 'tender_reference', | |
| 'construction_summary', 'contractor', 'contractor_address', | |
| 'amount', 'notice_publish_date'] | |
| ws.append(headers) | |
| # Create a helper function for exact matching | |
| def exact_match(key, target): | |
| key = ''.join(c.lower() for c in key if c.isalnum()) | |
| target = ''.join(c.lower() for c in target if c.isalnum()) | |
| return key == target | |
| for contract in data['contracts']: | |
| row = [] | |
| for header in headers: | |
| # Use exact matching to find the corresponding value | |
| matched_value = next((v for k, v in contract.items() if exact_match(header, k)), '') | |
| row.append(matched_value) | |
| ws.append(row) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
| wb.save(tmp.name) | |
| return tmp.name | |
| def clean_url(input_text): | |
| # Remove any leading or trailing quotes | |
| cleaned_url = input_text.strip().strip('"') | |
| return cleaned_url | |
| # New function: Process uploaded PDF | |
| def process_pdf(file): | |
| logging.info(f"Start processing PDF file: {type(file)}") | |
| try: | |
| if hasattr(file, 'name'): | |
| # If file is a file object | |
| with fitz.open(file.name) as doc: | |
| text_content = "" | |
| for page in doc: | |
| text_content += page.get_text() | |
| else: | |
| # If file is a string (file path) | |
| with fitz.open(file) as doc: | |
| text_content = "" | |
| for page in doc: | |
| text_content += page.get_text() | |
| logging.info("PDF processing successful") | |
| return text_content | |
| except Exception as e: | |
| logging.error(f"PDF processing error: {str(e)}") | |
| raise | |
| def preview_excel(excel_path): | |
| try: | |
| df = pd.read_excel(excel_path, nrows=10) | |
| preview_df = df.iloc[:10, :8] | |
| return gr.Dataframe(value=preview_df) | |
| except Exception as e: | |
| logging.error(f"Excel preview error: {str(e)}") | |
| return gr.Dataframe() | |
| def process_pdf_file(file): | |
| if file is None: | |
| logging.warning("No file uploaded") | |
| return "Please upload a PDF file.", None, gr.Dataframe() | |
| try: | |
| logging.info(f"Received file: {type(file)}, {file.name if hasattr(file, 'name') else 'No name'}") | |
| pdf_content = process_pdf(file) | |
| except Exception as e: | |
| logging.error(f"Error processing PDF file: {str(e)}", exc_info=True) | |
| return f"Error processing PDF file: {str(e)}", None, gr.Dataframe() | |
| try: | |
| json_data = extract_information(pdf_content) | |
| if json_data is None: | |
| logging.error("Failed to extract information") | |
| return "Error extracting information. Please try again later.", None, gr.Dataframe() | |
| excel_path = json_to_excel(json_data) | |
| excel_preview = preview_excel(excel_path) | |
| logging.info("File processing successful") | |
| return "Processing successful!", excel_path, excel_preview | |
| except Exception as e: | |
| logging.error(f"Error processing file: {str(e)}", exc_info=True) | |
| return f"Error processing file: {str(e)}", None, gr.Dataframe() | |
| # Gradio interface | |
| iface = gr.Interface( | |
| fn=process_pdf_file, | |
| inputs=[ | |
| PDF(label="Upload PDF File") # Only keep the label parameter | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Processing Status"), | |
| gr.File(label="Download Excel File"), | |
| gr.Dataframe(label="Excel Preview (First 10 rows, 8 columns)") | |
| ], | |
| title="PDF Document Processing and Information Extraction", | |
| description="Upload a PDF file, and the system will process it and generate an Excel result." | |
| ) | |
| # Run the Gradio app | |
| if __name__ == "__main__": | |
| iface.launch() |