Files
local_swarm/src/api/routes.py
T
sleepy 5fa8cd4e0e fix: Correct streaming implementation syntax
- Fixed indentation in routes.py streaming code
- Real-time streaming now properly structured
- All syntax errors resolved
2026-02-25 12:25:19 +01:00

1260 lines
51 KiB
Python

"""OpenAI-compatible API routes for Local Swarm."""
import asyncio
import json
import logging
import os
import time
import uuid
from pathlib import Path
from typing import AsyncIterator, Optional
import tiktoken
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import StreamingResponse
# Initialize tokenizer for accurate token counting
TOKEN_ENCODING = tiktoken.get_encoding('cl100k_base')
# Set up logger
logger = logging.getLogger(__name__)
# Cache for tool instructions (loaded from config file)
_TOOL_INSTRUCTIONS_CACHE: Optional[str] = None
# Global flag for tool mode (default: local tool server to save tokens)
_USE_OPENCODE_TOOLS: bool = False
def set_use_opencode_tools(value: bool):
"""Set whether to use opencode's tool definitions (default: False = local tool server).
Args:
value: True to use opencode tools (~27k tokens), False to use local tool server (~125 tokens)
"""
global _USE_OPENCODE_TOOLS
_USE_OPENCODE_TOOLS = value
logger.info(f"🔧 Tool mode set to: {'opencode tools (~27k tokens)' if value else 'local tool server (~125 tokens)'}")
def _load_tool_instructions() -> str:
"""Load tool instructions from config file.
Loads from config/prompts/tool_instructions.txt
Falls back to default if file not found.
Returns:
Tool instructions string
"""
global _TOOL_INSTRUCTIONS_CACHE
if _TOOL_INSTRUCTIONS_CACHE is not None:
return _TOOL_INSTRUCTIONS_CACHE
# Try to load from config file
config_path = Path(__file__).parent.parent.parent / "config" / "prompts" / "tool_instructions.txt"
try:
if config_path.exists():
with open(config_path, 'r') as f:
_TOOL_INSTRUCTIONS_CACHE = f.read().strip()
logger.debug(f"Loaded tool instructions from {config_path}")
else:
# Fallback default instructions
_TOOL_INSTRUCTIONS_CACHE = """You MUST use tools. DO NOT explain. DO NOT use markdown.
OUTPUT THIS EXACT FORMAT - NOTHING ELSE:
TOOL: bash
ARGUMENTS: {"command": "your command here"}
Available tools:
- bash: Run shell commands
- write: Create files
- read: Read files
NEVER write explanations.
NEVER use numbered lists.
NEVER use markdown code blocks.
ONLY output TOOL: lines."""
logger.warning(f"Tool instructions config not found at {config_path}, using default")
except Exception as e:
logger.error(f"Error loading tool instructions: {e}")
# Use minimal fallback
_TOOL_INSTRUCTIONS_CACHE = 'Use TOOL: tool_name\\nARGUMENTS: {"param": "value"} format.'
return _TOOL_INSTRUCTIONS_CACHE
from api.models import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionChoice,
ChatCompletionStreamResponse,
ChatCompletionStreamChoice,
ChatMessage,
UsageInfo,
ModelListResponse,
ModelInfo,
HealthResponse,
)
from swarm.manager import SwarmManager
from tools.executor import get_tool_executor, set_tool_executor, ToolExecutor
router = APIRouter()
# Global swarm manager instance (set during startup)
swarm_manager: Optional[SwarmManager] = None
def set_swarm_manager(manager: SwarmManager):
"""Set the global swarm manager instance."""
global swarm_manager
swarm_manager = manager
def format_tool_description(tool) -> str:
"""Format a tool definition for the prompt."""
func = tool.function
desc = f"### {func.name}\n"
desc += f"Description: {func.description}\n"
if func.parameters and func.parameters.get('properties'):
desc += "Parameters:\n"
for param_name, param_info in func.parameters['properties'].items():
param_desc = param_info.get('description', 'No description')
param_type = param_info.get('type', 'any')
required = param_name in func.parameters.get('required', [])
req_marker = " (required)" if required else ""
desc += f" - {param_name} ({param_type}){req_marker}: {param_desc}\n"
return desc
def format_messages_with_tools(messages: list, tools: Optional[list] = None) -> str:
"""Format chat messages into a single prompt using ChatML format.
Note: Tools are handled server-side. The model should respond normally.
IMPORTANT: If _USE_OPENCODE_TOOLS is True, use opencode's tool definitions (~27k tokens).
If False, use local tool server (~125 tokens) to save tokens.
"""
formatted = []
# AGGRESSIVE TOKEN OPTIMIZATION FOR INITIAL OPENCODE REQUESTS
# Detect if this is an initial conversation (no assistant/tool messages yet)
has_assistant_response = any(msg.role == "assistant" for msg in messages)
has_tool_results = any(msg.role == "tool" for msg in messages)
is_initial_request = not has_assistant_response and not has_tool_results
# Calculate current token count
full_text = "\n".join([f"{msg.role}: {msg.content}" for msg in messages])
current_tokens = len(TOKEN_ENCODING.encode(full_text))
logger.debug(f"Original prompt size: {current_tokens} tokens")
logger.debug(f"Is initial request: {is_initial_request}")
# If this is an initial request and it's huge (>4000 tokens), compress aggressively
if is_initial_request and current_tokens > 4000:
logger.info(f"🗜️ COMPRESSING: Initial request is {current_tokens} tokens, compressing to <4000...")
# Keep only user messages (strip all system messages and metadata)
user_messages = [msg for msg in messages if msg.role == "user"]
# Get the last user message (the actual query)
if user_messages:
last_user_msg = user_messages[-1]
# Truncate if it's still too long
user_content = last_user_msg.content
if len(user_content) > 2000:
user_content = user_content[:2000] + "... [truncated for token limit]"
logger.debug(f"Truncated user message from {len(last_user_msg.content)} to 2000 chars")
filtered_messages = [ChatMessage(role="user", content=user_content)]
logger.info(f"✅ Compressed to {len(user_messages)} user message(s)")
else:
filtered_messages = []
logger.warning("No user messages found in initial request!")
else:
# Normal filtering for subsequent messages
filtered_messages = [msg for msg in messages if msg.role != "system"]
# Check if there are already tool results in conversation
has_tool_results = any(msg.role == "tool" for msg in filtered_messages)
has_assistant_response = any(msg.role == "assistant" for msg in filtered_messages)
# Add tool instructions based on mode
if not has_assistant_response:
if _USE_OPENCODE_TOOLS:
# Use opencode's tool definitions (full capabilities, more tokens)
tool_instructions = _load_tool_instructions()
logger.debug(f"Using opencode tools mode with tool instructions: {len(tool_instructions)} chars")
from api.models import ChatMessage
filtered_messages.insert(0, ChatMessage(role="system", content=tool_instructions))
logger.debug("Added opencode tool instructions to system message")
else:
# Use local tool server (brief instructions, saves ~27k tokens!)
tool_instructions = _load_tool_instructions()
logger.debug(f"Using local tool server mode: {len(tool_instructions)} chars")
from api.models import ChatMessage
filtered_messages.insert(0, ChatMessage(role="system", content=tool_instructions))
logger.debug("Added local tool instructions to system message (client tools parameter ignored)")
# Debug: Log the full prompt being sent to model
full_prompt = []
for msg in filtered_messages:
if msg.role == "system":
full_prompt.append(f"[SYSTEM] {msg.content[:200]}...")
elif msg.role == "user":
full_prompt.append(f"[USER] {msg.content}")
logger.debug(f"Prompt preview: {' | '.join(full_prompt)}")
for msg in filtered_messages:
role = msg.role
content = msg.content
if role == "system":
formatted.append(f"<|im_start|>system\n{content}<|im_end|>")
elif role == "user":
formatted.append(f"<|im_start|>user\n{content}<|im_end|>")
elif role == "assistant":
formatted.append(f"<|im_start|>assistant\n{content}<|im_end|>")
elif role == "tool":
tool_name = getattr(msg, 'name', 'tool')
formatted.append(f"<|im_start|>tool\n{tool_name}: {content}<|im_end|>")
formatted.append("<|im_start|>assistant\n")
result = "\n".join(formatted)
# Log final token count
final_tokens = len(TOKEN_ENCODING.encode(result))
logger.info(f"📊 Final prompt size: {final_tokens} tokens (reduced from {current_tokens})")
return result
async def execute_tool_server_side(tool_name: str, tool_args: dict, working_dir: Optional[str] = None) -> str:
"""Execute a tool using the configured tool executor (local or remote).
Args:
tool_name: Name of the tool to execute
tool_args: Arguments for the tool
working_dir: The working directory to use for file operations and bash commands.
"""
import os
# Determine working directory
if working_dir is None:
# Try environment variable first
env_dir = os.getenv('LOCAL_SWARM_CLIENT_WORKING_DIR')
if env_dir:
working_dir = env_dir
logger.debug(f" 🌍 Using client working dir from LOCAL_SWARM_CLIENT_WORKING_DIR: {working_dir}")
else:
# Auto-detect project root from server's cwd (fallback)
working_dir = _discover_project_root()
logger.debug(f" ⚠️ No client working dir provided, auto-detected: {working_dir}")
logger.debug(f" 💡 For correct file locations, set X-Client-Working-Dir header or LOCAL_SWARM_CLIENT_WORKING_DIR env var")
# Inject working_dir into tool_args if provided
if working_dir is not None:
# Make a copy to avoid mutating original
tool_args = dict(tool_args)
# For bash, use 'cwd' parameter; for read/write, use 'working_dir'
if tool_name == 'bash':
tool_args['cwd'] = working_dir
else:
tool_args['working_dir'] = working_dir
executor = get_tool_executor()
if executor is None:
# Fallback to local execution if no executor configured
logger.debug(f" ⚠️ No tool executor configured, creating local fallback")
executor = ToolExecutor(tool_host_url=None)
set_tool_executor(executor)
else:
# Log which mode we're using
if executor.tool_host_url:
logger.debug(f" 🔗 Using remote tool host: {executor.tool_host_url}")
else:
logger.debug(f" 🏠 Using local tool execution")
logger.debug(f" 📍 Using working directory: {working_dir}")
return await executor.execute(tool_name, tool_args)
def _discover_project_root(start_dir: Optional[str] = None) -> str:
"""Discover the project root directory by looking for common markers."""
if start_dir is None:
start_dir = os.getcwd()
current = os.path.abspath(start_dir)
# Common project root markers
markers = ['.git', 'package.json', 'pyproject.toml', 'Cargo.toml', 'go.mod',
'requirements.txt', 'setup.py', 'pom.xml', 'build.gradle', '.project', '.venv']
while True:
try:
if any(os.path.exists(os.path.join(current, marker)) for marker in markers):
return current
except Exception:
pass # Permission errors, just skip
parent = os.path.dirname(current)
if parent == current: # Reached filesystem root
break
current = parent
return start_dir
def _ensure_tool_arguments(tool_name: str, args_dict: dict) -> dict:
"""Ensure tool arguments have all required fields.
For bash tool: inject 'description' field if missing.
"""
if tool_name == 'bash' and 'description' not in args_dict:
# Generate description from command
command = args_dict.get('command', '')
# Extract first word or short description
desc = command.split()[0] if command else 'Execute command'
args_dict['description'] = desc
return args_dict
def parse_tool_calls(text: str) -> tuple:
"""Parse tool calls from model output using the standardized format.
Supports multiple formats for compatibility with different model sizes:
1. Standard: TOOL: name\nARGUMENTS: {"key": "value"}
2. Markdown: ```bash command```
3. Numbered lists: 1. command
4. Inline: npm install ...
Returns:
tuple: (content_without_tools, list_of_tool_calls or None)
"""
import json
import re
# Priority 1: Standard format TOOL: name\nARGUMENTS: {...}
tool_pattern = r'TOOL:\s*(\w+)\s*\nARGUMENTS:\s*(\{[^}]*\})'
tool_matches = list(re.finditer(tool_pattern, text, re.IGNORECASE))
if tool_matches:
tool_calls = []
for i, tool_match in enumerate(tool_matches):
tool_name = tool_match.group(1)
args_str = tool_match.group(2)
try:
args_dict = json.loads(args_str)
# Ensure required fields are present
args_dict = _ensure_tool_arguments(tool_name, args_dict)
tool_calls.append({
"id": f"call_{i+1}",
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(args_dict)
}
})
except json.JSONDecodeError:
continue
if tool_calls:
first_start = tool_matches[0].start()
content = text[:first_start].strip()
return content, tool_calls
# Priority 2: Markdown code blocks (```bash command```)
markdown_pattern = r'```(?:bash|shell|sh)?\s*\n(.*?)\n```'
markdown_matches = list(re.finditer(markdown_pattern, text, re.DOTALL))
if markdown_matches:
tool_calls = []
for i, match in enumerate(markdown_matches):
code_content = match.group(1).strip()
if code_content:
args_dict = {"command": code_content}
args_dict = _ensure_tool_arguments("bash", args_dict)
tool_calls.append({
"id": f"call_{i+1}",
"type": "function",
"function": {
"name": "bash",
"arguments": json.dumps(args_dict)
}
})
if tool_calls:
first_start = markdown_matches[0].start()
content = text[:first_start].strip()
return content, tool_calls
# Priority 3: Look for command lines anywhere in text (for 7B models)
# Match lines containing common bash commands with their arguments
command_lines = []
for line in text.split('\n'):
line = line.strip()
# Match commands like: npm install, npx create-react-app, mkdir myapp, create-react-app, etc.
if re.match(r'^(npm|npx|mkdir|cd|ls|cat|echo|git|python|pip|node|yarn|create-react-app)\s+', line):
command_lines.append(line)
if command_lines:
# Create a single tool call with all commands chained
combined_command = ' && '.join(command_lines)
args_dict = {"command": combined_command}
args_dict = _ensure_tool_arguments("bash", args_dict)
tool_calls = [{
"id": "call_1",
"type": "function",
"function": {
"name": "bash",
"arguments": json.dumps(args_dict)
}
}]
return "", tool_calls
# Priority 4: Look for standalone bash commands (last resort)
# Match lines that start with common bash commands
standalone_pattern = r'(?:^|\n)(npm\s+\w+|npx\s+\w+|mkdir\s+\w+|cd\s+\w+|git\s+\w+)(?:\s|$)'
standalone_matches = list(re.finditer(standalone_pattern, text, re.MULTILINE))
if standalone_matches:
commands = [match.group(1).strip() for match in standalone_matches]
if commands:
combined_command = ' && '.join(commands)
args_dict = {"command": combined_command}
args_dict = _ensure_tool_arguments("bash", args_dict)
tool_calls = [{
"id": "call_1",
"type": "function",
"function": {
"name": "bash",
"arguments": json.dumps(args_dict)
}
}]
return "", tool_calls
# Priority 5: Look for URLs mentioned in text (for webfetch)
# Match common URL patterns like https://github.com/...
url_pattern = r'https?://[^\s<>"\')\]]+[a-zA-Z0-9]'
url_matches = list(re.finditer(url_pattern, text))
if url_matches:
urls = [match.group(0) for match in url_matches]
if urls:
# Create webfetch tool calls for each URL
tool_calls = []
for i, url in enumerate(urls):
tool_calls.append({
"id": f"call_{i+1}",
"type": "function",
"function": {
"name": "webfetch",
"arguments": json.dumps({"url": url, "format": "markdown"})
}
})
return "", tool_calls
return text, None
# Keep old function for backward compatibility
def format_messages(messages: list) -> str:
"""Format chat messages into a single prompt using ChatML format."""
return format_messages_with_tools(messages, None)
@router.get("/v1/models", response_model=ModelListResponse)
async def list_models():
"""List available models."""
if swarm_manager is None:
raise HTTPException(status_code=503, detail="Swarm not initialized")
status = swarm_manager.get_status()
return ModelListResponse(
data=[
ModelInfo(
id="local-swarm",
created=int(time.time()),
owned_by="local-swarm"
),
ModelInfo(
id=status.model_name.lower().replace(" ", "-"),
created=int(time.time()),
owned_by="local-swarm"
)
]
)
@router.post("/v1/tools/execute")
async def execute_tool(request: dict):
"""
Execute a tool (for remote tool execution).
This endpoint allows other swarm instances to execute tools
on a centralized tool host.
"""
import traceback
tool_name = request.get("tool", "")
tool_args = request.get("arguments", {})
logger.debug(f"\n{'='*60}")
logger.debug(f"🔧 TOOL SERVER: Received request")
logger.debug(f" Tool: {tool_name}")
logger.debug(f" Arguments: {tool_args}")
# Extract working_dir if provided (for file operations)
working_dir = tool_args.get('working_dir') or tool_args.get('cwd')
if working_dir:
logger.debug(f" Working directory: {working_dir}")
else:
logger.debug(f" Working directory: (using server default)")
logger.debug(f"{'='*60}")
# Create a temporary local executor for this request
executor = ToolExecutor(tool_host_url=None)
try:
logger.debug(f"🔧 TOOL SERVER: Executing {tool_name}...")
# Merge working_dir into tool_args if needed (executor will handle it)
# For bash, we need to rename 'working_dir' to 'cwd' if present
if 'working_dir' in tool_args and tool_name == 'bash':
# bash uses 'cwd' parameter
args_to_execute = dict(tool_args)
args_to_execute['cwd'] = tool_args['working_dir']
# Remove working_dir to avoid confusion
args_to_execute.pop('working_dir', None)
result = await executor.execute(tool_name, args_to_execute)
else:
result = await executor.execute(tool_name, tool_args)
logger.debug(f"🔧 TOOL SERVER: {tool_name} completed")
logger.debug(f" Result length: {len(result)} chars")
# Show tail of result for debugging
if result:
tail_length = 500
if len(result) > tail_length:
logger.debug(f" Result tail: ...{result[-tail_length:]}")
else:
logger.debug(f" Full result: {result}")
else:
logger.debug(f" Result: (empty)")
logger.debug(f"{'='*60}\n")
return {"result": result}
except Exception as e:
logger.debug(f"🔧 TOOL SERVER: Error executing {tool_name}")
logger.debug(f" Exception: {type(e).__name__}: {str(e)}")
logger.debug(f" Traceback: {traceback.format_exc()}")
logger.debug(f"{'='*60}\n")
return {"result": f"Error: {str(e)}"}
@router.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest, fastapi_request: Request):
"""
Generate chat completion.
Supports both regular and streaming responses.
"""
if swarm_manager is None:
raise HTTPException(status_code=503, detail="Swarm not initialized")
if not swarm_manager.get_status().is_running:
raise HTTPException(status_code=503, detail="Swarm not running")
# Get client working directory from header (if provided by client like opencode)
client_working_dir = fastapi_request.headers.get("X-Client-Working-Dir")
if client_working_dir:
logger.debug(f" 📍 Client working directory from header: {client_working_dir}")
else:
client_working_dir = None
logger.debug(f" 📍 No X-Client-Working-Dir header, using auto-detection")
# Format messages into prompt
# Mode 1: Local tool server (default) - ignore client tools, use brief instructions (~125 tokens)
# Mode 2: Opencode tools - use client tools with full definitions (~27k tokens)
if _USE_OPENCODE_TOOLS:
# Include client tools in prompt (full capabilities, more tokens)
# Sanitize tools to fix invalid schemas (e.g., remove extra 'description' from properties)
sanitized_tools = request.tools
if sanitized_tools:
for tool in sanitized_tools:
if tool.type == "function" and tool.function.parameters:
params = tool.function.parameters
# Remove invalid 'description' from properties if present
if 'properties' in params and 'description' in params.get('properties', {}):
invalid_props = ['description']
# Also remove 'description' from required if present
if 'required' in params:
params['required'] = [r for r in params.get('required', []) if r not in invalid_props]
# Remove invalid properties
params['properties'] = {k: v for k, v in params.get('properties', {}).items() if k not in invalid_props}
logger.debug(f" 🔧 Sanitized tool '{tool.function.name}': removed {invalid_props} from properties/required")
prompt = format_messages_with_tools(request.messages, sanitized_tools)
has_tools = sanitized_tools is not None and len(sanitized_tools) > 0
logger.debug(f"\n{'='*60}")
logger.debug(f"REQUEST: has_tools={has_tools}, stream={request.stream}")
logger.debug(f"MODE: opencode tools (~27k tokens in prompt)")
if has_tools:
logger.debug(f"TOOLS: {sanitized_tools}")
logger.debug(f"{'='*60}")
else:
# Ignore client tools to save tokens (~27k savings!)
# Model uses brief tool instructions instead (~125 tokens)
prompt = format_messages_with_tools(request.messages, None)
has_tools = request.tools is not None and len(request.tools) > 0
logger.debug(f"\n{'='*60}")
logger.debug(f"REQUEST: has_tools={has_tools}, stream={request.stream}")
logger.debug(f"MODE: local tool server (~125 tokens, saving ~27k tokens!)")
if has_tools:
logger.debug(f"NOTE: Client sent tools but ignored to save tokens")
logger.debug(f"{'='*60}")
# Generate ID
completion_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
created = int(time.time())
# Calculate prompt tokens once for usage reporting
prompt_tokens = len(TOKEN_ENCODING.encode(prompt))
if request.stream:
# Check if federation is enabled with peers
peers_count = 0
if federated_swarm is not None and federated_swarm.discovery is not None:
peers_count = len(federated_swarm.discovery.get_peers())
use_federation = peers_count > 0
if use_federation:
# Use federation for ALL requests (with or without tools)
logger.info(f"🌐 Using federation with {peers_count} peer(s) - waiting for consensus...")
# Run federation and get consensus result
fed_result = await federated_swarm.generate_with_federation(
prompt=prompt,
max_tokens=request.max_tokens or 1024,
temperature=request.temperature or 0.7,
min_peers=0
)
logger.info(f" ✓ Federation consensus complete (strategy: {fed_result.strategy}, confidence: {fed_result.local_confidence:.2f})")
logger.info(f" 🏆 Winner: {fed_result.winner}")
logger.info(f" 📝 Using federated response from {len(fed_result.peer_votes) + 1} nodes")
# Use the federated consensus result
content = fed_result.final_response
# Check if tools were requested and parse tool calls from federated response
if has_tools:
logger.debug(" 🔧 Parsing tool calls from federated response...")
content_parsed, tool_calls_parsed = parse_tool_calls(content)
if tool_calls_parsed:
logger.debug(f" 🔧 Found {len(tool_calls_parsed)} tool call(s) in federated response")
# Convert to ToolCall objects and return to client (opencode)
from api.models import ToolCall
tool_calls = [
ToolCall(
id=tc.get("id", f"call_{i}"),
type=tc.get("type", "function"),
function=tc.get("function", {})
)
for i, tc in enumerate(tool_calls_parsed)
]
# Stream with tool_calls (different generator)
async def tool_calls_fed_stream_generator() -> AsyncIterator[str]:
"""Generate SSE stream with tool calls from federation."""
first_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"role": "assistant"}
)
]
)
yield f"data: {first_chunk.model_dump_json()}\n\n"
if content_parsed:
content_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"content": content_parsed}
)
]
)
yield f"data: {content_chunk.model_dump_json()}\n\n"
tool_calls_delta = []
for i, tc in enumerate(tool_calls_parsed):
tool_calls_delta.append({
"index": i,
"id": tc["id"],
"type": "function",
"function": {
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
})
from api.models import UsageInfo
fed_completion_tokens = len(TOKEN_ENCODING.encode(content)) if content else 0
fed_total_tokens = prompt_tokens + fed_completion_tokens
final_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"tool_calls": tool_calls_delta},
finish_reason="tool_calls"
)
],
usage=UsageInfo(
prompt_tokens=prompt_tokens,
completion_tokens=fed_completion_tokens,
total_tokens=fed_total_tokens
)
)
yield f"data: {final_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
tool_calls_fed_stream_generator(),
media_type="text/event-stream"
)
# Calculate token counts
completion_tokens = len(TOKEN_ENCODING.encode(content)) if content else 0
total_tokens = prompt_tokens + completion_tokens
# Stream the federated response
async def federation_stream_generator() -> AsyncIterator[str]:
"""Generate SSE stream with federated content."""
# Send role chunk
first_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"role": "assistant"}
)
]
)
yield f"data: {first_chunk.model_dump_json()}\n\n"
# Send content in chunks
chunk_size = 100
for i in range(0, len(content), chunk_size):
chunk = content[i:i+chunk_size]
stream_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"content": chunk}
)
]
)
yield f"data: {stream_chunk.model_dump_json()}\n\n"
# Send final chunk with usage
from api.models import UsageInfo
final_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={},
finish_reason="stop"
)
],
usage=UsageInfo(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens
)
)
yield f"data: {final_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
federation_stream_generator(),
media_type="text/event-stream"
)
elif has_tools:
# Real-time streaming with tools - stream content and tool_calls as they're generated
logger.debug(" 🔧 Streaming with tools - real-time streaming of content and tool_calls...")
full_response = ""
last_tool_calls = []
accumulated_content = ""
async for chunk in swarm_manager.generate_stream(
prompt=prompt,
max_tokens=request.max_tokens or 1024,
temperature=request.temperature or 0.7
):
full_response += chunk
content, current_tool_calls = parse_tool_calls(full_response)
new_content = content[len(accumulated_content):] if content else ""
if new_content:
accumulated_content += new_content
content_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"content": new_content}
)
]
)
yield f"data: {content_chunk.model_dump_json()}\n\n"
logger.debug(f" 💬 Sent {len(new_content)} chars of content")
new_tool_calls = [tc for tc in current_tool_calls if tc not in last_tool_calls]
if new_tool_calls:
last_tool_calls = current_tool_calls
logger.debug(f" 🔧 Streaming {len(new_tool_calls)} new tool call(s)")
tool_calls_delta = []
for i, tc in enumerate(new_tool_calls):
tool_calls_delta.append({
"index": i,
"id": tc.get("id", ""),
"type": "function",
"function": {
"name": tc.get("function", {}).get("name", ""),
"arguments": tc.get("function", {}).get("arguments", {})
}
})
final_delta = {"tool_calls": tool_calls_delta}
final_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": request.model,
"choices": [
{
"index": 0,
"delta": final_delta,
"finish_reason": "tool_calls"
}
]
}
import json
chunk_json = json.dumps(final_chunk)
yield f"data: {chunk_json}\n\n"
logger.debug(f" 🔧 Sent tool calls delta: {len(new_tool_calls)} calls")
yield "data: [DONE]\n\n"
else:
# Regular streaming without tools
async def stream_generator() -> AsyncIterator[str]:
"""Generate SSE stream."""
# Send first chunk with role
first_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"role": "assistant"}
)
]
)
yield f"data: {first_chunk.model_dump_json()}\n\n"
# Stream content
async for chunk in swarm_manager.generate_stream(
prompt=prompt,
max_tokens=request.max_tokens or 1024,
temperature=request.temperature or 0.7
):
stream_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"content": chunk}
)
]
)
yield f"data: {stream_chunk.model_dump_json()}\n\n"
# Send final chunk
final_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={},
finish_reason="stop"
)
]
)
yield f"data: {final_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
stream_generator(),
media_type="text/event-stream"
)
else:
# Regular response with consensus
try:
# Use federation if enabled and peers are available
if federated_swarm is not None:
peers = federated_swarm.discovery.get_peers()
if peers:
logger.info(f"🌐 Using federation with {len(peers)} peer(s)...")
result = await federated_swarm.generate_with_federation(
prompt=prompt,
max_tokens=request.max_tokens or 1024,
temperature=request.temperature or 0.7,
min_peers=0 # Allow local fallback if no peers respond
)
response_text = result.final_response
tokens_generated = len(response_text.split()) # Rough estimate
# Parse tool calls if tools were provided
content = response_text
tool_calls = []
finish_reason = "stop"
if has_tools:
content, tool_calls_parsed = parse_tool_calls(response_text)
if tool_calls_parsed:
finish_reason = "tool_calls"
# Convert to ToolCall objects
from api.models import ToolCall
tool_calls = [
ToolCall(
id=tc.get("id", f"call_{i}"),
type=tc.get("type", "function"),
function=tc.get("function", {})
)
for i, tc in enumerate(tool_calls_parsed)
]
# Calculate accurate token counts using tiktoken
prompt_tokens = len(TOKEN_ENCODING.encode(prompt))
completion_tokens = len(TOKEN_ENCODING.encode(content))
total_tokens = prompt_tokens + completion_tokens
response_obj = ChatCompletionResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionChoice(
index=0,
message=ChatMessage(
role="assistant",
content=content,
tool_calls=tool_calls
),
finish_reason=finish_reason
)
],
usage=UsageInfo(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens
)
)
return response_obj
# Fallback to local generation
result = await swarm_manager.generate(
prompt=prompt,
max_tokens=request.max_tokens or 1024,
temperature=request.temperature or 0.7,
use_consensus=True
)
response_text = result.selected_response.text
tokens_generated = result.selected_response.tokens_generated
tokens_per_second = result.selected_response.tokens_per_second
logger.debug(f"DEBUG: Generated response (tokens={tokens_generated}, t/s={tokens_per_second:.1f})")
logger.debug(f"DEBUG: Response preview: {response_text[:200]}...")
# Parse tool calls if tools were provided
content = response_text
tool_calls = []
finish_reason = "stop"
if has_tools:
logger.debug(f"DEBUG: Parsing tool calls from response...")
content, tool_calls_parsed = parse_tool_calls(response_text)
logger.debug(f"DEBUG: parse_tool_calls returned: content_len={len(content)}, parsed={tool_calls_parsed is not None}")
if tool_calls_parsed:
logger.debug(f" 🔧 Model requesting {len(tool_calls_parsed)} tool(s)...")
executor = get_tool_executor()
if executor:
logger.debug(f" 🔗 Tool executor: {executor.tool_host_url or 'local'}")
else:
logger.debug(f" ⚠️ No tool executor configured!")
# Execute tools via configured executor (local or remote)
tool_results = []
for i, tc in enumerate(tool_calls_parsed):
tool_name = tc.get("function", {}).get("name", "")
tool_args_str = tc.get("function", {}).get("arguments", "{}")
try:
tool_args = json.loads(tool_args_str) if isinstance(tool_args_str, str) else tool_args_str
except:
tool_args = {}
logger.debug(f" [{i+1}/{len(tool_calls_parsed)}] Executing: {tool_name}({tool_args})")
# Execute tool via tool executor
result = await execute_tool_server_side(tool_name, tool_args, working_dir=client_working_dir)
tool_results.append(f"Tool '{tool_name}' result: {result}")
logger.debug(f" ✓ Completed: {result[:100]}..." if len(result) > 100 else f" ✓ Result: {result}")
# Return ONLY tool results as content
content = "\n\n".join(tool_results)
finish_reason = "stop"
tool_calls = [] # Clear tool_calls since we executed them
logger.debug(f" ✅ All tools executed, returning results")
else:
logger.debug(f"DEBUG: No tool calls parsed from response")
else:
logger.debug(f"DEBUG: No tools requested, returning normal response")
# Calculate accurate token counts using tiktoken
prompt_tokens = len(TOKEN_ENCODING.encode(prompt))
completion_tokens = len(TOKEN_ENCODING.encode(content))
total_tokens = prompt_tokens + completion_tokens
response_obj = ChatCompletionResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionChoice(
index=0,
message=ChatMessage(
role="assistant",
content=content,
tool_calls=tool_calls
),
finish_reason=finish_reason
)
],
usage=UsageInfo(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
tokens_per_second=tokens_per_second
)
)
return response_obj
except Exception as e:
raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")
@router.get("/health", response_model=HealthResponse)
async def health_check():
"""Check API and swarm health."""
if swarm_manager is None:
return HealthResponse(
status="initializing",
version="0.1.0",
workers=0,
model="unknown"
)
status = swarm_manager.get_status()
return HealthResponse(
status="healthy" if status.is_running else "degraded",
version="0.1.0",
workers=status.healthy_workers,
model=status.model_name
)
@router.get("/v1/health", response_model=HealthResponse)
async def health_check_v1():
"""Health check at /v1/health endpoint."""
return await health_check()
# Global federation instance (set during startup)
federated_swarm = None
def set_federated_swarm(federation):
"""Set the global federation instance."""
global federated_swarm
federated_swarm = federation
@router.post("/v1/federation/vote")
async def federation_vote(request: dict):
"""
Receive a vote request from a peer swarm.
This endpoint allows other swarms to request our "best local" response
for federated consensus.
"""
if swarm_manager is None:
raise HTTPException(status_code=503, detail="Swarm not initialized")
if not swarm_manager.get_status().is_running:
raise HTTPException(status_code=503, detail="Swarm not running")
prompt = request.get("prompt", "")
max_tokens = request.get("max_tokens", 1024)
temperature = request.get("temperature", 0.7)
try:
# Generate with local consensus
result = await swarm_manager.generate(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
use_consensus=True
)
return {
"response": result.selected_response.text,
"confidence": result.confidence,
"latency_ms": result.selected_response.latency_ms,
"worker_count": len(result.all_responses),
"strategy": result.strategy,
"tokens_per_second": result.selected_response.tokens_per_second,
"tokens_generated": result.selected_response.tokens_generated
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")
@router.get("/v1/federation/status")
async def federation_status():
"""Get federation status."""
if federated_swarm is None:
return {
"enabled": False,
"message": "Federation not enabled"
}
status = await federated_swarm.get_federation_status()
return status
@router.get("/v1/federation/peers")
async def federation_peers():
"""Get list of discovered peers."""
if federated_swarm is None or federated_swarm.discovery is None:
return {"peers": []}
peers = federated_swarm.discovery.get_peers()
return {
"peers": [
{
"name": p.name,
"host": p.host,
"port": p.port,
"model_id": p.model_id,
"instances": p.instances,
"api_url": p.api_url
}
for p in peers
]
}
@router.get("/v1/federation/health")
async def federation_health():
"""Health check for federation endpoint."""
if swarm_manager is None:
raise HTTPException(status_code=503, detail="Swarm not initialized")
if not swarm_manager.get_status().is_running:
raise HTTPException(status_code=503, detail="Swarm not running")
return {
"status": "healthy",
"federation_enabled": federated_swarm is not None
}
@router.get("/v1/federation/diagnostics")
async def federation_diagnostics():
"""Get federation diagnostics for troubleshooting."""
import socket
diagnostics = {
"hostname": socket.gethostname(),
"federation_enabled": federated_swarm is not None,
"discovery_active": False,
"advertising_ip": None,
"local_ip": None,
"peer_count": 0,
"peers": [],
"mdns_service": "_local-swarm._tcp.local.",
"issues": []
}
if federated_swarm is None:
diagnostics["issues"].append("Federation not enabled")
return diagnostics
discovery = federated_swarm.discovery
if discovery:
diagnostics["discovery_active"] = discovery._running
diagnostics["advertising_ip"] = discovery._local_ip
diagnostics["local_ip"] = discovery._local_ip
peers = discovery.get_peers()
diagnostics["peer_count"] = len(peers)
diagnostics["peers"] = [
{
"name": p.name,
"host": p.host,
"port": p.port,
"api_url": p.api_url,
"last_seen": p.last_seen.isoformat() if p.last_seen else None
}
for p in peers
]
# Check for common issues
if discovery._local_ip == '127.0.0.1':
diagnostics["issues"].append(
"Advertising on localhost (127.0.0.1) - other machines cannot connect. "
"Check network interface or use --host to specify IP."
)
if len(peers) == 0:
diagnostics["issues"].append(
"No peers discovered. Ensure mDNS is not blocked by firewall/router. "
"Try using --peer flag to manually add peers."
)
else:
diagnostics["issues"].append("Discovery service not initialized")
return diagnostics