from pathlib import Path from typing import List, Dict, Optional, Iterator from dataclasses import dataclass import json import re import os from bs4 import BeautifulSoup import logging import sqlite3 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class Chapter: number: int title: str content: List[str] class FileReader: @staticmethod def read_file(path: Path, encoding: str = "utf-8") -> List[str]: """Read file content with proper error handling.""" try: with open(path, encoding=encoding) as f: return [line.rstrip("\n") for line in f.readlines()] except Exception as e: logger.error(f"Error reading file {path}: {e}") return [] @staticmethod def read_json(path: Path) -> Dict: """Read and parse JSON file.""" try: with open(path, encoding="utf-8") as f: return json.load(f) except Exception as e: logger.error(f"Error reading JSON {path}: {e}") return {} class TextChapterParser: """Handles parsing of text-based chapter files.""" def __init__(self, base_path: Path): self.base_path = base_path def is_chapter_header( self, line: str, next_line: Optional[str], pattern: Optional[str] ) -> bool: """Determine if a line is a chapter header.""" if not line or line.startswith((" ", " ")): return False if next_line and (next_line.startswith(" ") or next_line.startswith(" ")): return not pattern or re.search(pattern, line) return False def parse_chapters( self, content_lines: List[str], chapter_pattern: Optional[str] ) -> List[Chapter]: """Parse text content into chapters.""" chapters: List[Chapter] = [] current_chapter = None chapter_content: List[str] = [] for i, line in enumerate(content_lines): if not line: continue next_line = content_lines[i + 1] if i + 1 < len(content_lines) else None if self.is_chapter_header(line, next_line, chapter_pattern): if current_chapter: chapters.append( Chapter( number=current_chapter["number"], title=current_chapter["title"], content=chapter_content, ) ) current_chapter = {"number": len(chapters) + 1, "title": line} chapter_content = [] continue if current_chapter: chapter_content.append(line.lstrip("  ")) if current_chapter: chapters.append( Chapter( number=current_chapter["number"], title=current_chapter["title"], content=chapter_content, ) ) return chapters class EpubChapterParser: """Handles parsing of EPUB format chapters.""" def __init__(self, base_path: Path): self.base_path = base_path @staticmethod def extract_number_from_filename( filename: str, pattern: str = None ) -> Optional[int]: patterns = [r"Section0*(\d+)", r"^0*(\d+)[-_]", r"split_0*(\d+)"] if pattern: patterns.insert(0, pattern) for pat in patterns: if match := re.search(pat, filename): return int(match.group(1)) return None # def get_valid_files(self, epub_dir: Path, config: Dict) -> List[str]: # """Get list of valid XHTML files.""" # try: # xhtml_files = [ # f for f in os.listdir(epub_dir) if f.endswith((".xhtml", ".html")) # ] # logger.info(len(xhtml_files)) # if not xhtml_files: # return xhtml_files # pattern = config.get("pattern") # files_with_nums = [ # (fname, num) # for fname in xhtml_files # if (num := self.extract_number_from_filename(fname, pattern)) # is not None # and num >= config["idx"] # ] # if config.get("sort"): # files_with_nums.sort(key=lambda x: x[1]) # return [fname for fname, _ in files_with_nums] # except Exception as e: # logger.error(f"Error listing directory {epub_dir}: {e}") # return [] def get_valid_files(self, epub_dir: Path, config: Dict) -> List[str]: """ Get list of valid XHTML files and log any missing sequence numbers. Returns the list of valid filenames while logging sequence gaps. """ try: xhtml_files = [ f for f in os.listdir(epub_dir) if f.endswith((".xhtml", ".html")) ] logger.info(f"Found {len(xhtml_files)} XHTML/HTML files") if not xhtml_files: return xhtml_files pattern = config.get("pattern") files_with_nums = [ (fname, num) for fname in xhtml_files if (num := self.extract_number_from_filename(fname, pattern)) is not None and num >= config["idx"] ] if config.get("sort"): files_with_nums.sort(key=lambda x: x[1]) # Check for missing numbers in sequence # if files_with_nums: # start_idx = config["idx"] # last_num = max(num for _, num in files_with_nums) # existing_nums = {num for _, num in files_with_nums} # missing_nums = [] # for expected_num in range(start_idx, last_num + 1): # if expected_num not in existing_nums: # prev_file = next(((f, n) for f, n in files_with_nums if n < expected_num), None) # next_file = next(((f, n) for f, n in files_with_nums if n > expected_num), None) # missing_nums.append({ # 'missing_num': expected_num, # 'prev_file': prev_file[0] if prev_file else None, # 'next_file': next_file[0] if next_file else None # }) # if missing_nums: # logger.warning(f"Found {len(missing_nums)} gaps in file sequence:") # for gap in missing_nums: # logger.warning( # f"Missing number {gap['missing_num']} " # f"(between files: {gap['prev_file']} and {gap['next_file']})" # ) return [fname for fname, _ in files_with_nums] except Exception as e: logger.error(f"Error listing directory {epub_dir}: {e}") return [] def parse_chapter_file( self, file_path: Path, chapter_num: int ) -> Optional[Chapter]: """Parse single chapter file.""" try: content = FileReader.read_file(file_path) if not content: return None soup = BeautifulSoup("".join(content), "html.parser") h1_tag = soup.find("h1") title = h1_tag.get_text().strip() if h1_tag else f"Chapter {chapter_num}" paragraphs = [] for p in soup.find_all("p"): for br in p.find_all("br"): br.replace_with("\n") # sup (footnotes) and a elements for element in p.find_all(["sup", "a"]): element.decompose() if text := p.get_text().strip(): paragraphs.append(text) return Chapter(number=chapter_num, title=title, content=paragraphs) except Exception as e: logger.error(f"Error processing {file_path}: {e}") return None class ChapterParser: """Main parser class that handles both text and EPUB formats.""" def __init__(self, base_dir: str): self.base_dir = Path(base_dir) self.text_parser = TextChapterParser(self.base_dir) self.epub_parser = EpubChapterParser(self.base_dir) def load_format_config(self, novel_dir: str) -> Dict: """Load format configuration for a novel.""" return FileReader.read_json(self.base_dir / novel_dir / "format.json") def get_txt_content(self, novel_dir: str, config: Dict) -> List[Chapter]: """Parse txt content into chapters.""" txt_path = self.base_dir / novel_dir / config["path"] content = FileReader.read_file(txt_path) if not content: return [] content_lines = content[config["idx"] - 1 :] chapters = self.text_parser.parse_chapters(content_lines, config.get("pattern")) if config.get("include_title"): for chapter in chapters: chapter.content = [chapter.title] + chapter.content return chapters def get_epub_content(self, novel_dir: str, config: Dict) -> List[Chapter]: """Parse EPUB content into chapters.""" epub_dir = self.base_dir / novel_dir / config["path"] valid_files = self.epub_parser.get_valid_files(epub_dir, config) chapters = [] for i, filename in enumerate(valid_files, start=1): if not ( chapter := self.epub_parser.parse_chapter_file(epub_dir / filename, i) ): continue if lshift := config.get("lshiftp", 0): chapter.content = chapter.content[lshift:] if not chapter.content: continue if config.get("include_title"): if not chapter.content or not ( re.match(config.get("title_pattern", ""), chapter.content[0]) ): chapter.content = [chapter.title] + chapter.content chapters.append(chapter) return chapters def print_chapter_titles( zh_chapters: List[Chapter], en_chapters: List[Chapter] ) -> None: """Print chapter titles side by side.""" max_chapters = max(len(zh_chapters), len(en_chapters)) logger.info(f"max_chapters: {max_chapters}") logger.info(f"zh: {len(zh_chapters)}") logger.info(f"en: {len(en_chapters)}") logger.info("\n=== Chapter Title Comparison ===") logger.info(f"{'English':<50} | {'Chinese'}") logger.info("-" * 80) for i in range(max_chapters): en_title = en_chapters[i].title if i < len(en_chapters) else "N/A" zh_title = zh_chapters[i].title if i < len(zh_chapters) else "N/A" logger.info(f"{en_title:<50} | {zh_title}") def create_db_entries( db_path: str, novel_dir: str, zh_chapters: List[Chapter], en_chapters: List[Chapter] ) -> None: if len(zh_chapters) != len(en_chapters): logger.warning( f"Chapter count mismatch for {novel_dir}: ZH={len(zh_chapters)}, EN={len(en_chapters)}" ) return conn = sqlite3.connect(db_path) cursor = conn.cursor() try: cursor.execute("insert or ignore into books (book_id) values (?)", (novel_dir,)) for zh_chap, en_chap in zip(zh_chapters, en_chapters): cursor.execute( """ insert or replace into chapters (book_id, chapter_id, text_zh, text_en) values (?, ?, ?, ?) """, ( novel_dir, str(zh_chap.number), "\n\n".join(zh_chap.content), "\n\n".join(en_chap.content), ), ) conn.commit() logger.info( f"Successfully inserted {len(zh_chapters)} chapters for {novel_dir}" ) except sqlite3.Error as e: logger.error(f"Database error occurred: {e}") conn.rollback() finally: conn.close() def main(): parser = ChapterParser("custom") logger.info(f"Starting parser with base directory: {os.path.abspath('custom')}") DB_PATH = "parallel_texts.db" epub_dirs = [ d for d in os.listdir("custom") if os.path.isdir(os.path.join("custom", d)) ] # epub_dirs = ["warlock"] for novel_dir in epub_dirs: logger.info(f"\n=== Analyzing {novel_dir} ===") config = parser.load_format_config(novel_dir) zh_chapters = parser.get_txt_content(novel_dir, config["zh"]) en_chapters = parser.get_epub_content(novel_dir, config["en"]) create_db_entries(DB_PATH, novel_dir, zh_chapters, en_chapters) # print_chapter_titles(zh_chapters, en_chapters) if __name__ == "__main__": main()