#!/usr/bin/env python3 """ RAG System Unified Launcher =========================== A comprehensive launcher that starts all RAG system components: - Ollama server - RAG API server (port 8001) - Backend server (port 8000) - Frontend server (port 3000) Features: - Single command startup - Real-time log aggregation - Process health monitoring - Graceful shutdown - Production-ready deployment support Usage: python run_system.py [--mode dev|prod] [--logs-only] [--no-frontend] """ import subprocess import threading import time import signal import sys import os import argparse import json import requests from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, TextIO import logging from dataclasses import dataclass import psutil @dataclass class ServiceConfig: name: str command: List[str] port: int cwd: Optional[str] = None env: Optional[Dict[str, str]] = None health_check_path: str = "/health" startup_delay: int = 2 required: bool = True class ColoredFormatter(logging.Formatter): """Custom formatter with colors for different log levels and services.""" COLORS = { 'DEBUG': '\033[36m', # Cyan 'INFO': '\033[32m', # Green 'WARNING': '\033[33m', # Yellow 'ERROR': '\033[31m', # Red 'CRITICAL': '\033[35m', # Magenta } SERVICE_COLORS = { 'ollama': '\033[94m', # Blue 'rag-api': '\033[95m', # Magenta 'backend': '\033[96m', # Cyan 'frontend': '\033[93m', # Yellow 'system': '\033[92m', # Green } RESET = '\033[0m' def format(self, record): # Add service-specific coloring service_name = getattr(record, 'service', 'system') service_color = self.SERVICE_COLORS.get(service_name, self.COLORS.get(record.levelname, '')) # Format timestamp timestamp = datetime.fromtimestamp(record.created).strftime('%H:%M:%S') # Create colored log line colored_service = f"{service_color}[{service_name.upper()}]{self.RESET}" colored_level = f"{self.COLORS.get(record.levelname, '')}{record.levelname}{self.RESET}" return f"{timestamp} {colored_service} {colored_level}: {record.getMessage()}" class ServiceManager: """Manages multiple system services with logging and health monitoring.""" def __init__(self, mode: str = "dev", logs_dir: str = "logs"): self.mode = mode self.logs_dir = Path(logs_dir) self.logs_dir.mkdir(exist_ok=True) self.processes: Dict[str, subprocess.Popen] = {} self.log_threads: Dict[str, threading.Thread] = {} self.running = False # Setup logging self.setup_logging() # Service configurations self.services = self._get_service_configs() # Register signal handlers for graceful shutdown signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def setup_logging(self): """Setup centralized logging with colors.""" # Create main logger self.logger = logging.getLogger('system') self.logger.setLevel(logging.INFO) # Console handler with colors console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(ColoredFormatter()) self.logger.addHandler(console_handler) # File handler for system logs file_handler = logging.FileHandler(self.logs_dir / 'system.log') file_handler.setFormatter(logging.Formatter( '%(asctime)s [%(levelname)s] %(message)s' )) self.logger.addHandler(file_handler) def _get_service_configs(self) -> Dict[str, ServiceConfig]: """Define service configurations based on mode.""" base_configs = { 'ollama': ServiceConfig( name='ollama', command=['ollama', 'serve'], port=11434, startup_delay=5, required=True ), 'rag-api': ServiceConfig( name='rag-api', command=[sys.executable, '-m', 'rag_system.api_server'], port=8001, startup_delay=3, required=True ), 'backend': ServiceConfig( name='backend', command=[sys.executable, 'backend/server.py'], port=8000, startup_delay=2, required=True ), 'frontend': ServiceConfig( name='frontend', command=['npm', 'run', 'dev' if self.mode == 'dev' else 'start'], port=3000, startup_delay=5, required=False # Optional in case Node.js not available ) } # Production mode adjustments if self.mode == 'prod': # Use production build for frontend base_configs['frontend'].command = ['npm', 'run', 'start'] # Add production environment variables base_configs['rag-api'].env = {'NODE_ENV': 'production'} base_configs['backend'].env = {'NODE_ENV': 'production'} return base_configs def _signal_handler(self, signum, frame): """Handle shutdown signals gracefully.""" self.logger.info(f"Received signal {signum}, shutting down...") self.shutdown() sys.exit(0) def is_port_in_use(self, port: int) -> bool: """Check if a port is already in use.""" try: for conn in psutil.net_connections(): if conn.laddr.port == port and conn.status == 'LISTEN': return True return False except (psutil.AccessDenied, AttributeError): # Fallback method import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(('localhost', port)) == 0 def check_prerequisites(self) -> bool: """Check if all required tools are available.""" self.logger.info("🔍 Checking prerequisites...") missing_tools = [] # Check Ollama if not self._command_exists('ollama'): missing_tools.append('ollama (https://ollama.ai)') # Check Python if not self._command_exists('python') and not self._command_exists('python3'): missing_tools.append('python') # Check Node.js (optional) if not self._command_exists('npm'): self.logger.warning("⚠️ npm not found - frontend will be disabled") self.services['frontend'].required = False if missing_tools: self.logger.error(f"❌ Missing required tools: {', '.join(missing_tools)}") return False self.logger.info("✅ All prerequisites satisfied") return True def _command_exists(self, command: str) -> bool: """Check if a command exists in PATH.""" try: subprocess.run([command, '--version'], capture_output=True, check=True, timeout=5) return True except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError): return False def ensure_models(self): """Ensure required Ollama models are available.""" self.logger.info("📥 Checking required models...") required_models = ['qwen3:8b', 'qwen3:0.6b'] try: # Get list of installed models result = subprocess.run(['ollama', 'list'], capture_output=True, text=True, timeout=10) installed_models = result.stdout for model in required_models: if model not in installed_models: self.logger.info(f"📥 Pulling {model}...") subprocess.run(['ollama', 'pull', model], check=True, timeout=300) # 5 min timeout self.logger.info(f"✅ {model} ready") else: self.logger.info(f"✅ {model} already available") except subprocess.TimeoutExpired: self.logger.warning("⚠️ Model check timed out - continuing anyway") except subprocess.CalledProcessError as e: self.logger.warning(f"⚠️ Could not check/pull models: {e}") def start_service(self, service_name: str, config: ServiceConfig) -> bool: """Start a single service.""" if service_name in self.processes: self.logger.warning(f"⚠️ {service_name} already running") return True # Check if port is in use if self.is_port_in_use(config.port): self.logger.warning(f"⚠️ Port {config.port} already in use, skipping {service_name}") return not config.required self.logger.info(f"🔄 Starting {service_name} on port {config.port}...") try: # Setup environment env = os.environ.copy() if config.env: env.update(config.env) # Start process process = subprocess.Popen( config.command, cwd=config.cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True ) self.processes[service_name] = process # Start log monitoring thread log_thread = threading.Thread( target=self._monitor_service_logs, args=(service_name, process), daemon=True ) log_thread.start() self.log_threads[service_name] = log_thread # Wait for startup time.sleep(config.startup_delay) # Check if process is still running if process.poll() is None: self.logger.info(f"✅ {service_name} started successfully (PID: {process.pid})") return True else: self.logger.error(f"❌ {service_name} failed to start") return False except Exception as e: self.logger.error(f"❌ Failed to start {service_name}: {e}") return False def _monitor_service_logs(self, service_name: str, process: subprocess.Popen): """Monitor service logs and forward to main logger.""" service_logger = logging.getLogger(service_name) service_logger.setLevel(logging.INFO) # Add file handler for this service file_handler = logging.FileHandler(self.logs_dir / f'{service_name}.log') file_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) service_logger.addHandler(file_handler) try: for line in iter(process.stdout.readline, ''): if line.strip(): # Create log record with service context record = logging.LogRecord( name=service_name, level=logging.INFO, pathname='', lineno=0, msg=line.strip(), args=(), exc_info=None ) record.service = service_name # Log to both service file and main console service_logger.handle(record) self.logger.handle(record) except Exception as e: self.logger.error(f"Error monitoring {service_name} logs: {e}") def health_check(self, service_name: str, config: ServiceConfig) -> bool: """Perform health check on a service.""" try: url = f"http://localhost:{config.port}{config.health_check_path}" response = requests.get(url, timeout=5) return response.status_code == 200 except: return False def start_all(self, skip_frontend: bool = False) -> bool: """Start all services in order.""" self.logger.info("🚀 Starting RAG System Components...") if not self.check_prerequisites(): return False self.running = True failed_services = [] # Start services in dependency order service_order = ['ollama', 'rag-api', 'backend'] if not skip_frontend and 'frontend' in self.services: service_order.append('frontend') for service_name in service_order: if service_name not in self.services: continue config = self.services[service_name] # Special handling for Ollama if service_name == 'ollama': if not self._start_ollama(): if config.required: failed_services.append(service_name) continue else: self.logger.warning(f"⚠️ Skipping optional service: {service_name}") continue else: if not self.start_service(service_name, config): if config.required: failed_services.append(service_name) else: self.logger.warning(f"⚠️ Skipping optional service: {service_name}") if failed_services: self.logger.error(f"❌ Failed to start required services: {', '.join(failed_services)}") return False # Print status summary self._print_status_summary() return True def _start_ollama(self) -> bool: """Special handling for Ollama startup.""" # Check if Ollama is already running if self.is_port_in_use(11434): self.logger.info("✅ Ollama already running") self.ensure_models() return True # Start Ollama if self.start_service('ollama', self.services['ollama']): self.ensure_models() return True return False def _print_status_summary(self): """Print system status summary.""" self.logger.info("") self.logger.info("🎉 RAG System Started!") self.logger.info("📊 Services Status:") for service_name, config in self.services.items(): if service_name in self.processes or self.is_port_in_use(config.port): status = "✅ Running" url = f"http://localhost:{config.port}" self.logger.info(f" • {service_name.capitalize():<10}: {status:<10} {url}") else: self.logger.info(f" • {service_name.capitalize():<10}: ❌ Stopped") self.logger.info("") self.logger.info("🌐 Access your RAG system at: http://localhost:3000") self.logger.info("") self.logger.info("📋 Useful commands:") self.logger.info(" • Stop system: Ctrl+C") self.logger.info(" • Check logs: tail -f logs/*.log") self.logger.info(" • Health check: python run_system.py --health") def shutdown(self): """Gracefully shutdown all services.""" if not self.running: return self.logger.info("🛑 Shutting down RAG system...") self.running = False # Stop services in reverse order for service_name in reversed(list(self.processes.keys())): self._stop_service(service_name) self.logger.info("✅ All services stopped") def _stop_service(self, service_name: str): """Stop a single service.""" if service_name not in self.processes: return process = self.processes[service_name] self.logger.info(f"🔄 Stopping {service_name}...") try: # Try graceful shutdown first process.terminate() # Wait up to 10 seconds for graceful shutdown try: process.wait(timeout=10) except subprocess.TimeoutExpired: # Force kill if graceful shutdown fails process.kill() process.wait() self.logger.info(f"✅ {service_name} stopped") except Exception as e: self.logger.error(f"❌ Error stopping {service_name}: {e}") finally: del self.processes[service_name] def monitor(self): """Monitor running services and restart if needed.""" self.logger.info("👁️ Monitoring services... (Press Ctrl+C to stop)") try: while self.running: time.sleep(30) # Check every 30 seconds for service_name, process in list(self.processes.items()): if process.poll() is not None: self.logger.warning(f"⚠️ {service_name} has stopped unexpectedly") # Restart the service config = self.services[service_name] if config.required: self.logger.info(f"🔄 Restarting {service_name}...") del self.processes[service_name] self.start_service(service_name, config) except KeyboardInterrupt: self.logger.info("Monitoring stopped by user") def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='RAG System Unified Launcher') parser.add_argument('--mode', choices=['dev', 'prod'], default='dev', help='Run mode (default: dev)') parser.add_argument('--logs-only', action='store_true', help='Only show aggregated logs from running services') parser.add_argument('--no-frontend', action='store_true', help='Skip frontend startup') parser.add_argument('--health', action='store_true', help='Check health of running services') parser.add_argument('--stop', action='store_true', help='Stop all running services') args = parser.parse_args() # Create service manager manager = ServiceManager(mode=args.mode) try: if args.health: # Health check mode manager._print_status_summary() return if args.stop: # Stop mode - kill any running processes manager.logger.info("🛑 Stopping all RAG system processes...") # Implementation for stopping would go here return if args.logs_only: # Logs only mode - just tail existing logs manager.logger.info("📋 Showing aggregated logs... (Press Ctrl+C to stop)") manager.monitor() return # Normal startup mode if manager.start_all(skip_frontend=args.no_frontend): manager.monitor() else: manager.logger.error("❌ System startup failed") sys.exit(1) except KeyboardInterrupt: manager.logger.info("Received interrupt signal") finally: manager.shutdown() if __name__ == "__main__": main()