import sqlite3 import os import re from bs4 import BeautifulSoup from pathlib import Path import unicodedata def create_paragraphs_table(conn): """Create the paragraphs table with necessary columns and constraints.""" conn.execute( """ create table if not exists paragraphs ( id integer primary key autoincrement, book_id text not null, chapter_id text not null, text_en text, text_zh text, char_count integer, foreign key (book_id, chapter_id) references chapters(book_id, chapter_id) ) """ ) def normalize_quotes(text): # normalize unicode characters to their composed form text = unicodedata.normalize("NFKC", text) quote_map = { "\u201c": '"', # LEFT DOUBLE QUOTATION MARK "\u201d": '"', # RIGHT DOUBLE QUOTATION MARK "\u2018": "'", # LEFT SINGLE QUOTATION MARK "\u2019": "'", # RIGHT SINGLE QUOTATION MARK "\u00ab": '"', # LEFT-POINTING DOUBLE ANGLE QUOTATION MARK "\u00bb": '"', # RIGHT-POINTING DOUBLE ANGLE QUOTATION MARK "\u2039": "'", # SINGLE LEFT-POINTING ANGLE QUOTATION MARK "\u203a": "'", # SINGLE RIGHT-POINTING ANGLE QUOTATION MARK "\u2032": "'", # PRIME "\u2033": '"', # DOUBLE PRIME } for old, new in quote_map.items(): text = text.replace(old, new) return text def strip_paragraph_markers(text): """remove p markers like #<# and #>#""" return re.sub(r"#<#|#>#", "", text).strip() def normalize_text(text): """text normalziations""" text = normalize_quotes(text) text = strip_paragraph_markers(text) return text def get_paragraphs_from_xhtml(xhtml_path): """extract p from html""" with open(xhtml_path, "r", encoding="utf-8") as f: content = f.read() soup = BeautifulSoup(content, "html.parser") paragraphs = [] p_elements = soup.find_all("p") for p in p_elements: text = normalize_text(p.get_text()) if text: # only add non-empty paragraphs paragraphs.append(text) # try br split if not paragraphs: content = re.sub(r"", "
", content, flags=re.IGNORECASE) parts = re.split(r"
\s*
", content, flags=re.IGNORECASE) for part in parts: clean_text = BeautifulSoup(part, "html.parser").get_text() text = normalize_text(clean_text) if text: # only add non-empty paragraphs paragraphs.append(text) return paragraphs def get_zh_text_for_lines(zh_lines, start_idx, end_idx): """Get corresponding Chinese text for given line range.""" return " ".join(zh_lines[start_idx : end_idx + 1]) def extract_paragraphs(text_en, text_zh): """ Extract matching paragraphs from English and Chinese texts. Returns list of paragraphs with normalized text. """ paragraphs = [] current_en_lines = [] current_en_indices = [] # split into lines and normalize en_lines = [line.strip() for line in text_en.split("\n")] zh_lines = [line.strip() for line in text_zh.split("\n")] if text_zh else [] i = 0 while i < len(en_lines): line = en_lines[i] normalized_line = normalize_text(line) if not normalized_line: i += 1 continue current_en_lines.append(normalized_line) current_en_indices.append(i) # Look ahead to check if next line is empty or ends the paragraph next_idx = i + 1 while next_idx < len(en_lines): next_line = en_lines[next_idx].strip() if not next_line: # Empty line - continue current paragraph next_idx += 1 continue # If we have Chinese text, check if these lines correspond to a complete thought if zh_lines: zh_text = get_zh_text_for_lines( zh_lines, current_en_indices[0], next_idx - 1 ) if zh_text: # Found corresponding Chinese text - end paragraph break # Add next line to current paragraph normalized_next = normalize_text(next_line) current_en_lines.append(normalized_next) current_en_indices.append(next_idx) next_idx += 1 # Create paragraph if current_en_lines: en_text = " ".join(current_en_lines) zh_text = "" if zh_lines: zh_text = get_zh_text_for_lines( zh_lines, current_en_indices[0], current_en_indices[-1] ) paragraphs.append( {"text_en": en_text, "text_zh": zh_text, "char_count": len(en_text)} ) # Reset for next paragraph current_en_lines = [] current_en_indices = [] i = next_idx return paragraphs def match_paragraphs(xhtml_paragraphs, db_lines, lines_to_try=3): """ Match paragraphs from XHTML with lines from database. Tries first few lines at start before giving up, to handle chapter titles and initial dialog. Args: xhtml_paragraphs: List of XHTML paragraph texts db_lines: List of database text lines lines_to_try: Number of initial lines to try before giving up Returns: List of tuples containing (start_idx, end_idx) for matched paragraphs """ def find_next_content_line(current_idx): """Find next non-empty line and return its index and content.""" while current_idx < len(db_lines): line = normalize_text(db_lines[current_idx].strip()) if line: return current_idx, line current_idx += 1 return current_idx, None matched_indices = [] xhtml_idx = 0 db_idx = 0 tried_lines = 0 while xhtml_idx < len(xhtml_paragraphs) and db_idx < len(db_lines): # find next non-empty line in db db_check_idx, db_line = find_next_content_line(db_idx) if not db_line: break # search for p containing this line while ( xhtml_idx < len(xhtml_paragraphs) and db_line not in xhtml_paragraphs[xhtml_idx] ): xhtml_idx += 1 # try ~3 db_lines at start if xhtml_idx >= len(xhtml_paragraphs): if not matched_indices and tried_lines < lines_to_try: tried_lines += 1 xhtml_idx = 0 db_idx = db_check_idx + 1 continue break # collect all database lines that belong to this p start_idx = db_check_idx current_idx = db_check_idx while current_idx < len(db_lines): current_line = normalize_text(db_lines[current_idx].strip()) if current_line and current_line not in xhtml_paragraphs[xhtml_idx]: break current_idx += 1 matched_indices.append((start_idx, current_idx - 1)) db_idx = current_idx xhtml_idx += 1 return matched_indices def normalize_chapter_id(chapter_id): """ Normalize chapter IDs by removing padding and handling special cases. Examples: - gfyxjdcz!_0001 -> 1 - 00001-1-Swindler -> 1> - wyctUp_0001 -> 1 - ltzz_0002 -> 2 """ # handle IDs with _ if "_" in chapter_id: chapter_id = chapter_id.split("_")[-1] # rm any non-digit prefix and suffix digits = re.search(r"(\d+)", chapter_id) if digits: chapter_id = digits.group(1) # rm leading zeros return str(int(chapter_id)) def find_chapter_file(epub_dir, normalized_id): epub_dir = Path(epub_dir) search_dirs = [ epub_dir / "OEBPS" / "Text", epub_dir / "OEBPS", epub_dir / "EPUB", ] for directory in search_dirs: if not directory.exists(): continue for file_path in directory.glob("*.*html"): numbers = re.findall(r"\d+", file_path.stem) if numbers: file_chapter_num = str(int(numbers[0])) if file_chapter_num == normalized_id: return file_path return None def preserve_lines(text): return [line.strip() if line.strip() else line for line in text.split("\n")] def print_matched_paragraphs(text_en_lines, text_zh_lines, matched_indices): """ Print matched paragraphs from English and Chinese text, with Chinese translation immediately following each English paragraph. Args: text_en_lines (list): List of English text lines text_zh_lines (list): List of Chinese text lines matched_indices (list): List of tuples containing (start_idx, end_idx) """ if not matched_indices: print("No matched paragraphs found.") return for start_idx, end_idx in matched_indices: # Get and join English lines for this range en_para = " ".join(text_en_lines[start_idx : end_idx + 1]) # Get and join Chinese lines for the same range zh_para = " ".join(text_zh_lines[start_idx : end_idx + 1]) # Print English followed by Chinese print(strip_paragraph_markers(en_para)) print(strip_paragraph_markers(zh_para)) print() # Extra newline between pairs def process_book(conn, epub_base_dir, book_id): """Process an entire book and add paragraphs to database.""" epub_dir = Path(epub_base_dir) / book_id if not epub_dir.exists(): # print(f"Warning: EPUB directory not found for book {book_id}: {epub_dir}") return print(f"Processing book {book_id} from: {epub_dir}") # Get all chapters for this book chapters = conn.execute( "select chapter_id, text_en, text_zh from chapters where book_id = ?", (book_id,), ).fetchall() print(f"Chapter count: {len(chapters)}") for chapter_id, text_en, text_zh in chapters: if not text_en or not text_zh: print( f"Warning: Missing content for chapter {chapter_id} in book {book_id}" ) continue # find html file normalized_id = normalize_chapter_id(chapter_id) xhtml_path = find_chapter_file(epub_dir, normalized_id) if not xhtml_path: print( f"Warning: Could not find XHTML file for chapter {chapter_id}. normalized_id: {normalized_id}, xhtml_path: {xhtml_path}" ) continue # extract p from html xhtml_paragraphs = get_paragraphs_from_xhtml(xhtml_path) # split by \n only, strip only non-empty lines text_en_lines = preserve_lines(text_en) text_zh_lines = preserve_lines(text_zh) # match ps between XHTML and db content matched_indices = match_paragraphs(xhtml_paragraphs, text_en_lines) # print_matched_paragraphs(text_en_lines, text_zh_lines, matched_indices) matched_pairs = [] for start_idx, end_idx in matched_indices: en_para = strip_paragraph_markers( " ".join(text_en_lines[start_idx : end_idx + 1]) ) zh_para = strip_paragraph_markers( " ".join(text_zh_lines[start_idx : end_idx + 1]) ) matched_pairs.append((en_para, zh_para)) for en_para, zh_para in matched_pairs: conn.execute( """ INSERT INTO paragraphs (book_id, chapter_id, text_en, text_zh, char_count) VALUES (?, ?, ?, ?, ?) """, (book_id, chapter_id, en_para, zh_para, len(en_para)), ) conn.commit() def process_all_books(db_path, epub_base_dir): """Process all books in the database.""" conn = sqlite3.connect(db_path) create_paragraphs_table(conn) books = conn.execute("select book_id from books").fetchall() for (book_id,) in books: process_book(conn, epub_base_dir, book_id) conn.close() if __name__ == "__main__": db_path = "parallel_texts.db" epub_base_dir = "epubs" # base dir process_all_books(db_path, epub_base_dir)