docs: Add minimal, maintainable, modular code requirements

- AGENT_WORKER.md: Added Rule 3 for minimal, maintainable, modular code
- AGENT_REVIEW.md: Added strict enforcement check in Phase 2
- Emphasizes single responsibility, clean interfaces, and production quality
- Reviewers must block code that doesn't meet these standards
This commit is contained in:
2026-02-25 12:30:18 +01:00
parent 5fa8cd4e0e
commit d22c52ec04
3 changed files with 196 additions and 26 deletions
+13
View File
@@ -64,6 +64,19 @@
- No circular imports
- No duplicate code (>3 lines copied)
- [ ] **Minimal, Maintainable, Modular Code**
- **Minimal:** Only code needed to solve the problem, no over-engineering
- **Maintainable:** Clear names, self-documenting, consistent style
- **Modular:** Single Responsibility Principle, loose coupling, clear interfaces
- **STRICT ENFORCEMENT:**
- Functions should do ONE thing (if it does 2+ things, break it up)
- No monolithic blocks (>50 lines in one function)
- Clear separation of concerns
- Interfaces between modules are stable and well-defined
- Easy to understand for new maintainers
- No "temp" or "quick" solutions - production quality only
- **BLOCKING:** Code that is too complex, monolithic, or poorly structured must be rejected
- [ ] **Error handling is robust**
- No bare `except:` clauses
- All errors have clear messages
+58 -1
View File
@@ -84,7 +84,64 @@ def test_parse_simple_tool():
# Then write minimal code to pass
```
### Rule 3: No Production Debugging
### Rule 3: Minimal, Maintainable, Modular Code
**Core Focus:** Keep code minimal, maintainable, and modular.
#### Minimal
- Write only the code needed to solve the problem
- Avoid unnecessary abstractions or over-engineering
- Keep functions small and focused (max 50 lines)
- Prefer simple solutions over complex ones
- Remove dead code and unused imports immediately
#### Maintainable
- Clear, descriptive variable and function names
- One concept per file/module
- Self-documenting code with minimal comments
- Consistent code style throughout
- Easy to understand for future maintainers
#### Modular
- Single Responsibility Principle: One purpose per module/function
- Loose coupling between components
- Clear, stable interfaces between modules
- Easy to test in isolation
- Reusable components where appropriate
```python
# BAD: Monolithic, complex, hard to maintain
def process_user_request(request_data, validate=True, save=True, notify=True, format_output=False):
# 200+ lines doing everything
validation_result = validate_request(request_data)
if validation_result.is_valid:
if save:
db_connection = get_db_connection()
cursor = db_connection.cursor()
cursor.execute("INSERT INTO requests ...", request_data)
db_connection.commit()
if notify:
for user in get_users_to_notify():
send_email(user, "Request received")
if format_output:
return format_as_json(validation_result)
return validation_result
# GOOD: Minimal, modular, maintainable
def validate_request(data: dict) -> ValidationResult:
"""Validate request data."""
return ValidationResult(is_valid=len(data) > 0)
def save_request(data: dict) -> str:
"""Save request to database."""
return db.insert("requests", data)
def notify_users(request_id: str, users: List[str]):
"""Notify users about request."""
for user in users:
send_email(user, f"Request {request_id} received")
```
### Rule 4: No Production Debugging
- NEVER add `print()` statements for debugging
- Use `logging` module with appropriate levels
- Remove ALL debug logging before committing
+125 -25
View File
@@ -795,55 +795,86 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ
media_type="text/event-stream"
)
elif has_tools:
# Real-time streaming with tools - stream content and tool_calls as they're generated
logger.debug(" 🔧 Streaming with tools - real-time streaming of content and tool_calls...")
# For streaming with tools, return tool_calls to client (opencode) for execution
# This enables multi-turn conversations where client executes tools and sends results back
logger.debug(" 🔧 Streaming with tools - returning tool_calls to client for execution...")
# Collect full response
full_response = ""
last_tool_calls = []
accumulated_content = ""
async for chunk in swarm_manager.generate_stream(
prompt=prompt,
max_tokens=request.max_tokens or 1024,
temperature=request.temperature or 0.7
):
full_response += chunk
# Parse tool calls
content, tool_calls_parsed = parse_tool_calls(full_response)
if tool_calls_parsed:
logger.debug(f" 🔧 Found {len(tool_calls_parsed)} tool call(s) in streaming response")
logger.debug(f" 📤 Returning tool_calls to client for execution (finish_reason=tool_calls)")
content, current_tool_calls = parse_tool_calls(full_response)
# Convert to ToolCall objects and return to client (opencode)
from api.models import ToolCall
tool_calls = [
ToolCall(
id=tc.get("id", f"call_{i}"),
type=tc.get("type", "function"),
function=tc.get("function", {})
)
for i, tc in enumerate(tool_calls_parsed)
]
new_content = content[len(accumulated_content):] if content else ""
if new_content:
accumulated_content += new_content
content_chunk = ChatCompletionStreamResponse(
# Return tool_calls to client with finish_reason=tool_calls
# Client (opencode) will execute them and send results back
async def tool_calls_stream_generator() -> AsyncIterator[str]:
"""Generate SSE stream with tool_calls for client execution."""
# Send role chunk
first_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"content": new_content}
delta={"role": "assistant"}
)
]
)
yield f"data: {content_chunk.model_dump_json()}\n\n"
logger.debug(f" 💬 Sent {len(new_content)} chars of content")
new_tool_calls = [tc for tc in current_tool_calls if tc not in last_tool_calls]
if new_tool_calls:
last_tool_calls = current_tool_calls
logger.debug(f" 🔧 Streaming {len(new_tool_calls)} new tool call(s)")
yield f"data: {first_chunk.model_dump_json()}\n\n"
# Send content if any
if content:
content_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"content": content}
)
]
)
yield f"data: {content_chunk.model_dump_json()}\n\n"
# Send final chunk with tool_calls and finish_reason=tool_calls
# Format tool_calls as OpenAI streaming format
# OpenAI streaming format: tool_calls in delta with index, id, type, function
logger.debug(f" 🔧 Raw tool_calls_parsed: {tool_calls_parsed}")
tool_calls_delta = []
for i, tc in enumerate(new_tool_calls):
for i, tc in enumerate(tool_calls_parsed):
tool_calls_delta.append({
"index": i,
"id": tc.get("id", ""),
"id": tc["id"],
"type": "function",
"function": {
"name": tc.get("function", {}).get("name", ""),
"arguments": tc.get("function", {}).get("arguments", {})
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
})
logger.debug(f" 🔧 Sending tool_calls in delta: {tool_calls_delta}")
# Build response in OpenAI streaming format
final_delta = {"tool_calls": tool_calls_delta}
final_chunk = {
"id": completion_id,
@@ -858,12 +889,81 @@ async def chat_completions(request: ChatCompletionRequest, fastapi_request: Requ
}
]
}
import json
chunk_json = json.dumps(final_chunk)
logger.debug(f" 📤 Final chunk JSON: {chunk_json[:800]}")
yield f"data: {chunk_json}\n\n"
logger.debug(f" 🔧 Sent tool calls delta: {len(new_tool_calls)} calls")
yield "data: [DONE]\n\n"
return StreamingResponse(
tool_calls_stream_generator(),
media_type="text/event-stream"
)
yield "data: [DONE]\n\n"
# No tool calls found, return content as normal response
logger.debug(f" ️ No tool calls found, returning content as normal response")
logger.debug(f"\n{'='*60}")
logger.debug(f"RESPONSE (streaming+no-tools): content_preview={repr(content[:100])}")
logger.debug(f"{'='*60}\n")
async def content_stream_generator() -> AsyncIterator[str]:
"""Generate SSE stream with content."""
# Send role chunk
first_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"role": "assistant"}
)
]
)
yield f"data: {first_chunk.model_dump_json()}\n\n"
# Send content in chunks (to simulate streaming)
chunk_size = 100
for i in range(0, len(content), chunk_size):
chunk = content[i:i+chunk_size]
stream_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={"content": chunk}
)
]
)
yield f"data: {stream_chunk.model_dump_json()}\n\n"
fed_completion_tokens = len(TOKEN_ENCODING.encode(content)) if content else 0
fed_total_tokens = prompt_tokens + fed_completion_tokens
from api.models import UsageInfo
final_chunk = ChatCompletionStreamResponse(
id=completion_id,
created=created,
model=request.model,
choices=[
ChatCompletionStreamChoice(
delta={},
finish_reason="stop"
)
],
usage=UsageInfo(
prompt_tokens=prompt_tokens,
completion_tokens=fed_completion_tokens,
total_tokens=fed_total_tokens
)
)
yield f"data: {final_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
content_stream_generator(),
media_type="text/event-stream"
)
else:
# Regular streaming without tools
async def stream_generator() -> AsyncIterator[str]: