Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3dc06c73ef | |||
| 989427c4d3 | |||
| c9406974e9 | |||
| b2328f761a | |||
| 17000dc51e |
@@ -9,4 +9,6 @@ ARGUMENTS: {"url": "https://example.com", "format": "markdown"}
|
|||||||
|
|
||||||
Available tools: bash, webfetch
|
Available tools: bash, webfetch
|
||||||
|
|
||||||
|
IMPORTANT: Only webfetch URLs that actually exist and are provided by the user. NEVER hallucinate or guess URLs. If a URL returns 404, stop trying to fetch it.
|
||||||
|
|
||||||
No explanations. No numbered lists. No markdown. Only tool calls.
|
No explanations. No numbered lists. No markdown. Only tool calls.
|
||||||
|
|||||||
+93
-3
@@ -1,5 +1,6 @@
|
|||||||
"""OpenAI-compatible API routes for Local Swarm."""
|
"""OpenAI-compatible API routes for Local Swarm."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
@@ -544,10 +545,99 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ
|
|||||||
completion_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
|
completion_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
|
||||||
created = int(time.time())
|
created = int(time.time())
|
||||||
|
|
||||||
|
# Calculate prompt tokens once for usage reporting
|
||||||
|
prompt_tokens = len(TOKEN_ENCODING.encode(prompt))
|
||||||
|
|
||||||
if request.stream:
|
if request.stream:
|
||||||
# For streaming with tools, return tool_calls to client (opencode) for execution
|
# Check if federation is enabled with peers
|
||||||
# This enables multi-turn conversations where client executes tools and sends results back
|
peers_count = 0
|
||||||
if has_tools:
|
if federated_swarm is not None and federated_swarm.discovery is not None:
|
||||||
|
peers_count = len(federated_swarm.discovery.get_peers())
|
||||||
|
use_federation = peers_count > 0
|
||||||
|
|
||||||
|
if use_federation and not has_tools:
|
||||||
|
# Use federation - wait for ALL nodes to complete and use consensus result
|
||||||
|
logger.debug(f"🌐 Using federation with {peers_count} peer(s) - waiting for consensus...")
|
||||||
|
|
||||||
|
# Run federation and get consensus result
|
||||||
|
fed_result = await federated_swarm.generate_with_federation(
|
||||||
|
prompt=prompt,
|
||||||
|
max_tokens=request.max_tokens or 1024,
|
||||||
|
temperature=request.temperature or 0.7,
|
||||||
|
min_peers=0
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(f" ✓ Federation consensus complete (strategy: {fed_result.strategy}, confidence: {fed_result.local_confidence:.2f})")
|
||||||
|
logger.debug(f" 🏆 Winner: {fed_result.winner} (check logs above for details)")
|
||||||
|
logger.debug(f" 📝 Using federated response from {len(fed_result.peer_votes) + 1} nodes")
|
||||||
|
|
||||||
|
# Use the federated consensus result
|
||||||
|
content = fed_result.final_response
|
||||||
|
|
||||||
|
# Calculate token counts
|
||||||
|
completion_tokens = len(TOKEN_ENCODING.encode(content)) if content else 0
|
||||||
|
total_tokens = prompt_tokens + completion_tokens
|
||||||
|
|
||||||
|
# Stream the federated response
|
||||||
|
async def federation_stream_generator() -> AsyncIterator[str]:
|
||||||
|
"""Generate SSE stream with federated content."""
|
||||||
|
# Send role chunk
|
||||||
|
first_chunk = ChatCompletionStreamResponse(
|
||||||
|
id=completion_id,
|
||||||
|
created=created,
|
||||||
|
model=request.model,
|
||||||
|
choices=[
|
||||||
|
ChatCompletionStreamChoice(
|
||||||
|
delta={"role": "assistant"}
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
yield f"data: {first_chunk.model_dump_json()}\n\n"
|
||||||
|
|
||||||
|
# Send content in chunks
|
||||||
|
chunk_size = 100
|
||||||
|
for i in range(0, len(content), chunk_size):
|
||||||
|
chunk = content[i:i+chunk_size]
|
||||||
|
stream_chunk = ChatCompletionStreamResponse(
|
||||||
|
id=completion_id,
|
||||||
|
created=created,
|
||||||
|
model=request.model,
|
||||||
|
choices=[
|
||||||
|
ChatCompletionStreamChoice(
|
||||||
|
delta={"content": chunk}
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
yield f"data: {stream_chunk.model_dump_json()}\n\n"
|
||||||
|
|
||||||
|
# Send final chunk with usage
|
||||||
|
from api.models import UsageInfo
|
||||||
|
final_chunk = ChatCompletionStreamResponse(
|
||||||
|
id=completion_id,
|
||||||
|
created=created,
|
||||||
|
model=request.model,
|
||||||
|
choices=[
|
||||||
|
ChatCompletionStreamChoice(
|
||||||
|
delta={},
|
||||||
|
finish_reason="stop"
|
||||||
|
)
|
||||||
|
],
|
||||||
|
usage=UsageInfo(
|
||||||
|
prompt_tokens=prompt_tokens,
|
||||||
|
completion_tokens=completion_tokens,
|
||||||
|
total_tokens=total_tokens
|
||||||
|
)
|
||||||
|
)
|
||||||
|
yield f"data: {final_chunk.model_dump_json()}\n\n"
|
||||||
|
yield "data: [DONE]\n\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
federation_stream_generator(),
|
||||||
|
media_type="text/event-stream"
|
||||||
|
)
|
||||||
|
elif has_tools:
|
||||||
|
# For streaming with tools, return tool_calls to client (opencode) for execution
|
||||||
|
# This enables multi-turn conversations where client executes tools and sends results back
|
||||||
logger.debug(" 🔧 Streaming with tools - returning tool_calls to client for execution...")
|
logger.debug(" 🔧 Streaming with tools - returning tool_calls to client for execution...")
|
||||||
# Collect full response
|
# Collect full response
|
||||||
full_response = ""
|
full_response = ""
|
||||||
|
|||||||
+63
-42
@@ -5,7 +5,7 @@ Handles communication between peer swarms for distributed consensus.
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
from typing import List, Optional, Dict, Any
|
from typing import List, Optional, Dict, Any, Tuple
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from network.discovery import PeerInfo
|
from network.discovery import PeerInfo
|
||||||
@@ -29,6 +29,7 @@ class FederationResult:
|
|||||||
local_confidence: float
|
local_confidence: float
|
||||||
peer_votes: List[PeerVote]
|
peer_votes: List[PeerVote]
|
||||||
strategy: str
|
strategy: str
|
||||||
|
winner: str = "" # Name of the winning node ("local" or peer name)
|
||||||
|
|
||||||
|
|
||||||
class FederationClient:
|
class FederationClient:
|
||||||
@@ -172,6 +173,8 @@ class FederatedSwarm:
|
|||||||
) -> FederationResult:
|
) -> FederationResult:
|
||||||
"""
|
"""
|
||||||
Generate with federation across peer swarms.
|
Generate with federation across peer swarms.
|
||||||
|
|
||||||
|
Optimized: Runs local and peer generation in parallel for maximum speed.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
prompt: Input prompt
|
prompt: Input prompt
|
||||||
@@ -182,53 +185,68 @@ class FederatedSwarm:
|
|||||||
Returns:
|
Returns:
|
||||||
FederationResult with final response
|
FederationResult with final response
|
||||||
"""
|
"""
|
||||||
# Phase 1: Local generation and consensus
|
peers = self.discovery.get_peers()
|
||||||
print(f" 🏠 Local swarm generating...")
|
|
||||||
local_result = await self.local_swarm.generate(
|
if len(peers) == 0:
|
||||||
|
if min_peers > 0:
|
||||||
|
raise RuntimeError(f"Federation requires {min_peers} peers, but none found")
|
||||||
|
|
||||||
|
# Solo mode - just run local generation
|
||||||
|
print(f" 🏠 Solo mode - local swarm generating...")
|
||||||
|
local_result = await self.local_swarm.generate(
|
||||||
|
prompt=prompt,
|
||||||
|
max_tokens=max_tokens,
|
||||||
|
temperature=temperature,
|
||||||
|
use_consensus=True
|
||||||
|
)
|
||||||
|
return FederationResult(
|
||||||
|
final_response=local_result.selected_response.text,
|
||||||
|
local_confidence=local_result.confidence,
|
||||||
|
peer_votes=[],
|
||||||
|
strategy="solo"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Parallel generation: Local swarm AND peers generate simultaneously
|
||||||
|
print(f" 🏠 Local swarm AND {len(peers)} peer(s) generating in parallel...")
|
||||||
|
|
||||||
|
# Start local generation
|
||||||
|
local_task = self.local_swarm.generate(
|
||||||
prompt=prompt,
|
prompt=prompt,
|
||||||
max_tokens=max_tokens,
|
max_tokens=max_tokens,
|
||||||
temperature=temperature,
|
temperature=temperature,
|
||||||
use_consensus=True
|
use_consensus=True
|
||||||
)
|
)
|
||||||
|
|
||||||
local_best = local_result.selected_response
|
# Start peer requests
|
||||||
local_confidence = local_result.confidence
|
|
||||||
|
|
||||||
print(f" ✓ Local best (confidence: {local_confidence:.2f})")
|
|
||||||
|
|
||||||
# Phase 2: Collect peer votes
|
|
||||||
peers = self.discovery.get_peers()
|
|
||||||
|
|
||||||
if len(peers) == 0:
|
|
||||||
if min_peers > 0:
|
|
||||||
raise RuntimeError(f"Federation requires {min_peers} peers, but none found")
|
|
||||||
|
|
||||||
# Solo mode - just return local result
|
|
||||||
return FederationResult(
|
|
||||||
final_response=local_best.text,
|
|
||||||
local_confidence=local_confidence,
|
|
||||||
peer_votes=[],
|
|
||||||
strategy="solo"
|
|
||||||
)
|
|
||||||
|
|
||||||
print(f" 🌐 Requesting votes from {len(peers)} peer(s)...")
|
|
||||||
for peer in peers:
|
|
||||||
print(f" → Contacting {peer.name} at {peer.api_url}")
|
|
||||||
|
|
||||||
peer_votes = []
|
|
||||||
vote_tasks = [
|
vote_tasks = [
|
||||||
self.federation_client.request_vote(peer, prompt, max_tokens, temperature)
|
self.federation_client.request_vote(peer, prompt, max_tokens, temperature)
|
||||||
for peer in peers
|
for peer in peers
|
||||||
]
|
]
|
||||||
|
|
||||||
results = await asyncio.gather(*vote_tasks, return_exceptions=True)
|
# Run everything in parallel
|
||||||
|
all_tasks = [local_task] + vote_tasks
|
||||||
for peer, result in zip(peers, results):
|
results = await asyncio.gather(*all_tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
# Separate local result from peer votes
|
||||||
|
local_result_raw = results[0]
|
||||||
|
if isinstance(local_result_raw, Exception):
|
||||||
|
print(f" ✗ Local swarm failed: {local_result_raw}")
|
||||||
|
raise RuntimeError(f"Local generation failed: {local_result_raw}")
|
||||||
|
|
||||||
|
from swarm.manager import ConsensusResult
|
||||||
|
local_result: ConsensusResult = local_result_raw # Now guaranteed not to be an exception
|
||||||
|
local_best = local_result.selected_response
|
||||||
|
local_confidence = local_result.confidence
|
||||||
|
print(f" ✓ Local completed (confidence: {local_confidence:.2f})")
|
||||||
|
|
||||||
|
# Collect peer votes
|
||||||
|
peer_votes = []
|
||||||
|
for peer, result in zip(peers, results[1:]):
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
print(f" ✗ Peer {peer.name} failed: {result}")
|
print(f" ✗ Peer {peer.name} failed: {result}")
|
||||||
elif result is not None:
|
elif result is not None:
|
||||||
peer_votes.append(result)
|
peer_votes.append(result)
|
||||||
print(f" ✓ Peer {peer.name} voted (confidence: {result.confidence:.2f})")
|
print(f" ✓ Peer {peer.name} completed (confidence: {result.confidence:.2f})")
|
||||||
|
|
||||||
if len(peer_votes) == 0:
|
if len(peer_votes) == 0:
|
||||||
# No peers responded, use local result
|
# No peers responded, use local result
|
||||||
@@ -240,16 +258,16 @@ class FederatedSwarm:
|
|||||||
strategy="local_fallback"
|
strategy="local_fallback"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Phase 3: Global consensus
|
# Global consensus
|
||||||
print(f" 🗳️ Running global consensus ({len(peer_votes) + 1} votes)...")
|
print(f" 🗳️ Running global consensus ({len(peer_votes) + 1} votes)...")
|
||||||
|
final_response, winner = self._weighted_vote(local_best.text, local_confidence, peer_votes)
|
||||||
final_response = self._weighted_vote(local_best.text, local_confidence, peer_votes)
|
|
||||||
|
|
||||||
return FederationResult(
|
return FederationResult(
|
||||||
final_response=final_response,
|
final_response=final_response,
|
||||||
local_confidence=local_confidence,
|
local_confidence=local_confidence,
|
||||||
peer_votes=peer_votes,
|
peer_votes=peer_votes,
|
||||||
strategy=self.consensus_strategy
|
strategy=self.consensus_strategy,
|
||||||
|
winner=winner
|
||||||
)
|
)
|
||||||
|
|
||||||
def _weighted_vote(
|
def _weighted_vote(
|
||||||
@@ -257,11 +275,14 @@ class FederatedSwarm:
|
|||||||
local_response: str,
|
local_response: str,
|
||||||
local_confidence: float,
|
local_confidence: float,
|
||||||
peer_votes: List[PeerVote]
|
peer_votes: List[PeerVote]
|
||||||
) -> str:
|
) -> Tuple[str, str]:
|
||||||
"""
|
"""
|
||||||
Select best response using weighted voting.
|
Select best response using weighted voting.
|
||||||
|
|
||||||
Weights by confidence score. Higher confidence = more weight.
|
Weights by confidence score. Higher confidence = more weight.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (selected_response, winner_name)
|
||||||
"""
|
"""
|
||||||
# Collect all votes with their weights
|
# Collect all votes with their weights
|
||||||
all_votes = [(local_response, local_confidence, "local")]
|
all_votes = [(local_response, local_confidence, "local")]
|
||||||
@@ -292,12 +313,12 @@ class FederatedSwarm:
|
|||||||
best_idx = max(range(len(scores)), key=lambda i: scores[i])
|
best_idx = max(range(len(scores)), key=lambda i: scores[i])
|
||||||
best = all_votes[best_idx]
|
best = all_votes[best_idx]
|
||||||
print(f" ✓ Selected response from {best[2]} (quality score: {scores[best_idx]:.2f})")
|
print(f" ✓ Selected response from {best[2]} (quality score: {scores[best_idx]:.2f})")
|
||||||
return best[0]
|
return best[0], best[2]
|
||||||
|
|
||||||
# Default: weighted selection - pick highest confidence
|
# Default: weighted selection - pick highest confidence
|
||||||
best = max(all_votes, key=lambda x: x[1])
|
best = max(all_votes, key=lambda x: x[1])
|
||||||
print(f" ✓ Selected response from {best[2]} (confidence: {best[1]:.2f})")
|
print(f" ✓ Selected response from {best[2]} (confidence: {best[1]:.2f})")
|
||||||
return best[0]
|
return best[0], best[2]
|
||||||
|
|
||||||
async def get_federation_status(self) -> Dict[str, Any]:
|
async def get_federation_status(self) -> Dict[str, Any]:
|
||||||
"""Get current federation status."""
|
"""Get current federation status."""
|
||||||
|
|||||||
Reference in New Issue
Block a user