Files
zh-en-wn-dataset/custom_parser.py
2025-02-11 13:28:12 +06:00

386 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from pathlib import Path
from typing import List, Dict, Optional, Iterator
from dataclasses import dataclass
import json
import re
import os
from bs4 import BeautifulSoup
import logging
import sqlite3
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Chapter:
number: int
title: str
content: List[str]
class FileReader:
@staticmethod
def read_file(path: Path, encoding: str = "utf-8") -> List[str]:
"""Read file content with proper error handling."""
try:
with open(path, encoding=encoding) as f:
return [line.rstrip("\n") for line in f.readlines()]
except Exception as e:
logger.error(f"Error reading file {path}: {e}")
return []
@staticmethod
def read_json(path: Path) -> Dict:
"""Read and parse JSON file."""
try:
with open(path, encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"Error reading JSON {path}: {e}")
return {}
class TextChapterParser:
"""Handles parsing of text-based chapter files."""
def __init__(self, base_path: Path):
self.base_path = base_path
def is_chapter_header(
self, line: str, next_line: Optional[str], pattern: Optional[str]
) -> bool:
"""Determine if a line is a chapter header."""
if not line or line.startswith((" ", " ")):
return False
if next_line and (next_line.startswith(" ") or next_line.startswith(" ")):
return not pattern or re.search(pattern, line)
return False
def parse_chapters(
self, content_lines: List[str], chapter_pattern: Optional[str]
) -> List[Chapter]:
"""Parse text content into chapters."""
chapters: List[Chapter] = []
current_chapter = None
chapter_content: List[str] = []
for i, line in enumerate(content_lines):
if not line:
continue
next_line = content_lines[i + 1] if i + 1 < len(content_lines) else None
if self.is_chapter_header(line, next_line, chapter_pattern):
if current_chapter:
chapters.append(
Chapter(
number=current_chapter["number"],
title=current_chapter["title"],
content=chapter_content,
)
)
current_chapter = {"number": len(chapters) + 1, "title": line}
chapter_content = []
continue
if current_chapter:
chapter_content.append(line.lstrip("  "))
if current_chapter:
chapters.append(
Chapter(
number=current_chapter["number"],
title=current_chapter["title"],
content=chapter_content,
)
)
return chapters
class EpubChapterParser:
"""Handles parsing of EPUB format chapters."""
def __init__(self, base_path: Path):
self.base_path = base_path
@staticmethod
def extract_number_from_filename(
filename: str, pattern: str = None
) -> Optional[int]:
patterns = [r"Section0*(\d+)", r"^0*(\d+)[-_]", r"split_0*(\d+)"]
if pattern:
patterns.insert(0, pattern)
for pat in patterns:
if match := re.search(pat, filename):
return int(match.group(1))
return None
# def get_valid_files(self, epub_dir: Path, config: Dict) -> List[str]:
# """Get list of valid XHTML files."""
# try:
# xhtml_files = [
# f for f in os.listdir(epub_dir) if f.endswith((".xhtml", ".html"))
# ]
# logger.info(len(xhtml_files))
# if not xhtml_files:
# return xhtml_files
# pattern = config.get("pattern")
# files_with_nums = [
# (fname, num)
# for fname in xhtml_files
# if (num := self.extract_number_from_filename(fname, pattern))
# is not None
# and num >= config["idx"]
# ]
# if config.get("sort"):
# files_with_nums.sort(key=lambda x: x[1])
# return [fname for fname, _ in files_with_nums]
# except Exception as e:
# logger.error(f"Error listing directory {epub_dir}: {e}")
# return []
def get_valid_files(self, epub_dir: Path, config: Dict) -> List[str]:
"""
Get list of valid XHTML files and log any missing sequence numbers.
Returns the list of valid filenames while logging sequence gaps.
"""
try:
xhtml_files = [
f for f in os.listdir(epub_dir) if f.endswith((".xhtml", ".html"))
]
logger.info(f"Found {len(xhtml_files)} XHTML/HTML files")
if not xhtml_files:
return xhtml_files
pattern = config.get("pattern")
files_with_nums = [
(fname, num)
for fname in xhtml_files
if (num := self.extract_number_from_filename(fname, pattern))
is not None
and num >= config["idx"]
]
if config.get("sort"):
files_with_nums.sort(key=lambda x: x[1])
# Check for missing numbers in sequence
# if files_with_nums:
# start_idx = config["idx"]
# last_num = max(num for _, num in files_with_nums)
# existing_nums = {num for _, num in files_with_nums}
# missing_nums = []
# for expected_num in range(start_idx, last_num + 1):
# if expected_num not in existing_nums:
# prev_file = next(((f, n) for f, n in files_with_nums if n < expected_num), None)
# next_file = next(((f, n) for f, n in files_with_nums if n > expected_num), None)
# missing_nums.append({
# 'missing_num': expected_num,
# 'prev_file': prev_file[0] if prev_file else None,
# 'next_file': next_file[0] if next_file else None
# })
# if missing_nums:
# logger.warning(f"Found {len(missing_nums)} gaps in file sequence:")
# for gap in missing_nums:
# logger.warning(
# f"Missing number {gap['missing_num']} "
# f"(between files: {gap['prev_file']} and {gap['next_file']})"
# )
return [fname for fname, _ in files_with_nums]
except Exception as e:
logger.error(f"Error listing directory {epub_dir}: {e}")
return []
def parse_chapter_file(
self, file_path: Path, chapter_num: int
) -> Optional[Chapter]:
"""Parse single chapter file."""
try:
content = FileReader.read_file(file_path)
if not content:
return None
soup = BeautifulSoup("".join(content), "html.parser")
h1_tag = soup.find("h1")
title = h1_tag.get_text().strip() if h1_tag else f"Chapter {chapter_num}"
paragraphs = []
for p in soup.find_all("p"):
for br in p.find_all("br"):
br.replace_with("\n")
# sup (footnotes) and a elements
for element in p.find_all(["sup", "a"]):
element.decompose()
if text := p.get_text().strip():
paragraphs.append(text)
return Chapter(number=chapter_num, title=title, content=paragraphs)
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
return None
class ChapterParser:
"""Main parser class that handles both text and EPUB formats."""
def __init__(self, base_dir: str):
self.base_dir = Path(base_dir)
self.text_parser = TextChapterParser(self.base_dir)
self.epub_parser = EpubChapterParser(self.base_dir)
def load_format_config(self, novel_dir: str) -> Dict:
"""Load format configuration for a novel."""
return FileReader.read_json(self.base_dir / novel_dir / "format.json")
def get_txt_content(self, novel_dir: str, config: Dict) -> List[Chapter]:
"""Parse txt content into chapters."""
txt_path = self.base_dir / novel_dir / config["path"]
content = FileReader.read_file(txt_path)
if not content:
return []
content_lines = content[config["idx"] - 1 :]
chapters = self.text_parser.parse_chapters(content_lines, config.get("pattern"))
if config.get("include_title"):
for chapter in chapters:
chapter.content = [chapter.title] + chapter.content
return chapters
def get_epub_content(self, novel_dir: str, config: Dict) -> List[Chapter]:
"""Parse EPUB content into chapters."""
epub_dir = self.base_dir / novel_dir / config["path"]
valid_files = self.epub_parser.get_valid_files(epub_dir, config)
chapters = []
for i, filename in enumerate(valid_files, start=1):
if not (
chapter := self.epub_parser.parse_chapter_file(epub_dir / filename, i)
):
continue
if lshift := config.get("lshiftp", 0):
chapter.content = chapter.content[lshift:]
if not chapter.content:
continue
if config.get("include_title"):
if not chapter.content or not (
re.match(config.get("title_pattern", ""), chapter.content[0])
):
chapter.content = [chapter.title] + chapter.content
chapters.append(chapter)
return chapters
def print_chapter_titles(
zh_chapters: List[Chapter], en_chapters: List[Chapter]
) -> None:
"""Print chapter titles side by side."""
max_chapters = max(len(zh_chapters), len(en_chapters))
logger.info(f"max_chapters: {max_chapters}")
logger.info(f"zh: {len(zh_chapters)}")
logger.info(f"en: {len(en_chapters)}")
logger.info("\n=== Chapter Title Comparison ===")
logger.info(f"{'English':<50} | {'Chinese'}")
logger.info("-" * 80)
for i in range(max_chapters):
en_title = en_chapters[i].title if i < len(en_chapters) else "N/A"
zh_title = zh_chapters[i].title if i < len(zh_chapters) else "N/A"
logger.info(f"{en_title:<50} | {zh_title}")
def create_db_entries(
db_path: str, novel_dir: str, zh_chapters: List[Chapter], en_chapters: List[Chapter]
) -> None:
if len(zh_chapters) != len(en_chapters):
logger.warning(
f"Chapter count mismatch for {novel_dir}: ZH={len(zh_chapters)}, EN={len(en_chapters)}"
)
return
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
cursor.execute("insert or ignore into books (book_id) values (?)", (novel_dir,))
for zh_chap, en_chap in zip(zh_chapters, en_chapters):
cursor.execute(
"""
insert or replace into chapters
(book_id, chapter_id, text_zh, text_en)
values (?, ?, ?, ?)
""",
(
novel_dir,
str(zh_chap.number),
"\n\n".join(zh_chap.content),
"\n\n".join(en_chap.content),
),
)
conn.commit()
logger.info(
f"Successfully inserted {len(zh_chapters)} chapters for {novel_dir}"
)
except sqlite3.Error as e:
logger.error(f"Database error occurred: {e}")
conn.rollback()
finally:
conn.close()
def main():
parser = ChapterParser("custom")
logger.info(f"Starting parser with base directory: {os.path.abspath('custom')}")
DB_PATH = "parallel_texts.db"
epub_dirs = [
d for d in os.listdir("custom") if os.path.isdir(os.path.join("custom", d))
]
# epub_dirs = ["warlock"]
for novel_dir in epub_dirs:
logger.info(f"\n=== Analyzing {novel_dir} ===")
config = parser.load_format_config(novel_dir)
zh_chapters = parser.get_txt_content(novel_dir, config["zh"])
en_chapters = parser.get_epub_content(novel_dir, config["en"])
create_db_entries(DB_PATH, novel_dir, zh_chapters, en_chapters)
# print_chapter_titles(zh_chapters, en_chapters)
if __name__ == "__main__":
main()