Advanced Workflow Examples¶
This section provides advanced usage examples showcasing DeepCritical's sophisticated workflow capabilities, multi-agent coordination, and complex research scenarios.
Multi-Flow Integration¶
Comprehensive Research Pipeline¶
import asyncio
from deepresearch.app import main
async def comprehensive_research():
"""Execute comprehensive research combining multiple flows."""
# Multi-flow research question
result = await main(
question="Design and validate a novel therapeutic approach for Alzheimer's disease using AI and bioinformatics",
flows={
"prime": {"enabled": True},
"bioinformatics": {"enabled": True},
"deepsearch": {"enabled": True}
},
config_overrides={
"prime": {
"params": {
"adaptive_replanning": True,
"nested_loops": 3
}
},
"bioinformatics": {
"data_sources": {
"go": {"max_annotations": 500},
"pubmed": {"max_results": 100}
}
}
}
)
print(f"Comprehensive research completed: {result.success}")
if result.success:
print(f"Key findings: {result.data['summary']}")
asyncio.run(comprehensive_research())
Cross-Domain Analysis¶
import asyncio
from deepresearch.app import main
async def cross_domain_analysis():
"""Analyze relationships between different scientific domains."""
result = await main(
question="How do advances in machine learning impact drug discovery and protein engineering?",
flows={
"prime": {"enabled": True},
"bioinformatics": {"enabled": True},
"deepsearch": {"enabled": True}
},
execution_mode="multi_level_react",
max_iterations=5
)
print(f"Cross-domain analysis completed: {result.success}")
asyncio.run(cross_domain_analysis())
Custom Agent Workflows¶
Multi-Agent Coordination¶
import asyncio
from deepresearch.agents import MultiAgentOrchestrator, SearchAgent, RAGAgent
from deepresearch.datatypes import AgentDependencies
async def multi_agent_workflow():
"""Demonstrate multi-agent coordination."""
# Create agent orchestrator
orchestrator = MultiAgentOrchestrator()
# Add specialized agents
orchestrator.add_agent("search", SearchAgent())
orchestrator.add_agent("rag", RAGAgent())
# Define workflow
workflow = [
{"agent": "search", "task": "Find latest ML papers"},
{"agent": "rag", "task": "Analyze research trends"},
{"agent": "search", "task": "Find related applications"}
]
# Execute workflow
result = await orchestrator.execute_workflow(
initial_query="Machine learning in drug discovery",
workflow_sequence=workflow
)
print(f"Multi-agent workflow completed: {result.success}")
asyncio.run(multi_agent_workflow())
Agent Specialization¶
import asyncio
from deepresearch.agents import BaseAgent, AgentType, AgentDependencies
class SpecializedAgent(BaseAgent):
"""Custom agent for specific domain expertise."""
def __init__(self, domain: str):
super().__init__(AgentType.CUSTOM, "anthropic:claude-sonnet-4-0")
self.domain = domain
async def execute(self, input_data, deps=None):
"""Execute with domain specialization."""
# Customize execution based on domain
if self.domain == "drug_discovery":
return await self._drug_discovery_analysis(input_data, deps)
elif self.domain == "protein_engineering":
return await self._protein_engineering_analysis(input_data, deps)
else:
return await super().execute(input_data, deps)
async def specialized_workflow():
"""Use specialized agents for domain-specific tasks."""
# Create domain-specific agents
drug_agent = SpecializedAgent("drug_discovery")
protein_agent = SpecializedAgent("protein_engineering")
# Execute specialized analysis
drug_result = await drug_agent.execute(
"Analyze ML applications in drug discovery",
AgentDependencies()
)
protein_result = await protein_agent.execute(
"Design proteins for therapeutic applications",
AgentDependencies()
)
print(f"Drug discovery analysis: {drug_result.success}")
print(f"Protein engineering analysis: {protein_result.success}")
asyncio.run(specialized_workflow())
Complex Configuration Scenarios¶
Environment-Specific Workflows¶
import asyncio
from deepresearch.app import main
async def environment_specific_workflow():
"""Execute workflows optimized for different environments."""
# Development environment
dev_result = await main(
question="Test research workflow",
config_name="development",
debug=True,
verbose_logging=True
)
# Production environment
prod_result = await main(
question="Production research analysis",
config_name="production",
optimization_level="high",
caching_enabled=True
)
print(f"Development test: {dev_result.success}")
print(f"Production run: {prod_result.success}")
asyncio.run(environment_specific_workflow())
Batch Research Campaigns¶
import asyncio
from deepresearch.app import main
async def batch_research_campaign():
"""Execute large-scale research campaigns."""
# Define research campaign
research_topics = [
"AI in healthcare diagnostics",
"Protein design for therapeutics",
"Drug discovery optimization",
"Bioinformatics data integration",
"Machine learning interpretability"
]
campaign_results = []
for topic in research_topics:
result = await main(
question=topic,
flows={
"prime": {"enabled": True},
"bioinformatics": {"enabled": True},
"deepsearch": {"enabled": True}
},
batch_mode=True
)
campaign_results.append((topic, result))
# Analyze campaign results
success_count = sum(1 for _, result in campaign_results if result.success)
print(f"Campaign completed: {success_count}/{len(research_topics)} successful")
asyncio.run(batch_research_campaign())
Advanced Tool Integration¶
Custom Tool Chains¶
import asyncio
from deepresearch.tools import ToolRegistry
async def custom_tool_chain():
"""Create and execute custom tool chains."""
registry = ToolRegistry.get_instance()
# Define custom analysis chain
tool_chain = [
("web_search", {
"query": "machine learning applications",
"num_results": 20
}),
("content_extraction", {
"urls": "web_search_results",
"extract_metadata": True
}),
("duplicate_removal", {
"content": "content_extraction_results"
}),
("quality_filtering", {
"content": "duplicate_removal_results",
"min_length": 500
}),
("content_analysis", {
"content": "quality_filtering_results",
"analysis_types": ["sentiment", "topics", "entities"]
})
]
# Execute tool chain
results = await registry.execute_tool_chain(tool_chain)
print(f"Tool chain executed: {len(results)} steps")
for i, result in enumerate(results):
print(f"Step {i+1}: {'Success' if result.success else 'Failed'}")
asyncio.run(custom_tool_chain())
Tool Result Processing¶
import asyncio
from deepresearch.tools import ToolRegistry
async def tool_result_processing():
"""Process and analyze tool execution results."""
registry = ToolRegistry.get_instance()
# Execute multiple tools
search_result = await registry.execute_tool("web_search", {
"query": "AI applications",
"num_results": 10
})
analysis_result = await registry.execute_tool("content_analysis", {
"content": search_result.data,
"analysis_types": ["topics", "sentiment"]
})
# Process combined results
if search_result.success and analysis_result.success:
combined_insights = {
"search_summary": search_result.metadata,
"content_analysis": analysis_result.data,
"execution_metrics": {
"search_time": search_result.execution_time,
"analysis_time": analysis_result.execution_time
}
}
print(f"Combined insights: {combined_insights}")
asyncio.run(tool_result_processing())
Workflow State Management¶
State Persistence¶
import asyncio
from deepresearch.app import main
from deepresearch.datatypes import ResearchState
async def state_persistence_example():
"""Demonstrate workflow state persistence."""
# Execute workflow with state tracking
result = await main(
question="Long-running research task",
enable_state_persistence=True,
state_save_interval=300, # Save every 5 minutes
state_file="research_state.json"
)
# Load and resume workflow
if result.interrupted:
# Resume from saved state
resumed_result = await main(
resume_from_state="research_state.json",
question="Continue research task"
)
print(f"Workflow resumed: {resumed_result.success}")
asyncio.run(state_persistence_example())
State Analysis¶
import asyncio
import json
from deepresearch.datatypes import ResearchState
async def state_analysis_example():
"""Analyze workflow execution state."""
# Load execution state
with open("research_state.json", "r") as f:
state_data = json.load(f)
state = ResearchState(**state_data)
# Analyze state
analysis = {
"total_steps": len(state.execution_history.entries),
"successful_steps": sum(1 for entry in state.execution_history.entries if entry.success),
"failed_steps": sum(1 for entry in state.execution_history.entries if not entry.success),
"total_execution_time": state.execution_history.total_time,
"agent_results": len(state.agent_results),
"tool_outputs": len(state.tool_outputs)
}
print(f"State analysis: {analysis}")
asyncio.run(state_analysis_example())
Performance Optimization¶
Parallel Execution¶
import asyncio
from deepresearch.app import main
async def parallel_execution():
"""Execute multiple research tasks in parallel."""
# Define parallel tasks
tasks = [
main(question="Machine learning in healthcare"),
main(question="Protein engineering advances"),
main(question="Bioinformatics data integration"),
main(question="AI ethics in research")
]
# Execute in parallel
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i+1} failed: {result}")
else:
print(f"Task {i+1} completed: {result.success}")
asyncio.run(parallel_execution())
Memory-Efficient Processing¶
import asyncio
from deepresearch.app import main
async def memory_efficient_processing():
"""Execute large workflows with memory optimization."""
result = await main(
question="Large-scale research analysis",
memory_optimization=True,
chunk_size=1000,
max_concurrent_operations=5,
cleanup_intermediate_results=True,
compression_enabled=True
)
print(f"Memory-efficient execution: {result.success}")
asyncio.run(memory_efficient_processing())
Error Recovery and Resilience¶
Comprehensive Error Handling¶
import asyncio
from deepresearch.app import main
async def error_recovery_example():
"""Demonstrate comprehensive error recovery."""
try:
result = await main(
question="Research task that may fail",
error_recovery_strategy="comprehensive",
max_retries=5,
retry_delay=2.0,
fallback_enabled=True
)
if result.success:
print(f"Task completed: {result.data}")
else:
print(f"Task failed after retries: {result.error}")
print(f"Error history: {result.error_history}")
except Exception as e:
print(f"Unhandled exception: {e}")
# Implement fallback logic
asyncio.run(error_recovery_example())
Graceful Degradation¶
import asyncio
from deepresearch.app import main
async def graceful_degradation():
"""Execute workflows with graceful degradation."""
result = await main(
question="Complex research requiring multiple tools",
graceful_degradation=True,
critical_path_only=False,
partial_results_acceptable=True
)
if result.partial_success:
print(f"Partial results available: {result.partial_data}")
print(f"Failed components: {result.failed_components}")
elif result.success:
print(f"Full success: {result.data}")
else:
print(f"Complete failure: {result.error}")
asyncio.run(graceful_degradation())
Monitoring and Observability¶
Execution Monitoring¶
import asyncio
from deepresearch.app import main
async def execution_monitoring():
"""Monitor workflow execution in real-time."""
# Enable detailed monitoring
result = await main(
question="Research task with monitoring",
monitoring_enabled=True,
progress_reporting=True,
metrics_collection=True,
alert_thresholds={
"execution_time": 300, # 5 minutes
"memory_usage": 0.8, # 80%
"error_rate": 0.1 # 10%
}
)
# Access monitoring data
if result.success:
monitoring_data = result.monitoring_data
print(f"Execution time: {monitoring_data.execution_time}")
print(f"Memory usage: {monitoring_data.memory_usage}")
print(f"Tool success rate: {monitoring_data.tool_success_rate}")
asyncio.run(execution_monitoring())
Performance Profiling¶
import asyncio
from deepresearch.app import main
async def performance_profiling():
"""Profile workflow performance."""
result = await main(
question="Performance-intensive research task",
profiling_enabled=True,
detailed_metrics=True,
bottleneck_detection=True
)
if result.success and result.profiling_data:
profile = result.profiling_data
print(f"Performance bottlenecks: {profile.bottlenecks}")
print(f"Optimization suggestions: {profile.suggestions}")
print(f"Resource usage patterns: {profile.resource_usage}")
asyncio.run(performance_profiling())
Integration Patterns¶
API Integration¶
import asyncio
from deepresearch.app import main
async def api_integration():
"""Integrate with external APIs."""
# Use external API data
external_data = {
"protein_database": "https://api.uniprot.org",
"literature_api": "https://api.pubmed.org",
"structure_api": "https://api.pdb.org"
}
result = await main(
question="Integrate external biological data sources",
external_apis=external_data,
api_timeout=30,
api_retry_attempts=3
)
print(f"API integration completed: {result.success}")
asyncio.run(api_integration())
Database Integration¶
import asyncio
from deepresearch.app import main
async def database_integration():
"""Integrate with research databases."""
# Configure database connections
db_config = {
"neo4j": {
"uri": "bolt://localhost:7687",
"auth": {"user": "neo4j", "password": "password"}
},
"postgres": {
"host": "localhost",
"database": "research_db",
"user": "researcher"
}
}
result = await main(
question="Query research database for related studies",
database_connections=db_config,
query_optimization=True
)
print(f"Database integration completed: {result.success}")
asyncio.run(database_integration())
Best Practices for Advanced Usage¶
- Workflow Composition: Combine flows strategically for complex research
- Resource Management: Monitor and optimize resource usage for large workflows
- Error Recovery: Implement comprehensive error handling and recovery strategies
- State Management: Use state persistence for long-running workflows
- Performance Monitoring: Track execution metrics and identify bottlenecks
- Integration Testing: Test integrations thoroughly before production use
Next Steps¶
After exploring these advanced examples:
- Custom Development: Create custom agents and tools for specific domains
- Workflow Optimization: Fine-tune configurations for your use cases
- Production Deployment: Set up production-ready workflows
- Monitoring Setup: Implement comprehensive monitoring and alerting
- Integration Expansion: Connect with additional external systems
Code Improvement Workflow Examples¶
Automatic Error Correction¶
import asyncio
from DeepResearch.src.agents.code_execution_orchestrator import CodeExecutionOrchestrator
async def automatic_error_correction():
"""Demonstrate automatic code improvement and error correction."""
orchestrator = CodeExecutionOrchestrator()
# This intentionally problematic request will trigger error correction
result = await orchestrator.iterative_improve_and_execute(
user_message="Write a Python script that reads a CSV file and calculates statistics, but make sure it handles all possible errors",
max_iterations=3
)
print(f"Success: {result.success}")
print(f"Final code has {len(result.data['final_code'])} characters")
print(f"Improvement attempts: {result.data['iterations_used']}")
asyncio.run(automatic_error_correction())
Code Analysis and Improvement¶
import asyncio
from DeepResearch.src.agents.code_improvement_agent import CodeImprovementAgent
async def code_analysis_improvement():
"""Analyze and improve existing code."""
agent = CodeImprovementAgent()
# Code with intentional issues
problematic_code = '''
def process_list(items):
total = 0
for item in items:
total += item # No error handling for non-numeric items
return total / len(items) # Division by zero if empty list
result = process_list([])
'''
# Analyze the error
analysis = await agent.analyze_error(
code=problematic_code,
error_message="ZeroDivisionError: division by zero",
language="python"
)
print(f"Error Type: {analysis['error_type']}")
print(f"Root Cause: {analysis['root_cause']}")
# Improve the code
improvement = await agent.improve_code(
original_code=problematic_code,
error_message="ZeroDivisionError: division by zero",
language="python",
improvement_focus="robustness"
)
print(f"Improved Code:\n{improvement['improved_code']}")
asyncio.run(code_analysis_improvement())
Multi-Language Code Generation with Error Handling¶
import asyncio
from DeepResearch.src.agents.code_execution_orchestrator import CodeExecutionOrchestrator
async def multi_language_generation():
"""Generate and improve code in multiple languages."""
orchestrator = CodeExecutionOrchestrator()
# Python script with error correction
python_result = await orchestrator.iterative_improve_and_execute(
"Create a Python function that safely parses JSON data from a file",
code_type="python",
max_iterations=3
)
# Bash script with error correction
bash_result = await orchestrator.iterative_improve_and_execute(
"Write a bash script that checks if a directory exists and creates it if not",
code_type="bash",
max_iterations=2
)
print("Python script result:", python_result.success)
print("Bash script result:", bash_result.success)
asyncio.run(multi_language_generation())
Performance Optimization and Code Enhancement¶
import asyncio
from DeepResearch.src.agents.code_improvement_agent import CodeImprovementAgent
async def performance_optimization():
"""Optimize code for better performance."""
agent = CodeImprovementAgent()
# Inefficient code
slow_code = '''
def fibonacci_recursive(n):
if n <= 1:
return n
return fibonacci_recursive(n-1) + fibonacci_recursive(n-2)
# Calculate multiple values (very inefficient)
results = [fibonacci_recursive(i) for i in range(35)]
'''
# Optimize for performance
optimization = await agent.improve_code(
original_code=slow_code,
error_message="", # No error, just optimization
language="python",
improvement_focus="optimize"
)
print("Optimization completed")
print(f"Optimized code:\n{optimization['improved_code']}")
asyncio.run(performance_optimization())
Integration with Code Execution Workflow¶
# Complete workflow with automatic error correction
uv run deepresearch \
flows.code_execution.enabled=true \
question="Create a data analysis script that reads CSV, performs statistical analysis, and generates plots" \
flows.code_execution.improvement.enabled=true \
flows.code_execution.improvement.max_attempts=5 \
flows.code_execution.execution.use_docker=true
Advanced Error Recovery Scenarios¶
import asyncio
from DeepResearch.src.agents.code_execution_orchestrator import CodeExecutionOrchestrator
async def advanced_error_recovery():
"""Handle complex error scenarios with multiple improvement attempts."""
orchestrator = CodeExecutionOrchestrator()
# Complex request that may require multiple iterations
result = await orchestrator.iterative_improve_and_execute(
user_message="""
Write a Python script that:
1. Downloads data from a REST API
2. Parses and validates the JSON response
3. Performs statistical analysis on numeric fields
4. Saves results to both CSV and JSON formats
5. Includes comprehensive error handling for all operations
""",
max_iterations=5, # Allow more attempts for complex tasks
enable_improvement=True
)
print(f"Complex task completed: {result.success}")
if result.success:
print(f"Final code quality: {len(result.data['improvement_history'])} improvements made")
print("Improvement history:")
for i, improvement in enumerate(result.data['improvement_history'], 1):
print(f" {i}. {improvement['explanation'][:100]}...")
asyncio.run(advanced_error_recovery())
For more specialized examples, see Bioinformatics Tools and Integration Examples.