feat: Add global tokens/sec reporting and reduce log level to INFO
- Add global t/sec metric that includes sync + voting overhead - Track total time from start to finish across all workers - Display global performance summary after federation completes - Reduce default logging level from DEBUG to INFO - Add tokens_generated to federation API responses - Update federation vote to report peer t/sec metrics This allows users to see both individual worker speeds and the effective speed including synchronization overhead.
This commit is contained in:
@@ -76,6 +76,7 @@ class UsageInfo(BaseModel):
|
|||||||
prompt_tokens: int = Field(default=0, description="Tokens in prompt")
|
prompt_tokens: int = Field(default=0, description="Tokens in prompt")
|
||||||
completion_tokens: int = Field(default=0, description="Tokens in completion")
|
completion_tokens: int = Field(default=0, description="Tokens in completion")
|
||||||
total_tokens: int = Field(default=0, description="Total tokens")
|
total_tokens: int = Field(default=0, description="Total tokens")
|
||||||
|
tokens_per_second: Optional[float] = Field(default=None, description="Generation speed in tokens per second")
|
||||||
|
|
||||||
|
|
||||||
class ChatCompletionResponse(BaseModel):
|
class ChatCompletionResponse(BaseModel):
|
||||||
|
|||||||
+12
-8
@@ -557,7 +557,7 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ
|
|||||||
|
|
||||||
if use_federation:
|
if use_federation:
|
||||||
# Use federation for ALL requests (with or without tools)
|
# Use federation for ALL requests (with or without tools)
|
||||||
logger.debug(f"🌐 Using federation with {peers_count} peer(s) - waiting for consensus...")
|
logger.info(f"🌐 Using federation with {peers_count} peer(s) - waiting for consensus...")
|
||||||
|
|
||||||
# Run federation and get consensus result
|
# Run federation and get consensus result
|
||||||
fed_result = await federated_swarm.generate_with_federation(
|
fed_result = await federated_swarm.generate_with_federation(
|
||||||
@@ -567,9 +567,9 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ
|
|||||||
min_peers=0
|
min_peers=0
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug(f" ✓ Federation consensus complete (strategy: {fed_result.strategy}, confidence: {fed_result.local_confidence:.2f})")
|
logger.info(f" ✓ Federation consensus complete (strategy: {fed_result.strategy}, confidence: {fed_result.local_confidence:.2f})")
|
||||||
logger.debug(f" 🏆 Winner: {fed_result.winner} (check logs above for details)")
|
logger.info(f" 🏆 Winner: {fed_result.winner}")
|
||||||
logger.debug(f" 📝 Using federated response from {len(fed_result.peer_votes) + 1} nodes")
|
logger.info(f" 📝 Using federated response from {len(fed_result.peer_votes) + 1} nodes")
|
||||||
|
|
||||||
# Use the federated consensus result
|
# Use the federated consensus result
|
||||||
content = fed_result.final_response
|
content = fed_result.final_response
|
||||||
@@ -952,7 +952,7 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ
|
|||||||
if federated_swarm is not None:
|
if federated_swarm is not None:
|
||||||
peers = federated_swarm.discovery.get_peers()
|
peers = federated_swarm.discovery.get_peers()
|
||||||
if peers:
|
if peers:
|
||||||
logger.debug(f"🌐 Using federation with {len(peers)} peer(s)...")
|
logger.info(f"🌐 Using federation with {len(peers)} peer(s)...")
|
||||||
result = await federated_swarm.generate_with_federation(
|
result = await federated_swarm.generate_with_federation(
|
||||||
prompt=prompt,
|
prompt=prompt,
|
||||||
max_tokens=request.max_tokens or 1024,
|
max_tokens=request.max_tokens or 1024,
|
||||||
@@ -1020,7 +1020,8 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ
|
|||||||
|
|
||||||
response_text = result.selected_response.text
|
response_text = result.selected_response.text
|
||||||
tokens_generated = result.selected_response.tokens_generated
|
tokens_generated = result.selected_response.tokens_generated
|
||||||
logger.debug(f"DEBUG: Generated response (tokens={tokens_generated})")
|
tokens_per_second = result.selected_response.tokens_per_second
|
||||||
|
logger.debug(f"DEBUG: Generated response (tokens={tokens_generated}, t/s={tokens_per_second:.1f})")
|
||||||
logger.debug(f"DEBUG: Response preview: {response_text[:200]}...")
|
logger.debug(f"DEBUG: Response preview: {response_text[:200]}...")
|
||||||
|
|
||||||
# Parse tool calls if tools were provided
|
# Parse tool calls if tools were provided
|
||||||
@@ -1088,7 +1089,8 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ
|
|||||||
usage=UsageInfo(
|
usage=UsageInfo(
|
||||||
prompt_tokens=prompt_tokens,
|
prompt_tokens=prompt_tokens,
|
||||||
completion_tokens=completion_tokens,
|
completion_tokens=completion_tokens,
|
||||||
total_tokens=total_tokens
|
total_tokens=total_tokens,
|
||||||
|
tokens_per_second=tokens_per_second
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return response_obj
|
return response_obj
|
||||||
@@ -1166,7 +1168,9 @@ async def federation_vote(request: dict):
|
|||||||
"confidence": result.confidence,
|
"confidence": result.confidence,
|
||||||
"latency_ms": result.selected_response.latency_ms,
|
"latency_ms": result.selected_response.latency_ms,
|
||||||
"worker_count": len(result.all_responses),
|
"worker_count": len(result.all_responses),
|
||||||
"strategy": result.strategy
|
"strategy": result.strategy,
|
||||||
|
"tokens_per_second": result.selected_response.tokens_per_second,
|
||||||
|
"tokens_generated": result.selected_response.tokens_generated
|
||||||
}
|
}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ class PeerVote:
|
|||||||
confidence: float
|
confidence: float
|
||||||
latency_ms: float
|
latency_ms: float
|
||||||
worker_count: int
|
worker_count: int
|
||||||
|
tokens_per_second: float = 0.0
|
||||||
|
tokens_generated: int = 0
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -30,6 +32,7 @@ class FederationResult:
|
|||||||
peer_votes: List[PeerVote]
|
peer_votes: List[PeerVote]
|
||||||
strategy: str
|
strategy: str
|
||||||
winner: str = "" # Name of the winning node ("local" or peer name)
|
winner: str = "" # Name of the winning node ("local" or peer name)
|
||||||
|
global_tokens_per_second: float = 0.0 # Includes sync + voting overhead
|
||||||
|
|
||||||
|
|
||||||
class FederationClient:
|
class FederationClient:
|
||||||
@@ -118,7 +121,9 @@ class FederationClient:
|
|||||||
response_text=data.get("response", ""),
|
response_text=data.get("response", ""),
|
||||||
confidence=data.get("confidence", 0.5),
|
confidence=data.get("confidence", 0.5),
|
||||||
latency_ms=data.get("latency_ms", latency_ms),
|
latency_ms=data.get("latency_ms", latency_ms),
|
||||||
worker_count=data.get("worker_count", 0)
|
worker_count=data.get("worker_count", 0),
|
||||||
|
tokens_per_second=data.get("tokens_per_second", 0.0),
|
||||||
|
tokens_generated=data.get("tokens_generated", 0)
|
||||||
)
|
)
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
@@ -207,22 +212,38 @@ class FederatedSwarm:
|
|||||||
|
|
||||||
# Solo mode - just run local generation
|
# Solo mode - just run local generation
|
||||||
print(f" 🏠 Solo mode - local swarm generating...")
|
print(f" 🏠 Solo mode - local swarm generating...")
|
||||||
|
solo_start_time = time.time()
|
||||||
local_result = await self.local_swarm.generate(
|
local_result = await self.local_swarm.generate(
|
||||||
prompt=prompt,
|
prompt=prompt,
|
||||||
max_tokens=max_tokens,
|
max_tokens=max_tokens,
|
||||||
temperature=temperature,
|
temperature=temperature,
|
||||||
use_consensus=True
|
use_consensus=True
|
||||||
)
|
)
|
||||||
|
solo_end_time = time.time()
|
||||||
|
total_elapsed = solo_end_time - solo_start_time
|
||||||
|
tokens_generated = local_result.selected_response.tokens_generated
|
||||||
|
global_tps = tokens_generated / total_elapsed if total_elapsed > 0 else 0.0
|
||||||
|
|
||||||
|
print(f"\n 📊 Global Performance:")
|
||||||
|
print(f" Total tokens: {tokens_generated}")
|
||||||
|
print(f" Total time: {total_elapsed:.2f}s")
|
||||||
|
print(f" Global speed: {global_tps:.1f} t/s")
|
||||||
|
|
||||||
return FederationResult(
|
return FederationResult(
|
||||||
final_response=local_result.selected_response.text,
|
final_response=local_result.selected_response.text,
|
||||||
local_confidence=local_result.confidence,
|
local_confidence=local_result.confidence,
|
||||||
peer_votes=[],
|
peer_votes=[],
|
||||||
strategy="solo"
|
strategy="solo",
|
||||||
|
global_tokens_per_second=global_tps
|
||||||
)
|
)
|
||||||
|
|
||||||
# Parallel generation: Local swarm AND peers generate simultaneously
|
# Parallel generation: Local swarm AND peers generate simultaneously
|
||||||
print(f" 🏠 Local swarm AND {len(peers)} peer(s) generating in parallel...")
|
print(f" 🏠 Local swarm AND {len(peers)} peer(s) generating in parallel...")
|
||||||
|
|
||||||
|
# Track timing for global t/sec calculation (includes sync + voting overhead)
|
||||||
|
federation_start_time = time.time()
|
||||||
|
total_tokens_generated = 0
|
||||||
|
|
||||||
# Start local generation
|
# Start local generation
|
||||||
local_task = self.local_swarm.generate(
|
local_task = self.local_swarm.generate(
|
||||||
prompt=prompt,
|
prompt=prompt,
|
||||||
@@ -251,7 +272,9 @@ class FederatedSwarm:
|
|||||||
local_result: ConsensusResult = local_result_raw # Now guaranteed not to be an exception
|
local_result: ConsensusResult = local_result_raw # Now guaranteed not to be an exception
|
||||||
local_best = local_result.selected_response
|
local_best = local_result.selected_response
|
||||||
local_confidence = local_result.confidence
|
local_confidence = local_result.confidence
|
||||||
print(f" ✓ Local completed (confidence: {local_confidence:.2f})")
|
local_tps = local_best.tokens_per_second
|
||||||
|
total_tokens_generated += local_best.tokens_generated
|
||||||
|
print(f" ✓ Local completed (confidence: {local_confidence:.2f}, {local_tps:.1f} t/s)")
|
||||||
|
|
||||||
# Collect peer votes
|
# Collect peer votes
|
||||||
peer_votes = []
|
peer_votes = []
|
||||||
@@ -260,28 +283,52 @@ class FederatedSwarm:
|
|||||||
print(f" ✗ Peer {peer.name} failed: {result}")
|
print(f" ✗ Peer {peer.name} failed: {result}")
|
||||||
elif result is not None:
|
elif result is not None:
|
||||||
peer_votes.append(result)
|
peer_votes.append(result)
|
||||||
print(f" ✓ Peer {peer.name} completed (confidence: {result.confidence:.2f})")
|
total_tokens_generated += result.tokens_generated if hasattr(result, 'tokens_generated') else 0
|
||||||
|
print(f" ✓ Peer {peer.name} completed (confidence: {result.confidence:.2f}, {result.tokens_per_second:.1f} t/s)")
|
||||||
|
|
||||||
if len(peer_votes) == 0:
|
if len(peer_votes) == 0:
|
||||||
# No peers responded, use local result
|
# No peers responded, use local result
|
||||||
print(" ⚠️ No peers responded, using local result")
|
print(" ⚠️ No peers responded, using local result")
|
||||||
|
|
||||||
|
# Calculate global t/sec even in fallback mode
|
||||||
|
federation_end_time = time.time()
|
||||||
|
total_elapsed_seconds = federation_end_time - federation_start_time
|
||||||
|
global_tps = total_tokens_generated / total_elapsed_seconds if total_elapsed_seconds > 0 else 0.0
|
||||||
|
|
||||||
|
print(f"\n 📊 Global Performance:")
|
||||||
|
print(f" Total tokens: {total_tokens_generated}")
|
||||||
|
print(f" Total time: {total_elapsed_seconds:.2f}s")
|
||||||
|
print(f" Global speed: {global_tps:.1f} t/s")
|
||||||
|
|
||||||
return FederationResult(
|
return FederationResult(
|
||||||
final_response=local_best.text,
|
final_response=local_best.text,
|
||||||
local_confidence=local_confidence,
|
local_confidence=local_confidence,
|
||||||
peer_votes=[],
|
peer_votes=[],
|
||||||
strategy="local_fallback"
|
strategy="local_fallback",
|
||||||
|
global_tokens_per_second=global_tps
|
||||||
)
|
)
|
||||||
|
|
||||||
# Global consensus
|
# Global consensus
|
||||||
print(f" 🗳️ Running global consensus ({len(peer_votes) + 1} votes)...")
|
print(f" 🗳️ Running global consensus ({len(peer_votes) + 1} votes)...")
|
||||||
final_response, winner = self._weighted_vote(local_best.text, local_confidence, peer_votes)
|
final_response, winner = self._weighted_vote(local_best.text, local_confidence, peer_votes)
|
||||||
|
|
||||||
|
# Calculate global tokens/sec including sync + voting overhead
|
||||||
|
federation_end_time = time.time()
|
||||||
|
total_elapsed_seconds = federation_end_time - federation_start_time
|
||||||
|
global_tps = total_tokens_generated / total_elapsed_seconds if total_elapsed_seconds > 0 else 0.0
|
||||||
|
|
||||||
|
print(f"\n 📊 Global Performance:")
|
||||||
|
print(f" Total tokens: {total_tokens_generated}")
|
||||||
|
print(f" Total time: {total_elapsed_seconds:.2f}s")
|
||||||
|
print(f" Global speed: {global_tps:.1f} t/s (includes sync + voting)")
|
||||||
|
|
||||||
return FederationResult(
|
return FederationResult(
|
||||||
final_response=final_response,
|
final_response=final_response,
|
||||||
local_confidence=local_confidence,
|
local_confidence=local_confidence,
|
||||||
peer_votes=peer_votes,
|
peer_votes=peer_votes,
|
||||||
strategy=self.consensus_strategy,
|
strategy=self.consensus_strategy,
|
||||||
winner=winner
|
winner=winner,
|
||||||
|
global_tokens_per_second=global_tps
|
||||||
)
|
)
|
||||||
|
|
||||||
def _weighted_vote(
|
def _weighted_vote(
|
||||||
|
|||||||
+12
-2
@@ -232,7 +232,7 @@ class SwarmManager:
|
|||||||
response = await worker.generate_with_progress(request)
|
response = await worker.generate_with_progress(request)
|
||||||
responses.append(response)
|
responses.append(response)
|
||||||
if not self.mcp_mode:
|
if not self.mcp_mode:
|
||||||
print(f" ✓ {worker.name} completed ({response.tokens_generated} tokens)")
|
print(f" ✓ {worker.name} completed ({response.tokens_generated} tokens, {response.tokens_per_second:.1f} t/s)")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
responses.append(e)
|
responses.append(e)
|
||||||
if not self.mcp_mode:
|
if not self.mcp_mode:
|
||||||
@@ -283,6 +283,11 @@ class SwarmManager:
|
|||||||
|
|
||||||
if not self.mcp_mode:
|
if not self.mcp_mode:
|
||||||
print(f" Got {len(valid_responses)} valid responses")
|
print(f" Got {len(valid_responses)} valid responses")
|
||||||
|
|
||||||
|
# Print performance summary
|
||||||
|
print(f"\n 📊 Performance Summary:")
|
||||||
|
for i, resp in enumerate(valid_responses, 1):
|
||||||
|
print(f" Worker {i}: {resp.tokens_generated} tokens @ {resp.tokens_per_second:.1f} t/s ({resp.latency_ms:.0f}ms)")
|
||||||
|
|
||||||
# Run consensus
|
# Run consensus
|
||||||
result = await self.consensus.select_best(valid_responses)
|
result = await self.consensus.select_best(valid_responses)
|
||||||
@@ -494,7 +499,7 @@ class SwarmManager:
|
|||||||
try:
|
try:
|
||||||
response = await worker.generate_with_progress(request)
|
response = await worker.generate_with_progress(request)
|
||||||
responses.append(response)
|
responses.append(response)
|
||||||
print(f" ✓ Response {i+1} completed ({response.tokens_generated} tokens)")
|
print(f" ✓ Response {i+1} completed ({response.tokens_generated} tokens, {response.tokens_per_second:.1f} t/s)")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
responses.append(e)
|
responses.append(e)
|
||||||
print(f" ✗ Response {i+1} failed: {e}")
|
print(f" ✗ Response {i+1} failed: {e}")
|
||||||
@@ -513,6 +518,11 @@ class SwarmManager:
|
|||||||
|
|
||||||
print(f" Got {len(valid_responses)} valid responses")
|
print(f" Got {len(valid_responses)} valid responses")
|
||||||
|
|
||||||
|
# Print performance summary
|
||||||
|
print(f"\n 📊 Performance Summary:")
|
||||||
|
for i, resp in enumerate(valid_responses, 1):
|
||||||
|
print(f" Seed {i}: {resp.tokens_generated} tokens @ {resp.tokens_per_second:.1f} t/s ({resp.latency_ms:.0f}ms)")
|
||||||
|
|
||||||
# Run consensus
|
# Run consensus
|
||||||
result = await self.consensus.select_best(valid_responses)
|
result = await self.consensus.select_best(valid_responses)
|
||||||
print(f" Selected response using '{result.strategy}' strategy (confidence: {result.confidence:.2f})")
|
print(f" Selected response using '{result.strategy}' strategy (confidence: {result.confidence:.2f})")
|
||||||
|
|||||||
@@ -7,11 +7,11 @@ import logging
|
|||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
|
||||||
def setup_logging(level=logging.DEBUG):
|
def setup_logging(level=logging.INFO):
|
||||||
"""Set up logging configuration.
|
"""Set up logging configuration.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
level: Logging level (default: DEBUG for development)
|
level: Logging level (default: INFO)
|
||||||
"""
|
"""
|
||||||
# Create formatter
|
# Create formatter
|
||||||
formatter = logging.Formatter(
|
formatter = logging.Formatter(
|
||||||
|
|||||||
Reference in New Issue
Block a user