Skip to content

Graph Orchestration Architecture

Overview

DeepCritical implements a graph-based orchestration system for research workflows using Pydantic AI agents as nodes. This enables better parallel execution, conditional routing, and state management compared to simple agent chains.

<<<<<<< HEAD

Conversation History

DeepCritical supports multi-turn conversations through Pydantic AI's native message history format. The system maintains two types of history:

  1. User Conversation History: Multi-turn user interactions (from Gradio chat interface) stored as list[ModelMessage]
  2. Research Iteration History: Internal research process state (existing Conversation model)

Message History Flow

Gradio Chat History → convert_gradio_to_message_history() → GraphOrchestrator.run(message_history)
GraphExecutionContext (stores message_history)
Agent Nodes (receive message_history via agent.run())
WorkflowState (persists user_message_history)

Usage

Message history is automatically converted from Gradio format and passed through the orchestrator:

# In app.py - automatic conversion
message_history = convert_gradio_to_message_history(history) if history else None
async for event in orchestrator.run(query, message_history=message_history):
    yield event

Agents receive message history through their run() methods:

# In agent execution
if message_history:
    result = await agent.run(input_data, message_history=message_history)

=======

origin/dev

Graph Patterns

Iterative Research Graph

The iterative research graph follows this pattern:

[Input] → [Thinking] → [Knowledge Gap] → [Decision: Complete?]
                                              ↓ No          ↓ Yes
                                    [Tool Selector]    [Writer]
                                    [Execute Tools] → [Loop Back]

Node IDs: thinkingknowledge_gapcontinue_decisiontool_selector/writerexecute_tools → (loop back to thinking)

Special Node Handling: - execute_tools: State node that uses search_handler to execute searches and add evidence to workflow state - continue_decision: Decision node that routes based on research_complete flag from KnowledgeGapOutput

Deep Research Graph

The deep research graph follows this pattern:

[Input] → [Planner] → [Store Plan] → [Parallel Loops] → [Collect Drafts] → [Synthesizer]
                                        ↓         ↓         ↓
                                     [Loop1]  [Loop2]  [Loop3]

Node IDs: plannerstore_planparallel_loopscollect_draftssynthesizer

Special Node Handling: - planner: Agent node that creates ReportPlan with report outline - store_plan: State node that stores ReportPlan in context for parallel loops - parallel_loops: Parallel node that executes IterativeResearchFlow instances for each section - collect_drafts: State node that collects section drafts from parallel loops - synthesizer: Agent node that calls LongWriterAgent.write_report() directly with ReportDraft

Deep Research


sequenceDiagram
    actor User
    participant GraphOrchestrator
    participant InputParser
    participant GraphBuilder
    participant GraphExecutor
    participant Agent
    participant BudgetTracker
    participant WorkflowState

    User->>GraphOrchestrator: run(query)
    GraphOrchestrator->>InputParser: detect_research_mode(query)
    InputParser-->>GraphOrchestrator: mode (iterative/deep)
    GraphOrchestrator->>GraphBuilder: build_graph(mode)
    GraphBuilder-->>GraphOrchestrator: ResearchGraph
    GraphOrchestrator->>WorkflowState: init_workflow_state()
    GraphOrchestrator->>BudgetTracker: create_budget()
    GraphOrchestrator->>GraphExecutor: _execute_graph(graph)
    
    loop For each node in graph
        GraphExecutor->>Agent: execute_node(agent_node)
        Agent->>Agent: process_input
        Agent-->>GraphExecutor: result
        GraphExecutor->>WorkflowState: update_state(result)
        GraphExecutor->>BudgetTracker: add_tokens(used)
        GraphExecutor->>BudgetTracker: check_budget()
        alt Budget exceeded
            GraphExecutor->>GraphOrchestrator: emit(error_event)
        else Continue
            GraphExecutor->>GraphOrchestrator: emit(progress_event)
        end
    end
    
    GraphOrchestrator->>User: AsyncGenerator[AgentEvent]

Iterative Research

sequenceDiagram
    participant IterativeFlow
    participant ThinkingAgent
    participant KnowledgeGapAgent
    participant ToolSelector
    participant ToolExecutor
    participant JudgeHandler
    participant WriterAgent

    IterativeFlow->>IterativeFlow: run(query)
    
    loop Until complete or max_iterations
        IterativeFlow->>ThinkingAgent: generate_observations()
        ThinkingAgent-->>IterativeFlow: observations
        
        IterativeFlow->>KnowledgeGapAgent: evaluate_gaps()
        KnowledgeGapAgent-->>IterativeFlow: KnowledgeGapOutput
        
        alt Research complete
            IterativeFlow->>WriterAgent: create_final_report()
            WriterAgent-->>IterativeFlow: final_report
        else Gaps remain
            IterativeFlow->>ToolSelector: select_agents(gap)
            ToolSelector-->>IterativeFlow: AgentSelectionPlan
            
            IterativeFlow->>ToolExecutor: execute_tool_tasks()
            ToolExecutor-->>IterativeFlow: ToolAgentOutput[]
            
            IterativeFlow->>JudgeHandler: assess_evidence()
            JudgeHandler-->>IterativeFlow: should_continue
        end
    end

Graph Structure

Nodes

Graph nodes represent different stages in the research workflow:

  1. Agent Nodes: Execute Pydantic AI agents
  2. Input: Prompt/query
  3. Output: Structured or unstructured response
  4. Examples: KnowledgeGapAgent, ToolSelectorAgent, ThinkingAgent

  5. State Nodes: Update or read workflow state

  6. Input: Current state
  7. Output: Updated state
  8. Examples: Update evidence, update conversation history

  9. Decision Nodes: Make routing decisions based on conditions

  10. Input: Current state/results
  11. Output: Next node ID
  12. Examples: Continue research vs. complete research

  13. Parallel Nodes: Execute multiple nodes concurrently

  14. Input: List of node IDs
  15. Output: Aggregated results
  16. Examples: Parallel iterative research loops

Edges

Edges define transitions between nodes:

  1. Sequential Edges: Always traversed (no condition)
  2. From: Source node
  3. To: Target node
  4. Condition: None (always True)

  5. Conditional Edges: Traversed based on condition

  6. From: Source node
  7. To: Target node
  8. Condition: Callable that returns bool
  9. Example: If research complete → go to writer, else → continue loop

  10. Parallel Edges: Used for parallel execution branches

  11. From: Parallel node
  12. To: Multiple target nodes
  13. Execution: All targets run concurrently

State Management

State is managed via WorkflowState using ContextVar for thread-safe isolation:

  • Evidence: Collected evidence from searches
  • Conversation: Iteration history (gaps, tool calls, findings, thoughts)
  • Embedding Service: For semantic search

State transitions occur at state nodes, which update the global workflow state.

Execution Flow

  1. Graph Construction: Build graph from nodes and edges using create_iterative_graph() or create_deep_graph()
  2. Graph Validation: Ensure graph is valid (no cycles, all nodes reachable) via ResearchGraph.validate_structure()
  3. Graph Execution: Traverse graph from entry node using GraphOrchestrator._execute_graph()
  4. Node Execution: Execute each node based on type:
  5. Agent Nodes: Call agent.run() with transformed input
  6. State Nodes: Update workflow state via state_updater function
  7. Decision Nodes: Evaluate decision_function to get next node ID
  8. Parallel Nodes: Execute all parallel nodes concurrently via asyncio.gather()
  9. Edge Evaluation: Determine next node(s) based on edges and conditions
  10. Parallel Execution: Use asyncio.gather() for parallel nodes
  11. State Updates: Update state at state nodes via GraphExecutionContext.update_state()
  12. Event Streaming: Yield AgentEvent objects during execution for UI

GraphExecutionContext

The GraphExecutionContext class manages execution state during graph traversal:

  • State: Current WorkflowState instance
  • Budget Tracker: BudgetTracker instance for budget enforcement
  • Node Results: Dictionary storing results from each node execution
  • Visited Nodes: Set of node IDs that have been executed
  • Current Node: ID of the node currently being executed

Methods: - set_node_result(node_id, result): Store result from node execution - get_node_result(node_id): Retrieve stored result - has_visited(node_id): Check if node was visited - mark_visited(node_id): Mark node as visited - update_state(updater, data): Update workflow state

Conditional Routing

Decision nodes evaluate conditions and return next node IDs:

  • Knowledge Gap Decision: If research_complete → writer, else → tool selector
  • Budget Decision: If budget exceeded → exit, else → continue
  • Iteration Decision: If max iterations → exit, else → continue

Parallel Execution

Parallel nodes execute multiple nodes concurrently:

  • Each parallel branch runs independently
  • Results are aggregated after all branches complete
  • State is synchronized after parallel execution
  • Errors in one branch don't stop other branches

Budget Enforcement

Budget constraints are enforced at decision nodes:

  • Token Budget: Track LLM token usage
  • Time Budget: Track elapsed time
  • Iteration Budget: Track iteration count

If any budget is exceeded, execution routes to exit node.

Error Handling

Errors are handled at multiple levels:

  1. Node Level: Catch errors in individual node execution
  2. Graph Level: Handle errors during graph traversal
  3. State Level: Rollback state changes on error

Errors are logged and yield error events for UI.

Backward Compatibility

Graph execution is optional via feature flag:

  • USE_GRAPH_EXECUTION=true: Use graph-based execution
  • USE_GRAPH_EXECUTION=false: Use agent chain execution (existing)

This allows gradual migration and fallback if needed.

See Also