164 lines
5.6 KiB
Python
164 lines
5.6 KiB
Python
# Imports
|
|
import argparse, json, time, re
|
|
import numpy as np
|
|
from paddleocr import PaddleOCR
|
|
from jiwer import wer, cer
|
|
from dataset_manager import ImageTextDataset
|
|
from itertools import islice
|
|
|
|
def export_config(paddleocr_model):
|
|
yaml_path = "paddleocr_pipeline_dump.yaml"
|
|
paddleocr_model.export_paddlex_config_to_yaml(yaml_path)
|
|
print("Exported:", yaml_path)
|
|
|
|
def evaluate_text(reference, prediction):
|
|
return {'WER': wer(reference, prediction), 'CER': cer(reference, prediction)}
|
|
|
|
def _normalize_box_xyxy(box):
|
|
"""
|
|
Accepts:
|
|
- [[x,y],[x,y],[x,y],[x,y]] (quad)
|
|
- [x0, y0, x1, y1] (flat)
|
|
- [x0, y0, x1, y1, x2, y2, x3, y3] (flat quad)
|
|
Returns (x0, y0, x1, y1)
|
|
"""
|
|
# Quad as list of points?
|
|
if isinstance(box, (list, tuple)) and box and isinstance(box[0], (list, tuple)):
|
|
xs = [p[0] for p in box]
|
|
ys = [p[1] for p in box]
|
|
return min(xs), min(ys), max(xs), max(ys)
|
|
|
|
# Flat list
|
|
if isinstance(box, (list, tuple)):
|
|
if len(box) == 4:
|
|
x0, y0, x1, y1 = box
|
|
# ensure order
|
|
return min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1)
|
|
if len(box) == 8:
|
|
xs = box[0::2]
|
|
ys = box[1::2]
|
|
return min(xs), min(ys), max(xs), max(ys)
|
|
|
|
# Fallback
|
|
raise ValueError(f"Unrecognized box format: {box!r}")
|
|
|
|
def assemble_from_paddle_result(paddleocr_predict, min_score=0.0, line_tol_factor=0.6):
|
|
"""
|
|
Robust line grouping for PaddleOCR outputs:
|
|
- normalizes boxes to (x0,y0,x1,y1)
|
|
- adaptive line tolerance based on median box height
|
|
- optional confidence filter
|
|
- inserts '\n' between lines and preserves left→right order
|
|
"""
|
|
result = paddleocr_predict
|
|
|
|
boxes_all = [] # (x0, y0, x1, y1, y_mid, text, score)
|
|
for item in result:
|
|
res = item.json.get("res", {})
|
|
boxes = res.get("rec_boxes", []) or [] # be defensive
|
|
texts = res.get("rec_texts", []) or []
|
|
scores = res.get("rec_scores", None)
|
|
|
|
for i, (box, text) in enumerate(zip(boxes, texts)):
|
|
try:
|
|
x0, y0, x1, y1 = _normalize_box_xyxy(box)
|
|
except Exception:
|
|
# Skip weird boxes gracefully
|
|
continue
|
|
|
|
y_mid = 0.5 * (y0 + y1)
|
|
score = float(scores[i]) if (scores is not None and i < len(scores)) else 1.0
|
|
|
|
t = re.sub(r"\s+", " ", str(text)).strip()
|
|
if not t:
|
|
continue
|
|
|
|
boxes_all.append((x0, y0, x1, y1, y_mid, t, score))
|
|
|
|
if min_score > 0:
|
|
boxes_all = [b for b in boxes_all if b[6] >= min_score]
|
|
|
|
if not boxes_all:
|
|
return ""
|
|
|
|
# Adaptive line tolerance
|
|
heights = [b[3] - b[1] for b in boxes_all]
|
|
median_h = float(np.median(heights)) if heights else 20.0
|
|
line_tol = max(8.0, line_tol_factor * median_h)
|
|
|
|
# Sort by vertical mid, then x0
|
|
boxes_all.sort(key=lambda b: (b[4], b[0]))
|
|
|
|
# Group into lines
|
|
lines, cur, last_y = [], [], None
|
|
for x0, y0, x1, y1, y_mid, text, score in boxes_all:
|
|
if last_y is None or abs(y_mid - last_y) <= line_tol:
|
|
cur.append((x0, text))
|
|
else:
|
|
cur.sort(key=lambda t: t[0])
|
|
lines.append(" ".join(t[1] for t in cur))
|
|
cur = [(x0, text)]
|
|
last_y = y_mid
|
|
|
|
if cur:
|
|
cur.sort(key=lambda t: t[0])
|
|
lines.append(" ".join(t[1] for t in cur))
|
|
|
|
res = "\n".join(lines)
|
|
res = re.sub(r"\s+\n", "\n", res).strip()
|
|
return res
|
|
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--pdf-folder", required=True)
|
|
parser.add_argument("--textline-orientation", type=lambda s: s.lower()=="true", default=True)
|
|
parser.add_argument("--text-det-box-thresh", type=float, default=0.6)
|
|
parser.add_argument("--text-det-unclip-ratio", type=float, default=1.5)
|
|
parser.add_argument("--text-rec-score-thresh", type=float, default=0.0)
|
|
parser.add_argument("--line-tolerance", type=float, default=0.6)
|
|
parser.add_argument("--min-box-score", type=float, default=0.0)
|
|
parser.add_argument("--lang", default="es")
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
ocr = PaddleOCR(
|
|
text_detection_model_name="PP-OCRv5_server_det",
|
|
text_recognition_model_name="PP-OCRv5_server_rec",
|
|
lang=args.lang,
|
|
)
|
|
|
|
dataset = ImageTextDataset(args.pdf_folder)
|
|
cer_list, wer_list = [], []
|
|
time_per_page_list = []
|
|
t0 = time.time()
|
|
|
|
for img, ref in islice(dataset, 5, 10):
|
|
arr = np.array(img)
|
|
tp0 = time.time()
|
|
out = ocr.predict(
|
|
arr,
|
|
text_det_box_thresh=args.text_det_box_thresh,
|
|
text_det_unclip_ratio=args.text_det_unclip_ratio,
|
|
text_rec_score_thresh=args.text_rec_score_thresh,
|
|
use_textline_orientation=args.textline_orientation
|
|
)
|
|
pred = assemble_from_paddle_result(out, args.min_box_score, args.line_tolerance)
|
|
time_per_page_list.append(float(time.time() - tp0))
|
|
m = evaluate_text(ref, pred)
|
|
cer_list.append(m["CER"])
|
|
wer_list.append(m["WER"])
|
|
|
|
metrics = {
|
|
"CER": float(np.mean(cer_list) if cer_list else 1.0),
|
|
"WER": float(np.mean(wer_list) if wer_list else 1.0),
|
|
"TIME": float(time.time() - t0),
|
|
"PAGES": int(len(cer_list)),
|
|
"TIME_PER_PAGE": float(np.mean(time_per_page_list) if time_per_page_list else float(time.time() - t0)),
|
|
}
|
|
print(json.dumps(metrics))
|
|
|
|
if __name__ == "__main__":
|
|
main() |