5 Commits

Author SHA1 Message Date
sleepy 3dc06c73ef feat(federation): add winner tracking and token usage reporting
- Track which node won the consensus voting (local or peer name)
- Add winner to FederationResult dataclass
- Log winner in server logs
- Calculate and report token usage in federation streaming
- Fix prompt_tokens calculation in streaming path

Now opencode will show:
- Context tokens used
- Which node won the vote (in logs)
2026-02-24 23:40:41 +01:00
sleepy 989427c4d3 fix(federation): properly stream federated response
The federation case was setting the response but not returning
a StreamingResponse, so nothing was sent back to the client.

Added proper streaming generator for federation results that:
- Sends role chunk
- Streams content in chunks
- Sends final [DONE] chunk

This fixes the issue where opencode only saw local node output.
2026-02-24 23:36:48 +01:00
sleepy c9406974e9 fix(federation): wait for consensus and use federated result in streaming
Changed federation in streaming mode to:
- Wait for ALL nodes to complete generation
- Use the consensus result (not just local)
- Stream the federated response to client

This ensures voting from all nodes is properly considered.

Previous implementation streamed locally while federation ran
in background for logging only, which ignored the consensus.
2026-02-24 23:28:51 +01:00
sleepy b2328f761a feat(federation): add federation support to streaming path
Previously, federation only worked with non-streaming requests.
When opencode used streaming (which it does by default), only
the local swarm was queried, ignoring peer nodes.

Now when federation is enabled and peers exist:
- Start federation generation in background (parallel)
- Stream from local swarm immediately
- Log federation results when complete

This enables federation to work with opencode and other
streaming clients while maintaining fast streaming response.

Also added webfetch instructions to prevent hallucinating URLs.

Changes:
- Modified streaming path to detect and use federation
- Added asyncio import
- Updated tool instructions to prevent URL hallucination
2026-02-24 23:28:17 +01:00
sleepy 17000dc51e optimize(federation): run local and peer generation in parallel
Previously, the federation waited for local generation to complete
before asking peers to generate. This wasted time since peers sat
idle while the host generated.

Now local swarm and all peers generate simultaneously:
- Fire local generation AND peer requests at the same time
- Wait for all to complete with asyncio.gather()
- Then run global consensus

This reduces total generation time from ~2x to ~1x when using
federation with multiple nodes.

