Skip to content

Graph Orchestration Architecture

Overview

DeepCritical implements a graph-based orchestration system for research workflows using Pydantic AI agents as nodes. This enables better parallel execution, conditional routing, and state management compared to simple agent chains.

Graph Patterns

Iterative Research Graph

The iterative research graph follows this pattern:

[Input] → [Thinking] → [Knowledge Gap] → [Decision: Complete?]
                                              ↓ No          ↓ Yes
                                    [Tool Selector]    [Writer]
                                    [Execute Tools] → [Loop Back]

Node IDs: thinkingknowledge_gapcontinue_decisiontool_selector/writerexecute_tools → (loop back to thinking)

Special Node Handling: - execute_tools: State node that uses search_handler to execute searches and add evidence to workflow state - continue_decision: Decision node that routes based on research_complete flag from KnowledgeGapOutput

Deep Research Graph

The deep research graph follows this pattern:

[Input] → [Planner] → [Store Plan] → [Parallel Loops] → [Collect Drafts] → [Synthesizer]
                                        ↓         ↓         ↓
                                     [Loop1]  [Loop2]  [Loop3]

Node IDs: plannerstore_planparallel_loopscollect_draftssynthesizer

Special Node Handling: - planner: Agent node that creates ReportPlan with report outline - store_plan: State node that stores ReportPlan in context for parallel loops - parallel_loops: Parallel node that executes IterativeResearchFlow instances for each section - collect_drafts: State node that collects section drafts from parallel loops - synthesizer: Agent node that calls LongWriterAgent.write_report() directly with ReportDraft

Deep Research


sequenceDiagram
    actor User
    participant GraphOrchestrator
    participant InputParser
    participant GraphBuilder
    participant GraphExecutor
    participant Agent
    participant BudgetTracker
    participant WorkflowState

    User->>GraphOrchestrator: run(query)
    GraphOrchestrator->>InputParser: detect_research_mode(query)
    InputParser-->>GraphOrchestrator: mode (iterative/deep)
    GraphOrchestrator->>GraphBuilder: build_graph(mode)
    GraphBuilder-->>GraphOrchestrator: ResearchGraph
    GraphOrchestrator->>WorkflowState: init_workflow_state()
    GraphOrchestrator->>BudgetTracker: create_budget()
    GraphOrchestrator->>GraphExecutor: _execute_graph(graph)
    
    loop For each node in graph
        GraphExecutor->>Agent: execute_node(agent_node)
        Agent->>Agent: process_input
        Agent-->>GraphExecutor: result
        GraphExecutor->>WorkflowState: update_state(result)
        GraphExecutor->>BudgetTracker: add_tokens(used)
        GraphExecutor->>BudgetTracker: check_budget()
        alt Budget exceeded
            GraphExecutor->>GraphOrchestrator: emit(error_event)
        else Continue
            GraphExecutor->>GraphOrchestrator: emit(progress_event)
        end
    end
    
    GraphOrchestrator->>User: AsyncGenerator[AgentEvent]

Iterative Research

sequenceDiagram
    participant IterativeFlow
    participant ThinkingAgent
    participant KnowledgeGapAgent
    participant ToolSelector
    participant ToolExecutor
    participant JudgeHandler
    participant WriterAgent

    IterativeFlow->>IterativeFlow: run(query)
    
    loop Until complete or max_iterations
        IterativeFlow->>ThinkingAgent: generate_observations()
        ThinkingAgent-->>IterativeFlow: observations
        
        IterativeFlow->>KnowledgeGapAgent: evaluate_gaps()
        KnowledgeGapAgent-->>IterativeFlow: KnowledgeGapOutput
        
        alt Research complete
            IterativeFlow->>WriterAgent: create_final_report()
            WriterAgent-->>IterativeFlow: final_report
        else Gaps remain
            IterativeFlow->>ToolSelector: select_agents(gap)
            ToolSelector-->>IterativeFlow: AgentSelectionPlan
            
            IterativeFlow->>ToolExecutor: execute_tool_tasks()
            ToolExecutor-->>IterativeFlow: ToolAgentOutput[]
            
            IterativeFlow->>JudgeHandler: assess_evidence()
            JudgeHandler-->>IterativeFlow: should_continue
        end
    end

Graph Structure

Nodes

