Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import argparse | |
| import numpy as np | |
| import sys | |
| import os | |
| import re | |
| from collections import Counter | |
| import pickle | |
| from gematria import letter_to_value, HEBREW_GEMATRIA_VALUES, linearize_umlauts, decompose_to_latin | |
| # --- Konfiguration --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logging.getLogger("gensim").setLevel(logging.WARNING) | |
| HOLOGRAPHIC_STATE_SIZE_BITS = 4096 | |
| BITS_PER_CHAR = 16 | |
| BOOK_RANGE = range(1, 40) | |
| MODELS_DIR = "models_by_book" | |
| INDICES_DIR = "indices_by_book" | |
| CACHE_FILE = "tanakh_data.cache" | |
| SORTED_GEMATRIA = sorted(HEBREW_GEMATRIA_VALUES.items(), key=lambda item: item[1], reverse=True) | |
| def setup_logging(debug_mode): | |
| level = logging.DEBUG if debug_mode else logging.INFO | |
| logging.getLogger().setLevel(level) | |
| # --- Kern-Engine als Klasse --- | |
| class TanakhExplorer: | |
| def __init__(self, use_cache=True): | |
| self.all_indices = {} | |
| self.tanakh_state = None | |
| cache_valid = use_cache and os.path.exists(CACHE_FILE) | |
| if cache_valid: | |
| try: | |
| logging.info(f"Lade Daten aus Cache-Datei: {CACHE_FILE}") | |
| with open(CACHE_FILE, 'rb') as f: | |
| cached_data = pickle.load(f) | |
| self.all_indices = cached_data.get('indices', {}) | |
| self.tanakh_state = cached_data.get('state') | |
| logging.info("Daten erfolgreich aus Cache geladen.") | |
| except Exception as e: | |
| logging.warning(f"Cache-Datei ist korrupt oder konnte nicht geladen werden: {e}. Lade Daten neu.") | |
| cache_valid = False | |
| if not cache_valid or not self.all_indices or not self.tanakh_state: | |
| self._load_all_indices() | |
| self._create_tanakh_holographic_state() | |
| if use_cache: | |
| self._save_to_cache() | |
| def _load_all_indices(self): | |
| logging.info("Lade Index-Dateien für alle Bücher...") | |
| for i in BOOK_RANGE: | |
| index_path = os.path.join(INDICES_DIR, f"book_{i:02}_index.json") | |
| if os.path.exists(index_path): | |
| with open(index_path, 'r', encoding='utf-8') as f: self.all_indices[i] = json.load(f) | |
| if not self.all_indices: sys.exit("Keine Index-Dateien gefunden. Bitte 'build_indices.py' ausführen.") | |
| logging.info(f"{len(self.all_indices)} Buch-Indizes geladen.") | |
| def _create_tanakh_holographic_state(self): | |
| logging.info("Erstelle holographischen Tanach-State...") | |
| final_state = '0' * HOLOGRAPHIC_STATE_SIZE_BITS | |
| full_binary_text = "" | |
| for i in BOOK_RANGE: | |
| try: | |
| with open(f"texts/torah/{i:02}.json", 'r', encoding='utf-8') as file: | |
| data = json.load(file) | |
| full_text = ' '.join([' '.join(block) for block in data.get("text", [])]) | |
| clean_text = re.sub(r"[^\u05D0-\u05EA]+", "", re.sub(r"\[.*?\]", "", full_text, flags=re.DOTALL)) | |
| if clean_text: | |
| full_binary_text += self._text_to_gematria_binary(clean_text, for_state=True) | |
| except Exception: continue | |
| self.tanakh_state = self._fold_into_state(full_binary_text) | |
| logging.info("Holographischer Tanach-State wurde erstellt.") | |
| def _save_to_cache(self): | |
| logging.info(f"Speichere Daten in Cache-Datei: {CACHE_FILE}") | |
| data_to_cache = {'indices': self.all_indices, 'state': self.tanakh_state} | |
| with open(CACHE_FILE, 'wb') as f: pickle.dump(data_to_cache, f) | |
| def _text_to_gematria_binary(text, for_state=False): | |
| text_for_calc = linearize_umlauts(text.lower()) | |
| if for_state: | |
| clean_text = re.sub(r"[^\u05D0-\u05EA]+", "", text_for_calc) | |
| else: | |
| clean_text = re.sub(r"[^a-z\u05D0-\u05EA]+", "", text_for_calc) | |
| logging.debug(f"text_to_gematria_binary (for_state={for_state}): Original='{text[:30]}...', Bereinigt='{clean_text[:30]}...'") | |
| binary_string = "".join(format(letter_to_value(c), f'0{BITS_PER_CHAR}b') for c in clean_text) | |
| logging.debug(f" -> erzeugter Binärstring (erste 64 Bits): {binary_string[:64]}") | |
| return binary_string | |
| def _fold_into_state(binary_string, initial_state=None): | |
| state = np.array(list(initial_state), dtype=np.int8) if initial_state else np.zeros(HOLOGRAPHIC_STATE_SIZE_BITS, dtype=np.int8) | |
| for i in range(0, len(binary_string), HOLOGRAPHIC_STATE_SIZE_BITS): | |
| block = binary_string[i:i+HOLOGRAPHIC_STATE_SIZE_BITS].ljust(HOLOGRAPHIC_STATE_SIZE_BITS, '0') | |
| state = np.bitwise_xor(state, np.array(list(block), dtype=np.int8)) | |
| return "".join(state.astype(str)) | |
| def get_best_phrase_from_all_books(self, gematria_val, method): | |
| best_overall_phrase_obj = None | |
| best_overall_score = -1.0 | |
| for book_num, book_index in self.all_indices.items(): | |
| candidates = book_index.get(str(gematria_val), {}).get('phrases', []) | |
| if not candidates: continue | |
| pg_score = book_index.get(str(gematria_val), {}).get('pagerank', 0) | |
| best_in_book = max(candidates, key=lambda p: pg_score / p.get('count', 1) if p.get('count', 0) > 0 else 0) | |
| current_score = pg_score / best_in_book.get('count', 1) if best_in_book.get('count', 0) > 0 else 0 | |
| if current_score > best_overall_score: | |
| best_overall_score = current_score | |
| best_in_book['source_book'] = book_num | |
| best_overall_phrase_obj = best_in_book | |
| if best_overall_phrase_obj: | |
| return best_overall_phrase_obj, "exact" | |
| for offset in [1, -1]: | |
| for book_num, book_index in self.all_indices.items(): | |
| candidates = book_index.get(str(gematria_val + offset), {}).get('phrases', []) | |
| if candidates: | |
| best_in_book = min(candidates, key=lambda p: p.get('position', float('inf'))) | |
| best_in_book['source_book'] = book_num | |
| return best_in_book, f"neighbor(d={offset})" | |
| decomposed = decompose_to_latin(gematria_val) | |
| if decomposed: | |
| return {"text": f"[{decomposed}]", "position": -2, "source_book": "N/A"}, "decomposed" | |
| return None, None | |
| def run_fractal_mode(self, query, depth, method): | |
| print(f"\n" + "="*15 + f" FRAKTALE LOGOS-AUSSCHÖPFUNG (Tiefe: {depth}, Methode: {method}) " + "="*15) | |
| initial_logos = query | |
| # <<<<<<<<<<<<<<<<<< HIER IST DIE KORREKTUR >>>>>>>>>>>>>>>>>>>> | |
| # Wir verwenden 0 für das Quell-Buch, um den TypeError zu vermeiden | |
| all_found_phrases_map = {initial_logos: {"text": initial_logos, "position": -1, "depth": 0, "count":1, "source_book": 0}} | |
| # <<<<<<<<<<<<<<<<<< ENDE DER KORREKTUR >>>>>>>>>>>>>>>>>>>>> | |
| phrases_to_process_this_level = {initial_logos} | |
| for d in range(depth): | |
| logging.info(f"--- Starte Tiefe {d + 1}/{depth} mit {len(phrases_to_process_this_level)} Phrasen ---") | |
| phrases_for_next_level = set() | |
| for p_current in phrases_to_process_this_level: | |
| combined_query = f"{initial_logos} {p_current}" | |
| query_binary = self._text_to_gematria_binary(combined_query) | |
| konzept_state = self._fold_into_state(query_binary) | |
| final_konzept = "".join(str(int(a)^int(b)) for a,b in zip(self.tanakh_state, konzept_state)) | |
| for i in range(0, len(final_konzept), BITS_PER_CHAR): | |
| gematria_val = int(final_konzept[i:i+BITS_PER_CHAR], 2) | |
| if gematria_val == 0: continue | |
| phrase_obj, _ = self.get_best_phrase_from_all_books(gematria_val, method) | |
| if phrase_obj: | |
| phrase_text = phrase_obj['text'] | |
| if phrase_text not in all_found_phrases_map: | |
| phrase_obj['depth'] = d + 1 | |
| phrase_obj['count'] = 1 | |
| all_found_phrases_map[phrase_text] = phrase_obj | |
| phrases_for_next_level.add(phrase_text) | |
| else: | |
| all_found_phrases_map[phrase_text]['count'] += 1 | |
| if not phrases_for_next_level: | |
| logging.info(f"Keine neuen Phrasen in Tiefe {d + 1} gefunden.") | |
| break | |
| phrases_to_process_this_level = phrases_for_next_level | |
| # Sortiere nach Buch und dann nach Position, um die narrative Ordnung beizubehalten | |
| sorted_by_position = sorted(all_found_phrases_map.values(), key=lambda x: (x.get('source_book', 99), x.get('position', -1))) | |
| print("\n--- Finale Synthese (geordnet nach Buch und Auftreten im Text) ---") | |
| current_book = -1 | |
| for p in sorted_by_position: | |
| book = p.get('source_book') | |
| if book != current_book: | |
| # Gib eine Kopfzeile für jedes neue Buch aus | |
| if isinstance(book, int) and book > 0: | |
| print(f"\n--- Buch {book:02d} ---") | |
| elif book == 0: | |
| print(f"--- Query ---") | |
| current_book = book | |
| print(f"{p['text']}", end=" | ") | |
| print("\n") | |
| # Sortiere nach Häufigkeit für die Top-Konzepte | |
| sorted_by_count = sorted(all_found_phrases_map.values(), key=lambda x: x['count'], reverse=True) | |
| print("\n--- Top 25 Resonanz-Konzepte (geordnet nach Häufigkeit im Fraktal) ---") | |
| for p in sorted_by_count[:25]: | |
| source = f"B{p.get('source_book', '??'):02d}" if isinstance(p.get('source_book'), int) and p.get('source_book') > 0 else p.get('source_book', 'N/A') | |
| print(f"[{p['count']:2d}x] {p['text']} (Original in {source}, Pos: {p.get('position', 'N/A')})") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Tanakh Holographic Explorer (v13, Final).") | |
| parser.add_argument("query", type=str, help="Die anfängliche Abfragephrase (Logos).") | |
| parser.add_argument("--method", type=str, choices=['frequency', 'network', 'default'], default='network', help="Gewichtungsmethode.") | |
| parser.add_argument("--depth", type=int, default=1, help="Maximale Tiefe der fraktalen Suche.") | |
| parser.add_argument("--no-cache", action="store_true", help="Erzwingt das Neuladen der Daten.") | |
| parser.add_argument("--debug", action="store_true", help="Aktiviert detaillierte Debug-Ausgaben.") | |
| args = parser.parse_args() | |
| setup_logging(args.debug) | |
| engine = TanakhExplorer(use_cache=not args.no_cache) | |
| engine.run_fractal_mode(args.query, args.depth, args.method) | |