Changes:
- Modified generate_with_federation() to run tasks in parallel
- Updated logging to reflect parallel execution
- Added proper error handling for local generation failures
2026-02-24 23:12:49 +01:00
3 changed files with 158 additions and 45 deletions
+2
View File
@@ -9,4 +9,6 @@ ARGUMENTS: {"url": "https://example.com", "format": "markdown"}
Available tools: bash, webfetch
IMPORTANT: Only webfetch URLs that actually exist and are provided by the user. NEVER hallucinate or guess URLs. If a URL returns 404, stop trying to fetch it.
No explanations. No numbered lists. No markdown. Only tool calls.
+93 -3
View File
@@ -1,5 +1,6 @@
"""OpenAI-compatible API routes for Local Swarm."""
import asyncio
import json
import logging
import os
@@ -544,10 +545,99 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ
completion_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
created = int(time.time())
# Calculate prompt tokens once for usage reporting
prompt_tokens = len(TOKEN_ENCODING.encode(prompt))
if request.stream:
# For streaming with tools, return tool_calls to client (opencode) for execution
# This enables multi-turn conversations where client executes tools and sends results back
if has_tools:
# Check if federation is enabled with peers
peers_count = 0
if federated_swarm is not None and federated_swarm.discovery is not None:
peers_count = len(federated_swarm.discovery.get_peers())
use_federation = peers_count > 0
if use_federation and not has_tools:
# Use federation - wait for ALL nodes to complete and use consensus result
logger.debug(f"🌐 Using federation with {peers_count} peer(s) - waiting for consensus...")
# Run federation and get consensus result
fed_result = await federated_swarm.generate_with_federation(
prompt=prompt,
max_tokens=request.max_tokens or 1024,
temperature=request.temperature or 0.7,
min_peers=0
)
logger.debug(f" ✓ Federation consensus complete (strategy: {fed_result.strategy}, confidence: {fed_result.local_confidence:.2f})")
logger.debug(f" 🏆 Winner: {fed_result.winner} (check logs above for details)")
logger.debug(f" 📝 Using federated response from {len(fed_result.peer_votes) + 1} nodes")
# Use the federated consensus result
content = fed_result.final_response
# Calculate token counts
completion_tokens = len(TOKEN_ENCODING.encode(content)) if content else 0
total_tokens = prompt_tokens + completion_tokens
# Stream the federated response
async def federation_stream_generator() -> AsyncIterator[str]:
"""Generate SSE stream with federated content."""
# Send role chunk
first_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"role": "assistant"}
)
]
)
yield f"data: {first_chunk.model_dump_json()}\n\n"
# Send content in chunks
chunk_size = 100
for i in range(0, len(content), chunk_size):
chunk = content[i:i+chunk_size]
stream_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"content": chunk}
)
]
)
yield f"data: {stream_chunk.model_dump_json()}\n\n"
# Send final chunk with usage
from api.models import UsageInfo
final_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={},
finish_reason="stop"
)
],
usage=UsageInfo(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens
)
)
yield f"data: {final_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
federation_stream_generator(),
media_type="text/event-stream"
)
elif has_tools:
# For streaming with tools, return tool_calls to client (opencode) for execution
# This enables multi-turn conversations where client executes tools and sends results back
logger.debug(" 🔧 Streaming with tools - returning tool_calls to client for execution...")
# Collect full response
full_response = ""
+63 -42
View File
@@ -5,7 +5,7 @@ Handles communication between peer swarms for distributed consensus.
import asyncio
import time
from typing import List, Optional, Dict, Any
from typing import List, Optional, Dict, Any, Tuple
from dataclasses import dataclass
from network.discovery import PeerInfo
@@ -29,6 +29,7 @@ class FederationResult:
local_confidence: float
peer_votes: List[PeerVote]
strategy: str
winner: str = "" # Name of the winning node ("local" or peer name)
class FederationClient:
@@ -172,6 +173,8 @@ class FederatedSwarm:
) -> FederationResult:
"""
Generate with federation across peer swarms.
Optimized: Runs local and peer generation in parallel for maximum speed.
Args:
prompt: Input prompt
@@ -182,53 +185,68 @@ class FederatedSwarm:
Returns:
FederationResult with final response
"""
# Phase 1: Local generation and consensus
print(f" 🏠 Local swarm generating...")
local_result = await self.local_swarm.generate(
peers = self.discovery.get_peers()
if len(peers) == 0:
if min_peers > 0:
raise RuntimeError(f"Federation requires {min_peers} peers, but none found")
# Solo mode - just run local generation
print(f" 🏠 Solo mode - local swarm generating...")
local_result = await self.local_swarm.generate(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
use_consensus=True
)
return FederationResult(
final_response=local_result.selected_response.text,
local_confidence=local_result.confidence,
peer_votes=[],
strategy="solo"
)
# Parallel generation: Local swarm AND peers generate simultaneously
print(f" 🏠 Local swarm AND {len(peers)} peer(s) generating in parallel...")
# Start local generation
local_task = self.local_swarm.generate(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
use_consensus=True
)
local_best = local_result.selected_response
local_confidence = local_result.confidence
print(f" ✓ Local best (confidence: {local_confidence:.2f})")
# Phase 2: Collect peer votes
peers = self.discovery.get_peers()
if len(peers) == 0:
if min_peers > 0:
raise RuntimeError(f"Federation requires {min_peers} peers, but none found")
# Solo mode - just return local result
return FederationResult(
final_response=local_best.text,
local_confidence=local_confidence,
peer_votes=[],
strategy="solo"
)
print(f" 🌐 Requesting votes from {len(peers)} peer(s)...")
for peer in peers:
print(f" → Contacting {peer.name} at {peer.api_url}")
peer_votes = []
# Start peer requests
vote_tasks = [
self.federation_client.request_vote(peer, prompt, max_tokens, temperature)
for peer in peers
]
results = await asyncio.gather(*vote_tasks, return_exceptions=True)
for peer, result in zip(peers, results):
# Run everything in parallel
all_tasks = [local_task] + vote_tasks
results = await asyncio.gather(*all_tasks, return_exceptions=True)
# Separate local result from peer votes
local_result_raw = results[0]
if isinstance(local_result_raw, Exception):
print(f" ✗ Local swarm failed: {local_result_raw}")
raise RuntimeError(f"Local generation failed: {local_result_raw}")
from swarm.manager import ConsensusResult
local_result: ConsensusResult = local_result_raw # Now guaranteed not to be an exception
local_best = local_result.selected_response
local_confidence = local_result.confidence
print(f" ✓ Local completed (confidence: {local_confidence:.2f})")
# Collect peer votes
peer_votes = []
for peer, result in zip(peers, results[1:]):
if isinstance(result, Exception):
print(f" ✗ Peer {peer.name} failed: {result}")
elif result is not None:
peer_votes.append(result)
print(f" ✓ Peer {peer.name} voted (confidence: {result.confidence:.2f})")
print(f" ✓ Peer {peer.name} completed (confidence: {result.confidence:.2f})")
if len(peer_votes) == 0:
# No peers responded, use local result
@@ -240,16 +258,16 @@ class FederatedSwarm:
strategy="local_fallback"
)
# Phase 3: Global consensus
# Global consensus
print(f" 🗳️ Running global consensus ({len(peer_votes) + 1} votes)...")
final_response = self._weighted_vote(local_best.text, local_confidence, peer_votes)
final_response, winner = self._weighted_vote(local_best.text, local_confidence, peer_votes)
return FederationResult(
final_response=final_response,
local_confidence=local_confidence,
peer_votes=peer_votes,
strategy=self.consensus_strategy
strategy=self.consensus_strategy,
winner=winner
)
def _weighted_vote(
@@ -257,11 +275,14 @@ class FederatedSwarm:
local_response: str,
local_confidence: float,
peer_votes: List[PeerVote]
) -> str:
) -> Tuple[str, str]:
"""
Select best response using weighted voting.
Weights by confidence score. Higher confidence = more weight.
Returns:
Tuple of (selected_response, winner_name)
"""
# Collect all votes with their weights
all_votes = [(local_response, local_confidence, "local")]
@@ -292,12 +313,12 @@ class FederatedSwarm:
best_idx = max(range(len(scores)), key=lambda i: scores[i])
best = all_votes[best_idx]
print(f" ✓ Selected response from {best[2]} (quality score: {scores[best_idx]:.2f})")
return best[0]
return best[0], best[2]
# Default: weighted selection - pick highest confidence
best = max(all_votes, key=lambda x: x[1])
print(f" ✓ Selected response from {best[2]} (confidence: {best[1]:.2f})")
return best[0]
return best[0], best[2]
async def get_federation_status(self) -> Dict[str, Any]:
"""Get current federation status."""