first commit
This commit is contained in:
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
epubs/
|
||||
*.db
|
||||
train.en
|
||||
train.zh
|
||||
4
README.md
Normal file
4
README.md
Normal file
@@ -0,0 +1,4 @@
|
||||
# chinese -> english finetuning datasets
|
||||
|
||||
train.en and train.zh are from [here](https://www.dropbox.com/scl/fo/dtrf3pe1vfbo5nse16648/ANLqlv3ascANpkdnYF_w4Jk/V1/TRAIN?dl=0&rlkey=486vbn17qra1ez91btj0n4xu2&subfolder_nav_tracking=1)
|
||||
TODO: mirror
|
||||
389
paragraph_split.py
Normal file
389
paragraph_split.py
Normal file
@@ -0,0 +1,389 @@
|
||||
import sqlite3
|
||||
import os
|
||||
import re
|
||||
from bs4 import BeautifulSoup
|
||||
from pathlib import Path
|
||||
import unicodedata
|
||||
|
||||
|
||||
def create_paragraphs_table(conn):
|
||||
"""Create the paragraphs table with necessary columns and constraints."""
|
||||
conn.execute(
|
||||
"""
|
||||
create table if not exists paragraphs (
|
||||
id integer primary key autoincrement,
|
||||
book_id text not null,
|
||||
chapter_id text not null,
|
||||
text_en text,
|
||||
text_zh text,
|
||||
char_count integer,
|
||||
foreign key (book_id, chapter_id) references chapters(book_id, chapter_id)
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def normalize_quotes(text):
|
||||
# normalize unicode characters to their composed form
|
||||
text = unicodedata.normalize("NFKC", text)
|
||||
|
||||
quote_map = {
|
||||
"\u201c": '"', # LEFT DOUBLE QUOTATION MARK
|
||||
"\u201d": '"', # RIGHT DOUBLE QUOTATION MARK
|
||||
"\u2018": "'", # LEFT SINGLE QUOTATION MARK
|
||||
"\u2019": "'", # RIGHT SINGLE QUOTATION MARK
|
||||
"\u00ab": '"', # LEFT-POINTING DOUBLE ANGLE QUOTATION MARK
|
||||
"\u00bb": '"', # RIGHT-POINTING DOUBLE ANGLE QUOTATION MARK
|
||||
"\u2039": "'", # SINGLE LEFT-POINTING ANGLE QUOTATION MARK
|
||||
"\u203a": "'", # SINGLE RIGHT-POINTING ANGLE QUOTATION MARK
|
||||
"\u2032": "'", # PRIME
|
||||
"\u2033": '"', # DOUBLE PRIME
|
||||
}
|
||||
|
||||
for old, new in quote_map.items():
|
||||
text = text.replace(old, new)
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def strip_paragraph_markers(text):
|
||||
"""remove p markers like #<# and #>#"""
|
||||
return re.sub(r"#<#|#>#", "", text).strip()
|
||||
|
||||
|
||||
def normalize_text(text):
|
||||
"""text normalziations"""
|
||||
text = normalize_quotes(text)
|
||||
text = strip_paragraph_markers(text)
|
||||
return text
|
||||
|
||||
|
||||
def get_paragraphs_from_xhtml(xhtml_path):
|
||||
"""extract p from html"""
|
||||
with open(xhtml_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
soup = BeautifulSoup(content, "html.parser")
|
||||
paragraphs = []
|
||||
|
||||
p_elements = soup.find_all("p")
|
||||
|
||||
for p in p_elements:
|
||||
text = normalize_text(p.get_text())
|
||||
if text: # only add non-empty paragraphs
|
||||
paragraphs.append(text)
|
||||
|
||||
# try br split
|
||||
if not paragraphs:
|
||||
content = re.sub(r"<br\s*/>", "<br>", content, flags=re.IGNORECASE)
|
||||
parts = re.split(r"<br>\s*<br>", content, flags=re.IGNORECASE)
|
||||
|
||||
for part in parts:
|
||||
clean_text = BeautifulSoup(part, "html.parser").get_text()
|
||||
text = normalize_text(clean_text)
|
||||
if text: # only add non-empty paragraphs
|
||||
paragraphs.append(text)
|
||||
|
||||
return paragraphs
|
||||
|
||||
|
||||
def get_zh_text_for_lines(zh_lines, start_idx, end_idx):
|
||||
"""Get corresponding Chinese text for given line range."""
|
||||
return " ".join(zh_lines[start_idx : end_idx + 1])
|
||||
|
||||
|
||||
def extract_paragraphs(text_en, text_zh):
|
||||
"""
|
||||
Extract matching paragraphs from English and Chinese texts.
|
||||
Returns list of paragraphs with normalized text.
|
||||
"""
|
||||
paragraphs = []
|
||||
current_en_lines = []
|
||||
current_en_indices = []
|
||||
|
||||
# split into lines and normalize
|
||||
en_lines = [line.strip() for line in text_en.split("\n")]
|
||||
zh_lines = [line.strip() for line in text_zh.split("\n")] if text_zh else []
|
||||
|
||||
i = 0
|
||||
while i < len(en_lines):
|
||||
line = en_lines[i]
|
||||
normalized_line = normalize_text(line)
|
||||
|
||||
if not normalized_line:
|
||||
i += 1
|
||||
continue
|
||||
|
||||
current_en_lines.append(normalized_line)
|
||||
current_en_indices.append(i)
|
||||
|
||||
# Look ahead to check if next line is empty or ends the paragraph
|
||||
next_idx = i + 1
|
||||
while next_idx < len(en_lines):
|
||||
next_line = en_lines[next_idx].strip()
|
||||
if not next_line:
|
||||
# Empty line - continue current paragraph
|
||||
next_idx += 1
|
||||
continue
|
||||
|
||||
# If we have Chinese text, check if these lines correspond to a complete thought
|
||||
if zh_lines:
|
||||
zh_text = get_zh_text_for_lines(
|
||||
zh_lines, current_en_indices[0], next_idx - 1
|
||||
)
|
||||
if zh_text:
|
||||
# Found corresponding Chinese text - end paragraph
|
||||
break
|
||||
|
||||
# Add next line to current paragraph
|
||||
normalized_next = normalize_text(next_line)
|
||||
current_en_lines.append(normalized_next)
|
||||
current_en_indices.append(next_idx)
|
||||
next_idx += 1
|
||||
|
||||
# Create paragraph
|
||||
if current_en_lines:
|
||||
en_text = " ".join(current_en_lines)
|
||||
zh_text = ""
|
||||
if zh_lines:
|
||||
zh_text = get_zh_text_for_lines(
|
||||
zh_lines, current_en_indices[0], current_en_indices[-1]
|
||||
)
|
||||
|
||||
paragraphs.append(
|
||||
{"text_en": en_text, "text_zh": zh_text, "char_count": len(en_text)}
|
||||
)
|
||||
|
||||
# Reset for next paragraph
|
||||
current_en_lines = []
|
||||
current_en_indices = []
|
||||
i = next_idx
|
||||
|
||||
return paragraphs
|
||||
|
||||
|
||||
def match_paragraphs(xhtml_paragraphs, db_lines, lines_to_try=3):
|
||||
"""
|
||||
Match paragraphs from XHTML with lines from database.
|
||||
Tries first few lines at start before giving up, to handle chapter titles and initial dialog.
|
||||
|
||||
Args:
|
||||
xhtml_paragraphs: List of XHTML paragraph texts
|
||||
db_lines: List of database text lines
|
||||
lines_to_try: Number of initial lines to try before giving up
|
||||
|
||||
Returns:
|
||||
List of tuples containing (start_idx, end_idx) for matched paragraphs
|
||||
"""
|
||||
|
||||
def find_next_content_line(current_idx):
|
||||
"""Find next non-empty line and return its index and content."""
|
||||
while current_idx < len(db_lines):
|
||||
line = normalize_text(db_lines[current_idx].strip())
|
||||
if line:
|
||||
return current_idx, line
|
||||
current_idx += 1
|
||||
return current_idx, None
|
||||
|
||||
matched_indices = []
|
||||
xhtml_idx = 0
|
||||
db_idx = 0
|
||||
tried_lines = 0
|
||||
|
||||
while xhtml_idx < len(xhtml_paragraphs) and db_idx < len(db_lines):
|
||||
# find next non-empty line in db
|
||||
db_check_idx, db_line = find_next_content_line(db_idx)
|
||||
if not db_line:
|
||||
break
|
||||
|
||||
# search for p containing this line
|
||||
while (
|
||||
xhtml_idx < len(xhtml_paragraphs)
|
||||
and db_line not in xhtml_paragraphs[xhtml_idx]
|
||||
):
|
||||
xhtml_idx += 1
|
||||
|
||||
# try ~3 db_lines at start
|
||||
if xhtml_idx >= len(xhtml_paragraphs):
|
||||
if not matched_indices and tried_lines < lines_to_try:
|
||||
tried_lines += 1
|
||||
xhtml_idx = 0
|
||||
db_idx = db_check_idx + 1
|
||||
continue
|
||||
break
|
||||
|
||||
# collect all database lines that belong to this p
|
||||
start_idx = db_check_idx
|
||||
current_idx = db_check_idx
|
||||
|
||||
while current_idx < len(db_lines):
|
||||
current_line = normalize_text(db_lines[current_idx].strip())
|
||||
if current_line and current_line not in xhtml_paragraphs[xhtml_idx]:
|
||||
break
|
||||
current_idx += 1
|
||||
|
||||
matched_indices.append((start_idx, current_idx - 1))
|
||||
db_idx = current_idx
|
||||
xhtml_idx += 1
|
||||
|
||||
return matched_indices
|
||||
|
||||
|
||||
def normalize_chapter_id(chapter_id):
|
||||
"""
|
||||
Normalize chapter IDs by removing padding and handling special cases.
|
||||
Examples:
|
||||
- gfyxjdcz!_0001 -> 1
|
||||
- 00001-1-Swindler -> 1>
|
||||
- wyctUp_0001 -> 1
|
||||
- ltzz_0002 -> 2
|
||||
"""
|
||||
# handle IDs with _
|
||||
if "_" in chapter_id:
|
||||
chapter_id = chapter_id.split("_")[-1]
|
||||
|
||||
# rm any non-digit prefix and suffix
|
||||
digits = re.search(r"(\d+)", chapter_id)
|
||||
if digits:
|
||||
chapter_id = digits.group(1)
|
||||
|
||||
# rm leading zeros
|
||||
return str(int(chapter_id))
|
||||
|
||||
|
||||
def find_chapter_file(epub_dir, normalized_id):
|
||||
epub_dir = Path(epub_dir)
|
||||
search_dirs = [
|
||||
epub_dir / "OEBPS" / "Text",
|
||||
epub_dir / "OEBPS",
|
||||
]
|
||||
|
||||
for directory in search_dirs:
|
||||
if not directory.exists():
|
||||
continue
|
||||
|
||||
for file_path in directory.glob("*.*html"):
|
||||
numbers = re.findall(r"\d+", file_path.stem)
|
||||
if numbers:
|
||||
file_chapter_num = str(int(numbers[0]))
|
||||
if file_chapter_num == normalized_id:
|
||||
return file_path
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def preserve_lines(text):
|
||||
return [line.strip() if line.strip() else line for line in text.split("\n")]
|
||||
|
||||
|
||||
def print_matched_paragraphs(text_en_lines, text_zh_lines, matched_indices):
|
||||
"""
|
||||
Print matched paragraphs from English and Chinese text, with Chinese translation
|
||||
immediately following each English paragraph.
|
||||
|
||||
Args:
|
||||
text_en_lines (list): List of English text lines
|
||||
text_zh_lines (list): List of Chinese text lines
|
||||
matched_indices (list): List of tuples containing (start_idx, end_idx)
|
||||
"""
|
||||
if not matched_indices:
|
||||
print("No matched paragraphs found.")
|
||||
return
|
||||
|
||||
for start_idx, end_idx in matched_indices:
|
||||
# Get and join English lines for this range
|
||||
en_para = " ".join(text_en_lines[start_idx : end_idx + 1])
|
||||
# Get and join Chinese lines for the same range
|
||||
zh_para = " ".join(text_zh_lines[start_idx : end_idx + 1])
|
||||
|
||||
# Print English followed by Chinese
|
||||
print(strip_paragraph_markers(en_para))
|
||||
print(strip_paragraph_markers(zh_para))
|
||||
print() # Extra newline between pairs
|
||||
|
||||
|
||||
def process_book(conn, epub_base_dir, book_id):
|
||||
"""Process an entire book and add paragraphs to database."""
|
||||
epub_dir = Path(epub_base_dir) / book_id
|
||||
|
||||
if not epub_dir.exists():
|
||||
# print(f"Warning: EPUB directory not found for book {book_id}: {epub_dir}")
|
||||
return
|
||||
|
||||
print(f"Processing book {book_id} from: {epub_dir}")
|
||||
|
||||
# Get all chapters for this book
|
||||
chapters = conn.execute(
|
||||
"select chapter_id, text_en, text_zh from chapters where book_id = ?",
|
||||
(book_id,),
|
||||
).fetchall()
|
||||
|
||||
print(f"Chapter count: {len(chapters)}")
|
||||
|
||||
for chapter_id, text_en, text_zh in chapters:
|
||||
if not text_en or not text_zh:
|
||||
print(
|
||||
f"Warning: Missing content for chapter {chapter_id} in book {book_id}"
|
||||
)
|
||||
continue
|
||||
|
||||
# find html file
|
||||
normalized_id = normalize_chapter_id(chapter_id)
|
||||
xhtml_path = find_chapter_file(epub_dir, normalized_id)
|
||||
if not xhtml_path:
|
||||
print(
|
||||
f"Warning: Could not find XHTML file for chapter {chapter_id}. normalized_id: {normalized_id}, xhtml_path: {xhtml_path}"
|
||||
)
|
||||
continue
|
||||
|
||||
# extract p from html
|
||||
xhtml_paragraphs = get_paragraphs_from_xhtml(xhtml_path)
|
||||
|
||||
# split by \n only, strip only non-empty lines
|
||||
text_en_lines = preserve_lines(text_en)
|
||||
text_zh_lines = preserve_lines(text_zh)
|
||||
|
||||
# match ps between XHTML and db content
|
||||
matched_indices = match_paragraphs(xhtml_paragraphs, text_en_lines)
|
||||
|
||||
# print_matched_paragraphs(text_en_lines, text_zh_lines, matched_indices)
|
||||
matched_pairs = []
|
||||
for start_idx, end_idx in matched_indices:
|
||||
en_para = strip_paragraph_markers(
|
||||
" ".join(text_en_lines[start_idx : end_idx + 1])
|
||||
)
|
||||
zh_para = strip_paragraph_markers(
|
||||
" ".join(text_zh_lines[start_idx : end_idx + 1])
|
||||
)
|
||||
matched_pairs.append((en_para, zh_para))
|
||||
|
||||
for en_para, zh_para in matched_pairs:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO paragraphs (book_id, chapter_id, text_en, text_zh, char_count)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
""",
|
||||
(book_id, chapter_id, en_para, zh_para, len(en_para)),
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
|
||||
|
||||
def process_all_books(db_path, epub_base_dir):
|
||||
"""Process all books in the database."""
|
||||
conn = sqlite3.connect(db_path)
|
||||
create_paragraphs_table(conn)
|
||||
|
||||
books = conn.execute("select book_id from books").fetchall()
|
||||
|
||||
for (book_id,) in books:
|
||||
process_book(conn, epub_base_dir, book_id)
|
||||
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
db_path = "parallel_texts.db"
|
||||
epub_base_dir = "epubs" # base dir
|
||||
|
||||
process_all_books(db_path, epub_base_dir)
|
||||
198
paragraph_split_custom_zh.py
Normal file
198
paragraph_split_custom_zh.py
Normal file
@@ -0,0 +1,198 @@
|
||||
import os
|
||||
from bs4 import BeautifulSoup
|
||||
import re
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Dict
|
||||
|
||||
|
||||
def clean_text(text: str) -> str:
|
||||
"""normalize whitespace and line end"""
|
||||
return re.sub(r"\s+", " ", text).strip()
|
||||
|
||||
|
||||
def extract_zh_paragraphs(soup: BeautifulSoup) -> List[str]:
|
||||
"""extract paragraphs from zh HTML"""
|
||||
if h1_tag := soup.find("h1"):
|
||||
h1_tag.decompose()
|
||||
|
||||
for br in soup.find_all("br"):
|
||||
br.replace_with("\n")
|
||||
|
||||
content = soup.body.get_text()
|
||||
paragraphs = [clean_text(p) for p in content.split("\n\n") if clean_text(p)]
|
||||
|
||||
return paragraphs
|
||||
|
||||
|
||||
def extract_en_paragraphs(soup: BeautifulSoup) -> List[str]:
|
||||
"""etract paragraphs from English HTML"""
|
||||
if h1_tag := soup.find("h1"):
|
||||
h1_tag.decompose()
|
||||
|
||||
for footnote in soup.find_all("span", id=re.compile(r"easy-footnote.*")):
|
||||
footnote.decompose()
|
||||
|
||||
paragraphs = [
|
||||
clean_text(p.get_text()) for p in soup.find_all("p") if clean_text(p.get_text())
|
||||
]
|
||||
|
||||
return paragraphs
|
||||
|
||||
|
||||
def print_debug_info(
|
||||
chapter_num: str,
|
||||
en_file: Path,
|
||||
zh_file: Path,
|
||||
en_paragraphs: List[str],
|
||||
zh_paragraphs: List[str],
|
||||
):
|
||||
"""debug"""
|
||||
print(f"\n=== MISMATCH DETECTED IN CHAPTER {chapter_num} ===")
|
||||
print(f"English file: {en_file}")
|
||||
print(f"Chinese file: {zh_file}")
|
||||
print(f"\nParagraph count:")
|
||||
print(f" English: {len(en_paragraphs)}")
|
||||
print(f" Chinese: {len(zh_paragraphs)}")
|
||||
|
||||
print("\nFirst 3 English p:")
|
||||
for i, p in enumerate(en_paragraphs[:3]):
|
||||
print(f" {i+1}: {p[:100]}...")
|
||||
|
||||
print("\nFirst 3 Chinese p:")
|
||||
for i, p in enumerate(zh_paragraphs[:3]):
|
||||
print(f" {i+1}: {p[:100]}...")
|
||||
|
||||
print("\nRaw Chinese HTML:")
|
||||
with open(zh_file, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
print(content[:500])
|
||||
|
||||
|
||||
def process_chapter_pair(en_path: Path, zh_path: Path) -> Tuple[List[str], List[str]]:
|
||||
"""Process a pair of corresponding chapter files"""
|
||||
with open(en_path, "r", encoding="utf-8") as f:
|
||||
en_soup = BeautifulSoup(f, "html.parser")
|
||||
en_paragraphs = extract_en_paragraphs(en_soup)
|
||||
|
||||
with open(zh_path, "r", encoding="utf-8") as f:
|
||||
zh_soup = BeautifulSoup(f, "html.parser")
|
||||
zh_paragraphs = extract_zh_paragraphs(zh_soup)
|
||||
|
||||
return en_paragraphs, zh_paragraphs
|
||||
|
||||
|
||||
def insert_book_chapters(
|
||||
db_path: str, book_id: str, matched_chapters: Dict[str, Tuple[List[str], List[str]]]
|
||||
):
|
||||
"""
|
||||
Insert chapters and paragraphs into the database for a given book_id.
|
||||
Only inserts when English and Chinese paragraph counts match.
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
cur.execute("insert or ignore into books (book_id) values (?)", (book_id,))
|
||||
|
||||
for chapter_id, (en_paragraphs, zh_paragraphs) in matched_chapters.items():
|
||||
# only process if paragraph counts match
|
||||
if len(en_paragraphs) != len(zh_paragraphs):
|
||||
print(
|
||||
f"Skipping chapter {chapter_id} due to paragraph count mismatch: "
|
||||
f"EN: {len(en_paragraphs)}, ZH: {len(zh_paragraphs)}"
|
||||
)
|
||||
continue
|
||||
|
||||
# join paragraphs for chapter text
|
||||
chapter_text_en = "\n".join(en_paragraphs)
|
||||
chapter_text_zh = "\n".join(zh_paragraphs)
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
insert into chapters (book_id, chapter_id, text_en, text_zh)
|
||||
values (?, ?, ?, ?)
|
||||
on conflict (book_id, chapter_id) do update set
|
||||
text_en = excluded.text_en,
|
||||
text_zh = excluded.text_zh
|
||||
""",
|
||||
(book_id, chapter_id, chapter_text_en, chapter_text_zh),
|
||||
)
|
||||
|
||||
# insert p
|
||||
for en_text, zh_text in zip(en_paragraphs, zh_paragraphs):
|
||||
char_count = len(en_text)
|
||||
cur.execute(
|
||||
"""
|
||||
insert into paragraphs
|
||||
(book_id, chapter_id, text_en, text_zh, char_count)
|
||||
values (?, ?, ?, ?, ?)
|
||||
""",
|
||||
(book_id, chapter_id, en_text, zh_text, char_count),
|
||||
)
|
||||
|
||||
print(
|
||||
f"Processed chapter {chapter_id} with {len(en_paragraphs)} paragraphs"
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
print(f"Successfully processed all matching chapters for book {book_id}")
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
print(f"Error processing chapters: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def match_chapters(epub_dir: str) -> Dict[str, Tuple[List[str], List[str]]]:
|
||||
"""Match and process chapters between English and Chinese directories."""
|
||||
base_dir = Path(epub_dir)
|
||||
en_dir = base_dir / "en"
|
||||
zh_dir = base_dir / "zh"
|
||||
|
||||
matched_paragraphs = {}
|
||||
|
||||
# Get all English files and sort them
|
||||
en_files = sorted([f for f in en_dir.glob("*.xhtml")])
|
||||
|
||||
for en_file in en_files:
|
||||
# Construct corresponding Chinese filename
|
||||
chapter_num = re.search(r"(\d{4})", en_file.name).group(1)
|
||||
zh_file = zh_dir / f"{chapter_num}_.xhtml"
|
||||
|
||||
if not zh_file.exists():
|
||||
print(f"Warning: No matching Chinese file for {en_file.name}")
|
||||
continue
|
||||
|
||||
try:
|
||||
en_paragraphs, zh_paragraphs = process_chapter_pair(en_file, zh_file)
|
||||
|
||||
# Check for significant mismatch in paragraph counts
|
||||
# if abs(len(en_paragraphs) - len(zh_paragraphs)) > 5:
|
||||
# print_debug_info(
|
||||
# chapter_num, en_file, zh_file, en_paragraphs, zh_paragraphs
|
||||
# )
|
||||
# else:
|
||||
print(f"Chapter {chapter_num}:")
|
||||
print(f" English paragraphs: {len(en_paragraphs)}")
|
||||
print(f" Chinese paragraphs: {len(zh_paragraphs)}")
|
||||
|
||||
# Store results
|
||||
matched_paragraphs[chapter_num] = (en_paragraphs, zh_paragraphs)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing chapter {chapter_num}: {str(e)}")
|
||||
|
||||
return matched_paragraphs
|
||||
|
||||
|
||||
def main():
|
||||
epub_dir = "epubs/1v1h"
|
||||
matched_chapters = match_chapters(epub_dir)
|
||||
insert_book_chapters("parallel_texts.db", "1v1h", matched_chapters)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
119
parallel_text_import.py
Normal file
119
parallel_text_import.py
Normal file
@@ -0,0 +1,119 @@
|
||||
import sqlite3
|
||||
import re
|
||||
from typing import List, Tuple, Dict
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class TextUnit:
|
||||
book_id: str
|
||||
chapter_id: str
|
||||
text: str
|
||||
|
||||
|
||||
def parse_file(filename: str) -> List[TextUnit]:
|
||||
"""Parse the file and return a list of TextUnits."""
|
||||
units = []
|
||||
current_book = ""
|
||||
current_chapter = ""
|
||||
current_text = []
|
||||
book_pattern = re.compile(r'<BOOK id="([^"]+)">')
|
||||
chapter_pattern = re.compile(r'<CHAPTER id="([^"]+)">')
|
||||
end_pattern = re.compile(r"</(?:BOOK|CHAPTER)>")
|
||||
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
# parse BOOK opening tag
|
||||
book_match = book_pattern.match(line)
|
||||
if book_match:
|
||||
current_book = book_match.group(1)
|
||||
continue
|
||||
|
||||
# parse CHAPTER opening tag
|
||||
chapter_match = chapter_pattern.match(line)
|
||||
if chapter_match:
|
||||
current_chapter = chapter_match.group(1)
|
||||
current_text = []
|
||||
continue
|
||||
|
||||
# on any end tag, save the current chapter
|
||||
if end_pattern.match(line):
|
||||
if current_text:
|
||||
units.append(
|
||||
TextUnit(
|
||||
book_id=current_book,
|
||||
chapter_id=current_chapter,
|
||||
text="".join(current_text),
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
# if line doesn't match any of our known tags, it's content
|
||||
if not book_pattern.match(line) and not chapter_pattern.match(line):
|
||||
current_text.append(line)
|
||||
|
||||
return units
|
||||
|
||||
|
||||
def create_database(db_name: str = "parallel_texts.db"):
|
||||
"""create schema"""
|
||||
conn = sqlite3.connect(db_name)
|
||||
|
||||
with open("schema.sql", "r") as f:
|
||||
conn.executescript(f.read())
|
||||
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def import_texts(
|
||||
en_units: List[TextUnit], zh_units: List[TextUnit], conn: sqlite3.Connection
|
||||
):
|
||||
"""import parsed text"""
|
||||
c = conn.cursor()
|
||||
|
||||
# collect all unique book IDs
|
||||
book_ids = set(unit.book_id for unit in en_units)
|
||||
|
||||
# insert books
|
||||
for book_id in book_ids:
|
||||
c.execute("insert or ignore into books (book_id) values (?)", (book_id,))
|
||||
|
||||
# create a dict for Chinese texts
|
||||
zh_dict = {(unit.book_id, unit.chapter_id): unit.text for unit in zh_units}
|
||||
|
||||
# insert chapters with parallel texts
|
||||
for en_unit in en_units:
|
||||
zh_text = zh_dict.get((en_unit.book_id, en_unit.chapter_id), "")
|
||||
c.execute(
|
||||
"""
|
||||
insert or replace into chapters (book_id, chapter_id, text_en, text_zh) values (?, ?, ?, ?)
|
||||
""",
|
||||
(en_unit.book_id, en_unit.chapter_id, en_unit.text, zh_text),
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
|
||||
|
||||
def main():
|
||||
en_units = parse_file("train.en")
|
||||
zh_units = parse_file("train.zh")
|
||||
|
||||
# create and populate database
|
||||
conn = create_database()
|
||||
import_texts(en_units, zh_units, conn)
|
||||
|
||||
# stats
|
||||
c = conn.cursor()
|
||||
c.execute("select count(*) from books")
|
||||
book_count = c.fetchone()[0]
|
||||
c.execute("select count(*) from chapters")
|
||||
chapter_count = c.fetchone()[0]
|
||||
|
||||
print(f"Imported {book_count} books and {chapter_count} chapters.")
|
||||
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
23
schema.sql
Normal file
23
schema.sql
Normal file
@@ -0,0 +1,23 @@
|
||||
create table if not exists books (
|
||||
book_id text primary key
|
||||
);
|
||||
|
||||
create table if not exists chapters (
|
||||
id integer primary key autoincrement,
|
||||
book_id text,
|
||||
chapter_id text,
|
||||
text_en text,
|
||||
text_zh text,
|
||||
foreign key (book_id) references books(book_id),
|
||||
unique(book_id, chapter_id)
|
||||
);
|
||||
|
||||
create table if not exists paragraphs (
|
||||
id integer primary key autoincrement,
|
||||
book_id text not null,
|
||||
chapter_id text not null,
|
||||
text_en text,
|
||||
text_zh text,
|
||||
char_count integer,
|
||||
foreign key (book_id, chapter_id) references chapters(book_id, chapter_id)
|
||||
);
|
||||
Reference in New Issue
Block a user