diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..be78b56 --- /dev/null +++ b/TODO.md @@ -0,0 +1,276 @@ +# TODO: CUDA and Android Support in Federation + +## Overview + +This document tracks known issues and recommendations for adding CUDA (NVIDIA) and Android nodes to the local_swarm federation system. + +## Current Status + +- ✅ **Apple Silicon (macOS)**: Fully supported with MLX backend +- ⚠️ **CUDA/Android**: Not currently supported, requires implementation work +- ✅ **Linux**: Should work with llama.cpp + CUDA +- ✅ **Windows**: Should work with llama.cpp + CUDA (not tested) + +## Known Issues + +### 1. No CUDA Backend for macOS + +**Problem:** +- `__init__.py` only chooses MLX or llama.cpp +- No CUDA path for macOS +- Apple Silicon only supports Metal acceleration, not CUDA + +**Impact:** +- CUDA/Android nodes on macOS cannot use GPU acceleration +- These nodes will fall back to CPU-only mode + +**References:** +- `src/backends/__init__.py` (lines 26-32) +- `src/hardware/detector.py` (Apple Silicon detection) + +**Recommendation:** +- Current architecture is correct for macOS - CUDA is not supported on Apple Silicon +- Would need separate CUDA backend implementation (not recommended) + +--- + +### 2. Platform Detection in `hardware/detector.py` + +**Current Detection:** +```python +def detect_gpu(): + # macOS: Apple Silicon (Metal only, no CUDA) + # Linux/Windows: NVIDIA/AMD/Intel GPU (potential CUDA) + # Android/Termux: CPU-only (no GPU) +``` + +**Impact:** +- Android/Termux devices detected as Linux +- Will use CPU-only mode (expected) +- No special handling for Android platform + +**Potential Issue:** +- Termux on Android reports as "linux" +- May have different requirements (file paths, permissions) +- Need to test if file paths work correctly on Android + +**References:** +- `src/hardware/detector.py:170-221` (Android/Termux detection via `is_termux()`) + +**Recommendation:** +- Add explicit Android platform detection beyond `is_termux()` +- Test file path handling on Termux +- Consider Android's unique file system limitations + +--- + +### 3. Llama.cpp Backend Configuration + +**Current GPU Layer Logic:** +```python +# src/backends/__init__.py (line 35) +if hardware.gpu and not hardware.is_apple_silicon: + n_gpu_layers = -1 # Offload all to GPU (Metal/CUDA) +else: + n_gpu_layers = 0 # CPU-only +``` + +**For CUDA Support on Linux:** +- Should set `n_gpu_layers` based on actual GPU count +- NVIDIA: Set to GPU count (1-8 for multi-GPU) +- AMD ROCm: Different backend, not tested + +**Impact:** +- Currently hardcoded to -1 on Apple Silicon (Metal) +- CUDA nodes on Linux need proper layer configuration +- No validation that requested layers match available GPU + +**References:** +- `src/backends/llamacpp.py` (line 16, n_gpu_layers parameter) +- `src/backends/__init__.py` (line 35) + +**Recommendation:** +- Make `n_gpu_layers` configurable per backend +- Auto-detect GPU capabilities from `pynvml` or system +- Add GPU layer validation + +--- + +### 4. Seed Variation Mode (Not an Issue, but Important) + +**Current Behavior:** +```python +# src/swarm/manager.py (line 76-82) +if use_seed_variation is None and hardware.is_apple_silicon: + self.use_seed_variation = True # Auto-enabled on macOS +``` + +**How It Works:** +- Runs 1 model instance with different random seeds +- Simulates multiple "workers" for consensus +- Saves memory by not loading multiple models + +**Impact on Federation:** +- Your Mac: 1 worker → 2 votes (from 2 seeds) +- Peer Mac: 2 workers → 2 votes (from 2 seeds) +- Total: 4 votes instead of 8 (if using 4 actual instances) + +**This is CORRECT behavior** for seed variation mode. + +**Recommendation:** +- To get 4 votes per machine (8 total), use `--instances 4` flag +- Seed variation is a design choice, not a bug + +--- + +### 5. Federation Client Timeout + +**Status:** ✅ **FIXED** + +**Previous:** +- Default timeout: 30 seconds +- Peers on slow networks or slow machines would timeout + +**Current:** +- Default timeout: 60 seconds (increased in `src/network/federation.py:38`) +- Gives peers more time to respond + +**References:** +- `src/network/federation.py` (line 38) + +**Recommendation:** +- Current 60s is reasonable +- Consider making timeout configurable per peer in discovery +- Add retry logic for failed requests + +--- + +### 6. Network Discovery + +**Current Implementation:** ✅ **PLATFORM AGNOSTIC** + +**Uses:** +- mDNS/Bonjour for peer discovery +- Standard network protocols +- No platform-specific blocking + +**Status:** Should work on all platforms (macOS, Linux, Windows, Android) + +**References:** +- `src/network/discovery.py` (standard mDNS implementation) + +**Recommendation:** +- No changes needed +- Test on Linux/Windows/Android if needed + +--- + +## Implementation Priorities + +### High Priority (Breaking Features) + +1. **CUDA Backend for Linux** (if needed) + - Add CUDA-specific backend or extend llama.cpp + - Auto-detect NVIDIA GPU and configure layers + - Test on actual CUDA hardware + - **Effort:** 3-5 days + +2. **Android Platform Detection** + - Add explicit Android detection beyond Termux + - Handle Android's file system and package manager differences + - Test on real Android device + - **Effort:** 2-3 days + +### Medium Priority (Improvements) + +1. **GPU Layer Auto-Configuration** + - Auto-detect GPU capabilities from system + - Match requested layers to available hardware + - Add validation and helpful error messages + - **Effort:** 1-2 days + +2. **Federation Metrics** + - Add per-peer timeout in PeerInfo + - Track latency and success rates + - Better error handling for retry logic + - **Effort:** 1 day + +### Low Priority (Nice to Have) + +1. **GPU Backend Selection UI** + - Allow users to manually select MLX vs llama.cpp + - Add warning for CUDA backend on macOS (not supported) + - **Effort:** 2 hours + +2. **Seed Variation Toggle** + - Add command-line flag to disable seed variation + - Document the trade-offs clearly + - **Effort:** 30 minutes + +## Testing Checklist + +Before marking any issue as complete, test on: + +### macOS (Apple Silicon) +- [ ] Federation with macOS peers (current environment) +- [ ] Seed variation mode works correctly +- [ ] MLX backend loads and generates +- [ ] No crashes with multiple instances + +### Linux (NVIDIA GPU) +- [ ] llama.cpp backend loads with CUDA support +- [ ] Federation with Linux peers works +- [ ] GPU layers configured correctly +- [ ] No GPU conflicts + +### Windows (NVIDIA GPU) +- [ ] llama.cpp backend loads with CUDA support +- [ ] Federation with Windows peers works +- [ ] No GPU conflicts + +### Android (CPU-only) +- [ ] Federation with Android peers works (mDNS should work) +- [ ] CPU-only generation works +- [ ] File paths work on Termux/Android + +## Notes + +### Architecture Decisions + +**Why not per-platform backends:** +- Simplifies codebase (single MLX path, single llama.cpp path) +- Reduces maintenance burden +- Trade-off: Can't optimize for platform-specific GPUs in backends + +**Why seed variation on macOS:** +- Apple Silicon has unified memory, not discrete VRAM +- Loading multiple models would consume too much RAM +- Seed variation allows consensus quality with 1 model instance + +**CUDA/Android is not a bug:** +- Current system is designed for Apple Silicon + llama.cpp +- Adding CUDA support requires significant architecture work +- Focus on federation quality for current platforms first + +## Related Files + +- `src/backends/__init__.py` - Backend selection logic +- `src/backends/mlx.py` - Apple Silicon MLX backend +- `src/backends/llamacpp.py` - llama.cpp backend (supports CUDA) +- `src/hardware/detector.py` - Platform and GPU detection +- `src/network/federation.py` - Federation communication +- `src/network/discovery.py` - Peer discovery via mDNS +- `src/swarm/manager.py` - Swarm orchestration + +## Conclusion + +The current federation implementation is **platform-agnostic** and should work on Linux/Windows with CUDA nodes. The main limitation is that macOS (Apple Silicon) only supports Metal/MLX, not CUDA. + +**For immediate use:** +- Use `--instances 4` flag on each machine to get 4 votes per machine +- Test federation between different platforms (macOS + Linux) +- Android/Termux should work as-is (CPU-only mode) + +**For future work:** +- Implement high-priority items if CUDA/Android support is needed +- Add GPU layer auto-configuration for better hardware utilization diff --git a/config/prompts/tool_instructions.txt b/config/prompts/tool_instructions.txt index ccfade0..98c18bf 100644 --- a/config/prompts/tool_instructions.txt +++ b/config/prompts/tool_instructions.txt @@ -9,4 +9,6 @@ ARGUMENTS: {"url": "https://example.com", "format": "markdown"} Available tools: bash, webfetch +IMPORTANT: Only webfetch URLs that actually exist and are provided by the user. NEVER hallucinate or guess URLs. If a URL returns 404, stop trying to fetch it. + No explanations. No numbered lists. No markdown. Only tool calls. diff --git a/src/api/routes.py b/src/api/routes.py index 9803306..f0e55a4 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -1,5 +1,6 @@ """OpenAI-compatible API routes for Local Swarm.""" +import asyncio import json import logging import os @@ -544,10 +545,184 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ completion_id = f"chatcmpl-{uuid.uuid4().hex[:12]}" created = int(time.time()) + # Calculate prompt tokens once for usage reporting + prompt_tokens = len(TOKEN_ENCODING.encode(prompt)) + if request.stream: - # For streaming with tools, return tool_calls to client (opencode) for execution - # This enables multi-turn conversations where client executes tools and sends results back - if has_tools: + # Check if federation is enabled with peers + peers_count = 0 + if federated_swarm is not None and federated_swarm.discovery is not None: + peers_count = len(federated_swarm.discovery.get_peers()) + use_federation = peers_count > 0 + + if use_federation: + # Use federation for ALL requests (with or without tools) + logger.debug(f"🌐 Using federation with {peers_count} peer(s) - waiting for consensus...") + + # Run federation and get consensus result + fed_result = await federated_swarm.generate_with_federation( + prompt=prompt, + max_tokens=request.max_tokens or 1024, + temperature=request.temperature or 0.7, + min_peers=0 + ) + + logger.debug(f" ✓ Federation consensus complete (strategy: {fed_result.strategy}, confidence: {fed_result.local_confidence:.2f})") + logger.debug(f" 🏆 Winner: {fed_result.winner} (check logs above for details)") + logger.debug(f" 📝 Using federated response from {len(fed_result.peer_votes) + 1} nodes") + + # Use the federated consensus result + content = fed_result.final_response + + # Check if tools were requested and parse tool calls from federated response + if has_tools: + logger.debug(" 🔧 Parsing tool calls from federated response...") + content_parsed, tool_calls_parsed = parse_tool_calls(content) + + if tool_calls_parsed: + logger.debug(f" 🔧 Found {len(tool_calls_parsed)} tool call(s) in federated response") + # Convert to ToolCall objects and return to client (opencode) + from api.models import ToolCall + tool_calls = [ + ToolCall( + id=tc.get("id", f"call_{i}"), + type=tc.get("type", "function"), + function=tc.get("function", {}) + ) + for i, tc in enumerate(tool_calls_parsed) + ] + + # Stream with tool_calls (different generator) + async def tool_calls_fed_stream_generator() -> AsyncIterator[str]: + """Generate SSE stream with tool calls from federation.""" + first_chunk = ChatCompletionStreamResponse( + id=completion_id, + created=created, + model=request.model, + choices=[ + ChatCompletionStreamChoice( + delta={"role": "assistant"} + ) + ] + ) + yield f"data: {first_chunk.model_dump_json()}\n\n" + + if content_parsed: + content_chunk = ChatCompletionStreamResponse( + id=completion_id, + created=created, + model=request.model, + choices=[ + ChatCompletionStreamChoice( + delta={"content": content_parsed} + ) + ] + ) + yield f"data: {content_chunk.model_dump_json()}\n\n" + + tool_calls_delta = [] + for i, tc in enumerate(tool_calls_parsed): + tool_calls_delta.append({ + "index": i, + "id": tc["id"], + "type": "function", + "function": { + "name": tc["function"]["name"], + "arguments": tc["function"]["arguments"] + } + }) + + from api.models import UsageInfo + fed_completion_tokens = len(TOKEN_ENCODING.encode(content)) if content else 0 + fed_total_tokens = prompt_tokens + fed_completion_tokens + final_chunk = ChatCompletionStreamResponse( + id=completion_id, + created=created, + model=request.model, + choices=[ + ChatCompletionStreamChoice( + delta={"tool_calls": tool_calls_delta}, + finish_reason="tool_calls" + ) + ], + usage=UsageInfo( + prompt_tokens=prompt_tokens, + completion_tokens=fed_completion_tokens, + total_tokens=fed_total_tokens + ) + ) + yield f"data: {final_chunk.model_dump_json()}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse( + tool_calls_fed_stream_generator(), + media_type="text/event-stream" + ) + + # Calculate token counts + completion_tokens = len(TOKEN_ENCODING.encode(content)) if content else 0 + total_tokens = prompt_tokens + completion_tokens + + # Stream the federated response + async def federation_stream_generator() -> AsyncIterator[str]: + """Generate SSE stream with federated content.""" + # Send role chunk + first_chunk = ChatCompletionStreamResponse( + id=completion_id, + created=created, + model=request.model, + choices=[ + ChatCompletionStreamChoice( + delta={"role": "assistant"} + ) + ] + ) + yield f"data: {first_chunk.model_dump_json()}\n\n" + + # Send content in chunks + chunk_size = 100 + for i in range(0, len(content), chunk_size): + chunk = content[i:i+chunk_size] + stream_chunk = ChatCompletionStreamResponse( + id=completion_id, + created=created, + model=request.model, + choices=[ + ChatCompletionStreamChoice( + delta={"content": chunk} + ) + ] + ) + yield f"data: {stream_chunk.model_dump_json()}\n\n" + + # Send final chunk with usage + from api.models import UsageInfo + final_chunk = ChatCompletionStreamResponse( + id=completion_id, + created=created, + model=request.model, + choices=[ + ChatCompletionStreamChoice( + delta={}, + finish_reason="stop" + ) + ], + usage=UsageInfo( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens + ) + ) + yield f"data: {final_chunk.model_dump_json()}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse( + federation_stream_generator(), + media_type="text/event-stream" + ) + elif has_tools: + # For streaming with tools, return tool_calls to client (opencode) for execution + # This enables multi-turn conversations where client executes tools and sends results back logger.debug(" 🔧 Streaming with tools - returning tool_calls to client for execution...") # Collect full response full_response = "" @@ -689,7 +864,9 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ ) yield f"data: {stream_chunk.model_dump_json()}\n\n" - # Send final chunk with finish_reason + fed_completion_tokens = len(TOKEN_ENCODING.encode(content)) if content else 0 + fed_total_tokens = prompt_tokens + fed_completion_tokens + from api.models import UsageInfo final_chunk = ChatCompletionStreamResponse( id=completion_id, created=created, @@ -699,7 +876,12 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ delta={}, finish_reason="stop" ) - ] + ], + usage=UsageInfo( + prompt_tokens=prompt_tokens, + completion_tokens=fed_completion_tokens, + total_tokens=fed_total_tokens + ) ) yield f"data: {final_chunk.model_dump_json()}\n\n" yield "data: [DONE]\n\n" diff --git a/src/backends/__init__.py b/src/backends/__init__.py index 04ccd47..c3738d3 100644 --- a/src/backends/__init__.py +++ b/src/backends/__init__.py @@ -4,7 +4,7 @@ Creates the appropriate backend based on hardware and platform. """ from typing import Optional -from hardware.detector import HardwareProfile, detect_hardware +from hardware.detector import HardwareProfile, detect_hardware, calculate_gpu_layers from backends.base import LLMBackend from backends.llamacpp import LlamaCppBackend from backends.mlx import MLXBackend @@ -31,15 +31,17 @@ def create_backend(hardware: Optional[HardwareProfile] = None) -> LLMBackend: # Otherwise use llama.cpp (supports CUDA, ROCm, SYCL, CPU) print("Using llama.cpp backend") - # Determine GPU layers + # Auto-configure GPU layers based on hardware + n_gpu_layers = calculate_gpu_layers(hardware.gpu) + if hardware.gpu and not hardware.is_apple_silicon: - # Has external GPU, offload all layers - n_gpu_layers = -1 print(f" GPU detected: {hardware.gpu.name}") - print(f" Offloading all layers to GPU") + if hardware.gpu.is_nvidia: + print(f" Compute capability: {hardware.gpu.compute_capability or 'unknown'}") + if hardware.gpu.device_count > 1: + print(f" GPU count: {hardware.gpu.device_count}") + print(f" Offloading {n_gpu_layers} layers to GPU") else: - # CPU only - n_gpu_layers = 0 print(f" No GPU detected, using CPU") return LlamaCppBackend(n_gpu_layers=n_gpu_layers) diff --git a/src/hardware/detector.py b/src/hardware/detector.py index 599e22e..b1c79a3 100644 --- a/src/hardware/detector.py +++ b/src/hardware/detector.py @@ -2,6 +2,7 @@ from dataclasses import dataclass from typing import Optional, List +import os import platform import psutil @@ -17,6 +18,8 @@ class GPUInfo: is_nvidia: bool = False is_amd: bool = False is_mobile: bool = False + compute_capability: Optional[str] = None # CUDA compute capability + device_count: int = 1 # Number of GPUs available @dataclass @@ -70,10 +73,55 @@ class HardwareProfile: return self.available_memory_gb +def is_android() -> bool: + """Check if running on Android (beyond just Termux).""" + # Check multiple Android indicators + + # 1. Check for Android-specific environment variables + android_env_vars = [ + "ANDROID_ROOT", + "ANDROID_DATA", + "ANDROID_ART_ROOT", + "ANDROID_I18N_ROOT", + "ANDROID_TZDATA_ROOT", + ] + if any(os.environ.get(var) for var in android_env_vars): + return True + + # 2. Check for Android-specific paths + android_paths = [ + "/system/build.prop", + "/system/bin/app_process", + "/data/data", + ] + if any(os.path.exists(path) for path in android_paths): + return True + + # 3. Check for Termux (which runs on Android) + if _is_android_or_termux(): + return True + + # 4. Check /proc/sys/kernel/osrelease for Android + try: + if os.path.exists("/proc/sys/kernel/osrelease"): + with open("/proc/sys/kernel/osrelease", "r") as f: + release = f.read().lower() + if "android" in release: + return True + except Exception: + pass + + return False + + def detect_os() -> str: """Detect the operating system.""" system = platform.system().lower() - if system == "darwin": + + # Check for Android first (reports as Linux) + if system == "linux" and is_android(): + return "android" + elif system == "darwin": return "darwin" elif system == "windows": return "windows" @@ -132,6 +180,14 @@ def detect_nvidia_gpu() -> Optional[GPUInfo]: except Exception: driver = None + # Get compute capability + compute_capability = None + try: + major, minor = pynvml.nvmlDeviceGetCudaComputeCapability(handle) + compute_capability = f"{major}.{minor}" + except Exception: + pass + return GPUInfo( name=name, vram_gb=vram_gb, @@ -139,7 +195,9 @@ def detect_nvidia_gpu() -> Optional[GPUInfo]: device_id=0, is_nvidia=True, is_apple_silicon=False, - is_amd=False + is_amd=False, + compute_capability=compute_capability, + device_count=device_count ) finally: pynvml.nvmlShutdown() @@ -219,6 +277,78 @@ def detect_gpu() -> Optional[GPUInfo]: return None +def calculate_gpu_layers(gpu: Optional[GPUInfo]) -> int: + """Calculate optimal number of GPU layers to offload. + + Args: + gpu: GPU information (None if no GPU) + + Returns: + Number of layers to offload (-1 = all, 0 = CPU only) + """ + if gpu is None: + return 0 + + if gpu.is_apple_silicon: + # Apple Silicon: offload all layers (unified memory) + return -1 + + if gpu.is_nvidia: + # NVIDIA: Check compute capability for compatibility + if gpu.compute_capability: + major, _ = gpu.compute_capability.split('.') + if int(major) < 5: + # Very old GPUs (Kepler and earlier) may have issues + return 0 + + # Multi-GPU support: use device_count to determine layers + # For now, offload all layers if we have any NVIDIA GPU + return -1 + + if gpu.is_amd: + # AMD: ROCm support varies, be conservative + return -1 + + # Unknown GPU type: use CPU + return 0 + + +def validate_gpu_layers(requested_layers: int, gpu: Optional[GPUInfo]) -> int: + """Validate and adjust requested GPU layers. + + Args: + requested_layers: Requested number of layers (-1 = all) + gpu: GPU information + + Returns: + Validated layer count + """ + if requested_layers == 0: + return 0 + + if gpu is None: + if requested_layers != 0: + raise ValueError( + f"Requested {requested_layers} GPU layers but no GPU detected. " + "Use n_gpu_layers=0 for CPU-only mode." + ) + return 0 + + if gpu.is_apple_silicon: + # Apple Silicon always uses all layers + return -1 + + if gpu.is_nvidia and gpu.compute_capability: + major, _ = gpu.compute_capability.split('.') + if int(major) < 5: + raise ValueError( + f"NVIDIA GPU {gpu.name} has compute capability {gpu.compute_capability}. " + f"Minimum required is 5.0. Use n_gpu_layers=0 for CPU mode." + ) + + return requested_layers + + def detect_hardware() -> HardwareProfile: """Detect complete hardware profile.""" os_name = detect_os() diff --git a/src/hardware/qualcomm.py b/src/hardware/qualcomm.py index a60a1c3..da8e5b0 100644 --- a/src/hardware/qualcomm.py +++ b/src/hardware/qualcomm.py @@ -10,6 +10,64 @@ from typing import Optional from hardware.detector import GPUInfo +# Android-specific file paths for common operations +ANDROID_PATHS = { + "termux_home": "/data/data/com.termux/files/home", + "termux_usr": "/data/data/com.termux/files/usr", + "termux_bin": "/data/data/com.termux/files/usr/bin", + "shared_storage": "/sdcard", + "android_data": "/data/data", +} + + +def get_android_path(path_type: str, subpath: str = "") -> str: + """Get Android-specific file path. + + Args: + path_type: Type of path (termux_home, shared_storage, etc.) + subpath: Additional path components + + Returns: + Full path string + """ + base = ANDROID_PATHS.get(path_type, path_type) + if subpath: + return os.path.join(base, subpath) + return base + + +def normalize_path_for_android(path: str) -> str: + """Normalize a path for Android/Termux environment. + + Args: + path: Original path + + Returns: + Normalized path for Android + """ + # Expand user home directory properly on Android + if path.startswith("~/"): + if is_termux(): + home = ANDROID_PATHS["termux_home"] + else: + home = os.environ.get("HOME", "/") + path = os.path.join(home, path[2:]) + + # Handle /sdcard paths + if path.startswith("/sdcard") and not os.path.exists("/sdcard"): + # Try alternative storage paths + alternatives = [ + "/storage/emulated/0", + "/storage/self/primary", + ] + for alt in alternatives: + if os.path.exists(alt): + path = path.replace("/sdcard", alt, 1) + break + + return os.path.normpath(path) + + def is_termux() -> bool: """Check if running in Termux environment.""" return ( diff --git a/src/network/discovery.py b/src/network/discovery.py index 4b5e8ac..6a207ef 100644 --- a/src/network/discovery.py +++ b/src/network/discovery.py @@ -6,10 +6,43 @@ Uses mDNS/Bonjour to discover other Local Swarm instances on the local network. import socket import asyncio from typing import Dict, List, Optional, Any -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime, timedelta +@dataclass +class PeerMetrics: + """Metrics for tracking peer performance.""" + total_requests: int = 0 + successful_requests: int = 0 + failed_requests: int = 0 + total_latency_ms: float = 0.0 + avg_latency_ms: float = 0.0 + last_error: Optional[str] = None + last_error_time: Optional[datetime] = None + + @property + def success_rate(self) -> float: + """Calculate success rate (0.0 to 1.0).""" + if self.total_requests == 0: + return 1.0 + return self.successful_requests / self.total_requests + + def record_success(self, latency_ms: float): + """Record a successful request.""" + self.total_requests += 1 + self.successful_requests += 1 + self.total_latency_ms += latency_ms + self.avg_latency_ms = self.total_latency_ms / self.successful_requests + + def record_failure(self, error: str): + """Record a failed request.""" + self.total_requests += 1 + self.failed_requests += 1 + self.last_error = error + self.last_error_time = datetime.now() + + @dataclass class PeerInfo: """Information about a peer swarm.""" @@ -21,6 +54,8 @@ class PeerInfo: model_id: str hardware_summary: str last_seen: datetime + timeout_seconds: float = 60.0 # Configurable timeout per peer + metrics: PeerMetrics = field(default_factory=PeerMetrics) @property def api_url(self) -> str: diff --git a/src/network/federation.py b/src/network/federation.py index 759d1f2..62ae9c9 100644 --- a/src/network/federation.py +++ b/src/network/federation.py @@ -5,7 +5,7 @@ Handles communication between peer swarms for distributed consensus. import asyncio import time -from typing import List, Optional, Dict, Any +from typing import List, Optional, Dict, Any, Tuple from dataclasses import dataclass from network.discovery import PeerInfo @@ -29,12 +29,13 @@ class FederationResult: local_confidence: float peer_votes: List[PeerVote] strategy: str + winner: str = "" # Name of the winning node ("local" or peer name) class FederationClient: """Client for communicating with peer swarms.""" - def __init__(self, timeout: float = 30.0): + def __init__(self, timeout: float = 60.0): """ Initialize federation client. @@ -79,42 +80,56 @@ class FederationClient: Returns: PeerVote or None if request failed """ + request_start = time.time() + # Use peer-specific timeout if available, otherwise use default + timeout = getattr(peer, 'timeout_seconds', self.timeout) + try: import aiohttp - session = await self._get_session() + # Create session with peer-specific timeout + session_timeout = aiohttp.ClientTimeout(total=timeout) + async with aiohttp.ClientSession(timeout=session_timeout) as session: + url = f"{peer.api_url}/v1/federation/vote" + payload = { + "prompt": prompt, + "max_tokens": max_tokens, + "temperature": temperature, + "request_id": f"fed_{time.time()}" + } - url = f"{peer.api_url}/v1/federation/vote" - payload = { - "prompt": prompt, - "max_tokens": max_tokens, - "temperature": temperature, - "request_id": f"fed_{time.time()}" - } + print(f" → Sending request to {url} (timeout: {timeout}s)") + async with session.post(url, json=payload) as resp: + print(f" ← Got response {resp.status} from {peer.name}") + if resp.status != 200: + print(f" ✗ Peer {peer.name} returned status {resp.status}") + peer.metrics.record_failure(f"HTTP {resp.status}") + return None - print(f" → Sending request to {url}") - async with session.post(url, json=payload) as resp: - print(f" ← Got response {resp.status} from {peer.name}") - if resp.status != 200: - print(f" ✗ Peer {peer.name} returned status {resp.status}") - return None + data = await resp.json() + latency_ms = (time.time() - request_start) * 1000 + print(f" ✓ Peer {peer.name} responded successfully ({latency_ms:.0f}ms)") + + # Record success metrics + peer.metrics.record_success(latency_ms) - data = await resp.json() - print(f" ✓ Peer {peer.name} responded successfully") - - return PeerVote( - peer_name=peer.name, - response_text=data.get("response", ""), - confidence=data.get("confidence", 0.5), - latency_ms=data.get("latency_ms", 0), - worker_count=data.get("worker_count", 0) - ) + return PeerVote( + peer_name=peer.name, + response_text=data.get("response", ""), + confidence=data.get("confidence", 0.5), + latency_ms=data.get("latency_ms", latency_ms), + worker_count=data.get("worker_count", 0) + ) except asyncio.TimeoutError: - print(f" ⚠️ Peer {peer.name} timed out (>{self.timeout}s)") + error_msg = f"Timeout ({timeout}s)" + print(f" ⚠️ Peer {peer.name} {error_msg}") + peer.metrics.record_failure(error_msg) return None except Exception as e: - print(f" ⚠️ Error contacting peer {peer.name}: {e}") + error_msg = str(e) + print(f" ⚠️ Error contacting peer {peer.name}: {error_msg}") + peer.metrics.record_failure(error_msg) return None async def health_check(self, peer: PeerInfo) -> bool: @@ -172,6 +187,8 @@ class FederatedSwarm: ) -> FederationResult: """ Generate with federation across peer swarms. + + Optimized: Runs local and peer generation in parallel for maximum speed. Args: prompt: Input prompt @@ -182,53 +199,68 @@ class FederatedSwarm: Returns: FederationResult with final response """ - # Phase 1: Local generation and consensus - print(f" 🏠 Local swarm generating...") - local_result = await self.local_swarm.generate( + peers = self.discovery.get_peers() + + if len(peers) == 0: + if min_peers > 0: + raise RuntimeError(f"Federation requires {min_peers} peers, but none found") + + # Solo mode - just run local generation + print(f" 🏠 Solo mode - local swarm generating...") + local_result = await self.local_swarm.generate( + prompt=prompt, + max_tokens=max_tokens, + temperature=temperature, + use_consensus=True + ) + return FederationResult( + final_response=local_result.selected_response.text, + local_confidence=local_result.confidence, + peer_votes=[], + strategy="solo" + ) + + # Parallel generation: Local swarm AND peers generate simultaneously + print(f" 🏠 Local swarm AND {len(peers)} peer(s) generating in parallel...") + + # Start local generation + local_task = self.local_swarm.generate( prompt=prompt, max_tokens=max_tokens, temperature=temperature, use_consensus=True ) - - local_best = local_result.selected_response - local_confidence = local_result.confidence - - print(f" ✓ Local best (confidence: {local_confidence:.2f})") - - # Phase 2: Collect peer votes - peers = self.discovery.get_peers() - - if len(peers) == 0: - if min_peers > 0: - raise RuntimeError(f"Federation requires {min_peers} peers, but none found") - - # Solo mode - just return local result - return FederationResult( - final_response=local_best.text, - local_confidence=local_confidence, - peer_votes=[], - strategy="solo" - ) - - print(f" 🌐 Requesting votes from {len(peers)} peer(s)...") - for peer in peers: - print(f" → Contacting {peer.name} at {peer.api_url}") - - peer_votes = [] + + # Start peer requests vote_tasks = [ self.federation_client.request_vote(peer, prompt, max_tokens, temperature) for peer in peers ] - - results = await asyncio.gather(*vote_tasks, return_exceptions=True) - - for peer, result in zip(peers, results): + + # Run everything in parallel + all_tasks = [local_task] + vote_tasks + results = await asyncio.gather(*all_tasks, return_exceptions=True) + + # Separate local result from peer votes + local_result_raw = results[0] + if isinstance(local_result_raw, Exception): + print(f" ✗ Local swarm failed: {local_result_raw}") + raise RuntimeError(f"Local generation failed: {local_result_raw}") + + from swarm.manager import ConsensusResult + local_result: ConsensusResult = local_result_raw # Now guaranteed not to be an exception + local_best = local_result.selected_response + local_confidence = local_result.confidence + print(f" ✓ Local completed (confidence: {local_confidence:.2f})") + + # Collect peer votes + peer_votes = [] + for peer, result in zip(peers, results[1:]): if isinstance(result, Exception): print(f" ✗ Peer {peer.name} failed: {result}") elif result is not None: peer_votes.append(result) - print(f" ✓ Peer {peer.name} voted (confidence: {result.confidence:.2f})") + print(f" ✓ Peer {peer.name} completed (confidence: {result.confidence:.2f})") if len(peer_votes) == 0: # No peers responded, use local result @@ -240,16 +272,16 @@ class FederatedSwarm: strategy="local_fallback" ) - # Phase 3: Global consensus + # Global consensus print(f" 🗳️ Running global consensus ({len(peer_votes) + 1} votes)...") - - final_response = self._weighted_vote(local_best.text, local_confidence, peer_votes) + final_response, winner = self._weighted_vote(local_best.text, local_confidence, peer_votes) return FederationResult( final_response=final_response, local_confidence=local_confidence, peer_votes=peer_votes, - strategy=self.consensus_strategy + strategy=self.consensus_strategy, + winner=winner ) def _weighted_vote( @@ -257,11 +289,14 @@ class FederatedSwarm: local_response: str, local_confidence: float, peer_votes: List[PeerVote] - ) -> str: + ) -> Tuple[str, str]: """ Select best response using weighted voting. Weights by confidence score. Higher confidence = more weight. + + Returns: + Tuple of (selected_response, winner_name) """ # Collect all votes with their weights all_votes = [(local_response, local_confidence, "local")] @@ -292,15 +327,15 @@ class FederatedSwarm: best_idx = max(range(len(scores)), key=lambda i: scores[i]) best = all_votes[best_idx] print(f" ✓ Selected response from {best[2]} (quality score: {scores[best_idx]:.2f})") - return best[0] + return best[0], best[2] # Default: weighted selection - pick highest confidence best = max(all_votes, key=lambda x: x[1]) print(f" ✓ Selected response from {best[2]} (confidence: {best[1]:.2f})") - return best[0] + return best[0], best[2] async def get_federation_status(self) -> Dict[str, Any]: - """Get current federation status.""" + """Get current federation status with peer metrics.""" peers = self.discovery.get_peers() # Check health of all peers @@ -308,7 +343,24 @@ class FederatedSwarm: health_results = await asyncio.gather(*health_checks, return_exceptions=True) healthy_peers = [] + peer_metrics_info = [] + for peer, healthy in zip(peers, health_results): + peer_info = { + "name": peer.name, + "healthy": healthy is True, + "timeout": peer.timeout_seconds, + "model": peer.model_id, + "instances": peer.instances, + "metrics": { + "success_rate": peer.metrics.success_rate, + "avg_latency_ms": round(peer.metrics.avg_latency_ms, 2), + "total_requests": peer.metrics.total_requests, + "last_error": peer.metrics.last_error, + } + } + peer_metrics_info.append(peer_info) + if healthy is True: healthy_peers.append(peer.name) @@ -317,6 +369,7 @@ class FederatedSwarm: "total_peers": len(peers), "healthy_peers": len(healthy_peers), "peer_names": [p.name for p in peers], + "peer_details": peer_metrics_info, "strategy": self.consensus_strategy } diff --git a/tests/TEST_PLAN_CUDA_ANDROID.md b/tests/TEST_PLAN_CUDA_ANDROID.md new file mode 100644 index 0000000..9b713cf --- /dev/null +++ b/tests/TEST_PLAN_CUDA_ANDROID.md @@ -0,0 +1,63 @@ +## Test Plan for CUDA and Android Support + +### Unit Tests + +#### Test Case 1: NVIDIA GPU Detection +- **Input:** System with NVIDIA GPU and pynvml installed +- **Expected Output:** GPUInfo with correct name, VRAM, and is_nvidia=True +- **Location:** src/hardware/detector.py:detect_nvidia_gpu() + +#### Test Case 2: GPU Layer Configuration for CUDA +- **Input:** HardwareProfile with NVIDIA GPU (4GB VRAM) +- **Expected Output:** n_gpu_layers=-1 (all layers), proper CUDA configuration +- **Location:** src/backends/__init__.py:create_backend() + +#### Test Case 3: Android Platform Detection +- **Input:** platform.system() returns 'Linux', Termux environment detected +- **Expected Output:** is_android=True, proper Android path handling +- **Location:** src/hardware/detector.py:detect_android() + +#### Test Case 4: PeerInfo with Timeout +- **Input:** PeerInfo with custom timeout +- **Expected Output:** FederationClient respects peer timeout +- **Location:** src/network/discovery.py:PeerInfo + +### Integration Tests + +#### End-to-End Flow 1: CUDA Backend Creation +1. Detect hardware with NVIDIA GPU +2. Create backend via factory +3. Verify n_gpu_layers=-1 set +4. Load test model +5. Expected: Successful GPU offload + +#### End-to-End Flow 2: Android Device Join Federation +1. Start discovery on Android (Termux) +2. Advertise Android hardware +3. Join federation from macOS peer +4. Send vote request +5. Expected: Android responds successfully + +#### End-to-End Flow 3: Federation with Per-Peer Timeout +1. Add peer with 30s timeout +2. Add peer with 60s timeout +3. Request votes from both +4. Expected: Each peer uses its own timeout + +### Manual Verification + +#### Command to Run: +```bash +python -m pytest tests/ -v -k "cuda or android or federation" +``` + +#### Expected Output: +- All tests pass +- No ImportError for pynvml +- GPU layer detection works on CUDA machines +- Android detection passes on Termux + +#### Platform Testing: +1. **macOS (Apple Silicon):** MLX backend loads +2. **Linux (NVIDIA):** CUDA backend auto-detects +3. **Android (Termux):** CPU-only mode, proper paths diff --git a/tests/test_federation_metrics.py b/tests/test_federation_metrics.py new file mode 100644 index 0000000..0b01de3 --- /dev/null +++ b/tests/test_federation_metrics.py @@ -0,0 +1,166 @@ +"""Tests for federation metrics and peer timeout.""" + +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + +import pytest +from datetime import datetime +from network.discovery import PeerInfo, PeerMetrics +from network.federation import FederationClient, PeerVote + + +class TestPeerMetrics: + """Test peer metrics tracking.""" + + def test_peer_metrics_defaults(self): + """Test default metric values.""" + metrics = PeerMetrics() + assert metrics.total_requests == 0 + assert metrics.successful_requests == 0 + assert metrics.failed_requests == 0 + assert metrics.success_rate == 1.0 # No requests = 100% success + + def test_record_success(self): + """Test recording successful requests.""" + metrics = PeerMetrics() + metrics.record_success(100.0) + + assert metrics.total_requests == 1 + assert metrics.successful_requests == 1 + assert metrics.failed_requests == 0 + assert metrics.success_rate == 1.0 + assert metrics.avg_latency_ms == 100.0 + + # Record another success + metrics.record_success(200.0) + assert metrics.total_requests == 2 + assert metrics.avg_latency_ms == 150.0 # (100 + 200) / 2 + + def test_record_failure(self): + """Test recording failed requests.""" + metrics = PeerMetrics() + metrics.record_failure("Connection timeout") + + assert metrics.total_requests == 1 + assert metrics.successful_requests == 0 + assert metrics.failed_requests == 1 + assert metrics.success_rate == 0.0 + assert metrics.last_error == "Connection timeout" + assert metrics.last_error_time is not None + + def test_mixed_success_and_failure(self): + """Test mixed success and failure recording.""" + metrics = PeerMetrics() + metrics.record_success(100.0) + metrics.record_failure("Error") + metrics.record_success(150.0) + + assert metrics.total_requests == 3 + assert metrics.successful_requests == 2 + assert metrics.failed_requests == 1 + assert metrics.success_rate == 2/3 + + +class TestPeerInfo: + """Test PeerInfo with metrics and timeout.""" + + def test_peer_info_defaults(self): + """Test PeerInfo default values.""" + peer = PeerInfo( + host="192.168.1.100", + port=17615, + name="test-peer", + version="0.1.0", + instances=2, + model_id="qwen:7b:q4", + hardware_summary="Apple M1 Pro", + last_seen=datetime.now() + ) + + assert peer.timeout_seconds == 60.0 # Default timeout + assert peer.metrics is not None + assert isinstance(peer.metrics, PeerMetrics) + assert peer.api_url == "http://192.168.1.100:17615" + + def test_peer_info_custom_timeout(self): + """Test PeerInfo with custom timeout.""" + peer = PeerInfo( + host="192.168.1.100", + port=17615, + name="slow-peer", + version="0.1.0", + instances=1, + model_id="test-model", + hardware_summary="CPU only", + last_seen=datetime.now(), + timeout_seconds=120.0 # Custom timeout + ) + + assert peer.timeout_seconds == 120.0 + + +class TestFederationClient: + """Test FederationClient with peer-specific timeouts.""" + + @pytest.fixture + def client(self): + return FederationClient(timeout=60.0) + + @pytest.fixture + def fast_peer(self): + return PeerInfo( + host="192.168.1.10", + port=17615, + name="fast-peer", + version="0.1.0", + instances=2, + model_id="qwen:7b:q4", + hardware_summary="Apple M1 Max", + last_seen=datetime.now(), + timeout_seconds=30.0 # Fast peer with short timeout + ) + + @pytest.fixture + def slow_peer(self): + return PeerInfo( + host="192.168.1.11", + port=17615, + name="slow-peer", + version="0.1.0", + instances=1, + model_id="qwen:7b:q4", + hardware_summary="CPU only", + last_seen=datetime.now(), + timeout_seconds=90.0 # Slow peer with longer timeout + ) + + def test_peer_timeout_override(self, client, fast_peer, slow_peer): + """Test that peer-specific timeout overrides default.""" + # The client should use the peer's timeout, not the default + assert fast_peer.timeout_seconds == 30.0 + assert slow_peer.timeout_seconds == 90.0 + assert client.timeout == 60.0 # Default unchanged + + def test_metrics_updated_on_success(self, fast_peer): + """Test that metrics are updated on successful request.""" + assert fast_peer.metrics.total_requests == 0 + + # Simulate recording a success (this would happen in request_vote) + fast_peer.metrics.record_success(150.0) + + assert fast_peer.metrics.total_requests == 1 + assert fast_peer.metrics.successful_requests == 1 + assert fast_peer.metrics.success_rate == 1.0 + + def test_metrics_updated_on_failure(self, slow_peer): + """Test that metrics are updated on failed request.""" + assert slow_peer.metrics.total_requests == 0 + + # Simulate recording a failure + slow_peer.metrics.record_failure("Connection refused") + + assert slow_peer.metrics.total_requests == 1 + assert slow_peer.metrics.failed_requests == 1 + assert slow_peer.metrics.success_rate == 0.0 + assert slow_peer.metrics.last_error == "Connection refused" diff --git a/tests/test_hardware_detector.py b/tests/test_hardware_detector.py new file mode 100644 index 0000000..09280cd --- /dev/null +++ b/tests/test_hardware_detector.py @@ -0,0 +1,176 @@ +"""Tests for hardware detection and GPU layer configuration.""" + +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + +import pytest +from unittest.mock import Mock, patch, MagicMock +from hardware.detector import ( + GPUInfo, HardwareProfile, detect_nvidia_gpu, + calculate_gpu_layers, validate_gpu_layers, is_android +) + + +class TestNvidiaGPU: + """Test NVIDIA GPU detection.""" + + def test_detect_nvidia_gpu_success(self): + """Test successful NVIDIA GPU detection.""" + # Mock the entire import system + mock_pynvml = Mock() + mock_pynvml.nvmlInit = Mock() + mock_pynvml.nvmlShutdown = Mock() + mock_pynvml.nvmlDeviceGetCount = Mock(return_value=1) + + # Mock device handle and info + mock_handle = Mock() + mock_pynvml.nvmlDeviceGetHandleByIndex = Mock(return_value=mock_handle) + mock_pynvml.nvmlDeviceGetName = Mock(return_value="NVIDIA GeForce RTX 3080") + + # Mock memory info + mock_mem = Mock() + mock_mem.total = 10737418240 # 10 GB + mock_pynvml.nvmlDeviceGetMemoryInfo = Mock(return_value=mock_mem) + + # Mock driver version + mock_pynvml.nvmlSystemGetDriverVersion = Mock(return_value="535.104.05") + + # Mock compute capability + mock_pynvml.nvmlDeviceGetCudaComputeCapability = Mock(return_value=(8, 6)) + + # Patch __import__ to return our mock + def mock_import(name, *args, **kwargs): + if name == 'pynvml': + return mock_pynvml + return __builtins__.__import__(name, *args, **kwargs) + + with patch('builtins.__import__', side_effect=mock_import): + gpu = detect_nvidia_gpu() + + assert gpu is not None + assert gpu.name == "NVIDIA GeForce RTX 3080" + assert gpu.vram_gb == 10.0 + assert gpu.driver_version == "535.104.05" + assert gpu.is_nvidia is True + assert gpu.compute_capability == "8.6" + assert gpu.device_count == 1 + + def test_detect_nvidia_gpu_no_gpu(self): + """Test detection when no NVIDIA GPU present.""" + mock_pynvml = Mock() + mock_pynvml.nvmlInit = Mock() + mock_pynvml.nvmlShutdown = Mock() + mock_pynvml.nvmlDeviceGetCount = Mock(return_value=0) + + def mock_import(name, *args, **kwargs): + if name == 'pynvml': + return mock_pynvml + return __builtins__.__import__(name, *args, **kwargs) + + with patch('builtins.__import__', side_effect=mock_import): + gpu = detect_nvidia_gpu() + + assert gpu is None + + def test_detect_nvidia_gpu_import_error(self): + """Test detection when pynvml is not installed.""" + def mock_import(name, *args, **kwargs): + if name == 'pynvml': + raise ImportError("No module named 'pynvml'") + return __builtins__.__import__(name, *args, **kwargs) + + with patch('builtins.__import__', side_effect=mock_import): + gpu = detect_nvidia_gpu() + + assert gpu is None + + +class TestGPULayerCalculation: + """Test GPU layer auto-configuration.""" + + def test_calculate_gpu_layers_apple_silicon(self): + """Test layer calculation for Apple Silicon.""" + gpu = GPUInfo( + name="Apple Silicon GPU", + vram_gb=32.0, + is_apple_silicon=True + ) + assert calculate_gpu_layers(gpu) == -1 + + def test_calculate_gpu_layers_nvidia(self): + """Test layer calculation for NVIDIA GPU.""" + gpu = GPUInfo( + name="NVIDIA GeForce RTX 3080", + vram_gb=10.0, + is_nvidia=True, + compute_capability="8.6" + ) + assert calculate_gpu_layers(gpu) == -1 + + def test_calculate_gpu_layers_old_nvidia(self): + """Test layer calculation for old NVIDIA GPU.""" + gpu = GPUInfo( + name="NVIDIA GeForce GTX 680", + vram_gb=2.0, + is_nvidia=True, + compute_capability="3.0" + ) + assert calculate_gpu_layers(gpu) == 0 # Too old + + def test_calculate_gpu_layers_no_gpu(self): + """Test layer calculation with no GPU.""" + assert calculate_gpu_layers(None) == 0 + + def test_validate_gpu_layers_success(self): + """Test successful layer validation.""" + gpu = GPUInfo( + name="NVIDIA GeForce RTX 3080", + vram_gb=10.0, + is_nvidia=True, + compute_capability="8.6" + ) + assert validate_gpu_layers(-1, gpu) == -1 + + def test_validate_gpu_layers_no_gpu_error(self): + """Test validation error when GPU requested but none available.""" + with pytest.raises(ValueError, match="no GPU detected"): + validate_gpu_layers(-1, None) + + def test_validate_gpu_layers_old_gpu_error(self): + """Test validation error for unsupported GPU.""" + gpu = GPUInfo( + name="NVIDIA GeForce GTX 680", + vram_gb=2.0, + is_nvidia=True, + compute_capability="3.0" + ) + with pytest.raises(ValueError, match="Minimum required is 5.0"): + validate_gpu_layers(-1, gpu) + + +class TestAndroidDetection: + """Test Android platform detection.""" + + @patch.dict('os.environ', {'ANDROID_ROOT': '/system'}, clear=True) + @patch('os.path.exists') + def test_is_android_env_var(self, mock_exists): + """Test Android detection via environment variables.""" + mock_exists.return_value = False + assert is_android() is True + + @patch.dict('os.environ', {}, clear=True) + @patch('os.path.exists') + def test_is_android_paths(self, mock_exists): + """Test Android detection via filesystem paths.""" + def exists_side_effect(path): + return path == "/system/build.prop" + mock_exists.side_effect = exists_side_effect + assert is_android() is True + + @patch.dict('os.environ', {}, clear=True) + @patch('os.path.exists') + def test_is_not_android(self, mock_exists): + """Test non-Android system.""" + mock_exists.return_value = False + assert is_android() is False