From 5fa8cd4e0eceb0cce601b4b4e7290deabd9319b7 Mon Sep 17 00:00:00 2001 From: Kaloyan Nikolov Date: Wed, 25 Feb 2026 12:25:19 +0100 Subject: [PATCH] fix: Correct streaming implementation syntax - Fixed indentation in routes.py streaming code - Real-time streaming now properly structured - All syntax errors resolved --- src/api/routes.py | 282 ++++++++++------------------------------------ 1 file changed, 57 insertions(+), 225 deletions(-) diff --git a/src/api/routes.py b/src/api/routes.py index 0ccad15..079114d 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -795,231 +795,6 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ media_type="text/event-stream" ) elif has_tools: - # For streaming with tools, return tool_calls to client (opencode) for execution - # This enables multi-turn conversations where client executes tools and sends results back - logger.debug(" 🔧 Streaming with tools - returning tool_calls to client for execution...") - # Collect full response - full_response = "" - async for chunk in swarm_manager.generate_stream( - prompt=prompt, - max_tokens=request.max_tokens or 1024, - temperature=request.temperature or 0.7 - ): - full_response += chunk - - # Parse tool calls - content, tool_calls_parsed = parse_tool_calls(full_response) - if tool_calls_parsed: - logger.debug(f" 🔧 Found {len(tool_calls_parsed)} tool call(s) in streaming response") - logger.debug(f" 📤 Returning tool_calls to client for execution (finish_reason=tool_calls)") - - # Convert to ToolCall objects and return to client (opencode) - from api.models import ToolCall - tool_calls = [ - ToolCall( - id=tc.get("id", f"call_{i}"), - type=tc.get("type", "function"), - function=tc.get("function", {}) - ) - for i, tc in enumerate(tool_calls_parsed) - ] - - # Return tool_calls to client with finish_reason=tool_calls - # Client (opencode) will execute them and send results back - async def tool_calls_stream_generator() -> AsyncIterator[str]: - """Generate SSE stream with tool_calls for client execution.""" - # Send role chunk - first_chunk = ChatCompletionStreamResponse( - id=completion_id, - created=created, - model=request.model, - choices=[ - ChatCompletionStreamChoice( - delta={"role": "assistant"} - ) - ] - ) - yield f"data: {first_chunk.model_dump_json()}\n\n" - - # Send content if any - if content: - content_chunk = ChatCompletionStreamResponse( - id=completion_id, - created=created, - model=request.model, - choices=[ - ChatCompletionStreamChoice( - delta={"content": content} - ) - ] - ) - yield f"data: {content_chunk.model_dump_json()}\n\n" - - # Send final chunk with tool_calls and finish_reason=tool_calls - # Format tool_calls as OpenAI streaming format - # OpenAI streaming format: tool_calls in delta with index, id, type, function - logger.debug(f" 🔧 Raw tool_calls_parsed: {tool_calls_parsed}") - - tool_calls_delta = [] - for i, tc in enumerate(tool_calls_parsed): - tool_calls_delta.append({ - "index": i, - "id": tc["id"], - "type": "function", - "function": { - "name": tc["function"]["name"], - "arguments": tc["function"]["arguments"] - } - }) - - logger.debug(f" 🔧 Sending tool_calls in delta: {tool_calls_delta}") - - # Build response in OpenAI streaming format - final_delta = {"tool_calls": tool_calls_delta} - final_chunk = { - "id": completion_id, - "object": "chat.completion.chunk", - "created": created, - "model": request.model, - "choices": [ - { - "index": 0, - "delta": final_delta, - "finish_reason": "tool_calls" - } - ] - } - - import json - chunk_json = json.dumps(final_chunk) - logger.debug(f" 📤 Final chunk JSON: {chunk_json[:800]}") - yield f"data: {chunk_json}\n\n" - yield "data: [DONE]\n\n" - - return StreamingResponse( - tool_calls_stream_generator(), - media_type="text/event-stream" - ) - - # No tool calls found, return content as normal response - logger.debug(f" â„šī¸ No tool calls found, returning content as normal response") - logger.debug(f"\n{'='*60}") - logger.debug(f"RESPONSE (streaming+no-tools): content_preview={repr(content[:100])}") - logger.debug(f"{'='*60}\n") - - async def content_stream_generator() -> AsyncIterator[str]: - """Generate SSE stream with content.""" - # Send role chunk - first_chunk = ChatCompletionStreamResponse( - id=completion_id, - created=created, - model=request.model, - choices=[ - ChatCompletionStreamChoice( - delta={"role": "assistant"} - ) - ] - ) - yield f"data: {first_chunk.model_dump_json()}\n\n" - - # Send content in chunks (to simulate streaming) - chunk_size = 100 - for i in range(0, len(content), chunk_size): - chunk = content[i:i+chunk_size] - stream_chunk = ChatCompletionStreamResponse( - id=completion_id, - created=created, - model=request.model, - choices=[ - ChatCompletionStreamChoice( - delta={"content": chunk} - ) - ] - ) - yield f"data: {stream_chunk.model_dump_json()}\n\n" - - fed_completion_tokens = len(TOKEN_ENCODING.encode(content)) if content else 0 - fed_total_tokens = prompt_tokens + fed_completion_tokens - from api.models import UsageInfo - final_chunk = ChatCompletionStreamResponse( - id=completion_id, - created=created, - model=request.model, - choices=[ - ChatCompletionStreamChoice( - delta={}, - finish_reason="stop" - ) - ], - usage=UsageInfo( - prompt_tokens=prompt_tokens, - completion_tokens=fed_completion_tokens, - total_tokens=fed_total_tokens - ) - ) - yield f"data: {final_chunk.model_dump_json()}\n\n" - yield "data: [DONE]\n\n" - - return StreamingResponse( - content_stream_generator(), - media_type="text/event-stream" - ) - else: - # Regular streaming without tools - async def stream_generator() -> AsyncIterator[str]: - """Generate SSE stream.""" - # Send first chunk with role - first_chunk = ChatCompletionStreamResponse( - id=completion_id, - created=created, - model=request.model, - choices=[ - ChatCompletionStreamChoice( - delta={"role": "assistant"} - ) - ] - ) - yield f"data: {first_chunk.model_dump_json()}\n\n" - - # Stream content - async for chunk in swarm_manager.generate_stream( - prompt=prompt, - max_tokens=request.max_tokens or 1024, - temperature=request.temperature or 0.7 - ): - stream_chunk = ChatCompletionStreamResponse( - id=completion_id, - created=created, - model=request.model, - choices=[ - ChatCompletionStreamChoice( - delta={"content": chunk} - ) - ] - ) - yield f"data: {stream_chunk.model_dump_json()}\n\n" - - # Send final chunk - final_chunk = ChatCompletionStreamResponse( - id=completion_id, - created=created, - model=request.model, - choices=[ - ChatCompletionStreamChoice( - delta={}, - finish_reason="stop" - ) - ] - ) - yield f"data: {final_chunk.model_dump_json()}\n\n" - yield "data: [DONE]\n\n" - - return StreamingResponse( - stream_generator(), - media_type="text/event-stream" - ) - - else: # Real-time streaming with tools - stream content and tool_calls as they're generated logger.debug(" 🔧 Streaming with tools - real-time streaming of content and tool_calls...") @@ -1089,6 +864,63 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ logger.debug(f" 🔧 Sent tool calls delta: {len(new_tool_calls)} calls") yield "data: [DONE]\n\n" + else: + # Regular streaming without tools + async def stream_generator() -> AsyncIterator[str]: + """Generate SSE stream.""" + # Send first chunk with role + first_chunk = ChatCompletionStreamResponse( + id=completion_id, + created=created, + model=request.model, + choices=[ + ChatCompletionStreamChoice( + delta={"role": "assistant"} + ) + ] + ) + yield f"data: {first_chunk.model_dump_json()}\n\n" + + # Stream content + async for chunk in swarm_manager.generate_stream( + prompt=prompt, + max_tokens=request.max_tokens or 1024, + temperature=request.temperature or 0.7 + ): + stream_chunk = ChatCompletionStreamResponse( + id=completion_id, + created=created, + model=request.model, + choices=[ + ChatCompletionStreamChoice( + delta={"content": chunk} + ) + ] + ) + yield f"data: {stream_chunk.model_dump_json()}\n\n" + + # Send final chunk + final_chunk = ChatCompletionStreamResponse( + id=completion_id, + created=created, + model=request.model, + choices=[ + ChatCompletionStreamChoice( + delta={}, + finish_reason="stop" + ) + ] + ) + yield f"data: {final_chunk.model_dump_json()}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse( + stream_generator(), + media_type="text/event-stream" + ) + + else: + # Regular response with consensus try: # Use federation if enabled and peers are available if federated_swarm is not None: