Files
local_swarm/main.py
sleepy 580d1e5d17 feat: comprehensive tool system improvements and webfetch support (#3)
* feat: enhanced tool instructions for multi-step operations

- Add comprehensive examples for ls, find, grep, mkdir, npm init, etc.
- Explain multi-step workflow (explore → read → write)
- Tool system already supports chaining via conversation history
- Bash tool supports: ls, find, grep, cat, mkdir, cd, npm, etc.
- 30 second timeout on commands
- Output limited to 3000 chars for readability

* Cleanup: Consolidate documentation and tidy codebase

Documentation:
- Consolidate 6 markdown files into simplified README.md
- Remove redundant docs: TODO.md, NETWORK.md, REVIEW.md, PLAN.md, CONTEXT.md, GUIDE.md
- Add ARCHITECTURE.md with clean technical overview
- README now focuses on quick start and core concepts

Code verification:
- Verified blocking I/O properly wrapped in asyncio.to_thread()
- Confirmed locks initialized correctly in backends
- AMD VRAM detection uses proper regex (takes max value, not first match)
- All exception handling uses 'except Exception:' (not bare except)

Tool execution improvements (existing changes):
- Better working directory handling with project root detection
- Extended timeouts for package managers (300s)
- Multi-tool call parsing support
- Improved error handling and logging

Note: System prompt concern noted - 30k tokens too large for 16-32k context windows

* docs: add development patterns analysis

Document circular development issues identified in commit history:
- Tool execution went back-and-forth 3+ times (server-side vs client-side)
- Tool instructions changed from 40k → 300 → removed → enhanced tokens
- 8+ parsing fixes for same issues (no tests)
- 6 debug-only commits (production debugging)

Provides recommendations to prevent future cycles:
1. Pick one architecture and stick with it
2. Add unit tests before fixes
3. Token budget (<2000 for instructions)
4. One format only (remove alternative parsers)
5. Integration test script
6. Separate concerns into smaller modules
7. Design doc before code changes
8. CI/CD with automated testing

* docs: add comprehensive agent guidelines

AGENT_WORKER.md (600+ lines):
- Pre-flight checklist: token budget, test plan, design doc
- Coding rules: TDD, no debug code, architecture consistency
- Git workflow: branching strategy, commit rules, release process
- Testing requirements: unit (≥80%), integration structure
- Code quality: PEP 8, type hints, max 50 lines per function
- Architecture: no feature flags, separation of concerns
- Continuous learning: research requirements, documentation
- Forbidden patterns: bare except, production debugging, etc.

AGENT_REVIEW.md (400+ lines):
- Review philosophy: prevent circular development
- 6-phase review checklist: structure, quality, tokens, architecture, research, logic
- Report format with token impact analysis
- Severity levels: blocking vs warnings vs approved
- Common issues with examples (good vs bad)
- Review workflow: 30-35 min per PR
- Reports stored in reports/ folder (gitignored)

Also added:
- tests/test_tool_parsing.py - example test following guidelines
- Updated DEVELOPMENT_PATTERNS.md with recommendations

Reports folder in .gitignore for local review storage

* chore: gitignore review reports folder

* feat: fix tool execution and enhance instructions with accurate token counting

- Enhanced tool instructions (1041 tokens, within 2000 budget)
- Added tiktoken>=0.5.0 for accurate token counting
- Fixed subprocess hang by adding stdin=subprocess.DEVNULL
- Removed 9 DEBUG print statements from routes.py
- Added tests for instruction content and token budget verification
- All tests pass (11/11)

Resolves blockers from previous review:
- Token budget verified ✓
- Token documentation added ✓
- Debug code cleaned ✓
- Missing tests added ✓

* feat: implement comprehensive tool system with proper logging

Major improvements to tool instructions and execution:
- Enhanced tool instructions with 7-step task completion workflow
- Added markdown code block fallback parser for tool calls
- Fixed subprocess hang with stdin=subprocess.DEVNULL
- Fixed streaming path to return tool_calls (enabling multi-turn conversations)
- Added complete React project creation example with verification steps
- Token count: 1,743 tokens (within 2,000 limit)

Logging infrastructure:
- Created centralized logging configuration (src/utils/logging_config.py)
- Replaced 80+ print statements with logger.debug()
- Set log level to DEBUG for development
- All modules now use proper logging instead of print

Testing:
- Added 4 new tests for markdown parsing and instruction content
- All 13 tests passing
- Token budget verification test

Documentation:
- Added comprehensive design docs for all major changes
- Added test plans for verification
- Created helper scripts for logging migration

Files changed:
- main.py: Added logging setup
- src/api/routes.py: Tool instructions, streaming fixes, logging
- src/tools/executor.py: subprocess fix, logging
- src/utils/: New logging configuration module
- tests/test_tool_parsing.py: New tests
- docs/: Design decisions and test plans
- scripts/: Helper scripts for development

* refactor: simplify tool instructions to 109 tokens for 7B model

Reduced from 1,743 tokens to 109 tokens (94% reduction) to help
qwen2.5 7B 4bit model follow instructions better.

Changes:
- Removed complex workflow documentation
- Removed multi-turn conversation examples
- Removed lengthy anti-patterns
- Kept only essential format and rules
- Updated tests to match simplified content

Before: 1,743 tokens, 6,004 chars (87% of budget)
After: 109 tokens, 392 chars (5.5% of budget)

This should make it much easier for smaller models to:
1. Understand they must use tools
2. Follow the simple TOOL: format
3. Not get overwhelmed by instructions

* refactor: make tool instructions ultra-direct for 7B models

Further simplify instructions to prevent model from adding explanations.

Before: 109 tokens - model still added explanatory text
After: 86 tokens - ultra-direct commands

Key changes:
- Start with 'You MUST use tools. DO NOT explain.'
- 'OUTPUT THIS EXACT FORMAT - NOTHING ELSE'
- Removed all examples and pleasantries
- Added 'NEVER' rules in all caps
- 'ONLY output TOOL: lines'

The model was outputting:
'1. First, install... TOOL: bash ARGUMENTS: {...}'

Now should output just:
'TOOL: bash
ARGUMENTS: {...}'

This should force the 7B qwen model to stop explaining and just execute.

* refactor: move tool instructions to external config file

Moves hardcoded tool instructions from routes.py to external config file
for better maintainability and easier editing.

Changes:
- Created config/prompts/tool_instructions.txt
- Added _load_tool_instructions() function with caching
- Falls back to default if config file not found
- Updated tests to use the loader function
- Added proper error handling

Benefits:
- Easier to modify instructions without code changes
- Instructions can be edited by non-developers
- Cleaner separation of config vs code
- Supports hot-reloading (cached but easy to invalidate)

Token count: 86 tokens (loaded from file)
Location: config/prompts/tool_instructions.txt

* refactor: simplify tool instructions further and add debug logging

- Reduced instructions to bare minimum: 50 tokens
- Added debug logging to verify instructions are sent
- Removed all caps and aggressive language
- Made instructions more straightforward

Instructions now:
'Use tools to execute commands. Output only tool calls.
Format: TOOL: bash ARGUMENTS: {...}
No explanations. No numbered lists. No markdown. Only tool calls.'

This should be easier for 7B models to follow while still
conveying the essential requirements.

* feat: improve tool parser to handle 7B model output variations

Enhanced parse_tool_calls() with multiple fallback strategies:

1. Standard TOOL:/ARGUMENTS: format (original)
2. Markdown code blocks ()
3. Numbered list items (1. npm install ...)
4. Standalone bash commands (npm, npx, mkdir, etc.)

Now handles messy output from small models like:
'1. Install: npm install -g create-react-app'
'2. Create: create-react-app hello-world'

Parses these into chained bash commands for execution.

Also simplified instructions to 50 tokens minimum:
'Use tools to execute commands. Output only tool calls.
Format: TOOL: bash ARGUMENTS: {...}
No explanations. No numbered lists. No markdown. Only tool calls.'

This combination should make 7B models much more likely to
have their output successfully parsed and executed.

* fix: improve command extraction for 7B model output

Parser now extracts bash commands from any line containing:
- npm, npx, mkdir, cd, ls, cat, echo, git, python, pip, node, yarn
- create-react-app (added for React projects)

Example: Extracts 'npm install -g create-react-app' from:
'1. Install: npm install -g create-react-app'

Chains multiple commands with && for sequential execution.

This should now successfully parse the numbered list output
from 7B models and execute the commands.

* feat: add bash tool description validation and improve 7B model parsing

Changes:
- Added _ensure_tool_arguments() function to inject 'description' field
- Updated tool_instructions.txt to require description for bash tool
- Improved 7B model command extraction with better regex patterns
- Added 'create-react-app' to command detection list
- Updated delta field type to Dict[str, Any] for streaming
- Added GGUF to MLX quantization mapping for registry.py
- Clarified agent responsibilities in AGENT_REVIEW.md and AGENT_WORKER.md

Fixes:
- Bash tool now validates required 'description' field
- 7B model output parsed more reliably (numbered lists)
- Multiple commands chained with && for sequential execution

Token count: 69 tokens (down from 86, -19.8%)

All tests pass: 13/13

* feat: add webfetch tool support with URL extraction

Changes:
- Added webfetch to tool instructions config
- Added URL extraction pattern to parse_tool_calls()
- Parser now recognizes URLs and creates webfetch tool calls
- Updated token count: 89 tokens (+29% from 69)

The webfetch tool is available through opencode environment.
System prompt adjustment enables model to use it for URL fetching.

Token budget: 89 tokens (4.45% of 2000 limit)
Tests pass: 13/13
2026-02-24 22:35:05 +01:00

545 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Local Swarm - Automatically configure and run a swarm of small coding LLMs
NOTE: On macOS with Apple Silicon, we use multiprocessing with spawn method
to safely handle multiple MLX models. This prevents GPU conflicts.
"""
import sys
import multiprocessing as mp
# CRITICAL: Set spawn method BEFORE any other imports on macOS
# This prevents fork-related issues with Metal GPU
if sys.platform == "darwin":
try:
mp.set_start_method("spawn", force=True)
except RuntimeError:
pass # Already set
import argparse
import asyncio
from pathlib import Path
# Add src to path - resolve for Windows compatibility
src_path = Path(__file__).parent.resolve() / "src"
sys.path.insert(0, str(src_path))
# Also add parent dir for Windows import issues
if str(Path(__file__).parent.resolve()) not in sys.path:
sys.path.insert(0, str(Path(__file__).parent.resolve()))
# These imports must come AFTER setting spawn method on macOS
from hardware.detector import detect_hardware
from models.selector import select_optimal_model
from models.downloader import download_model_for_config
from swarm import SwarmManager
from api import create_server
from api.routes import set_federated_swarm
from mcp_server import create_mcp_server
from interactive import (
interactive_model_selection,
show_startup_summary,
show_runtime_menu,
custom_configuration,
)
from network import create_discovery_service, FederatedSwarm
from tools.executor import ToolExecutor, set_tool_executor
from utils.logging_config import setup_logging
# Set up logging (DEBUG level for development)
setup_logging()
async def setup_swarm(model_config, hardware):
"""Download model and initialize swarm."""
# Download model
print("\n⬇️ Downloading model...")
try:
model_path = download_model_for_config(model_config)
print(f"✓ Model ready at: {model_path}")
except Exception as e:
print(f"\n❌ Error downloading model: {e}", file=sys.stderr)
return None
# Initialize swarm
print("\n🚀 Initializing swarm...")
try:
swarm = SwarmManager(
model_config=model_config,
hardware=hardware,
consensus_strategy="similarity"
)
success = await swarm.initialize(str(model_path))
if not success:
print("❌ Failed to initialize swarm")
return None
return swarm
except Exception as e:
print(f"\n❌ Error initializing swarm: {e}", file=sys.stderr)
return None
def get_local_ip():
"""Get the local network IP address (private networks only)."""
import socket
try:
# Create a socket and connect to a public DNS server
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(2)
# Try to connect to Google's DNS - this doesn't actually send data
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
# Check if it's a private IP (only 192.168.x.x for this network)
is_private = (
ip.startswith('192.168.')
)
if is_private:
print(f" 📡 Detected local IP: {ip}")
return ip
else:
# If not private, return localhost for safety
print(f" ⚠️ IP {ip} is not a private network, binding to localhost")
return "127.0.0.1"
except Exception as e:
print(f" ⚠️ Could not detect local IP: {e}, using localhost")
return "127.0.0.1"
def main():
parser = argparse.ArgumentParser(
description="Local Swarm - AI-powered coding LLM swarm",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python main.py # Interactive setup and start
python main.py --auto # Auto-detect and start without menu
python main.py --detect # Show hardware detection only
python main.py --model qwen:3b:q4 # Use specific model (skip menu)
python main.py --port 17615 # Use custom port (default: 17615)
python main.py --host 192.168.1.5 # Bind to specific IP
python main.py --instances 4 # Force number of instances
python main.py --download-only # Download model only
python main.py --test # Test with sample prompt
python main.py --mcp # Enable MCP server
python main.py --federation # Enable federation with other instances
python main.py --federation --peer 192.168.1.10:17615 # Manual peer
"""
)
parser.add_argument(
"--auto",
action="store_true",
help="Auto-detect best configuration without interactive menu"
)
parser.add_argument(
"--detect",
action="store_true",
help="Show hardware detection and exit"
)
parser.add_argument(
"--model",
type=str,
help="Model to use (format: name:size:quant, e.g., qwen:3b:q4)"
)
parser.add_argument(
"--port",
type=int,
default=17615,
help="Port to run the API server on (default: 17615)"
)
parser.add_argument(
"--instances",
type=int,
help="Force number of instances (overrides auto-calculation)"
)
parser.add_argument(
"--download-only",
action="store_true",
help="Download models only, don't start server"
)
parser.add_argument(
"--test",
action="store_true",
help="Test with a sample prompt"
)
parser.add_argument(
"--mcp",
action="store_true",
help="Enable MCP server alongside HTTP API"
)
parser.add_argument(
"--config",
type=str,
default="config.yaml",
help="Path to config file"
)
parser.add_argument(
"--host",
type=str,
default=None,
help="Host IP to bind to (default: auto-detect)"
)
parser.add_argument(
"--federation",
action="store_true",
help="Enable federation with other Local Swarm instances on the network"
)
parser.add_argument(
"--peer",
action="append",
dest="peers",
help="Manually add a peer (format: host:port, can be used multiple times)"
)
parser.add_argument(
"--tool-server",
action="store_true",
help="Run as dedicated tool execution server (executes read/write/bash tools)"
)
parser.add_argument(
"--tool-port",
type=int,
default=17616,
help="Port for tool execution server (default: 17616)"
)
parser.add_argument(
"--tool-host",
type=str,
default=None,
nargs='?',
const='', # When --tool-host is used without a value, use empty string
help="URL of tool execution server. Use without value for auto-detected local IP (http://<local-ip>:17616), or provide explicit URL."
)
parser.add_argument(
"--version",
action="version",
version="%(prog)s 0.1.0"
)
args = parser.parse_args()
# Detect hardware first
print("\n🔍 Detecting hardware...")
try:
hardware = detect_hardware()
except Exception as e:
print(f"\n❌ Error detecting hardware: {e}", file=sys.stderr)
sys.exit(1)
if args.detect:
# Just show hardware info
from interactive import print_hardware_info
print_hardware_info(hardware)
print("\n✅ Detection complete")
return
# Tool server mode - run minimal tool-only server
if args.tool_server:
print("\n🔧 Starting Tool Execution Server...")
from fastapi import FastAPI
import uvicorn
# Initialize local tool executor
tool_executor = ToolExecutor(tool_host_url=None)
set_tool_executor(tool_executor)
app = FastAPI(title="Local Swarm Tool Server")
@app.post("/v1/tools/execute")
async def execute_tool(request: dict):
tool_name = request.get("tool", "")
tool_args = request.get("arguments", {})
result = await tool_executor.execute(tool_name, tool_args)
return {"result": result}
@app.get("/health")
async def health():
return {"status": "healthy", "mode": "tool-server"}
host = args.host if args.host else get_local_ip()
tool_port = args.tool_port
print(f"🔗 Tool server running at http://{host}:{tool_port}")
print(f" Endpoints:")
print(f" - POST /v1/tools/execute")
print(f" - GET /health")
print(f"\n✅ Tool server ready!")
uvicorn.run(app, host=host, port=tool_port)
return
# Determine model configuration
config = None
if args.model or args.instances or args.auto:
# Use command-line arguments or auto-detect
print("\n📊 Calculating optimal configuration...")
try:
config = select_optimal_model(
hardware,
preferred_model=args.model,
force_instances=args.instances
)
if not config:
print("\n❌ No suitable model found for your hardware")
print(" Minimum requirement: 2 GB available memory")
sys.exit(1)
# Show brief summary
print(f"\n✓ Selected: {config.display_name}")
print(f" Instances: {config.instances}")
print(f" Memory: {config.total_memory_gb:.1f} GB")
except Exception as e:
print(f"\n❌ Error selecting model: {e}", file=sys.stderr)
sys.exit(1)
else:
# Interactive mode - show menu
config = interactive_model_selection(hardware)
if not config:
print("\n❌ No configuration selected")
sys.exit(1)
if args.download_only:
# Download model only
print("\n" + "=" * 70)
print("⬇️ Download Mode: Downloading model only")
print("=" * 70)
try:
model_path = download_model_for_config(config)
print(f"✓ Model downloaded to: {model_path}")
print("\n" + "=" * 70)
print("✅ Download complete")
print("=" * 70)
except Exception as e:
print(f"\n❌ Download failed: {e}", file=sys.stderr)
sys.exit(1)
elif args.test:
# Test mode with sample prompt
print("\n" + "=" * 70)
print("🧪 Test Mode: Running sample inference")
print("=" * 70)
async def test_inference():
show_startup_summary(hardware, config)
swarm = await setup_swarm(config, hardware)
if not swarm:
return False
try:
# Test prompt
prompt = "Write a Python function to calculate factorial:"
print(f"\nPrompt: {prompt}\n")
print("Generating responses...\n")
result = await swarm.generate(prompt, max_tokens=200)
print("\n" + "=" * 70)
print("SELECTED RESPONSE:")
print("=" * 70)
print(result.selected_response.text)
print("\n" + "=" * 70)
print(f"Strategy: {result.strategy}")
print(f"Confidence: {result.confidence:.2f}")
print(f"Latency: {result.selected_response.latency_ms:.1f}ms")
print(f"Tokens/sec: {result.selected_response.tokens_per_second:.1f}")
# Show all responses
print("\nAll responses received:")
for i, resp in enumerate(result.all_responses):
preview = resp.text[:60].replace('\n', ' ')
print(f" Worker {i}: {preview}... ({resp.latency_ms:.1f}ms)")
return True
finally:
await swarm.shutdown()
success = asyncio.run(test_inference())
if success:
print("\n" + "=" * 70)
print("✅ Test complete")
print("=" * 70)
else:
print("\n❌ Test failed")
sys.exit(1)
else:
# Full mode (download + start API server + optional MCP)
show_startup_summary(hardware, config)
async def run_server():
swarm = await setup_swarm(config, hardware)
if not swarm:
return False
# Initialize tool executor
if args.tool_host is not None:
# --tool-host was provided
if args.tool_host == "":
# --tool-host with no value - use local IP with default port
local_ip = get_local_ip()
tool_host_url = f"http://{local_ip}:17616"
print(f"\n🔧 Using remote tool host: {tool_host_url} (auto-detected local IP)")
else:
# --tool-host with explicit value
tool_host_url = args.tool_host
print(f"\n🔧 Using remote tool host: {tool_host_url}")
tool_executor = ToolExecutor(tool_host_url=tool_host_url)
set_tool_executor(tool_executor)
else:
# Local tool execution (default)
tool_executor = ToolExecutor(tool_host_url=None)
set_tool_executor(tool_executor)
# Update summary with runtime info
show_startup_summary(hardware, config, swarm)
# Initialize federation if enabled
discovery = None
federated_swarm = None
if args.federation:
print("\n🌐 Initializing federation...")
try:
# Use specified host for advertising if provided
advertise_ip = args.host if args.host else None
discovery = await create_discovery_service(args.port, advertise_ip=advertise_ip)
# Get swarm info for advertising
swarm_info = {
"version": "0.1.0",
"instances": config.instances,
"model_id": config.model_id,
"hardware_summary": f"{hardware.cpu_cores} CPU, {hardware.ram_gb:.1f}GB RAM"
}
await discovery.start_advertising(swarm_info)
await discovery.start_listening()
# Add manual peers if specified
if args.peers:
print(f" 📍 Adding {len(args.peers)} manual peer(s)...")
from network.discovery import PeerInfo
from datetime import datetime
for peer_str in args.peers:
try:
host, port = peer_str.rsplit(':', 1)
port = int(port)
peer = PeerInfo(
host=host,
port=port,
name=f"manual_{host}_{port}",
version="0.1.0",
instances=0,
model_id="unknown",
hardware_summary="manual",
last_seen=datetime.now()
)
discovery.peers[peer.name] = peer
print(f" ✓ Added peer: {host}:{port}")
except Exception as e:
print(f" ⚠️ Failed to add peer {peer_str}: {e}")
# Create federated swarm wrapper
federated_swarm = FederatedSwarm(swarm, discovery)
set_federated_swarm(federated_swarm)
# Start health check loop in background
asyncio.create_task(discovery.start_health_check_loop(interval_seconds=10))
print(f" ✓ Federation enabled")
print(f" ✓ Discovery active on port {discovery.discovery_port}")
print(f" ✓ Peer health checks every 10s")
except Exception as e:
print(f" ⚠️ Failed to initialize federation: {e}")
print(" Continuing without federation...")
mcp_server = None
try:
# Create and start API server
print("\n🌐 Starting HTTP API server...")
# Use provided host or auto-detect
if args.host:
host = args.host
print(f"🔗 Using specified host: {host}:{args.port}")
else:
# Use local network IP instead of 0.0.0.0 for security
host = get_local_ip()
print(f"🔗 Binding to {host}:{args.port}")
server = create_server(swarm, host=host, port=args.port)
print(f"\n✅ Local Swarm is running!")
print(f" API: http://{host}:{args.port}/v1")
print(f" Health: http://{host}:{args.port}/health")
if args.federation and discovery:
peers = discovery.get_peers()
print(f"\n🌐 Federation: Enabled")
print(f" Discovery port: {discovery.discovery_port}")
if peers:
print(f" Peers discovered: {len(peers)}")
for peer in peers:
print(f" - {peer.name} ({peer.model_id})")
else:
print(f" Peers discovered: 0 (waiting for peers...)")
# Show tool server status
if args.tool_host is not None:
print(f"\n🔧 Tool Server: Remote")
if args.tool_host == "":
local_ip = get_local_ip()
print(f" URL: http://{local_ip}:17616 (auto-detected)")
else:
print(f" URL: {args.tool_host}")
print(f" Mode: Tools executed remotely on tool host")
else:
print(f"\n🔧 Tool Server: Local")
print(f" Mode: Tools executed on this machine")
if args.mcp:
# Start MCP server alongside HTTP API
print("\n🤖 Starting MCP server...")
mcp_server = await create_mcp_server(swarm)
print(" MCP server active (stdio)")
print(f"\n💡 Configure opencode to use:")
print(f' base_url: http://127.0.0.1:{args.port}/v1')
print(f' api_key: any (not used)')
print(f"\nPress Ctrl+C to stop...\n")
# Start HTTP server (this will block)
await server.start()
except KeyboardInterrupt:
print("\n\nReceived stop signal")
finally:
if federated_swarm:
await federated_swarm.close()
if discovery:
await discovery.stop()
await swarm.shutdown()
return True
try:
success = asyncio.run(run_server())
if success:
print("\n" + "=" * 70)
print("✅ Server stopped gracefully")
print("=" * 70)
except Exception as e:
print(f"\n❌ Error running server: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()