Graph nodes represent different stages in the research workflow:

  1. Agent Nodes: Execute Pydantic AI agents
  2. Input: Prompt/query
  3. Output: Structured or unstructured response
  4. Examples: KnowledgeGapAgent, ToolSelectorAgent, ThinkingAgent

  5. State Nodes: Update or read workflow state

  6. Input: Current state
  7. Output: Updated state
  8. Examples: Update evidence, update conversation history

  9. Decision Nodes: Make routing decisions based on conditions

  10. Input: Current state/results
  11. Output: Next node ID
  12. Examples: Continue research vs. complete research

  13. Parallel Nodes: Execute multiple nodes concurrently

  14. Input: List of node IDs
  15. Output: Aggregated results
  16. Examples: Parallel iterative research loops

Edges

Edges define transitions between nodes:

  1. Sequential Edges: Always traversed (no condition)
  2. From: Source node
  3. To: Target node
  4. Condition: None (always True)

  5. Conditional Edges: Traversed based on condition

  6. From: Source node
  7. To: Target node
  8. Condition: Callable that returns bool
  9. Example: If research complete → go to writer, else → continue loop

  10. Parallel Edges: Used for parallel execution branches

  11. From: Parallel node
  12. To: Multiple target nodes
  13. Execution: All targets run concurrently

State Management

State is managed via WorkflowState using ContextVar for thread-safe isolation:

  • Evidence: Collected evidence from searches
  • Conversation: Iteration history (gaps, tool calls, findings, thoughts)
  • Embedding Service: For semantic search

State transitions occur at state nodes, which update the global workflow state.

Execution Flow

  1. Graph Construction: Build graph from nodes and edges using create_iterative_graph() or create_deep_graph()
  2. Graph Validation: Ensure graph is valid (no cycles, all nodes reachable) via ResearchGraph.validate_structure()
  3. Graph Execution: Traverse graph from entry node using GraphOrchestrator._execute_graph()
  4. Node Execution: Execute each node based on type:
  5. Agent Nodes: Call agent.run() with transformed input
  6. State Nodes: Update workflow state via state_updater function
  7. Decision Nodes: Evaluate decision_function to get next node ID
  8. Parallel Nodes: Execute all parallel nodes concurrently via asyncio.gather()
  9. Edge Evaluation: Determine next node(s) based on edges and conditions
  10. Parallel Execution: Use asyncio.gather() for parallel nodes
  11. State Updates: Update state at state nodes via GraphExecutionContext.update_state()
  12. Event Streaming: Yield AgentEvent objects during execution for UI

GraphExecutionContext

The GraphExecutionContext class manages execution state during graph traversal:

  • State: Current WorkflowState instance
  • Budget Tracker: BudgetTracker instance for budget enforcement
  • Node Results: Dictionary storing results from each node execution
  • Visited Nodes: Set of node IDs that have been executed
  • Current Node: ID of the node currently being executed

Methods: - set_node_result(node_id, result): Store result from node execution - get_node_result(node_id): Retrieve stored result - has_visited(node_id): Check if node was visited - mark_visited(node_id): Mark node as visited - update_state(updater, data): Update workflow state

Conditional Routing

Decision nodes evaluate conditions and return next node IDs:

  • Knowledge Gap Decision: If research_complete → writer, else → tool selector
  • Budget Decision: If budget exceeded → exit, else → continue
  • Iteration Decision: If max iterations → exit, else → continue

Parallel Execution

Parallel nodes execute multiple nodes concurrently:

  • Each parallel branch runs independently
  • Results are aggregated after all branches complete
  • State is synchronized after parallel execution
  • Errors in one branch don't stop other branches

Budget Enforcement

Budget constraints are enforced at decision nodes:

  • Token Budget: Track LLM token usage
  • Time Budget: Track elapsed time
  • Iteration Budget: Track iteration count

If any budget is exceeded, execution routes to exit node.

Error Handling

Errors are handled at multiple levels:

  1. Node Level: Catch errors in individual node execution
  2. Graph Level: Handle errors during graph traversal
  3. State Level: Rollback state changes on error

Errors are logged and yield error events for UI.

Backward Compatibility

Graph execution is optional via feature flag:

  • USE_GRAPH_EXECUTION=true: Use graph-based execution
  • USE_GRAPH_EXECUTION=false: Use agent chain execution (existing)

This allows gradual migration and fallback if needed.

See Also