Spaces:
Build error
Build error
| import requests | |
| import base64 | |
| import json | |
| import zipfile | |
| import io | |
| import os | |
| from typing import List, Dict, Tuple, Optional | |
| from pathlib import Path | |
| import re | |
| from utils import matches_patterns, is_binary_file, format_file_size | |
| from config import GITHUB_API_BASE, HF_API_BASE | |
| def process_github_repo( | |
| repo_path: str, | |
| token: str, | |
| include_patterns: List[str], | |
| exclude_patterns: List[str], | |
| max_file_size: int | |
| ) -> Tuple[List[Tuple[str, str, int]], Dict]: | |
| """Process GitHub repository and return file contents""" | |
| headers = {} | |
| if token: | |
| headers['Authorization'] = f'token {token}' | |
| # Get repository info | |
| repo_url = f"{GITHUB_API_BASE}/repos/{repo_path}" | |
| repo_response = requests.get(repo_url, headers=headers) | |
| if repo_response.status_code != 200: | |
| raise Exception(f"Failed to fetch repository info: {repo_response.json().get('message', 'Unknown error')}") | |
| repo_info = repo_response.json() | |
| # Get all files recursively | |
| files_data = [] | |
| contents_queue = [""] | |
| while contents_queue: | |
| current_path = contents_queue.pop(0) | |
| # Get directory contents | |
| contents_url = f"{GITHUB_API_BASE}/repos/{repo_path}/contents/{current_path}" | |
| contents_response = requests.get(contents_url, headers=headers) | |
| if contents_response.status_code != 200: | |
| continue | |
| contents = contents_response.json() | |
| if isinstance(contents, dict): | |
| # Single file | |
| contents = [contents] | |
| for item in contents: | |
| item_path = f"{current_path}/{item['name']}" if current_path else item['name'] | |
| if item['type'] == 'dir': | |
| contents_queue.append(item_path) | |
| elif item['type'] == 'file': | |
| # Check if file matches patterns | |
| if not matches_patterns(item_path, include_patterns, exclude_patterns): | |
| continue | |
| # Check file size | |
| if item['size'] > max_file_size: | |
| continue | |
| # Get file content | |
| try: | |
| file_url = item['url'] | |
| file_response = requests.get(file_url, headers=headers) | |
| if file_response.status_code == 200: | |
| file_data = file_response.json() | |
| content = base64.b64decode(file_data['content']).decode('utf-8', errors='ignore') | |
| # Skip binary files | |
| if is_binary_file(content, item_path): | |
| continue | |
| files_data.append((item_path, content, item['size'])) | |
| except Exception as e: | |
| print(f"Error processing file {item_path}: {e}") | |
| continue | |
| return files_data, repo_info | |
| def process_huggingface_repo( | |
| repo_path: str, | |
| token: str, | |
| include_patterns: List[str], | |
| exclude_patterns: List[str], | |
| max_file_size: int | |
| ) -> Tuple[List[Tuple[str, str, int]], Dict]: | |
| """Process Hugging Face repository and return file contents""" | |
| headers = {} | |
| if token: | |
| headers['Authorization'] = f'Bearer {token}' | |
| # Get repository info | |
| repo_url = f"{HF_API_BASE}/api/models/{repo_path}" | |
| repo_response = requests.get(repo_url, headers=headers) | |
| if repo_response.status_code != 200: | |
| raise Exception(f"Failed to fetch repository info: {repo_response.json().get('error', 'Unknown error')}") | |
| repo_info = repo_response.json() | |
| # Get repository tree | |
| tree_url = f"{HF_API_BASE}/api/models/{repo_path}/tree/main" | |
| tree_response = requests.get(tree_url, headers=headers) | |
| if tree_response.status_code != 200: | |
| raise Exception(f"Failed to fetch repository tree: {tree_response.json().get('error', 'Unknown error')}") | |
| tree_data = tree_response.json() | |
| files_data = [] | |
| def process_tree_item(item, current_path=""): | |
| if isinstance(item, list): | |
| for subitem in item: | |
| process_tree_item(subitem, current_path) | |
| elif isinstance(item, dict): | |
| item_path = f"{current_path}/{item['path']}" if current_path else item['path'] | |
| if item['type'] == 'directory': | |
| # Get directory contents | |
| dir_url = f"{HF_API_BASE}/api/models/{repo_path}/tree/main/{item_path}" | |
| dir_response = requests.get(dir_url, headers=headers) | |
| if dir_response.status_code == 200: | |
| process_tree_item(dir_response.json(), item_path) | |
| elif item['type'] == 'file': | |
| # Check if file matches patterns | |
| if not matches_patterns(item_path, include_patterns, exclude_patterns): | |
| return | |
| # Check file size | |
| if item.get('size', 0) > max_file_size: | |
| return | |
| # Get file content | |
| try: | |
| raw_url = f"https://huggingface.co/{repo_path}/raw/main/{item_path}" | |
| file_response = requests.get(raw_url, headers=headers) | |
| if file_response.status_code == 200: | |
| content = file_response.text | |
| # Skip binary files | |
| if is_binary_file(content, item_path): | |
| return | |
| files_data.append((item_path, content, len(content))) | |
| except Exception as e: | |
| print(f"Error processing file {item_path}: {e}") | |
| return | |
| process_tree_item(tree_data) | |
| return files_data, repo_info | |
| def download_repo_as_zip(repo_url: str, token: str) -> str: | |
| """Download repository as ZIP file""" | |
| if "github.com" in repo_url: | |
| # GitHub ZIP URL | |
| if token: | |
| headers = {'Authorization': f'token {token}'} | |
| zip_url = repo_url.replace("github.com", "api.github.com/repos") + "/zipball/main" | |
| else: | |
| headers = {} | |
| zip_url = repo_url.replace("github.com", "codeload.github.com") + "/zip/main" | |
| elif "huggingface.co" in repo_url: | |
| # Hugging Face ZIP URL | |
| headers = {} | |
| if token: | |
| headers['Authorization'] = f'Bearer {token}' | |
| zip_url = repo_url.replace("huggingface.co", "huggingface.co") + "/resolve/main?download=true" | |
| else: | |
| raise ValueError("Unsupported repository URL") | |
| response = requests.get(zip_url, headers=headers, stream=True) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to download repository: {response.status_code}") | |
| # Save to temporary file | |
| temp_path = f"/tmp/repo_{hash(repo_url)}.zip" | |
| with open(temp_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return temp_path | |
| def extract_repo_info(repo_url: str, repo_type: str) -> Dict: | |
| """Extract basic repository information""" | |
| if repo_type == "github": | |
| # Extract owner and repo name | |
| match = re.search(r'github\.com/([^/]+)/([^/]+)', repo_url) | |
| if match: | |
| return { | |
| 'owner': match.group(1), | |
| 'repo': match.group(2), | |
| 'full_name': f"{match.group(1)}/{match.group(2)}", | |
| 'url': repo_url | |
| } | |
| elif repo_type == "huggingface": | |
| # Extract owner and repo name | |
| match = re.search(r'huggingface\.co/([^/]+)/([^/]+)', repo_url) | |
| if match: | |
| return { | |
| 'owner': match.group(1), | |
| 'repo': match.group(2), | |
| 'full_name': f"{match.group(1)}/{match.group(2)}", | |
| 'url': repo_url | |
| } | |
| return {'url': repo_url} | |