chore: haha
This commit is contained in:
520
custom_parser.py
520
custom_parser.py
@@ -1,271 +1,367 @@
|
||||
import os
|
||||
import json
|
||||
from bs4 import BeautifulSoup
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional, Iterator
|
||||
from dataclasses import dataclass
|
||||
import json
|
||||
import re
|
||||
import os
|
||||
from bs4 import BeautifulSoup
|
||||
import logging
|
||||
import sqlite3
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChapterParser:
|
||||
def __init__(self, base_dir):
|
||||
self.base_dir = Path(base_dir)
|
||||
@dataclass
|
||||
class Chapter:
|
||||
number: int
|
||||
title: str
|
||||
content: List[str]
|
||||
|
||||
def load_format_config(self, novel_dir):
|
||||
"""Load format.json configuration for a novel directory."""
|
||||
format_path = self.base_dir / novel_dir / "format.json"
|
||||
with open(format_path, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
def get_txt_content(self, novel_dir, config):
|
||||
"""Read and parse text content based on indentation patterns."""
|
||||
txt_path = self.base_dir / novel_dir / config["path"]
|
||||
print(f"\nDebug: Reading text file from {txt_path}")
|
||||
class FileReader:
|
||||
@staticmethod
|
||||
def read_file(path: Path, encoding: str = "utf-8") -> List[str]:
|
||||
"""Read file content with proper error handling."""
|
||||
try:
|
||||
with open(txt_path, "r", encoding="utf-8") as f:
|
||||
lines = f.readlines()
|
||||
print(f"Debug: Successfully read {len(lines)} lines")
|
||||
with open(path, encoding=encoding) as f:
|
||||
return [line.rstrip("\n") for line in f.readlines()]
|
||||
except Exception as e:
|
||||
print(f"Debug: Error reading file: {e}")
|
||||
logger.error(f"Error reading file {path}: {e}")
|
||||
return []
|
||||
|
||||
# Skip lines until reaching the starting index
|
||||
content_lines = lines[config["idx"] - 1 :]
|
||||
@staticmethod
|
||||
def read_json(path: Path) -> Dict:
|
||||
"""Read and parse JSON file."""
|
||||
try:
|
||||
with open(path, encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading JSON {path}: {e}")
|
||||
return {}
|
||||
|
||||
chapters = []
|
||||
|
||||
class TextChapterParser:
|
||||
"""Handles parsing of text-based chapter files."""
|
||||
|
||||
def __init__(self, base_path: Path):
|
||||
self.base_path = base_path
|
||||
|
||||
def is_chapter_header(
|
||||
self, line: str, next_line: Optional[str], pattern: Optional[str]
|
||||
) -> bool:
|
||||
"""Determine if a line is a chapter header."""
|
||||
if not line or line.startswith((" ", " ")):
|
||||
return False
|
||||
|
||||
if next_line and (next_line.startswith(" ") or next_line.startswith(" ")):
|
||||
return not pattern or re.search(pattern, line)
|
||||
|
||||
return False
|
||||
|
||||
def parse_chapters(
|
||||
self, content_lines: List[str], chapter_pattern: Optional[str]
|
||||
) -> List[Chapter]:
|
||||
"""Parse text content into chapters."""
|
||||
chapters: List[Chapter] = []
|
||||
current_chapter = None
|
||||
chapter_content = []
|
||||
chapter_content: List[str] = []
|
||||
|
||||
for i, line in enumerate(content_lines):
|
||||
line = line.rstrip("\n") # Preserve leading whitespace
|
||||
if not line: # Skip empty lines
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# Check if this is a root-level line (no indentation)
|
||||
if not line.startswith(" ") and not line.startswith(" "):
|
||||
# Check if next line exists and is indented
|
||||
if i + 1 < len(content_lines):
|
||||
next_line = content_lines[i + 1].rstrip("\n")
|
||||
if next_line and (
|
||||
next_line.startswith(" ") or next_line.startswith(" ")
|
||||
):
|
||||
# This is a chapter header
|
||||
if current_chapter:
|
||||
chapters.append(
|
||||
{
|
||||
"number": current_chapter["number"],
|
||||
"title": current_chapter["title"],
|
||||
"content": chapter_content,
|
||||
}
|
||||
)
|
||||
next_line = content_lines[i + 1] if i + 1 < len(content_lines) else None
|
||||
|
||||
# Try to extract chapter number if present
|
||||
chapter_num_match = re.search(r"第(\d+)章", line)
|
||||
chapter_num = (
|
||||
int(chapter_num_match.group(1))
|
||||
if chapter_num_match
|
||||
else len(chapters) + 1
|
||||
if self.is_chapter_header(line, next_line, chapter_pattern):
|
||||
if current_chapter:
|
||||
chapters.append(
|
||||
Chapter(
|
||||
number=current_chapter["number"],
|
||||
title=current_chapter["title"],
|
||||
content=chapter_content,
|
||||
)
|
||||
)
|
||||
|
||||
current_chapter = {
|
||||
"number": chapter_num,
|
||||
"title": line,
|
||||
}
|
||||
chapter_content = []
|
||||
continue
|
||||
current_chapter = {"number": len(chapters) + 1, "title": line}
|
||||
chapter_content = []
|
||||
continue
|
||||
|
||||
# If we have a current chapter and the line is indented, add it to content
|
||||
if current_chapter and (line.startswith(" ") or line.startswith(" ")):
|
||||
chapter_content.append(line.lstrip(" ")) # Remove indentation
|
||||
if current_chapter:
|
||||
chapter_content.append(line.lstrip(" "))
|
||||
|
||||
# Add the last chapter
|
||||
if current_chapter:
|
||||
chapters.append(
|
||||
{
|
||||
"number": current_chapter["number"],
|
||||
"title": current_chapter["title"],
|
||||
"content": chapter_content,
|
||||
}
|
||||
Chapter(
|
||||
number=current_chapter["number"],
|
||||
title=current_chapter["title"],
|
||||
content=chapter_content,
|
||||
)
|
||||
)
|
||||
|
||||
return chapters
|
||||
|
||||
def get_epub_content(self, novel_dir, config):
|
||||
"""Parse EPUB XHTML content and extract chapters."""
|
||||
epub_dir = self.base_dir / novel_dir / config["path"]
|
||||
print(f"\nDebug: Reading EPUB content from {epub_dir}")
|
||||
|
||||
def extract_number_from_filename(filename, pattern=None):
|
||||
"""Extract chapter number from filename using pattern or default patterns."""
|
||||
patterns = [pattern] if pattern else []
|
||||
patterns.extend(
|
||||
[
|
||||
r"Section0*(\d+)", # Section0000.xhtml
|
||||
r"^0*(\d+)[-_]", # 0000_Book_1, 00001-1
|
||||
r"split_0*(\d+)", # index_split_001
|
||||
]
|
||||
)
|
||||
class EpubChapterParser:
|
||||
"""Handles parsing of EPUB format chapters."""
|
||||
|
||||
for pat in patterns:
|
||||
if pat and (match := re.search(pat, filename)):
|
||||
return int(match.group(1))
|
||||
return None
|
||||
def __init__(self, base_path: Path):
|
||||
self.base_path = base_path
|
||||
|
||||
def get_valid_files():
|
||||
"""Get list of XHTML files that meet the index criteria."""
|
||||
try:
|
||||
xhtml_files = sorted(
|
||||
f for f in os.listdir(epub_dir) if f.endswith((".xhtml", ".html"))
|
||||
)
|
||||
@staticmethod
|
||||
def extract_number_from_filename(
|
||||
filename: str, pattern: str = None
|
||||
) -> Optional[int]:
|
||||
patterns = [r"Section0*(\d+)", r"^0*(\d+)[-_]", r"split_0*(\d+)"]
|
||||
if pattern:
|
||||
patterns.insert(0, pattern)
|
||||
|
||||
if not xhtml_files:
|
||||
return []
|
||||
for pat in patterns:
|
||||
if match := re.search(pat, filename):
|
||||
return int(match.group(1))
|
||||
return None
|
||||
|
||||
if config["idx"] == 0:
|
||||
return xhtml_files
|
||||
# def get_valid_files(self, epub_dir: Path, config: Dict) -> List[str]:
|
||||
# """Get list of valid XHTML files."""
|
||||
# try:
|
||||
# xhtml_files = [
|
||||
# f for f in os.listdir(epub_dir) if f.endswith((".xhtml", ".html"))
|
||||
# ]
|
||||
|
||||
pattern = config.get("pattern")
|
||||
return [
|
||||
fname
|
||||
for fname in xhtml_files
|
||||
if (num := extract_number_from_filename(fname, pattern)) is not None
|
||||
and num >= config["idx"]
|
||||
]
|
||||
# logger.info(len(xhtml_files))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Debug: Error listing directory: {e}")
|
||||
return []
|
||||
# if not xhtml_files:
|
||||
# return xhtml_files
|
||||
|
||||
def parse_chapter(filename, chapter_num):
|
||||
"""Parse single chapter file and extract content."""
|
||||
try:
|
||||
with open(epub_dir / filename, "r", encoding="utf-8") as f:
|
||||
soup = BeautifulSoup(f.read(), "html.parser")
|
||||
# pattern = config.get("pattern")
|
||||
# files_with_nums = [
|
||||
# (fname, num)
|
||||
# for fname in xhtml_files
|
||||
# if (num := self.extract_number_from_filename(fname, pattern))
|
||||
# is not None
|
||||
# and num >= config["idx"]
|
||||
# ]
|
||||
|
||||
paragraphs = [
|
||||
p.get_text().strip()
|
||||
for p in soup.find_all("p")
|
||||
if p.get_text().strip()
|
||||
]
|
||||
# if config.get("sort"):
|
||||
# files_with_nums.sort(key=lambda x: x[1])
|
||||
|
||||
return {
|
||||
"number": chapter_num,
|
||||
"title": f"Chapter {chapter_num}",
|
||||
"content": paragraphs,
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Debug: Error processing {filename}: {e}")
|
||||
return None
|
||||
# return [fname for fname, _ in files_with_nums]
|
||||
|
||||
# Main processing
|
||||
files_to_process = get_valid_files()
|
||||
if not files_to_process:
|
||||
print("Debug: No valid files found to process")
|
||||
# except Exception as e:
|
||||
# logger.error(f"Error listing directory {epub_dir}: {e}")
|
||||
# return []
|
||||
|
||||
def get_valid_files(self, epub_dir: Path, config: Dict) -> List[str]:
|
||||
"""
|
||||
Get list of valid XHTML files and log any missing sequence numbers.
|
||||
Returns the list of valid filenames while logging sequence gaps.
|
||||
"""
|
||||
try:
|
||||
xhtml_files = [
|
||||
f for f in os.listdir(epub_dir) if f.endswith((".xhtml", ".html"))
|
||||
]
|
||||
|
||||
logger.info(f"Found {len(xhtml_files)} XHTML/HTML files")
|
||||
|
||||
if not xhtml_files:
|
||||
return xhtml_files
|
||||
|
||||
pattern = config.get("pattern")
|
||||
files_with_nums = [
|
||||
(fname, num)
|
||||
for fname in xhtml_files
|
||||
if (num := self.extract_number_from_filename(fname, pattern))
|
||||
is not None
|
||||
and num >= config["idx"]
|
||||
]
|
||||
|
||||
if config.get("sort"):
|
||||
files_with_nums.sort(key=lambda x: x[1])
|
||||
|
||||
# Check for missing numbers in sequence
|
||||
# if files_with_nums:
|
||||
# start_idx = config["idx"]
|
||||
# last_num = max(num for _, num in files_with_nums)
|
||||
# existing_nums = {num for _, num in files_with_nums}
|
||||
|
||||
# missing_nums = []
|
||||
# for expected_num in range(start_idx, last_num + 1):
|
||||
# if expected_num not in existing_nums:
|
||||
# prev_file = next(((f, n) for f, n in files_with_nums if n < expected_num), None)
|
||||
# next_file = next(((f, n) for f, n in files_with_nums if n > expected_num), None)
|
||||
|
||||
# missing_nums.append({
|
||||
# 'missing_num': expected_num,
|
||||
# 'prev_file': prev_file[0] if prev_file else None,
|
||||
# 'next_file': next_file[0] if next_file else None
|
||||
# })
|
||||
|
||||
# if missing_nums:
|
||||
# logger.warning(f"Found {len(missing_nums)} gaps in file sequence:")
|
||||
# for gap in missing_nums:
|
||||
# logger.warning(
|
||||
# f"Missing number {gap['missing_num']} "
|
||||
# f"(between files: {gap['prev_file']} and {gap['next_file']})"
|
||||
# )
|
||||
|
||||
return [fname for fname, _ in files_with_nums]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error listing directory {epub_dir}: {e}")
|
||||
return []
|
||||
|
||||
def parse_chapter_file(
|
||||
self, file_path: Path, chapter_num: int
|
||||
) -> Optional[Chapter]:
|
||||
"""Parse single chapter file."""
|
||||
try:
|
||||
content = FileReader.read_file(file_path)
|
||||
if not content:
|
||||
return None
|
||||
|
||||
soup = BeautifulSoup("".join(content), "html.parser")
|
||||
h1_tag = soup.find("h1")
|
||||
title = h1_tag.get_text().strip() if h1_tag else f"Chapter {chapter_num}"
|
||||
|
||||
paragraphs = []
|
||||
for p in soup.find_all("p"):
|
||||
# sup (footnotes) and a elements
|
||||
for element in p.find_all(["sup", "a"]):
|
||||
element.decompose()
|
||||
|
||||
if text := p.get_text().strip():
|
||||
paragraphs.append(text)
|
||||
|
||||
return Chapter(number=chapter_num, title=title, content=paragraphs)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {file_path}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
class ChapterParser:
|
||||
"""Main parser class that handles both text and EPUB formats."""
|
||||
|
||||
def __init__(self, base_dir: str):
|
||||
self.base_dir = Path(base_dir)
|
||||
self.text_parser = TextChapterParser(self.base_dir)
|
||||
self.epub_parser = EpubChapterParser(self.base_dir)
|
||||
|
||||
def load_format_config(self, novel_dir: str) -> Dict:
|
||||
"""Load format configuration for a novel."""
|
||||
return FileReader.read_json(self.base_dir / novel_dir / "format.json")
|
||||
|
||||
def get_txt_content(self, novel_dir: str, config: Dict) -> List[Chapter]:
|
||||
"""Parse txt content into chapters."""
|
||||
txt_path = self.base_dir / novel_dir / config["path"]
|
||||
content = FileReader.read_file(txt_path)
|
||||
if not content:
|
||||
return []
|
||||
|
||||
content_lines = content[config["idx"] - 1 :]
|
||||
chapters = self.text_parser.parse_chapters(content_lines, config.get("pattern"))
|
||||
|
||||
if config.get("include_title"):
|
||||
for chapter in chapters:
|
||||
chapter.content = [chapter.title] + chapter.content
|
||||
|
||||
return chapters
|
||||
|
||||
def get_epub_content(self, novel_dir: str, config: Dict) -> List[Chapter]:
|
||||
"""Parse EPUB content into chapters."""
|
||||
epub_dir = self.base_dir / novel_dir / config["path"]
|
||||
valid_files = self.epub_parser.get_valid_files(epub_dir, config)
|
||||
|
||||
chapters = []
|
||||
for i, filename in enumerate(files_to_process, start=1):
|
||||
if chapter := parse_chapter(filename, i):
|
||||
for i, filename in enumerate(valid_files, start=1):
|
||||
if chapter := self.epub_parser.parse_chapter_file(epub_dir / filename, i):
|
||||
chapters.append(chapter)
|
||||
|
||||
return chapters
|
||||
|
||||
def compare_chapters(self, zh_chapters, en_chapters):
|
||||
"""Compare Chinese and English chapters and return aggregate statistics."""
|
||||
total_chapters = len(zh_chapters)
|
||||
print(
|
||||
f"\nDebug: Found {len(zh_chapters)} Chinese chapters and {len(en_chapters)} English chapters"
|
||||
|
||||
def print_chapter_titles(
|
||||
zh_chapters: List[Chapter], en_chapters: List[Chapter]
|
||||
) -> None:
|
||||
"""Print chapter titles side by side."""
|
||||
max_chapters = max(len(zh_chapters), len(en_chapters))
|
||||
|
||||
logger.info(f"max_chapters: {max_chapters}")
|
||||
logger.info(f"zh: {len(zh_chapters)}")
|
||||
logger.info(f"en: {len(en_chapters)}")
|
||||
|
||||
|
||||
# logger.info("\n=== Chapter Title Comparison ===")
|
||||
# logger.info(f"{'English':<50} | {'Chinese'}")
|
||||
# logger.info("-" * 80)
|
||||
|
||||
# for i in range(max_chapters):
|
||||
# en_title = en_chapters[i].title if i < len(en_chapters) else "N/A"
|
||||
# zh_title = zh_chapters[i].title if i < len(zh_chapters) else "N/A"
|
||||
# logger.info(f"{en_title:<50} | {zh_title}")
|
||||
|
||||
|
||||
def create_db_entries(
|
||||
db_path: str, novel_dir: str, zh_chapters: List[Chapter], en_chapters: List[Chapter]
|
||||
) -> None:
|
||||
if len(zh_chapters) != len(en_chapters):
|
||||
logger.warning(
|
||||
f"Chapter count mismatch for {novel_dir}: ZH={len(zh_chapters)}, EN={len(en_chapters)}"
|
||||
)
|
||||
return
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute("insert or ignore into books (book_id) values (?)", (novel_dir,))
|
||||
|
||||
for zh_chap, en_chap in zip(zh_chapters, en_chapters):
|
||||
cursor.execute(
|
||||
"""
|
||||
insert or replace into chapters
|
||||
(book_id, chapter_id, text_zh, text_en)
|
||||
values (?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
novel_dir,
|
||||
str(zh_chap.number),
|
||||
"\n".join(zh_chap.content),
|
||||
"\n".join(en_chap.content),
|
||||
),
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
logger.info(
|
||||
f"Successfully inserted {len(zh_chapters)} chapters for {novel_dir}"
|
||||
)
|
||||
|
||||
if total_chapters == 0:
|
||||
print("Debug: No chapters found to compare!")
|
||||
return {
|
||||
"total_chapters": 0,
|
||||
"matching_chapters": 0,
|
||||
"mismatched_chapters": 0,
|
||||
"total_zh_paragraphs": 0,
|
||||
"total_en_paragraphs": 0,
|
||||
"paragraph_difference": 0,
|
||||
}
|
||||
|
||||
matches = 0
|
||||
mismatches = 0
|
||||
total_zh_paragraphs = 0
|
||||
total_en_paragraphs = 0
|
||||
|
||||
for zh_chapter, en_chapter in zip(zh_chapters, en_chapters):
|
||||
zh_para_count = len(zh_chapter["content"])
|
||||
en_para_count = len(en_chapter["content"])
|
||||
|
||||
total_zh_paragraphs += zh_para_count
|
||||
total_en_paragraphs += en_para_count
|
||||
|
||||
if zh_para_count == en_para_count:
|
||||
matches += 1
|
||||
else:
|
||||
mismatches += 1
|
||||
|
||||
return {
|
||||
"total_chapters": total_chapters,
|
||||
"matching_chapters": matches,
|
||||
"mismatched_chapters": mismatches,
|
||||
"total_zh_paragraphs": total_zh_paragraphs,
|
||||
"total_en_paragraphs": total_en_paragraphs,
|
||||
"paragraph_difference": abs(total_zh_paragraphs - total_en_paragraphs),
|
||||
}
|
||||
except sqlite3.Error as e:
|
||||
logger.error(f"Database error occurred: {e}")
|
||||
conn.rollback()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def main():
|
||||
parser = ChapterParser("custom")
|
||||
print(f"Debug: Starting parser with base directory: {os.path.abspath('custom')}")
|
||||
logger.info(f"Starting parser with base directory: {os.path.abspath('custom')}")
|
||||
DB_PATH = "parallel_texts.db"
|
||||
|
||||
for novel_dir in ["ast", "desolate_era"]:
|
||||
print(f"\n=== Analyzing {novel_dir} ===")
|
||||
epub_dirs = [
|
||||
d for d in os.listdir("custom") if os.path.isdir(os.path.join("custom", d))
|
||||
]
|
||||
# epub_dirs = ["ast"]
|
||||
|
||||
for novel_dir in epub_dirs:
|
||||
logger.info(f"\n=== Analyzing {novel_dir} ===")
|
||||
|
||||
config = parser.load_format_config(novel_dir)
|
||||
zh_chapters = parser.get_txt_content(novel_dir, config["zh"])[:5]
|
||||
en_chapters = parser.get_epub_content(novel_dir, config["en"])[:5]
|
||||
zh_chapters = parser.get_txt_content(novel_dir, config["zh"])
|
||||
en_chapters = parser.get_epub_content(novel_dir, config["en"])
|
||||
|
||||
for i in range(min(5, max(len(zh_chapters), len(en_chapters)))):
|
||||
print(f"\nChapter {i+1}:")
|
||||
create_db_entries(DB_PATH, novel_dir, zh_chapters, en_chapters)
|
||||
|
||||
if i >= len(zh_chapters):
|
||||
print("ZH chapter missing")
|
||||
continue
|
||||
|
||||
if i >= len(en_chapters):
|
||||
print("EN chapter missing")
|
||||
continue
|
||||
|
||||
zh_ch = zh_chapters[i]
|
||||
en_ch = en_chapters[i]
|
||||
|
||||
print(f"ZH Title: {zh_ch['title']}")
|
||||
print(f"EN Title: {en_ch['title']}")
|
||||
print(f"ZH paragraphs: {len(zh_ch['content'])}")
|
||||
print(f"EN paragraphs: {len(en_ch['content'])}")
|
||||
|
||||
# Compare paragraphs
|
||||
max_paras = max(len(zh_ch["content"]), len(en_ch["content"]))
|
||||
for p_idx in range(max_paras):
|
||||
zh_para = (
|
||||
zh_ch["content"][p_idx] if p_idx < len(zh_ch["content"]) else None
|
||||
)
|
||||
en_para = (
|
||||
en_ch["content"][p_idx] if p_idx < len(en_ch["content"]) else None
|
||||
)
|
||||
|
||||
# Only print if one is missing or they're significantly different in length
|
||||
if zh_para is None:
|
||||
print(f"\nExtra EN paragraph at position {p_idx+1}:")
|
||||
print(f"EN: {en_para[:100]}...")
|
||||
elif en_para is None:
|
||||
print(f"\nExtra ZH paragraph at position {p_idx+1}:")
|
||||
print(f"ZH: {zh_para[:100]}...")
|
||||
elif (
|
||||
abs(len(zh_para) - len(en_para)) > 50
|
||||
): # Threshold for length mismatch
|
||||
print(f"\nLength mismatch at paragraph {p_idx+1}:")
|
||||
print(f"ZH ({len(zh_para)} chars): {zh_para[:100]}...")
|
||||
print(f"EN ({len(en_para)} chars): {en_para[:100]}...")
|
||||
# print_chapter_titles(zh_chapters, en_chapters)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user