localGPT/backend/server.py

1142 lines
50 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import http.server
import socketserver
import cgi
import os
import uuid
from urllib.parse import urlparse, parse_qs
import requests # 🆕 Import requests for making HTTP calls
import sys
from datetime import datetime
# Add parent directory to path so we can import rag_system modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import RAG system modules for complete metadata
try:
from rag_system.main import PIPELINE_CONFIGS
RAG_SYSTEM_AVAILABLE = True
print("✅ RAG system modules accessible from backend")
except ImportError as e:
PIPELINE_CONFIGS = {}
RAG_SYSTEM_AVAILABLE = False
print(f"⚠️ RAG system modules not available: {e}")
from ollama_client import OllamaClient
from database import db, generate_session_title
import simple_pdf_processor as pdf_module
from simple_pdf_processor import initialize_simple_pdf_processor
from typing import List, Dict, Any
import re
# 🆕 Reusable TCPServer with address reuse enabled
class ReusableTCPServer(socketserver.TCPServer):
allow_reuse_address = True
class ChatHandler(http.server.BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.ollama_client = OllamaClient()
super().__init__(*args, **kwargs)
def do_OPTIONS(self):
"""Handle CORS preflight requests"""
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_GET(self):
"""Handle GET requests"""
parsed_path = urlparse(self.path)
if parsed_path.path == '/health':
self.send_json_response({
"status": "ok",
"ollama_running": self.ollama_client.is_ollama_running(),
"available_models": self.ollama_client.list_models(),
"database_stats": db.get_stats()
})
elif parsed_path.path == '/sessions':
self.handle_get_sessions()
elif parsed_path.path == '/sessions/cleanup':
self.handle_cleanup_sessions()
elif parsed_path.path == '/models':
self.handle_get_models()
elif parsed_path.path == '/indexes':
self.handle_get_indexes()
elif parsed_path.path.startswith('/indexes/') and parsed_path.path.count('/') == 2:
index_id = parsed_path.path.split('/')[-1]
self.handle_get_index(index_id)
elif parsed_path.path.startswith('/sessions/') and parsed_path.path.endswith('/documents'):
session_id = parsed_path.path.split('/')[-2]
self.handle_get_session_documents(session_id)
elif parsed_path.path.startswith('/sessions/') and parsed_path.path.endswith('/indexes'):
session_id = parsed_path.path.split('/')[-2]
self.handle_get_session_indexes(session_id)
elif parsed_path.path.startswith('/sessions/') and parsed_path.path.count('/') == 2:
session_id = parsed_path.path.split('/')[-1]
self.handle_get_session(session_id)
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
"""Handle POST requests"""
parsed_path = urlparse(self.path)
if parsed_path.path == '/chat':
self.handle_chat()
elif parsed_path.path == '/sessions':
self.handle_create_session()
elif parsed_path.path == '/indexes':
self.handle_create_index()
elif parsed_path.path.startswith('/indexes/') and parsed_path.path.endswith('/upload'):
index_id = parsed_path.path.split('/')[-2]
self.handle_index_file_upload(index_id)
elif parsed_path.path.startswith('/indexes/') and parsed_path.path.endswith('/build'):
index_id = parsed_path.path.split('/')[-2]
self.handle_build_index(index_id)
elif parsed_path.path.startswith('/sessions/') and '/indexes/' in parsed_path.path:
parts = parsed_path.path.split('/')
session_id = parts[2]
index_id = parts[4]
self.handle_link_index_to_session(session_id, index_id)
elif parsed_path.path.startswith('/sessions/') and parsed_path.path.endswith('/messages'):
session_id = parsed_path.path.split('/')[-2]
self.handle_session_chat(session_id)
elif parsed_path.path.startswith('/sessions/') and parsed_path.path.endswith('/upload'):
session_id = parsed_path.path.split('/')[-2]
self.handle_file_upload(session_id)
elif parsed_path.path.startswith('/sessions/') and parsed_path.path.endswith('/index'):
session_id = parsed_path.path.split('/')[-2]
self.handle_index_documents(session_id)
elif parsed_path.path.startswith('/sessions/') and parsed_path.path.endswith('/rename'):
session_id = parsed_path.path.split('/')[-2]
self.handle_rename_session(session_id)
else:
self.send_response(404)
self.end_headers()
def do_DELETE(self):
"""Handle DELETE requests"""
parsed_path = urlparse(self.path)
if parsed_path.path.startswith('/sessions/') and parsed_path.path.count('/') == 2:
session_id = parsed_path.path.split('/')[-1]
self.handle_delete_session(session_id)
elif parsed_path.path.startswith('/indexes/') and parsed_path.path.count('/') == 2:
index_id = parsed_path.path.split('/')[-1]
self.handle_delete_index(index_id)
else:
self.send_response(404)
self.end_headers()
def handle_chat(self):
"""Handle legacy chat requests (without sessions)"""
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode('utf-8'))
message = data.get('message', '')
model = data.get('model', 'llama3.2:latest')
conversation_history = data.get('conversation_history', [])
if not message:
self.send_json_response({
"error": "Message is required"
}, status_code=400)
return
# Check if Ollama is running
if not self.ollama_client.is_ollama_running():
self.send_json_response({
"error": "Ollama is not running. Please start Ollama first."
}, status_code=503)
return
# Get response from Ollama
response = self.ollama_client.chat(message, model, conversation_history)
self.send_json_response({
"response": response,
"model": model,
"message_count": len(conversation_history) + 1
})
except json.JSONDecodeError:
self.send_json_response({
"error": "Invalid JSON"
}, status_code=400)
except Exception as e:
self.send_json_response({
"error": f"Server error: {str(e)}"
}, status_code=500)
def handle_get_sessions(self):
"""Get all chat sessions"""
try:
sessions = db.get_sessions()
self.send_json_response({
"sessions": sessions,
"total": len(sessions)
})
except Exception as e:
self.send_json_response({
"error": f"Failed to get sessions: {str(e)}"
}, status_code=500)
def handle_cleanup_sessions(self):
"""Clean up empty sessions"""
try:
cleanup_count = db.cleanup_empty_sessions()
self.send_json_response({
"message": f"Cleaned up {cleanup_count} empty sessions",
"cleanup_count": cleanup_count
})
except Exception as e:
self.send_json_response({
"error": f"Failed to cleanup sessions: {str(e)}"
}, status_code=500)
def handle_get_session(self, session_id: str):
"""Get a specific session with its messages"""
try:
session = db.get_session(session_id)
if not session:
self.send_json_response({
"error": "Session not found"
}, status_code=404)
return
messages = db.get_messages(session_id)
self.send_json_response({
"session": session,
"messages": messages
})
except Exception as e:
self.send_json_response({
"error": f"Failed to get session: {str(e)}"
}, status_code=500)
def handle_get_session_documents(self, session_id: str):
"""Return documents and basic info for a session."""
try:
session = db.get_session(session_id)
if not session:
self.send_json_response({"error": "Session not found"}, status_code=404)
return
docs = db.get_documents_for_session(session_id)
# Extract original filenames from stored paths
filenames = [os.path.basename(p).split('_', 1)[-1] if '_' in os.path.basename(p) else os.path.basename(p) for p in docs]
self.send_json_response({
"session": session,
"files": filenames,
"file_count": len(docs)
})
except Exception as e:
self.send_json_response({"error": f"Failed to get documents: {str(e)}"}, status_code=500)
def handle_create_session(self):
"""Create a new chat session"""
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode('utf-8'))
title = data.get('title', 'New Chat')
model = data.get('model', 'llama3.2:latest')
session_id = db.create_session(title, model)
session = db.get_session(session_id)
self.send_json_response({
"session": session,
"session_id": session_id
}, status_code=201)
except json.JSONDecodeError:
self.send_json_response({
"error": "Invalid JSON"
}, status_code=400)
except Exception as e:
self.send_json_response({
"error": f"Failed to create session: {str(e)}"
}, status_code=500)
def handle_session_chat(self, session_id: str):
"""
Handle chat within a specific session.
Intelligently routes between direct LLM (fast) and RAG pipeline (document-aware).
"""
try:
session = db.get_session(session_id)
if not session:
self.send_json_response({"error": "Session not found"}, status_code=404)
return
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode('utf-8'))
message = data.get('message', '')
if not message:
self.send_json_response({"error": "Message is required"}, status_code=400)
return
if session['message_count'] == 0:
title = generate_session_title(message)
db.update_session_title(session_id, title)
# Add user message to database first
user_message_id = db.add_message(session_id, message, "user")
# 🎯 SMART ROUTING: Decide between direct LLM vs RAG
idx_ids = db.get_indexes_for_session(session_id)
force_rag = bool(data.get("force_rag", False))
use_rag = True if force_rag else self._should_use_rag(message, idx_ids)
if use_rag:
# 🔍 --- Use RAG Pipeline for Document-Related Queries ---
print(f"🔍 Using RAG pipeline for document query: '{message[:50]}...'")
response_text, source_docs = self._handle_rag_query(session_id, message, data, idx_ids)
else:
# ⚡ --- Use Direct LLM for General Queries (FAST) ---
print(f"⚡ Using direct LLM for general query: '{message[:50]}...'")
response_text, source_docs = self._handle_direct_llm_query(session_id, message, session)
# Add AI response to database
ai_message_id = db.add_message(session_id, response_text, "assistant")
updated_session = db.get_session(session_id)
# Send response with proper error handling
self.send_json_response({
"response": response_text,
"session": updated_session,
"source_documents": source_docs,
"used_rag": use_rag
})
except BrokenPipeError:
# Client disconnected - this is normal for long queries, just log it
print(f"⚠️ Client disconnected during RAG processing for query: '{message[:30]}...'")
except json.JSONDecodeError:
self.send_json_response({
"error": "Invalid JSON"
}, status_code=400)
except Exception as e:
print(f"❌ Server error in session chat: {str(e)}")
try:
self.send_json_response({
"error": f"Server error: {str(e)}"
}, status_code=500)
except BrokenPipeError:
print(f"⚠️ Client disconnected during error response")
def _should_use_rag(self, message: str, idx_ids: List[str]) -> bool:
"""
🧠 ENHANCED: Determine if a query should use RAG pipeline using document overviews.
Args:
message: The user's query
idx_ids: List of index IDs associated with the session
Returns:
bool: True if should use RAG, False for direct LLM
"""
# No indexes = definitely no RAG needed
if not idx_ids:
return False
# Load document overviews for intelligent routing
try:
doc_overviews = self._load_document_overviews(idx_ids)
if doc_overviews:
return self._route_using_overviews(message, doc_overviews)
except Exception as e:
print(f"⚠️ Overview-based routing failed, falling back to simple routing: {e}")
# Fallback to simple pattern matching if overviews unavailable
return self._simple_pattern_routing(message, idx_ids)
def _load_document_overviews(self, idx_ids: List[str]) -> List[str]:
"""Load and aggregate overviews for the given index IDs.
Strategy:
1. Attempt to load each index's dedicated overview file.
2. Aggregate all overviews found across available files (deduplicated).
3. If none of the index files exist, fall back to the legacy global overview file.
"""
import os, json
aggregated: list[str] = []
# 1⃣ Collect overviews from per-index files
for idx in idx_ids:
candidate_paths = [
f"../index_store/overviews/{idx}.jsonl",
f"index_store/overviews/{idx}.jsonl",
f"./index_store/overviews/{idx}.jsonl",
]
for p in candidate_paths:
if os.path.exists(p):
print(f"📖 Loading overviews from: {p}")
try:
with open(p, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
record = json.loads(line)
overview = record.get("overview", "").strip()
if overview:
aggregated.append(overview)
except json.JSONDecodeError:
continue # skip malformed lines
break # Stop after the first existing path for this idx
except Exception as e:
print(f"⚠️ Error reading {p}: {e}")
break # Don't keep trying other paths for this idx if read failed
# 2⃣ Fall back to legacy global file if no per-index overviews found
if not aggregated:
legacy_paths = [
"../index_store/overviews/overviews.jsonl",
"index_store/overviews/overviews.jsonl",
"./index_store/overviews/overviews.jsonl",
]
for p in legacy_paths:
if os.path.exists(p):
print(f"⚠️ Falling back to legacy overviews file: {p}")
try:
with open(p, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
record = json.loads(line)
overview = record.get("overview", "").strip()
if overview:
aggregated.append(overview)
except json.JSONDecodeError:
continue
except Exception as e:
print(f"⚠️ Error reading legacy overviews file {p}: {e}")
break
# Limit for performance
if aggregated:
print(f"✅ Loaded {len(aggregated)} document overviews from {len(idx_ids)} index(es)")
else:
print(f"⚠️ No overviews found for indices {idx_ids}")
return aggregated[:40]
def _route_using_overviews(self, query: str, overviews: List[str]) -> bool:
"""
🎯 Use document overviews and LLM to make intelligent routing decisions.
Returns True if RAG should be used, False for direct LLM.
"""
if not overviews:
return False
# Format overviews for the routing prompt
overviews_block = "\n".join(f"[{i+1}] {ov}" for i, ov in enumerate(overviews))
router_prompt = f"""You are an AI router deciding whether a user question should be answered via:
"USE_RAG" search the user's private documents (described below)
"DIRECT_LLM" reply from general knowledge (greetings, public facts, unrelated topics)
CRITICAL PRINCIPLE: When documents exist in the KB, strongly prefer USE_RAG unless the query is purely conversational or completely unrelated to any possible document content.
RULES:
1. If ANY overview clearly relates to the question (entities, numbers, addresses, dates, amounts, companies, technical terms) → USE_RAG
2. For document operations (summarize, analyze, explain, extract, find) → USE_RAG
3. For greetings only ("Hi", "Hello", "Thanks") → DIRECT_LLM
4. For pure math/world knowledge clearly unrelated to documents → DIRECT_LLM
5. When in doubt → USE_RAG
DOCUMENT OVERVIEWS:
{overviews_block}
DECISION EXAMPLES:
"What invoice amounts are mentioned?" → USE_RAG (document-specific)
"Who is PromptX AI LLC?" → USE_RAG (entity in documents)
"What is the DeepSeek model?" → USE_RAG (mentioned in documents)
"Summarize the research paper" → USE_RAG (document operation)
"What is 2+2?" → DIRECT_LLM (pure math)
"Hi there" → DIRECT_LLM (greeting only)
USER QUERY: "{query}"
Respond with exactly one word: USE_RAG or DIRECT_LLM"""
try:
# Use Ollama to make the routing decision
response = self.ollama_client.chat(
message=router_prompt,
model="qwen3:0.6b", # Fast model for routing
enable_thinking=False # Fast routing
)
# The response is directly the text, not a dict
decision = response.strip().upper()
# Parse decision
if "USE_RAG" in decision:
print(f"🎯 Overview-based routing: USE_RAG for query: '{query[:50]}...'")
return True
elif "DIRECT_LLM" in decision:
print(f"⚡ Overview-based routing: DIRECT_LLM for query: '{query[:50]}...'")
return False
else:
print(f"⚠️ Unclear routing decision '{decision}', defaulting to RAG")
return True # Default to RAG when uncertain
except Exception as e:
print(f"❌ LLM routing failed: {e}, falling back to pattern matching")
return self._simple_pattern_routing(query, [])
def _simple_pattern_routing(self, message: str, idx_ids: List[str]) -> bool:
"""
📝 FALLBACK: Simple pattern-based routing (original logic).
"""
message_lower = message.lower()
# Always use Direct LLM for greetings and casual conversation
greeting_patterns = [
'hello', 'hi', 'hey', 'greetings', 'good morning', 'good afternoon', 'good evening',
'how are you', 'how do you do', 'nice to meet', 'pleasure to meet',
'thanks', 'thank you', 'bye', 'goodbye', 'see you', 'talk to you later',
'test', 'testing', 'check', 'ping', 'just saying', 'nevermind',
'ok', 'okay', 'alright', 'got it', 'understood', 'i see'
]
# Check for greeting patterns
for pattern in greeting_patterns:
if pattern in message_lower:
return False # Use Direct LLM for greetings
# Keywords that strongly suggest document-related queries
rag_indicators = [
'document', 'doc', 'file', 'pdf', 'text', 'content', 'page',
'according to', 'based on', 'mentioned', 'states', 'says',
'what does', 'summarize', 'summary', 'analyze', 'analysis',
'quote', 'citation', 'reference', 'source', 'evidence',
'explain from', 'extract', 'find in', 'search for'
]
# Check for strong RAG indicators
for indicator in rag_indicators:
if indicator in message_lower:
return True
# Question words + substantial length might benefit from RAG
question_words = ['what', 'how', 'when', 'where', 'why', 'who', 'which']
starts_with_question = any(message_lower.startswith(word) for word in question_words)
if starts_with_question and len(message) > 40:
return True
# Very short messages - use direct LLM
if len(message.strip()) < 20:
return False
# Default to Direct LLM unless there's clear indication of document query
return False
def _handle_direct_llm_query(self, session_id: str, message: str, session: dict):
"""
Handle query using direct Ollama client with thinking disabled for speed.
Returns:
tuple: (response_text, empty_source_docs)
"""
try:
# Get conversation history for context
conversation_history = db.get_conversation_history(session_id)
# Use the session's model or default
model = session.get('model', 'qwen3:8b') # Default to fast model
# Direct Ollama call with thinking disabled for speed
response_text = self.ollama_client.chat(
message=message,
model=model,
conversation_history=conversation_history,
enable_thinking=False # ⚡ DISABLE THINKING FOR SPEED
)
return response_text, [] # No source docs for direct LLM
except Exception as e:
print(f"❌ Direct LLM error: {e}")
return f"Error processing query: {str(e)}", []
def _handle_rag_query(self, session_id: str, message: str, data: dict, idx_ids: List[str]):
"""
Handle query using the full RAG pipeline (delegates to the advanced RAG API running on port 8001).
Returns:
tuple[str, List[dict]]: (response_text, source_documents)
"""
# Defaults
response_text = ""
source_docs: List[dict] = []
# Build payload for RAG API
rag_api_url = "http://localhost:8001/chat"
table_name = f"text_pages_{idx_ids[-1]}" if idx_ids else None
payload: Dict[str, Any] = {
"query": message,
"session_id": session_id,
}
if table_name:
payload["table_name"] = table_name
# Copy optional parameters from the incoming request
optional_params: Dict[str, tuple[type, str]] = {
"compose_sub_answers": (bool, "compose_sub_answers"),
"query_decompose": (bool, "query_decompose"),
"ai_rerank": (bool, "ai_rerank"),
"context_expand": (bool, "context_expand"),
"verify": (bool, "verify"),
"retrieval_k": (int, "retrieval_k"),
"context_window_size": (int, "context_window_size"),
"reranker_top_k": (int, "reranker_top_k"),
"search_type": (str, "search_type"),
"dense_weight": (float, "dense_weight"),
"provence_prune": (bool, "provence_prune"),
"provence_threshold": (float, "provence_threshold"),
}
for key, (caster, payload_key) in optional_params.items():
val = data.get(key)
if val is not None:
try:
payload[payload_key] = caster(val) # type: ignore[arg-type]
except Exception:
payload[payload_key] = val
try:
rag_response = requests.post(rag_api_url, json=payload)
if rag_response.status_code == 200:
rag_data = rag_response.json()
response_text = rag_data.get("answer", "No answer found.")
source_docs = rag_data.get("source_documents", [])
else:
response_text = f"Error from RAG API ({rag_response.status_code}): {rag_response.text}"
print(f"❌ RAG API error: {response_text}")
except requests.exceptions.ConnectionError:
response_text = "Could not connect to the RAG API server. Please ensure it is running."
print("❌ Connection to RAG API failed (port 8001).")
except Exception as e:
response_text = f"Error processing RAG query: {str(e)}"
print(f"❌ RAG processing error: {e}")
# Strip any <think>/<thinking> tags that might slip through
response_text = re.sub(r'<(think|thinking)>.*?</\\1>', '', response_text, flags=re.DOTALL | re.IGNORECASE).strip()
return response_text, source_docs
def handle_delete_session(self, session_id: str):
"""Delete a session and its messages"""
try:
deleted = db.delete_session(session_id)
if deleted:
self.send_json_response({'deleted': deleted})
else:
self.send_json_response({'error': 'Session not found'}, status_code=404)
except Exception as e:
self.send_json_response({'error': str(e)}, status_code=500)
def handle_file_upload(self, session_id: str):
"""Handle file uploads, save them, and associate with the session."""
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers['Content-Type']}
)
uploaded_files = []
if 'files' in form:
files = form['files']
if not isinstance(files, list):
files = [files]
upload_dir = "shared_uploads"
os.makedirs(upload_dir, exist_ok=True)
for file_item in files:
if file_item.filename:
# Create a unique filename to avoid overwrites
unique_filename = f"{uuid.uuid4()}_{file_item.filename}"
file_path = os.path.join(upload_dir, unique_filename)
with open(file_path, 'wb') as f:
f.write(file_item.file.read())
# Store the absolute path for the indexing service
absolute_file_path = os.path.abspath(file_path)
db.add_document_to_session(session_id, absolute_file_path)
uploaded_files.append({"filename": file_item.filename, "stored_path": absolute_file_path})
if not uploaded_files:
self.send_json_response({"error": "No files were uploaded"}, status_code=400)
return
self.send_json_response({
"message": f"Successfully uploaded {len(uploaded_files)} files.",
"uploaded_files": uploaded_files
})
def handle_index_documents(self, session_id: str):
"""Triggers indexing for all documents in a session."""
print(f"🔥 Received request to index documents for session {session_id[:8]}...")
try:
file_paths = db.get_documents_for_session(session_id)
if not file_paths:
self.send_json_response({"message": "No documents to index for this session."}, status_code=200)
return
print(f"Found {len(file_paths)} documents to index. Sending to RAG API...")
rag_api_url = "http://localhost:8001/index"
rag_response = requests.post(rag_api_url, json={"file_paths": file_paths, "session_id": session_id})
if rag_response.status_code == 200:
print("✅ RAG API successfully indexed documents.")
# Merge key config values into index metadata
idx_meta = {
"session_linked": True,
"retrieval_mode": "hybrid",
}
try:
db.update_index_metadata(session_id, idx_meta) # session_id used as index_id in text table naming
except Exception as e:
print(f"⚠️ Failed to update index metadata for session index: {e}")
self.send_json_response(rag_response.json())
else:
error_info = rag_response.text
print(f"❌ RAG API indexing failed ({rag_response.status_code}): {error_info}")
self.send_json_response({"error": f"Indexing failed: {error_info}"}, status_code=500)
except Exception as e:
print(f"❌ Exception during indexing: {str(e)}")
self.send_json_response({"error": f"An unexpected error occurred: {str(e)}"}, status_code=500)
def handle_pdf_upload(self, session_id: str):
"""
Processes PDF files: extracts text and stores it in the database.
DEPRECATED: This is the old method. Use handle_file_upload instead.
"""
# This function is now deprecated in favor of the new indexing workflow
# but is kept for potential legacy/compatibility reasons.
# For new functionality, it should not be used.
self.send_json_response({
"warning": "This upload method is deprecated. Use the new file upload and indexing flow.",
"message": "No action taken."
}, status_code=410) # 410 Gone
def handle_get_models(self):
"""Get available models from both Ollama and HuggingFace, grouped by capability"""
try:
generation_models = []
embedding_models = []
# Get Ollama models if available
if self.ollama_client.is_ollama_running():
all_ollama_models = self.ollama_client.list_models()
# Very naive classification - same logic as RAG API server
ollama_embedding_models = [m for m in all_ollama_models if any(k in m for k in ['embed','bge','embedding','text'])]
ollama_generation_models = [m for m in all_ollama_models if m not in ollama_embedding_models]
generation_models.extend(ollama_generation_models)
embedding_models.extend(ollama_embedding_models)
# Add supported HuggingFace embedding models
huggingface_embedding_models = [
"Qwen/Qwen3-Embedding-0.6B",
"Qwen/Qwen3-Embedding-4B",
"Qwen/Qwen3-Embedding-8B"
]
embedding_models.extend(huggingface_embedding_models)
# Sort models for consistent ordering
generation_models.sort()
embedding_models.sort()
self.send_json_response({
"generation_models": generation_models,
"embedding_models": embedding_models
})
except Exception as e:
self.send_json_response({
"error": f"Could not list models: {str(e)}"
}, status_code=500)
def handle_get_indexes(self):
try:
data = db.list_indexes()
self.send_json_response({'indexes': data, 'total': len(data)})
except Exception as e:
self.send_json_response({'error': str(e)}, status_code=500)
def handle_get_index(self, index_id: str):
try:
data = db.get_index(index_id)
if not data:
self.send_json_response({'error': 'Index not found'}, status_code=404)
return
self.send_json_response(data)
except Exception as e:
self.send_json_response({'error': str(e)}, status_code=500)
def handle_create_index(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode('utf-8'))
name = data.get('name')
description = data.get('description')
metadata = data.get('metadata', {})
if not name:
self.send_json_response({'error': 'Name required'}, status_code=400)
return
# Add complete metadata from RAG system configuration if available
if RAG_SYSTEM_AVAILABLE and PIPELINE_CONFIGS.get('default'):
default_config = PIPELINE_CONFIGS['default']
complete_metadata = {
'status': 'created',
'metadata_source': 'rag_system_config',
'created_at': json.loads(json.dumps(datetime.now().isoformat())),
'chunk_size': 512, # From default config
'chunk_overlap': 64, # From default config
'retrieval_mode': 'hybrid', # From default config
'window_size': 5, # From default config
'embedding_model': 'Qwen/Qwen3-Embedding-0.6B', # From default config
'enrich_model': 'qwen3:0.6b', # From default config
'overview_model': 'qwen3:0.6b', # From default config
'enable_enrich': True, # From default config
'latechunk': True, # From default config
'docling_chunk': True, # From default config
'note': 'Default configuration from RAG system'
}
# Merge with any provided metadata
complete_metadata.update(metadata)
metadata = complete_metadata
idx_id = db.create_index(name, description, metadata)
self.send_json_response({'index_id': idx_id}, status_code=201)
except Exception as e:
self.send_json_response({'error': str(e)}, status_code=500)
def handle_index_file_upload(self, index_id: str):
"""Reuse file upload logic but store docs under index."""
form = cgi.FieldStorage(fp=self.rfile, headers=self.headers, environ={'REQUEST_METHOD':'POST', 'CONTENT_TYPE': self.headers['Content-Type']})
uploaded_files=[]
if 'files' in form:
files=form['files']
if not isinstance(files, list):
files=[files]
upload_dir='shared_uploads'
os.makedirs(upload_dir, exist_ok=True)
for f in files:
if f.filename:
unique=f"{uuid.uuid4()}_{f.filename}"
path=os.path.join(upload_dir, unique)
with open(path,'wb') as out: out.write(f.file.read())
db.add_document_to_index(index_id, f.filename, os.path.abspath(path))
uploaded_files.append({'filename':f.filename,'stored_path':os.path.abspath(path)})
if not uploaded_files:
self.send_json_response({'error':'No files uploaded'}, status_code=400); return
self.send_json_response({'message':f"Uploaded {len(uploaded_files)} files","uploaded_files":uploaded_files})
def handle_build_index(self, index_id: str):
try:
index=db.get_index(index_id)
if not index:
self.send_json_response({'error':'Index not found'}, status_code=404); return
file_paths=[d['stored_path'] for d in index.get('documents',[])]
if not file_paths:
self.send_json_response({'error':'No documents to index'}, status_code=400); return
# Parse request body for optional flags and configuration
latechunk = False
docling_chunk = False
chunk_size = 512
chunk_overlap = 64
retrieval_mode = 'hybrid'
window_size = 2
enable_enrich = True
embedding_model = None
enrich_model = None
batch_size_embed = 50
batch_size_enrich = 25
overview_model = None
if 'Content-Length' in self.headers and int(self.headers['Content-Length']) > 0:
try:
length = int(self.headers['Content-Length'])
body = self.rfile.read(length)
opts = json.loads(body.decode('utf-8'))
latechunk = bool(opts.get('latechunk', False))
docling_chunk = bool(opts.get('doclingChunk', False))
chunk_size = int(opts.get('chunkSize', 512))
chunk_overlap = int(opts.get('chunkOverlap', 64))
retrieval_mode = str(opts.get('retrievalMode', 'hybrid'))
window_size = int(opts.get('windowSize', 2))
enable_enrich = bool(opts.get('enableEnrich', True))
embedding_model = opts.get('embeddingModel')
enrich_model = opts.get('enrichModel')
batch_size_embed = int(opts.get('batchSizeEmbed', 50))
batch_size_enrich = int(opts.get('batchSizeEnrich', 25))
overview_model = opts.get('overviewModel')
except Exception:
# Keep defaults on parse error
pass
# Set per-index overview file path
overview_path = f"index_store/overviews/{index_id}.jsonl"
# Ensure config_override includes overview_path
def ensure_overview_path(cfg: dict):
cfg["overview_path"] = overview_path
# we'll inject later when we build config_override
# Delegate to advanced RAG API same as session indexing
rag_api_url = "http://localhost:8001/index"
import requests, json as _json
# Use the index's dedicated LanceDB table so retrieval matches
table_name = index.get("vector_table_name")
payload = {
"file_paths": file_paths,
"session_id": index_id, # reuse index_id for progress tracking
"table_name": table_name,
"chunk_size": chunk_size,
"chunk_overlap": chunk_overlap,
"retrieval_mode": retrieval_mode,
"window_size": window_size,
"enable_enrich": enable_enrich,
"batch_size_embed": batch_size_embed,
"batch_size_enrich": batch_size_enrich
}
if latechunk:
payload["enable_latechunk"] = True
if docling_chunk:
payload["enable_docling_chunk"] = True
if embedding_model:
payload["embedding_model"] = embedding_model
if enrich_model:
payload["enrich_model"] = enrich_model
if overview_model:
payload["overview_model_name"] = overview_model
rag_resp = requests.post(rag_api_url, json=payload)
if rag_resp.status_code==200:
meta_updates = {
"chunk_size": chunk_size,
"chunk_overlap": chunk_overlap,
"retrieval_mode": retrieval_mode,
"window_size": window_size,
"enable_enrich": enable_enrich,
"latechunk": latechunk,
"docling_chunk": docling_chunk,
}
if embedding_model:
meta_updates["embedding_model"] = embedding_model
if enrich_model:
meta_updates["enrich_model"] = enrich_model
if overview_model:
meta_updates["overview_model"] = overview_model
try:
db.update_index_metadata(index_id, meta_updates)
except Exception as e:
print(f"⚠️ Failed to update index metadata: {e}")
self.send_json_response({
"response": rag_resp.json(),
**meta_updates
})
else:
# Gracefully handle scenario where table already exists (idempotent build)
try:
err_json = rag_resp.json()
except Exception:
err_json = {}
err_text = err_json.get('error') if isinstance(err_json, dict) else rag_resp.text
if err_text and 'already exists' in err_text:
# Treat as non-fatal; return message indicating index previously built
self.send_json_response({
"message": "Index already built skipping rebuild.",
"note": err_text
})
else:
self.send_json_response({"error":f"RAG indexing failed: {rag_resp.text}"}, status_code=500)
except Exception as e:
self.send_json_response({'error':str(e)}, status_code=500)
def handle_link_index_to_session(self, session_id: str, index_id: str):
try:
db.link_index_to_session(session_id, index_id)
self.send_json_response({'message':'Index linked to session'})
except Exception as e:
self.send_json_response({'error':str(e)}, status_code=500)
def handle_get_session_indexes(self, session_id: str):
try:
idx_ids = db.get_indexes_for_session(session_id)
indexes = []
for idx_id in idx_ids:
idx = db.get_index(idx_id)
if idx:
# Try to populate metadata for older indexes that have empty metadata
if not idx.get('metadata') or len(idx['metadata']) == 0:
print(f"🔍 Attempting to infer metadata for index {idx_id[:8]}...")
inferred_metadata = db.inspect_and_populate_index_metadata(idx_id)
if inferred_metadata:
# Refresh the index data with the new metadata
idx = db.get_index(idx_id)
indexes.append(idx)
self.send_json_response({'indexes': indexes, 'total': len(indexes)})
except Exception as e:
self.send_json_response({'error': str(e)}, status_code=500)
def handle_delete_index(self, index_id: str):
"""Remove an index, its documents, links, and the underlying LanceDB table."""
try:
deleted = db.delete_index(index_id)
if deleted:
self.send_json_response({'message': 'Index deleted successfully', 'index_id': index_id})
else:
self.send_json_response({'error': 'Index not found'}, status_code=404)
except Exception as e:
self.send_json_response({'error': str(e)}, status_code=500)
def handle_rename_session(self, session_id: str):
"""Rename an existing session title"""
try:
session = db.get_session(session_id)
if not session:
self.send_json_response({"error": "Session not found"}, status_code=404)
return
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_json_response({"error": "Request body required"}, status_code=400)
return
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode('utf-8'))
new_title: str = data.get('title', '').strip()
if not new_title:
self.send_json_response({"error": "Title cannot be empty"}, status_code=400)
return
db.update_session_title(session_id, new_title)
updated_session = db.get_session(session_id)
self.send_json_response({
"message": "Session renamed successfully",
"session": updated_session
})
except json.JSONDecodeError:
self.send_json_response({"error": "Invalid JSON"}, status_code=400)
except Exception as e:
self.send_json_response({"error": f"Failed to rename session: {str(e)}"}, status_code=500)
def send_json_response(self, data, status_code: int = 200):
"""Send a JSON (UTF-8) response with CORS headers. Safe against client disconnects."""
try:
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization')
self.send_header('Access-Control-Allow-Credentials', 'true')
self.end_headers()
response_bytes = json.dumps(data, indent=2).encode('utf-8')
self.wfile.write(response_bytes)
except BrokenPipeError:
# Client disconnected before we could finish sending
print("⚠️ Client disconnected during response ignoring.")
except Exception as e:
print(f"❌ Error sending response: {e}")
def log_message(self, format, *args):
"""Custom log format"""
print(f"[{self.date_time_string()}] {format % args}")
def main():
"""Main function to initialize and start the server"""
PORT = 8000 # 🆕 Define port
try:
# Initialize the database
print("✅ Database initialized successfully")
# Initialize the PDF processor
try:
pdf_module.initialize_simple_pdf_processor()
print("📄 Initializing simple PDF processing...")
if pdf_module.simple_pdf_processor:
print("✅ Simple PDF processor initialized")
else:
print("⚠️ PDF processing could not be initialized.")
except Exception as e:
print(f"❌ Error initializing PDF processor: {e}")
print("⚠️ PDF processing disabled - server will run without RAG functionality")
# Set a global reference to the initialized processor if needed elsewhere
global pdf_processor
pdf_processor = pdf_module.simple_pdf_processor
if pdf_processor:
print("✅ Global PDF processor initialized")
else:
print("⚠️ PDF processing disabled - server will run without RAG functionality")
# Cleanup empty sessions on startup
print("🧹 Cleaning up empty sessions...")
cleanup_count = db.cleanup_empty_sessions()
if cleanup_count > 0:
print(f"✨ Cleaned up {cleanup_count} empty sessions")
else:
print("✨ No empty sessions to clean up")
# Start the server
with ReusableTCPServer(("", PORT), ChatHandler) as httpd:
print(f"🚀 Starting localGPT backend server on port {PORT}")
print(f"📍 Chat endpoint: http://localhost:{PORT}/chat")
print(f"🔍 Health check: http://localhost:{PORT}/health")
# Test Ollama connection
client = OllamaClient()
if client.is_ollama_running():
models = client.list_models()
print(f"✅ Ollama is running with {len(models)} models")
print(f"📋 Available models: {', '.join(models[:3])}{'...' if len(models) > 3 else ''}")
else:
print("⚠️ Ollama is not running. Please start Ollama:")
print(" Install: https://ollama.ai")
print(" Run: ollama serve")
print(f"\n🌐 Frontend should connect to: http://localhost:{PORT}")
print("💬 Ready to chat!\n")
httpd.serve_forever()
except KeyboardInterrupt:
print("\n🛑 Server stopped")
if __name__ == "__main__":
main()