Files
local_swarm/src/api/routes.py
T
sleepy dcca89d89a fix: OpenAI API compatibility for hollama and other clients
- Fixed ChatMessage.tool_calls to be Optional with default None (excluded when empty)
- Added logprobs field to ChatCompletionChoice (always included as null)
- Added stats and system_fingerprint to ChatCompletionResponse
- Fixed streaming response to use delta format (not message format)
- Fixed non-streaming response to include logprobs: null
- Updated tool instructions to include 'NO explanations'
- Added pytest-asyncio markers to async tests
- All 41 tests passing

This fixes the 'Cannot read properties of undefined (reading content)' error in hollama and ensures compatibility with OpenAI clients.
2026-02-25 19:39:05 +01:00

328 lines
11 KiB
Python

"""OpenAI-compatible API routes for Local Swarm."""
import asyncio
import json
import logging
import os
import time
import uuid
from typing import AsyncIterator, Optional
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import StreamingResponse
from api.formatting import format_messages_with_tools, set_use_opencode_tools
from api.tool_parser import parse_tool_calls
from utils.project_discovery import discover_project_root
from utils.token_counter import count_tokens
from api.models import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionChoice,
ChatCompletionStreamResponse,
ChatCompletionStreamChoice,
ChatMessage,
UsageInfo,
ModelListResponse,
ModelInfo,
HealthResponse,
)
from swarm.manager import SwarmManager
from tools.executor import get_tool_executor, set_tool_executor, ToolExecutor
# Set up logger
logger = logging.getLogger(__name__)
router = APIRouter()
# Global swarm manager instance (set during startup)
swarm_manager: Optional[SwarmManager] = None
def set_swarm_manager(manager: SwarmManager):
"""Set the global swarm manager instance."""
global swarm_manager
swarm_manager = manager
def format_tool_description(tool) -> str:
"""Format a tool definition for the prompt."""
func = tool.function
desc = f"### {func.name}\n"
desc += f"Description: {func.description}\n"
if func.parameters and func.parameters.get('properties'):
desc += "Parameters:\n"
for param_name, param_info in func.parameters['properties'].items():
param_desc = param_info.get('description', 'No description')
param_type = param_info.get('type', 'any')
required = param_name in func.parameters.get('required', [])
req_marker = " (required)" if required else ""
desc += f" - {param_name} ({param_type}){req_marker}: {param_desc}\n"
return desc
async def execute_tool_server_side(
tool_name: str,
tool_args: dict,
working_dir: Optional[str] = None
) -> str:
"""Execute a tool using the configured tool executor (local or remote).
Args:
tool_name: Name of the tool to execute
tool_args: Arguments for the tool
working_dir: The working directory to use for file operations and bash commands.
"""
# Determine working directory
if working_dir is None:
# Try environment variable first
env_dir = os.getenv('LOCAL_SWARM_CLIENT_WORKING_DIR')
if env_dir:
working_dir = env_dir
logger.debug(f" 🌍 Using client working dir from LOCAL_SWARM_CLIENT_WORKING_DIR: {working_dir}")
else:
# Auto-detect project root from server's cwd (fallback)
working_dir = discover_project_root()
logger.debug(f" ⚠️ No client working dir provided, auto-detected: {working_dir}")
logger.debug(f" 💡 For correct file locations, set X-Client-Working-Dir header or LOCAL_SWARM_CLIENT_WORKING_DIR env var")
# Inject working_dir into tool_args if provided
if working_dir is not None:
# Make a copy to avoid mutating original
tool_args = dict(tool_args)
# For bash, use 'cwd' parameter; for read/write, use 'working_dir'
if tool_name == 'bash':
tool_args['cwd'] = working_dir
else:
tool_args['working_dir'] = working_dir
executor = get_tool_executor()
if executor is None:
# Fallback to local execution if no executor configured
logger.debug(f" ⚠️ No tool executor configured, creating local fallback")
executor = ToolExecutor(tool_host_url=None)
set_tool_executor(executor)
else:
# Log which mode we're using
if executor.tool_host_url:
logger.debug(f" 🔗 Using remote tool host: {executor.tool_host_url}")
else:
logger.debug(f" 🏠 Using local tool execution")
logger.debug(f" 📍 Using working directory: {working_dir}")
return await executor.execute(tool_name, tool_args)
@router.get("/v1/models", response_model=ModelListResponse)
async def list_models():
"""List available models."""
if swarm_manager is None:
raise HTTPException(status_code=503, detail="Swarm not initialized")
status = swarm_manager.get_status()
return ModelListResponse(
data=[
ModelInfo(
id="local-swarm",
created=int(time.time()),
owned_by="local-swarm"
),
ModelInfo(
id=status.model_name.lower().replace(" ", "-"),
created=int(time.time()),
owned_by="local-swarm"
)
]
)
@router.get("/health", response_model=HealthResponse)
async def health_check():
"""Check API and swarm health."""
if swarm_manager is None:
return HealthResponse(
status="initializing",
version="0.1.0",
workers=0,
model="none"
)
status = swarm_manager.get_status()
return HealthResponse(
status="healthy" if status.is_running else "stopped",
version="0.1.0",
workers=status.healthy_workers,
model=status.model_name
)
@router.post("/v1/tools/execute")
async def execute_tool_endpoint(request: dict):
"""Execute a tool (for remote tool execution).
This endpoint allows other swarm instances to execute tools
on a centralized tool host.
"""
import traceback
tool_name = request.get("tool", "")
tool_args = request.get("arguments", {})
logger.debug(f"\n{'='*60}")
logger.debug(f"🔧 TOOL SERVER: Received request")
logger.debug(f" Tool: {tool_name}")
logger.debug(f" Arguments: {tool_args}")
# Create a temporary local executor for this request
executor = ToolExecutor(tool_host_url=None)
try:
# For bash, rename 'working_dir' to 'cwd' if present
if 'working_dir' in tool_args and tool_name == 'bash':
args_to_execute = dict(tool_args)
args_to_execute['cwd'] = tool_args['working_dir']
args_to_execute.pop('working_dir', None)
result = await executor.execute(tool_name, args_to_execute)
else:
result = await executor.execute(tool_name, tool_args)
logger.debug(f"🔧 TOOL SERVER: {tool_name} completed")
logger.debug(f" Result length: {len(result)} chars")
logger.debug(f"{'='*60}\n")
return {"result": result}
except Exception as e:
logger.debug(f"🔧 TOOL SERVER: Error executing {tool_name}")
logger.debug(f" Exception: {type(e).__name__}: {str(e)}")
logger.debug(f" Traceback: {traceback.format_exc()}")
logger.debug(f"{'='*60}\n")
return {"result": f"Error: {str(e)}"}
# Global flag for tool mode - re-export for compatibility
_USE_OPENCODE_TOOLS = False
def set_use_opencode_tools(value: bool):
"""Set whether to use opencode's tool definitions."""
global _USE_OPENCODE_TOOLS
_USE_OPENCODE_TOOLS = value
from api.formatting import set_use_opencode_tools as _set_formatting_tool_mode
_set_formatting_tool_mode(value)
# Global federated swarm instance (set during startup)
federated_swarm = None
def set_federated_swarm(swarm):
"""Set the global federated swarm instance."""
global federated_swarm
federated_swarm = swarm
async def _stream_response(response: ChatCompletionResponse):
"""Stream a chat completion response as Server-Sent Events.
For compatibility with OpenAI format, we use delta format for streaming.
The response is sent as a single chunk since we don't support
true token-by-token streaming yet.
"""
import json
from api.models import ChatCompletionStreamResponse, ChatCompletionStreamChoice
# Convert to streaming format with delta
message = response.choices[0].message
choice = ChatCompletionStreamChoice(
index=0,
delta={"content": message.content},
finish_reason="stop"
)
stream_response = ChatCompletionStreamResponse(
id=response.id,
created=response.created,
model=response.model,
choices=[choice]
)
# Send as SSE event
data = stream_response.model_dump_json(exclude_none=True)
logger.debug(f"Streaming SSE data (delta format): {len(data)} chars")
yield f"data: {data}\n\n"
# Send done event
yield "data: [DONE]\n\n"
logger.debug(f"Streaming complete")
@router.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest, fastapi_request: Request):
"""Generate chat completion."""
from api.chat_handlers import handle_chat_completion
if swarm_manager is None:
raise HTTPException(status_code=503, detail="Swarm not initialized")
if not swarm_manager.get_status().is_running:
raise HTTPException(status_code=503, detail="Swarm not running")
# Get client working directory from header
client_working_dir = fastapi_request.headers.get("X-Client-Working-Dir")
try:
logger.info(f"📥 Processing chat completion request...")
logger.info(f" Stream: {request.stream}")
logger.info(f" Model: {request.model}")
response = await handle_chat_completion(
request=request,
swarm_manager=swarm_manager,
federated_swarm=federated_swarm,
client_working_dir=client_working_dir,
use_opencode_tools=_USE_OPENCODE_TOOLS
)
logger.info(f"✅ Response generated successfully")
logger.debug(f"Response object type: {type(response)}")
# Handle streaming if requested
if request.stream:
logger.info(f"🌊 Returning streaming response")
return StreamingResponse(
_stream_response(response),
media_type="text/event-stream"
)
# For non-streaming, return JSON with proper handling of None fields:
# - tool_calls: omit when None (no tools)
# - logprobs: always include as null (even when None)
from fastapi.responses import JSONResponse
logger.info(f"📋 Returning JSON response")
# Build response dict with custom handling
response_dict = response.model_dump(exclude_none=True)
# Ensure logprobs is always present (as null if not available)
for choice in response_dict.get('choices', []):
if 'logprobs' not in choice:
choice['logprobs'] = None
logger.debug(f"Response dict: {response_dict}")
return JSONResponse(
content=response_dict,
status_code=200
)
except Exception as e:
logger.exception("Error in chat completion")
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Error message: {str(e)}")
raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")