mirror of
https://github.com/zebrajr/ollama-webui.git
synced 2025-12-05 12:20:26 +01:00
542 lines
19 KiB
Python
542 lines
19 KiB
Python
import os
|
|
import time
|
|
import requests
|
|
import logging
|
|
import tempfile
|
|
import zipfile
|
|
from typing import List, Optional
|
|
from langchain_core.documents import Document
|
|
from fastapi import HTTPException, status
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class MinerULoader:
|
|
"""
|
|
MinerU document parser loader supporting both Cloud API and Local API modes.
|
|
|
|
Cloud API: Uses MinerU managed service with async task-based processing
|
|
Local API: Uses self-hosted MinerU API with synchronous processing
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
file_path: str,
|
|
api_mode: str = "local",
|
|
api_url: str = "http://localhost:8000",
|
|
api_key: str = "",
|
|
params: dict = None,
|
|
):
|
|
self.file_path = file_path
|
|
self.api_mode = api_mode.lower()
|
|
self.api_url = api_url.rstrip("/")
|
|
self.api_key = api_key
|
|
|
|
# Parse params dict with defaults
|
|
params = params or {}
|
|
self.enable_ocr = params.get("enable_ocr", False)
|
|
self.enable_formula = params.get("enable_formula", True)
|
|
self.enable_table = params.get("enable_table", True)
|
|
self.language = params.get("language", "en")
|
|
self.model_version = params.get("model_version", "pipeline")
|
|
self.page_ranges = params.get("page_ranges", "")
|
|
|
|
# Validate API mode
|
|
if self.api_mode not in ["local", "cloud"]:
|
|
raise ValueError(
|
|
f"Invalid API mode: {self.api_mode}. Must be 'local' or 'cloud'"
|
|
)
|
|
|
|
# Validate Cloud API requirements
|
|
if self.api_mode == "cloud" and not self.api_key:
|
|
raise ValueError("API key is required for Cloud API mode")
|
|
|
|
def load(self) -> List[Document]:
|
|
"""
|
|
Main entry point for loading and parsing the document.
|
|
Routes to Cloud or Local API based on api_mode.
|
|
"""
|
|
try:
|
|
if self.api_mode == "cloud":
|
|
return self._load_cloud_api()
|
|
else:
|
|
return self._load_local_api()
|
|
except Exception as e:
|
|
log.error(f"Error loading document with MinerU: {e}")
|
|
raise
|
|
|
|
def _load_local_api(self) -> List[Document]:
|
|
"""
|
|
Load document using Local API (synchronous).
|
|
Posts file to /file_parse endpoint and gets immediate response.
|
|
"""
|
|
log.info(f"Using MinerU Local API at {self.api_url}")
|
|
|
|
filename = os.path.basename(self.file_path)
|
|
|
|
# Build form data for Local API
|
|
form_data = {
|
|
"return_md": "true",
|
|
"formula_enable": str(self.enable_formula).lower(),
|
|
"table_enable": str(self.enable_table).lower(),
|
|
}
|
|
|
|
# Parse method based on OCR setting
|
|
if self.enable_ocr:
|
|
form_data["parse_method"] = "ocr"
|
|
else:
|
|
form_data["parse_method"] = "auto"
|
|
|
|
# Language configuration (Local API uses lang_list array)
|
|
if self.language:
|
|
form_data["lang_list"] = self.language
|
|
|
|
# Backend/model version (Local API uses "backend" parameter)
|
|
if self.model_version == "vlm":
|
|
form_data["backend"] = "vlm-vllm-engine"
|
|
else:
|
|
form_data["backend"] = "pipeline"
|
|
|
|
# Page ranges (Local API uses start_page_id and end_page_id)
|
|
if self.page_ranges:
|
|
# For simplicity, if page_ranges is specified, log a warning
|
|
# Full page range parsing would require parsing the string
|
|
log.warning(
|
|
f"Page ranges '{self.page_ranges}' specified but Local API uses different format. "
|
|
"Consider using start_page_id/end_page_id parameters if needed."
|
|
)
|
|
|
|
try:
|
|
with open(self.file_path, "rb") as f:
|
|
files = {"files": (filename, f, "application/octet-stream")}
|
|
|
|
log.info(f"Sending file to MinerU Local API: {filename}")
|
|
log.debug(f"Local API parameters: {form_data}")
|
|
|
|
response = requests.post(
|
|
f"{self.api_url}/file_parse",
|
|
data=form_data,
|
|
files=files,
|
|
timeout=300, # 5 minute timeout for large documents
|
|
)
|
|
response.raise_for_status()
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(
|
|
status.HTTP_404_NOT_FOUND, detail=f"File not found: {self.file_path}"
|
|
)
|
|
except requests.Timeout:
|
|
raise HTTPException(
|
|
status.HTTP_504_GATEWAY_TIMEOUT,
|
|
detail="MinerU Local API request timed out",
|
|
)
|
|
except requests.HTTPError as e:
|
|
error_detail = f"MinerU Local API request failed: {e}"
|
|
if e.response is not None:
|
|
try:
|
|
error_data = e.response.json()
|
|
error_detail += f" - {error_data}"
|
|
except:
|
|
error_detail += f" - {e.response.text}"
|
|
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error calling MinerU Local API: {str(e)}",
|
|
)
|
|
|
|
# Parse response
|
|
try:
|
|
result = response.json()
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status.HTTP_502_BAD_GATEWAY,
|
|
detail=f"Invalid JSON response from MinerU Local API: {e}",
|
|
)
|
|
|
|
# Extract markdown content from response
|
|
if "results" not in result:
|
|
raise HTTPException(
|
|
status.HTTP_502_BAD_GATEWAY,
|
|
detail="MinerU Local API response missing 'results' field",
|
|
)
|
|
|
|
results = result["results"]
|
|
if not results:
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST,
|
|
detail="MinerU returned empty results",
|
|
)
|
|
|
|
# Get the first (and typically only) result
|
|
file_result = list(results.values())[0]
|
|
markdown_content = file_result.get("md_content", "")
|
|
|
|
if not markdown_content:
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST,
|
|
detail="MinerU returned empty markdown content",
|
|
)
|
|
|
|
log.info(f"Successfully parsed document with MinerU Local API: {filename}")
|
|
|
|
# Create metadata
|
|
metadata = {
|
|
"source": filename,
|
|
"api_mode": "local",
|
|
"backend": result.get("backend", "unknown"),
|
|
"version": result.get("version", "unknown"),
|
|
}
|
|
|
|
return [Document(page_content=markdown_content, metadata=metadata)]
|
|
|
|
def _load_cloud_api(self) -> List[Document]:
|
|
"""
|
|
Load document using Cloud API (asynchronous).
|
|
Uses batch upload endpoint to avoid need for public file URLs.
|
|
"""
|
|
log.info(f"Using MinerU Cloud API at {self.api_url}")
|
|
|
|
filename = os.path.basename(self.file_path)
|
|
|
|
# Step 1: Request presigned upload URL
|
|
batch_id, upload_url = self._request_upload_url(filename)
|
|
|
|
# Step 2: Upload file to presigned URL
|
|
self._upload_to_presigned_url(upload_url)
|
|
|
|
# Step 3: Poll for results
|
|
result = self._poll_batch_status(batch_id, filename)
|
|
|
|
# Step 4: Download and extract markdown from ZIP
|
|
markdown_content = self._download_and_extract_zip(
|
|
result["full_zip_url"], filename
|
|
)
|
|
|
|
log.info(f"Successfully parsed document with MinerU Cloud API: {filename}")
|
|
|
|
# Create metadata
|
|
metadata = {
|
|
"source": filename,
|
|
"api_mode": "cloud",
|
|
"batch_id": batch_id,
|
|
}
|
|
|
|
return [Document(page_content=markdown_content, metadata=metadata)]
|
|
|
|
def _request_upload_url(self, filename: str) -> tuple:
|
|
"""
|
|
Request presigned upload URL from Cloud API.
|
|
Returns (batch_id, upload_url).
|
|
"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
# Build request body
|
|
request_body = {
|
|
"enable_formula": self.enable_formula,
|
|
"enable_table": self.enable_table,
|
|
"language": self.language,
|
|
"model_version": self.model_version,
|
|
"files": [
|
|
{
|
|
"name": filename,
|
|
"is_ocr": self.enable_ocr,
|
|
}
|
|
],
|
|
}
|
|
|
|
# Add page ranges if specified
|
|
if self.page_ranges:
|
|
request_body["files"][0]["page_ranges"] = self.page_ranges
|
|
|
|
log.info(f"Requesting upload URL for: {filename}")
|
|
log.debug(f"Cloud API request body: {request_body}")
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.api_url}/file-urls/batch",
|
|
headers=headers,
|
|
json=request_body,
|
|
timeout=30,
|
|
)
|
|
response.raise_for_status()
|
|
except requests.HTTPError as e:
|
|
error_detail = f"Failed to request upload URL: {e}"
|
|
if e.response is not None:
|
|
try:
|
|
error_data = e.response.json()
|
|
error_detail += f" - {error_data.get('msg', error_data)}"
|
|
except:
|
|
error_detail += f" - {e.response.text}"
|
|
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error requesting upload URL: {str(e)}",
|
|
)
|
|
|
|
try:
|
|
result = response.json()
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status.HTTP_502_BAD_GATEWAY,
|
|
detail=f"Invalid JSON response: {e}",
|
|
)
|
|
|
|
# Check for API error response
|
|
if result.get("code") != 0:
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST,
|
|
detail=f"MinerU Cloud API error: {result.get('msg', 'Unknown error')}",
|
|
)
|
|
|
|
data = result.get("data", {})
|
|
batch_id = data.get("batch_id")
|
|
file_urls = data.get("file_urls", [])
|
|
|
|
if not batch_id or not file_urls:
|
|
raise HTTPException(
|
|
status.HTTP_502_BAD_GATEWAY,
|
|
detail="MinerU Cloud API response missing batch_id or file_urls",
|
|
)
|
|
|
|
upload_url = file_urls[0]
|
|
log.info(f"Received upload URL for batch: {batch_id}")
|
|
|
|
return batch_id, upload_url
|
|
|
|
def _upload_to_presigned_url(self, upload_url: str) -> None:
|
|
"""
|
|
Upload file to presigned URL (no authentication needed).
|
|
"""
|
|
log.info(f"Uploading file to presigned URL")
|
|
|
|
try:
|
|
with open(self.file_path, "rb") as f:
|
|
response = requests.put(
|
|
upload_url,
|
|
data=f,
|
|
timeout=300, # 5 minute timeout for large files
|
|
)
|
|
response.raise_for_status()
|
|
except FileNotFoundError:
|
|
raise HTTPException(
|
|
status.HTTP_404_NOT_FOUND, detail=f"File not found: {self.file_path}"
|
|
)
|
|
except requests.Timeout:
|
|
raise HTTPException(
|
|
status.HTTP_504_GATEWAY_TIMEOUT,
|
|
detail="File upload to presigned URL timed out",
|
|
)
|
|
except requests.HTTPError as e:
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Failed to upload file to presigned URL: {e}",
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error uploading file: {str(e)}",
|
|
)
|
|
|
|
log.info("File uploaded successfully")
|
|
|
|
def _poll_batch_status(self, batch_id: str, filename: str) -> dict:
|
|
"""
|
|
Poll batch status until completion.
|
|
Returns the result dict for the file.
|
|
"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
}
|
|
|
|
max_iterations = 300 # 10 minutes max (2 seconds per iteration)
|
|
poll_interval = 2 # seconds
|
|
|
|
log.info(f"Polling batch status: {batch_id}")
|
|
|
|
for iteration in range(max_iterations):
|
|
try:
|
|
response = requests.get(
|
|
f"{self.api_url}/extract-results/batch/{batch_id}",
|
|
headers=headers,
|
|
timeout=30,
|
|
)
|
|
response.raise_for_status()
|
|
except requests.HTTPError as e:
|
|
error_detail = f"Failed to poll batch status: {e}"
|
|
if e.response is not None:
|
|
try:
|
|
error_data = e.response.json()
|
|
error_detail += f" - {error_data.get('msg', error_data)}"
|
|
except:
|
|
error_detail += f" - {e.response.text}"
|
|
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error polling batch status: {str(e)}",
|
|
)
|
|
|
|
try:
|
|
result = response.json()
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status.HTTP_502_BAD_GATEWAY,
|
|
detail=f"Invalid JSON response while polling: {e}",
|
|
)
|
|
|
|
# Check for API error response
|
|
if result.get("code") != 0:
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST,
|
|
detail=f"MinerU Cloud API error: {result.get('msg', 'Unknown error')}",
|
|
)
|
|
|
|
data = result.get("data", {})
|
|
extract_result = data.get("extract_result", [])
|
|
|
|
# Find our file in the batch results
|
|
file_result = None
|
|
for item in extract_result:
|
|
if item.get("file_name") == filename:
|
|
file_result = item
|
|
break
|
|
|
|
if not file_result:
|
|
raise HTTPException(
|
|
status.HTTP_502_BAD_GATEWAY,
|
|
detail=f"File {filename} not found in batch results",
|
|
)
|
|
|
|
state = file_result.get("state")
|
|
|
|
if state == "done":
|
|
log.info(f"Processing complete for {filename}")
|
|
return file_result
|
|
elif state == "failed":
|
|
error_msg = file_result.get("err_msg", "Unknown error")
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST,
|
|
detail=f"MinerU processing failed: {error_msg}",
|
|
)
|
|
elif state in ["waiting-file", "pending", "running", "converting"]:
|
|
# Still processing
|
|
if iteration % 10 == 0: # Log every 20 seconds
|
|
log.info(
|
|
f"Processing status: {state} (iteration {iteration + 1}/{max_iterations})"
|
|
)
|
|
time.sleep(poll_interval)
|
|
else:
|
|
log.warning(f"Unknown state: {state}")
|
|
time.sleep(poll_interval)
|
|
|
|
# Timeout
|
|
raise HTTPException(
|
|
status.HTTP_504_GATEWAY_TIMEOUT,
|
|
detail="MinerU processing timed out after 10 minutes",
|
|
)
|
|
|
|
def _download_and_extract_zip(self, zip_url: str, filename: str) -> str:
|
|
"""
|
|
Download ZIP file from CDN and extract markdown content.
|
|
Returns the markdown content as a string.
|
|
"""
|
|
log.info(f"Downloading results from: {zip_url}")
|
|
|
|
try:
|
|
response = requests.get(zip_url, timeout=60)
|
|
response.raise_for_status()
|
|
except requests.HTTPError as e:
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Failed to download results ZIP: {e}",
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error downloading results: {str(e)}",
|
|
)
|
|
|
|
# Save ZIP to temporary file and extract
|
|
try:
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_zip:
|
|
tmp_zip.write(response.content)
|
|
tmp_zip_path = tmp_zip.name
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
# Extract ZIP
|
|
with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref:
|
|
zip_ref.extractall(tmp_dir)
|
|
|
|
# Find markdown file - search recursively for any .md file
|
|
markdown_content = None
|
|
found_md_path = None
|
|
|
|
# First, list all files in the ZIP for debugging
|
|
all_files = []
|
|
for root, dirs, files in os.walk(tmp_dir):
|
|
for file in files:
|
|
full_path = os.path.join(root, file)
|
|
all_files.append(full_path)
|
|
# Look for any .md file
|
|
if file.endswith(".md"):
|
|
found_md_path = full_path
|
|
log.info(f"Found markdown file at: {full_path}")
|
|
try:
|
|
with open(full_path, "r", encoding="utf-8") as f:
|
|
markdown_content = f.read()
|
|
if (
|
|
markdown_content
|
|
): # Use the first non-empty markdown file
|
|
break
|
|
except Exception as e:
|
|
log.warning(f"Failed to read {full_path}: {e}")
|
|
if markdown_content:
|
|
break
|
|
|
|
if markdown_content is None:
|
|
log.error(f"Available files in ZIP: {all_files}")
|
|
# Try to provide more helpful error message
|
|
md_files = [f for f in all_files if f.endswith(".md")]
|
|
if md_files:
|
|
error_msg = (
|
|
f"Found .md files but couldn't read them: {md_files}"
|
|
)
|
|
else:
|
|
error_msg = (
|
|
f"No .md files found in ZIP. Available files: {all_files}"
|
|
)
|
|
raise HTTPException(
|
|
status.HTTP_502_BAD_GATEWAY,
|
|
detail=error_msg,
|
|
)
|
|
|
|
# Clean up temporary ZIP file
|
|
os.unlink(tmp_zip_path)
|
|
|
|
except zipfile.BadZipFile as e:
|
|
raise HTTPException(
|
|
status.HTTP_502_BAD_GATEWAY,
|
|
detail=f"Invalid ZIP file received: {e}",
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error extracting ZIP: {str(e)}",
|
|
)
|
|
|
|
if not markdown_content:
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST,
|
|
detail="Extracted markdown content is empty",
|
|
)
|
|
|
|
log.info(
|
|
f"Successfully extracted markdown content ({len(markdown_content)} characters)"
|
|
)
|
|
return markdown_content
|