Graph Orchestration Architecture¶
Overview¶
DeepCritical implements a graph-based orchestration system for research workflows using Pydantic AI agents as nodes. This enables better parallel execution, conditional routing, and state management compared to simple agent chains.
<<<<<<< HEAD
Conversation History¶
DeepCritical supports multi-turn conversations through Pydantic AI's native message history format. The system maintains two types of history:
- User Conversation History: Multi-turn user interactions (from Gradio chat interface) stored as
list[ModelMessage] - Research Iteration History: Internal research process state (existing
Conversationmodel)
Message History Flow¶
Gradio Chat History → convert_gradio_to_message_history() → GraphOrchestrator.run(message_history)
↓
GraphExecutionContext (stores message_history)
↓
Agent Nodes (receive message_history via agent.run())
↓
WorkflowState (persists user_message_history)
Usage¶
Message history is automatically converted from Gradio format and passed through the orchestrator:
# In app.py - automatic conversion
message_history = convert_gradio_to_message_history(history) if history else None
async for event in orchestrator.run(query, message_history=message_history):
yield event
Agents receive message history through their run() methods:
# In agent execution
if message_history:
result = await agent.run(input_data, message_history=message_history)
=======
origin/dev
Graph Patterns¶
Iterative Research Graph¶
The iterative research graph follows this pattern:
[Input] → [Thinking] → [Knowledge Gap] → [Decision: Complete?]
↓ No ↓ Yes
[Tool Selector] [Writer]
↓
[Execute Tools] → [Loop Back]
Node IDs: thinking → knowledge_gap → continue_decision → tool_selector/writer → execute_tools → (loop back to thinking)
Special Node Handling: - execute_tools: State node that uses search_handler to execute searches and add evidence to workflow state - continue_decision: Decision node that routes based on research_complete flag from KnowledgeGapOutput
Deep Research Graph¶
The deep research graph follows this pattern:
[Input] → [Planner] → [Store Plan] → [Parallel Loops] → [Collect Drafts] → [Synthesizer]
↓ ↓ ↓
[Loop1] [Loop2] [Loop3]
Node IDs: planner → store_plan → parallel_loops → collect_drafts → synthesizer
Special Node Handling: - planner: Agent node that creates ReportPlan with report outline - store_plan: State node that stores ReportPlan in context for parallel loops - parallel_loops: Parallel node that executes IterativeResearchFlow instances for each section - collect_drafts: State node that collects section drafts from parallel loops - synthesizer: Agent node that calls LongWriterAgent.write_report() directly with ReportDraft
Deep Research¶
sequenceDiagram
actor User
participant GraphOrchestrator
participant InputParser
participant GraphBuilder
participant GraphExecutor
participant Agent
participant BudgetTracker
participant WorkflowState
User->>GraphOrchestrator: run(query)
GraphOrchestrator->>InputParser: detect_research_mode(query)
InputParser-->>GraphOrchestrator: mode (iterative/deep)
GraphOrchestrator->>GraphBuilder: build_graph(mode)
GraphBuilder-->>GraphOrchestrator: ResearchGraph
GraphOrchestrator->>WorkflowState: init_workflow_state()
GraphOrchestrator->>BudgetTracker: create_budget()
GraphOrchestrator->>GraphExecutor: _execute_graph(graph)
loop For each node in graph
GraphExecutor->>Agent: execute_node(agent_node)
Agent->>Agent: process_input
Agent-->>GraphExecutor: result
GraphExecutor->>WorkflowState: update_state(result)
GraphExecutor->>BudgetTracker: add_tokens(used)
GraphExecutor->>BudgetTracker: check_budget()
alt Budget exceeded
GraphExecutor->>GraphOrchestrator: emit(error_event)
else Continue
GraphExecutor->>GraphOrchestrator: emit(progress_event)
end
end
GraphOrchestrator->>User: AsyncGenerator[AgentEvent]
Iterative Research¶
sequenceDiagram
participant IterativeFlow
participant ThinkingAgent
participant KnowledgeGapAgent
participant ToolSelector
participant ToolExecutor
participant JudgeHandler
participant WriterAgent
IterativeFlow->>IterativeFlow: run(query)
loop Until complete or max_iterations
IterativeFlow->>ThinkingAgent: generate_observations()
ThinkingAgent-->>IterativeFlow: observations
IterativeFlow->>KnowledgeGapAgent: evaluate_gaps()
KnowledgeGapAgent-->>IterativeFlow: KnowledgeGapOutput
alt Research complete
IterativeFlow->>WriterAgent: create_final_report()
WriterAgent-->>IterativeFlow: final_report
else Gaps remain
IterativeFlow->>ToolSelector: select_agents(gap)
ToolSelector-->>IterativeFlow: AgentSelectionPlan
IterativeFlow->>ToolExecutor: execute_tool_tasks()
ToolExecutor-->>IterativeFlow: ToolAgentOutput[]
IterativeFlow->>JudgeHandler: assess_evidence()
JudgeHandler-->>IterativeFlow: should_continue
end
end Graph Structure¶
Nodes¶
Graph nodes represent different stages in the research workflow:
- Agent Nodes: Execute Pydantic AI agents
- Input: Prompt/query
- Output: Structured or unstructured response
-
Examples:
KnowledgeGapAgent,ToolSelectorAgent,ThinkingAgent -
State Nodes: Update or read workflow state
- Input: Current state
- Output: Updated state
-
Examples: Update evidence, update conversation history
-
Decision Nodes: Make routing decisions based on conditions
- Input: Current state/results
- Output: Next node ID
-
Examples: Continue research vs. complete research
-
Parallel Nodes: Execute multiple nodes concurrently
- Input: List of node IDs
- Output: Aggregated results
- Examples: Parallel iterative research loops
Edges¶
Edges define transitions between nodes:
- Sequential Edges: Always traversed (no condition)
- From: Source node
- To: Target node
-
Condition: None (always True)
-
Conditional Edges: Traversed based on condition
- From: Source node
- To: Target node
- Condition: Callable that returns bool
-
Example: If research complete → go to writer, else → continue loop
-
Parallel Edges: Used for parallel execution branches
- From: Parallel node
- To: Multiple target nodes
- Execution: All targets run concurrently
State Management¶
State is managed via WorkflowState using ContextVar for thread-safe isolation:
- Evidence: Collected evidence from searches
- Conversation: Iteration history (gaps, tool calls, findings, thoughts)
- Embedding Service: For semantic search
State transitions occur at state nodes, which update the global workflow state.
Execution Flow¶
- Graph Construction: Build graph from nodes and edges using
create_iterative_graph()orcreate_deep_graph() - Graph Validation: Ensure graph is valid (no cycles, all nodes reachable) via
ResearchGraph.validate_structure() - Graph Execution: Traverse graph from entry node using
GraphOrchestrator._execute_graph() - Node Execution: Execute each node based on type:
- Agent Nodes: Call
agent.run()with transformed input - State Nodes: Update workflow state via
state_updaterfunction - Decision Nodes: Evaluate
decision_functionto get next node ID - Parallel Nodes: Execute all parallel nodes concurrently via
asyncio.gather() - Edge Evaluation: Determine next node(s) based on edges and conditions
- Parallel Execution: Use
asyncio.gather()for parallel nodes - State Updates: Update state at state nodes via
GraphExecutionContext.update_state() - Event Streaming: Yield
AgentEventobjects during execution for UI
GraphExecutionContext¶
The GraphExecutionContext class manages execution state during graph traversal:
- State: Current
WorkflowStateinstance - Budget Tracker:
BudgetTrackerinstance for budget enforcement - Node Results: Dictionary storing results from each node execution
- Visited Nodes: Set of node IDs that have been executed
- Current Node: ID of the node currently being executed
Methods: - set_node_result(node_id, result): Store result from node execution - get_node_result(node_id): Retrieve stored result - has_visited(node_id): Check if node was visited - mark_visited(node_id): Mark node as visited - update_state(updater, data): Update workflow state
Conditional Routing¶
Decision nodes evaluate conditions and return next node IDs:
- Knowledge Gap Decision: If
research_complete→ writer, else → tool selector - Budget Decision: If budget exceeded → exit, else → continue
- Iteration Decision: If max iterations → exit, else → continue
Parallel Execution¶
Parallel nodes execute multiple nodes concurrently:
- Each parallel branch runs independently
- Results are aggregated after all branches complete
- State is synchronized after parallel execution
- Errors in one branch don't stop other branches
Budget Enforcement¶
Budget constraints are enforced at decision nodes:
- Token Budget: Track LLM token usage
- Time Budget: Track elapsed time
- Iteration Budget: Track iteration count
If any budget is exceeded, execution routes to exit node.
Error Handling¶
Errors are handled at multiple levels:
- Node Level: Catch errors in individual node execution
- Graph Level: Handle errors during graph traversal
- State Level: Rollback state changes on error
Errors are logged and yield error events for UI.
Backward Compatibility¶
Graph execution is optional via feature flag:
USE_GRAPH_EXECUTION=true: Use graph-based executionUSE_GRAPH_EXECUTION=false: Use agent chain execution (existing)
This allows gradual migration and fallback if needed.
See Also¶
- Orchestrators - Overview of all orchestrator patterns
- Workflow Diagrams - Detailed workflow diagrams
- API Reference - Orchestrators - API documentation