Graph Orchestration Architecture¶
Overview¶
DeepCritical implements a graph-based orchestration system for research workflows using Pydantic AI agents as nodes. This enables better parallel execution, conditional routing, and state management compared to simple agent chains.
Graph Patterns¶
Iterative Research Graph¶
The iterative research graph follows this pattern:
[Input] → [Thinking] → [Knowledge Gap] → [Decision: Complete?]
↓ No ↓ Yes
[Tool Selector] [Writer]
↓
[Execute Tools] → [Loop Back]
Node IDs: thinking → knowledge_gap → continue_decision → tool_selector/writer → execute_tools → (loop back to thinking)
Special Node Handling: - execute_tools: State node that uses search_handler to execute searches and add evidence to workflow state - continue_decision: Decision node that routes based on research_complete flag from KnowledgeGapOutput
Deep Research Graph¶
The deep research graph follows this pattern:
[Input] → [Planner] → [Store Plan] → [Parallel Loops] → [Collect Drafts] → [Synthesizer]
↓ ↓ ↓
[Loop1] [Loop2] [Loop3]
Node IDs: planner → store_plan → parallel_loops → collect_drafts → synthesizer
Special Node Handling: - planner: Agent node that creates ReportPlan with report outline - store_plan: State node that stores ReportPlan in context for parallel loops - parallel_loops: Parallel node that executes IterativeResearchFlow instances for each section - collect_drafts: State node that collects section drafts from parallel loops - synthesizer: Agent node that calls LongWriterAgent.write_report() directly with ReportDraft
Deep Research¶
sequenceDiagram
actor User
participant GraphOrchestrator
participant InputParser
participant GraphBuilder
participant GraphExecutor
participant Agent
participant BudgetTracker
participant WorkflowState
User->>GraphOrchestrator: run(query)
GraphOrchestrator->>InputParser: detect_research_mode(query)
InputParser-->>GraphOrchestrator: mode (iterative/deep)
GraphOrchestrator->>GraphBuilder: build_graph(mode)
GraphBuilder-->>GraphOrchestrator: ResearchGraph
GraphOrchestrator->>WorkflowState: init_workflow_state()
GraphOrchestrator->>BudgetTracker: create_budget()
GraphOrchestrator->>GraphExecutor: _execute_graph(graph)
loop For each node in graph
GraphExecutor->>Agent: execute_node(agent_node)
Agent->>Agent: process_input
Agent-->>GraphExecutor: result
GraphExecutor->>WorkflowState: update_state(result)
GraphExecutor->>BudgetTracker: add_tokens(used)
GraphExecutor->>BudgetTracker: check_budget()
alt Budget exceeded
GraphExecutor->>GraphOrchestrator: emit(error_event)
else Continue
GraphExecutor->>GraphOrchestrator: emit(progress_event)
end
end
GraphOrchestrator->>User: AsyncGenerator[AgentEvent]
Iterative Research¶
sequenceDiagram
participant IterativeFlow
participant ThinkingAgent
participant KnowledgeGapAgent
participant ToolSelector
participant ToolExecutor
participant JudgeHandler
participant WriterAgent
IterativeFlow->>IterativeFlow: run(query)
loop Until complete or max_iterations
IterativeFlow->>ThinkingAgent: generate_observations()
ThinkingAgent-->>IterativeFlow: observations
IterativeFlow->>KnowledgeGapAgent: evaluate_gaps()
KnowledgeGapAgent-->>IterativeFlow: KnowledgeGapOutput
alt Research complete
IterativeFlow->>WriterAgent: create_final_report()
WriterAgent-->>IterativeFlow: final_report
else Gaps remain
IterativeFlow->>ToolSelector: select_agents(gap)
ToolSelector-->>IterativeFlow: AgentSelectionPlan
IterativeFlow->>ToolExecutor: execute_tool_tasks()
ToolExecutor-->>IterativeFlow: ToolAgentOutput[]
IterativeFlow->>JudgeHandler: assess_evidence()
JudgeHandler-->>IterativeFlow: should_continue
end
end Graph Structure¶
Nodes¶
Graph nodes represent different stages in the research workflow:
- Agent Nodes: Execute Pydantic AI agents
- Input: Prompt/query
- Output: Structured or unstructured response
-
Examples:
KnowledgeGapAgent,ToolSelectorAgent,ThinkingAgent -
State Nodes: Update or read workflow state
- Input: Current state
- Output: Updated state
-
Examples: Update evidence, update conversation history
-
Decision Nodes: Make routing decisions based on conditions
- Input: Current state/results
- Output: Next node ID
-
Examples: Continue research vs. complete research
-
Parallel Nodes: Execute multiple nodes concurrently
- Input: List of node IDs
- Output: Aggregated results
- Examples: Parallel iterative research loops
Edges¶
Edges define transitions between nodes:
- Sequential Edges: Always traversed (no condition)
- From: Source node
- To: Target node
-
Condition: None (always True)
-
Conditional Edges: Traversed based on condition
- From: Source node
- To: Target node
- Condition: Callable that returns bool
-
Example: If research complete → go to writer, else → continue loop
-
Parallel Edges: Used for parallel execution branches
- From: Parallel node
- To: Multiple target nodes
- Execution: All targets run concurrently
State Management¶
State is managed via WorkflowState using ContextVar for thread-safe isolation:
- Evidence: Collected evidence from searches
- Conversation: Iteration history (gaps, tool calls, findings, thoughts)
- Embedding Service: For semantic search
State transitions occur at state nodes, which update the global workflow state.
Execution Flow¶
- Graph Construction: Build graph from nodes and edges using
create_iterative_graph()orcreate_deep_graph() - Graph Validation: Ensure graph is valid (no cycles, all nodes reachable) via
ResearchGraph.validate_structure() - Graph Execution: Traverse graph from entry node using
GraphOrchestrator._execute_graph() - Node Execution: Execute each node based on type:
- Agent Nodes: Call
agent.run()with transformed input - State Nodes: Update workflow state via
state_updaterfunction - Decision Nodes: Evaluate
decision_functionto get next node ID - Parallel Nodes: Execute all parallel nodes concurrently via
asyncio.gather() - Edge Evaluation: Determine next node(s) based on edges and conditions
- Parallel Execution: Use
asyncio.gather()for parallel nodes - State Updates: Update state at state nodes via
GraphExecutionContext.update_state() - Event Streaming: Yield
AgentEventobjects during execution for UI
GraphExecutionContext¶
The GraphExecutionContext class manages execution state during graph traversal:
- State: Current
WorkflowStateinstance - Budget Tracker:
BudgetTrackerinstance for budget enforcement - Node Results: Dictionary storing results from each node execution
- Visited Nodes: Set of node IDs that have been executed
- Current Node: ID of the node currently being executed
Methods: - set_node_result(node_id, result): Store result from node execution - get_node_result(node_id): Retrieve stored result - has_visited(node_id): Check if node was visited - mark_visited(node_id): Mark node as visited - update_state(updater, data): Update workflow state
Conditional Routing¶
Decision nodes evaluate conditions and return next node IDs:
- Knowledge Gap Decision: If
research_complete→ writer, else → tool selector - Budget Decision: If budget exceeded → exit, else → continue
- Iteration Decision: If max iterations → exit, else → continue
Parallel Execution¶
Parallel nodes execute multiple nodes concurrently:
- Each parallel branch runs independently
- Results are aggregated after all branches complete
- State is synchronized after parallel execution
- Errors in one branch don't stop other branches
Budget Enforcement¶
Budget constraints are enforced at decision nodes:
- Token Budget: Track LLM token usage
- Time Budget: Track elapsed time
- Iteration Budget: Track iteration count
If any budget is exceeded, execution routes to exit node.
Error Handling¶
Errors are handled at multiple levels:
- Node Level: Catch errors in individual node execution
- Graph Level: Handle errors during graph traversal
- State Level: Rollback state changes on error
Errors are logged and yield error events for UI.
Backward Compatibility¶
Graph execution is optional via feature flag:
USE_GRAPH_EXECUTION=true: Use graph-based executionUSE_GRAPH_EXECUTION=false: Use agent chain execution (existing)
This allows gradual migration and fallback if needed.
See Also¶
- Orchestrators - Overview of all orchestrator patterns
- Workflow Diagrams - Detailed workflow diagrams
- API Reference - Orchestrators - API documentation