Files
local_swarm/main.py
T
sleepy 47f6c8e7d9 Add local network IP binding for federation support
- Add get_local_ip() function to detect local network IP (192.x.x.x or 100.x.x.x)
- Bind server to specific local IP instead of 0.0.0.0 for security
- Only expose to local network, not internet
- Fall back to localhost if not on private network

This enables federation between multiple Macs on the same local network
while keeping the server secure from external access.
2026-02-24 04:07:27 +01:00

347 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Local Swarm - Automatically configure and run a swarm of small coding LLMs
NOTE: On macOS with Apple Silicon, we use multiprocessing with spawn method
to safely handle multiple MLX models. This prevents GPU conflicts.
"""
import sys
import multiprocessing as mp
# CRITICAL: Set spawn method BEFORE any other imports on macOS
# This prevents fork-related issues with Metal GPU
if sys.platform == "darwin":
try:
mp.set_start_method("spawn", force=True)
except RuntimeError:
pass # Already set
import argparse
import asyncio
from pathlib import Path
# Add src to path - resolve for Windows compatibility
src_path = Path(__file__).parent.resolve() / "src"
sys.path.insert(0, str(src_path))
# Also add parent dir for Windows import issues
if str(Path(__file__).parent.resolve()) not in sys.path:
sys.path.insert(0, str(Path(__file__).parent.resolve()))
# These imports must come AFTER setting spawn method on macOS
from hardware.detector import detect_hardware
from models.selector import select_optimal_model
from models.downloader import download_model_for_config
from swarm import SwarmManager
from api import create_server
from mcp_server import create_mcp_server
from interactive import (
interactive_model_selection,
show_startup_summary,
show_runtime_menu,
custom_configuration,
)
async def setup_swarm(model_config, hardware):
"""Download model and initialize swarm."""
# Download model
print("\n⬇️ Downloading model...")
try:
model_path = download_model_for_config(model_config)
print(f"✓ Model ready at: {model_path}")
except Exception as e:
print(f"\n❌ Error downloading model: {e}", file=sys.stderr)
return None
# Initialize swarm
print("\n🚀 Initializing swarm...")
try:
swarm = SwarmManager(
model_config=model_config,
hardware=hardware,
consensus_strategy="similarity"
)
success = await swarm.initialize(str(model_path))
if not success:
print("❌ Failed to initialize swarm")
return None
return swarm
except Exception as e:
print(f"\n❌ Error initializing swarm: {e}", file=sys.stderr)
return None
def get_local_ip():
"""Get the local network IP address (192.x.x.x or 100.x.x.x)."""
import socket
try:
# Create a socket and connect to a public DNS server
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(2)
# Try to connect to Google's DNS - this doesn't actually send data
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
# Check if it's a private IP
if ip.startswith(('192.', '100.')):
return ip
else:
# If not private, return localhost for safety
return "127.0.0.1"
except Exception:
return "127.0.0.1"
def main():
parser = argparse.ArgumentParser(
description="Local Swarm - AI-powered coding LLM swarm",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python main.py # Interactive setup and start
python main.py --auto # Auto-detect and start without menu
python main.py --detect # Show hardware detection only
python main.py --model qwen:3b:q4 # Use specific model (skip menu)
python main.py --port 17615 # Use custom port (default: 17615)
python main.py --instances 4 # Force number of instances
python main.py --download-only # Download model only
python main.py --test # Test with sample prompt
python main.py --mcp # Enable MCP server
"""
)
parser.add_argument(
"--auto",
action="store_true",
help="Auto-detect best configuration without interactive menu"
)
parser.add_argument(
"--detect",
action="store_true",
help="Show hardware detection and exit"
)
parser.add_argument(
"--model",
type=str,
help="Model to use (format: name:size:quant, e.g., qwen:3b:q4)"
)
parser.add_argument(
"--port",
type=int,
default=17615,
help="Port to run the API server on (default: 17615)"
)
parser.add_argument(
"--instances",
type=int,
help="Force number of instances (overrides auto-calculation)"
)
parser.add_argument(
"--download-only",
action="store_true",
help="Download models only, don't start server"
)
parser.add_argument(
"--test",
action="store_true",
help="Test with a sample prompt"
)
parser.add_argument(
"--mcp",
action="store_true",
help="Enable MCP server alongside HTTP API"
)
parser.add_argument(
"--config",
type=str,
default="config.yaml",
help="Path to config file"
)
parser.add_argument(
"--version",
action="version",
version="%(prog)s 0.1.0"
)
args = parser.parse_args()
# Detect hardware first
print("\n🔍 Detecting hardware...")
try:
hardware = detect_hardware()
except Exception as e:
print(f"\n❌ Error detecting hardware: {e}", file=sys.stderr)
sys.exit(1)
if args.detect:
# Just show hardware info
from interactive import print_hardware_info
print_hardware_info(hardware)
print("\n✅ Detection complete")
return
# Determine model configuration
config = None
if args.model or args.instances or args.auto:
# Use command-line arguments or auto-detect
print("\n📊 Calculating optimal configuration...")
try:
config = select_optimal_model(
hardware,
preferred_model=args.model,
force_instances=args.instances
)
if not config:
print("\n❌ No suitable model found for your hardware")
print(" Minimum requirement: 2 GB available memory")
sys.exit(1)
# Show brief summary
print(f"\n✓ Selected: {config.display_name}")
print(f" Instances: {config.instances}")
print(f" Memory: {config.total_memory_gb:.1f} GB")
except Exception as e:
print(f"\n❌ Error selecting model: {e}", file=sys.stderr)
sys.exit(1)
else:
# Interactive mode - show menu
config = interactive_model_selection(hardware)
if not config:
print("\n❌ No configuration selected")
sys.exit(1)
if args.download_only:
# Download model only
print("\n" + "=" * 70)
print("⬇️ Download Mode: Downloading model only")
print("=" * 70)
try:
model_path = download_model_for_config(config)
print(f"✓ Model downloaded to: {model_path}")
print("\n" + "=" * 70)
print("✅ Download complete")
print("=" * 70)
except Exception as e:
print(f"\n❌ Download failed: {e}", file=sys.stderr)
sys.exit(1)
elif args.test:
# Test mode with sample prompt
print("\n" + "=" * 70)
print("🧪 Test Mode: Running sample inference")
print("=" * 70)
async def test_inference():
show_startup_summary(hardware, config)
swarm = await setup_swarm(config, hardware)
if not swarm:
return False
try:
# Test prompt
prompt = "Write a Python function to calculate factorial:"
print(f"\nPrompt: {prompt}\n")
print("Generating responses...\n")
result = await swarm.generate(prompt, max_tokens=200)
print("\n" + "=" * 70)
print("SELECTED RESPONSE:")
print("=" * 70)
print(result.selected_response.text)
print("\n" + "=" * 70)
print(f"Strategy: {result.strategy}")
print(f"Confidence: {result.confidence:.2f}")
print(f"Latency: {result.selected_response.latency_ms:.1f}ms")
print(f"Tokens/sec: {result.selected_response.tokens_per_second:.1f}")
# Show all responses
print("\nAll responses received:")
for i, resp in enumerate(result.all_responses):
preview = resp.text[:60].replace('\n', ' ')
print(f" Worker {i}: {preview}... ({resp.latency_ms:.1f}ms)")
return True
finally:
await swarm.shutdown()
success = asyncio.run(test_inference())
if success:
print("\n" + "=" * 70)
print("✅ Test complete")
print("=" * 70)
else:
print("\n❌ Test failed")
sys.exit(1)
else:
# Full mode (download + start API server + optional MCP)
show_startup_summary(hardware, config)
async def run_server():
swarm = await setup_swarm(config, hardware)
if not swarm:
return False
# Update summary with runtime info
show_startup_summary(hardware, config, swarm)
mcp_server = None
try:
# Create and start API server
print("\n🌐 Starting HTTP API server...")
# Use local network IP instead of 0.0.0.0 for security
host = get_local_ip()
print(f"🔗 Binding to {host}:{args.port}")
server = create_server(swarm, host=host, port=args.port)
print(f"\n✅ Local Swarm is running!")
print(f" API: http://127.0.0.1:{args.port}/v1")
print(f" Health: http://127.0.0.1:{args.port}/health")
if args.mcp:
# Start MCP server alongside HTTP API
print("\n🤖 Starting MCP server...")
mcp_server = await create_mcp_server(swarm)
print(" MCP server active (stdio)")
print(f"\n💡 Configure opencode to use:")
print(f' base_url: http://127.0.0.1:{args.port}/v1')
print(f' api_key: any (not used)')
print(f"\nPress Ctrl+C to stop...\n")
# Start HTTP server (this will block)
await server.start()
except KeyboardInterrupt:
print("\n\nReceived stop signal")
finally:
await swarm.shutdown()
return True
try:
success = asyncio.run(run_server())
if success:
print("\n" + "=" * 70)
print("✅ Server stopped gracefully")
print("=" * 70)
except Exception as e:
print(f"\n❌ Error running server: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()