import sqlite3
import os
import re
from bs4 import BeautifulSoup
from pathlib import Path
import unicodedata
def create_paragraphs_table(conn):
"""Create the paragraphs table with necessary columns and constraints."""
conn.execute(
"""
create table if not exists paragraphs (
id integer primary key autoincrement,
book_id text not null,
chapter_id text not null,
text_en text,
text_zh text,
char_count integer,
foreign key (book_id, chapter_id) references chapters(book_id, chapter_id)
)
"""
)
def normalize_quotes(text):
# normalize unicode characters to their composed form
text = unicodedata.normalize("NFKC", text)
quote_map = {
"\u201c": '"', # LEFT DOUBLE QUOTATION MARK
"\u201d": '"', # RIGHT DOUBLE QUOTATION MARK
"\u2018": "'", # LEFT SINGLE QUOTATION MARK
"\u2019": "'", # RIGHT SINGLE QUOTATION MARK
"\u00ab": '"', # LEFT-POINTING DOUBLE ANGLE QUOTATION MARK
"\u00bb": '"', # RIGHT-POINTING DOUBLE ANGLE QUOTATION MARK
"\u2039": "'", # SINGLE LEFT-POINTING ANGLE QUOTATION MARK
"\u203a": "'", # SINGLE RIGHT-POINTING ANGLE QUOTATION MARK
"\u2032": "'", # PRIME
"\u2033": '"', # DOUBLE PRIME
}
for old, new in quote_map.items():
text = text.replace(old, new)
return text
def strip_paragraph_markers(text):
"""remove p markers like #<# and #>#"""
return re.sub(r"#<#|#>#", "", text).strip()
def normalize_text(text):
"""text normalziations"""
text = normalize_quotes(text)
text = strip_paragraph_markers(text)
return text
def get_paragraphs_from_xhtml(xhtml_path):
"""extract p from html"""
with open(xhtml_path, "r", encoding="utf-8") as f:
content = f.read()
soup = BeautifulSoup(content, "html.parser")
paragraphs = []
p_elements = soup.find_all("p")
for p in p_elements:
text = normalize_text(p.get_text())
if text: # only add non-empty paragraphs
paragraphs.append(text)
# try br split
if not paragraphs:
content = re.sub(r"
", "
", content, flags=re.IGNORECASE)
parts = re.split(r"
\s*
", content, flags=re.IGNORECASE)
for part in parts:
clean_text = BeautifulSoup(part, "html.parser").get_text()
text = normalize_text(clean_text)
if text: # only add non-empty paragraphs
paragraphs.append(text)
return paragraphs
def get_zh_text_for_lines(zh_lines, start_idx, end_idx):
"""Get corresponding Chinese text for given line range."""
return " ".join(zh_lines[start_idx : end_idx + 1])
def extract_paragraphs(text_en, text_zh):
"""
Extract matching paragraphs from English and Chinese texts.
Returns list of paragraphs with normalized text.
"""
paragraphs = []
current_en_lines = []
current_en_indices = []
# split into lines and normalize
en_lines = [line.strip() for line in text_en.split("\n")]
zh_lines = [line.strip() for line in text_zh.split("\n")] if text_zh else []
i = 0
while i < len(en_lines):
line = en_lines[i]
normalized_line = normalize_text(line)
if not normalized_line:
i += 1
continue
current_en_lines.append(normalized_line)
current_en_indices.append(i)
# Look ahead to check if next line is empty or ends the paragraph
next_idx = i + 1
while next_idx < len(en_lines):
next_line = en_lines[next_idx].strip()
if not next_line:
# Empty line - continue current paragraph
next_idx += 1
continue
# If we have Chinese text, check if these lines correspond to a complete thought
if zh_lines:
zh_text = get_zh_text_for_lines(
zh_lines, current_en_indices[0], next_idx - 1
)
if zh_text:
# Found corresponding Chinese text - end paragraph
break
# Add next line to current paragraph
normalized_next = normalize_text(next_line)
current_en_lines.append(normalized_next)
current_en_indices.append(next_idx)
next_idx += 1
# Create paragraph
if current_en_lines:
en_text = " ".join(current_en_lines)
zh_text = ""
if zh_lines:
zh_text = get_zh_text_for_lines(
zh_lines, current_en_indices[0], current_en_indices[-1]
)
paragraphs.append(
{"text_en": en_text, "text_zh": zh_text, "char_count": len(en_text)}
)
# Reset for next paragraph
current_en_lines = []
current_en_indices = []
i = next_idx
return paragraphs
def match_paragraphs(xhtml_paragraphs, db_lines, lines_to_try=3):
"""
Match paragraphs from XHTML with lines from database.
Tries first few lines at start before giving up, to handle chapter titles and initial dialog.
Args:
xhtml_paragraphs: List of XHTML paragraph texts
db_lines: List of database text lines
lines_to_try: Number of initial lines to try before giving up
Returns:
List of tuples containing (start_idx, end_idx) for matched paragraphs
"""
def find_next_content_line(current_idx):
"""Find next non-empty line and return its index and content."""
while current_idx < len(db_lines):
line = normalize_text(db_lines[current_idx].strip())
if line:
return current_idx, line
current_idx += 1
return current_idx, None
matched_indices = []
xhtml_idx = 0
db_idx = 0
tried_lines = 0
while xhtml_idx < len(xhtml_paragraphs) and db_idx < len(db_lines):
# find next non-empty line in db
db_check_idx, db_line = find_next_content_line(db_idx)
if not db_line:
break
# search for p containing this line
while (
xhtml_idx < len(xhtml_paragraphs)
and db_line not in xhtml_paragraphs[xhtml_idx]
):
xhtml_idx += 1
# try ~3 db_lines at start
if xhtml_idx >= len(xhtml_paragraphs):
if not matched_indices and tried_lines < lines_to_try:
tried_lines += 1
xhtml_idx = 0
db_idx = db_check_idx + 1
continue
break
# collect all database lines that belong to this p
start_idx = db_check_idx
current_idx = db_check_idx
while current_idx < len(db_lines):
current_line = normalize_text(db_lines[current_idx].strip())
if current_line and current_line not in xhtml_paragraphs[xhtml_idx]:
break
current_idx += 1
matched_indices.append((start_idx, current_idx - 1))
db_idx = current_idx
xhtml_idx += 1
return matched_indices
def normalize_chapter_id(chapter_id):
"""
Normalize chapter IDs by removing padding and handling special cases.
Examples:
- gfyxjdcz!_0001 -> 1
- 00001-1-Swindler -> 1>
- wyctUp_0001 -> 1
- ltzz_0002 -> 2
"""
# handle IDs with _
if "_" in chapter_id:
chapter_id = chapter_id.split("_")[-1]
# rm any non-digit prefix and suffix
digits = re.search(r"(\d+)", chapter_id)
if digits:
chapter_id = digits.group(1)
# rm leading zeros
return str(int(chapter_id))
def find_chapter_file(epub_dir, normalized_id):
epub_dir = Path(epub_dir)
search_dirs = [
epub_dir / "OEBPS" / "Text",
epub_dir / "OEBPS",
epub_dir / "EPUB",
]
for directory in search_dirs:
if not directory.exists():
continue
for file_path in directory.glob("*.*html"):
numbers = re.findall(r"\d+", file_path.stem)
if numbers:
file_chapter_num = str(int(numbers[0]))
if file_chapter_num == normalized_id:
return file_path
return None
def preserve_lines(text):
return [line.strip() if line.strip() else line for line in text.split("\n")]
def print_matched_paragraphs(text_en_lines, text_zh_lines, matched_indices):
"""
Print matched paragraphs from English and Chinese text, with Chinese translation
immediately following each English paragraph.
Args:
text_en_lines (list): List of English text lines
text_zh_lines (list): List of Chinese text lines
matched_indices (list): List of tuples containing (start_idx, end_idx)
"""
if not matched_indices:
print("No matched paragraphs found.")
return
for start_idx, end_idx in matched_indices:
# Get and join English lines for this range
en_para = " ".join(text_en_lines[start_idx : end_idx + 1])
# Get and join Chinese lines for the same range
zh_para = " ".join(text_zh_lines[start_idx : end_idx + 1])
# Print English followed by Chinese
print(strip_paragraph_markers(en_para))
print(strip_paragraph_markers(zh_para))
print() # Extra newline between pairs
def process_book(conn, epub_base_dir, book_id):
"""Process an entire book and add paragraphs to database."""
epub_dir = Path(epub_base_dir) / book_id
if not epub_dir.exists():
# print(f"Warning: EPUB directory not found for book {book_id}: {epub_dir}")
return
print(f"Processing book {book_id} from: {epub_dir}")
# Get all chapters for this book
chapters = conn.execute(
"select chapter_id, text_en, text_zh from chapters where book_id = ?",
(book_id,),
).fetchall()
print(f"Chapter count: {len(chapters)}")
for chapter_id, text_en, text_zh in chapters:
if not text_en or not text_zh:
print(
f"Warning: Missing content for chapter {chapter_id} in book {book_id}"
)
continue
# find html file
normalized_id = normalize_chapter_id(chapter_id)
xhtml_path = find_chapter_file(epub_dir, normalized_id)
if not xhtml_path:
print(
f"Warning: Could not find XHTML file for chapter {chapter_id}. normalized_id: {normalized_id}, xhtml_path: {xhtml_path}"
)
continue
# extract p from html
xhtml_paragraphs = get_paragraphs_from_xhtml(xhtml_path)
# split by \n only, strip only non-empty lines
text_en_lines = preserve_lines(text_en)
text_zh_lines = preserve_lines(text_zh)
# match ps between XHTML and db content
matched_indices = match_paragraphs(xhtml_paragraphs, text_en_lines)
# print_matched_paragraphs(text_en_lines, text_zh_lines, matched_indices)
matched_pairs = []
for start_idx, end_idx in matched_indices:
en_para = strip_paragraph_markers(
" ".join(text_en_lines[start_idx : end_idx + 1])
)
zh_para = strip_paragraph_markers(
" ".join(text_zh_lines[start_idx : end_idx + 1])
)
matched_pairs.append((en_para, zh_para))
for en_para, zh_para in matched_pairs:
conn.execute(
"""
INSERT INTO paragraphs (book_id, chapter_id, text_en, text_zh, char_count)
VALUES (?, ?, ?, ?, ?)
""",
(book_id, chapter_id, en_para, zh_para, len(en_para)),
)
conn.commit()
def process_all_books(db_path, epub_base_dir):
"""Process all books in the database."""
conn = sqlite3.connect(db_path)
create_paragraphs_table(conn)
books = conn.execute("select book_id from books").fetchall()
for (book_id,) in books:
process_book(conn, epub_base_dir, book_id)
conn.close()
if __name__ == "__main__":
db_path = "parallel_texts.db"
epub_base_dir = "epubs" # base dir
process_all_books(db_path, epub_base_dir)