BeeAI FrameworkOn this tutorial, we discover the facility and suppleness of the beeai-framework by constructing a totally purposeful multi-agent system from the bottom up. We stroll by means of the important elements, customized brokers, instruments, reminiscence administration, and occasion monitoring, to point out how BeeAI simplifies the event of clever, cooperative brokers. Alongside the way in which, we show how these brokers can carry out complicated duties, resembling market analysis, code evaluation, and strategic planning, utilizing a modular, production-ready sample.
import subprocess
import sys
import asyncio
import json
from typing import Dict, Record, Any, Elective
from datetime import datetime
import os
def install_packages():
packages = [
"beeai-framework",
"requests",
"beautifulsoup4",
"numpy",
"pandas",
"pydantic"
]
print("Putting in required packages...")
for package deal in packages:
strive:
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
print(f"✅ {package deal} put in efficiently")
besides subprocess.CalledProcessError as e:
print(f"❌ Failed to put in {package deal}: {e}")
print("Set up full!")
install_packages()
strive:
from beeai_framework import ChatModel
from beeai_framework.brokers import Agent
from beeai_framework.instruments import Device
from beeai_framework.workflows import Workflow
BEEAI_AVAILABLE = True
print("✅ BeeAI Framework imported efficiently")
besides ImportError as e:
print(f"⚠️ BeeAI Framework import failed: {e}")
print("Falling again to customized implementation...")
BEEAI_AVAILABLE = False
We start by putting in all of the required packages, together with the beeai-framework, to make sure our surroundings is prepared for multi-agent growth. As soon as put in, we try and import BeeAI’s core modules. If the import fails, we gracefully fall again to a customized implementation to take care of workflow performance.
class MockChatModel:
"""Mock LLM for demonstration functions"""
def __init__(self, model_name: str = "mock-llm"):
self.model_name = model_name
async def generate(self, messages: Record[Dict[str, str]]) -> str:
"""Generate a mock response"""
last_message = messages[-1]['content'] if messages else ""
if "market" in last_message.decrease():
return "Market evaluation exhibits sturdy progress in AI frameworks with 42% YoY enhance. Key rivals embody LangChain, CrewAI, and AutoGen."
elif "code" in last_message.decrease():
return "Code evaluation reveals good construction with async patterns. Take into account including extra error dealing with and documentation."
elif "technique" in last_message.decrease():
return "Strategic advice: Deal with ease of use, sturdy documentation, and enterprise options to compete successfully."
else:
return f"Analyzed: {last_message[:100]}... Suggestion: Implement greatest practices for scalability and maintainability."
class CustomTool:
"""Base class for customized instruments"""
def __init__(self, title: str, description: str):
self.title = title
self.description = description
async def run(self, input_data: str) -> str:
"""Override this technique in subclasses"""
elevate NotImplementedError
We outline a MockChatModel to simulate LLM habits when BeeAI is unavailable, permitting us to check and prototype workflows with out counting on exterior APIs. Alongside it, we create a CustomTool base class, which serves as a blueprint for task-specific instruments that our brokers can use, laying the inspiration for modular, tool-augmented agent capabilities.
class MarketResearchTool(CustomTool):
"""Customized device for market analysis and competitor evaluation"""
def __init__(self):
tremendous().__init__(
title="market_research",
description="Analyzes market traits and competitor data"
)
self.market_data = {
"AI_frameworks": {
"rivals": ["LangChain", "CrewAI", "AutoGen", "Haystack", "Semantic Kernel"],
"market_size": "$2.8B",
"growth_rate": "42% YoY",
"key_trends": ["Multi-agent systems", "Production deployment", "Tool integration", "Enterprise adoption"]
},
"enterprise_adoption": {
"fee": "78%",
"top_use_cases": ["Customer support", "Data analysis", "Code generation", "Document processing"],
"challenges": ["Reliability", "Cost control", "Integration complexity", "Governance"]
}
}
async def run(self, question: str) -> str:
"""Simulate market analysis based mostly on question"""
query_lower = question.decrease()
if "competitor" in query_lower or "competitors" in query_lower:
information = self.market_data["AI_frameworks"]
return f"""Market Evaluation Outcomes:
Key Opponents: {', '.be a part of(information['competitors'])}
Market Dimension: {information['market_size']}
Development Fee: {information['growth_rate']}
Key Developments: {', '.be a part of(information['key_trends'])}
Suggestion: Deal with differentiating options like simplified deployment, higher debugging instruments, and enterprise-grade safety."""
elif "adoption" in query_lower or "enterprise" in query_lower:
information = self.market_data["enterprise_adoption"]
return f"""Enterprise Adoption Evaluation:
Adoption Fee: {information['rate']}
Prime Use Circumstances: {', '.be a part of(information['top_use_cases'])}
Predominant Challenges: {', '.be a part of(information['challenges'])}
Suggestion: Deal with reliability and value management considerations by means of higher monitoring and useful resource administration options."""
else:
return "Market analysis out there for: competitor evaluation, enterprise adoption, or particular pattern evaluation. Please specify your focus space."
We implement the MarketResearchTool as a specialised extension of our CustomTool base class. This device simulates real-world market intelligence by returning pre-defined insights on AI framework traits, key rivals, adoption charges, and trade challenges. With this, we equip our brokers to make knowledgeable, data-driven suggestions throughout workflow execution.
class CodeAnalysisTool(CustomTool):
"""Customized device for analyzing code patterns and suggesting enhancements"""
def __init__(self):
tremendous().__init__(
title="code_analysis",
description="Analyzes code construction and suggests enhancements"
)
async def run(self, code_snippet: str) -> str:
"""Analyze code and supply insights"""
evaluation = {
"traces": len(code_snippet.break up('n')),
"complexity": "Excessive" if len(code_snippet) > 500 else "Medium" if len(code_snippet) > 200 else "Low",
"async_usage": "Sure" if "async" in code_snippet or "await" in code_snippet else "No",
"error_handling": "Current" if "strive:" in code_snippet or "besides:" in code_snippet else "Lacking",
"documentation": "Good" if '"""' in code_snippet or "'''" in code_snippet else "Wants enchancment",
"imports": "Current" if "import " in code_snippet else "None detected",
"lessons": len([line for line in code_snippet.split('n') if line.strip().startswith('class ')]),
"features": len([line for line in code_snippet.split('n') if line.strip().startswith('def ') or line.strip().startswith('async def ')])
}
strategies = []
if evaluation["error_handling"] == "Lacking":
strategies.append("Add try-except blocks for error dealing with")
if evaluation["documentation"] == "Wants enchancment":
strategies.append("Add docstrings and feedback")
if "print(" in code_snippet:
strategies.append("Think about using correct logging as a substitute of print statements")
if evaluation["async_usage"] == "Sure" and "await" not in code_snippet:
strategies.append("Guarantee correct await utilization with async features")
if evaluation["complexity"] == "Excessive":
strategies.append("Take into account breaking down into smaller features")
return f"""Code Evaluation Report:
Construction:
- Traces of code: {evaluation['lines']}
- Complexity: {evaluation['complexity']}
- Courses: {evaluation['classes']}
- Capabilities: {evaluation['functions']}
High quality Metrics:
- Async utilization: {evaluation['async_usage']}
- Error dealing with: {evaluation['error_handling']}
- Documentation: {evaluation['documentation']}
Ideas:
{chr(10).be a part of(f"• {suggestion}" for suggestion in strategies) if strategies else "• Code appears to be like good! Following greatest practices."}
Total Rating: {10 - len(strategies) * 2}/10"""
class CustomAgent:
"""Customized agent implementation"""
def __init__(self, title: str, function: str, directions: str, instruments: Record[CustomTool], llm=None):
self.title = title
self.function = function
self.directions = directions
self.instruments = instruments
self.llm = llm or MockChatModel()
self.reminiscence = []
async def run(self, activity: str) -> Dict[str, Any]:
"""Execute agent activity"""
print(f"🤖 {self.title} ({self.function}) processing activity...")
self.reminiscence.append({"kind": "activity", "content material": activity, "timestamp": datetime.now()})
task_lower = activity.decrease()
tool_used = None
tool_result = None
for device in self.instruments:
if device.title == "market_research" and ("market" in task_lower or "competitor" in task_lower):
tool_result = await device.run(activity)
tool_used = device.title
break
elif device.title == "code_analysis" and ("code" in task_lower or "analyze" in task_lower):
tool_result = await device.run(activity)
tool_used = device.title
break
messages = [
{"role": "system", "content": f"You are {self.role}. {self.instructions}"},
{"role": "user", "content": task}
]
if tool_result:
messages.append({"function": "system", "content material": f"Device {tool_used} offered: {tool_result}"})
response = await self.llm.generate(messages)
self.reminiscence.append({"kind": "response", "content material": response, "timestamp": datetime.now()})
return {
"agent": self.title,
"activity": activity,
"tool_used": tool_used,
"tool_result": tool_result,
"response": response,
"success": True
}
We now implement the CodeAnalysisTool, which allows our brokers to evaluate code snippets based mostly on construction, complexity, documentation, and error dealing with. This device generates insightful strategies to enhance code high quality. We additionally outline the CustomAgent class, equipping every agent with its personal function, directions, reminiscence, instruments, and entry to an LLM. This design permits every agent to resolve whether or not a device is required intelligently after which synthesize responses utilizing each evaluation and LLM reasoning, guaranteeing adaptable and context-aware habits.
class WorkflowMonitor:
"""Monitor and log workflow occasions"""
def __init__(self):
self.occasions = []
self.start_time = datetime.now()
def log_event(self, event_type: str, information: Dict[str, Any]):
"""Log workflow occasions"""
timestamp = datetime.now()
self.occasions.append({
"timestamp": timestamp,
"period": (timestamp - self.start_time).total_seconds(),
"event_type": event_type,
"information": information
})
print(f"[{timestamp.strftime('%H:%M:%S')}] {event_type}: {information.get('agent', 'System')}")
def get_summary(self):
"""Get monitoring abstract"""
return {
"total_events": len(self.occasions),
"total_duration": (datetime.now() - self.start_time).total_seconds(),
"event_types": listing(set([e["event_type"] for e in self.occasions])),
"occasions": self.occasions
}
class CustomWorkflow:
"""Customized workflow implementation"""
def __init__(self, title: str, description: str):
self.title = title
self.description = description
self.brokers = []
self.monitor = WorkflowMonitor()
def add_agent(self, agent: CustomAgent):
"""Add agent to workflow"""
self.brokers.append(agent)
self.monitor.log_event("agent_added", {"agent": agent.title, "function": agent.function})
async def run(self, duties: Record[str]) -> Dict[str, Any]:
"""Execute workflow with duties"""
self.monitor.log_event("workflow_started", {"duties": len(duties)})
outcomes = []
context = {"shared_insights": []}
for i, activity in enumerate(duties):
agent = self.brokers[i % len(self.agents)]
if context["shared_insights"]:
enhanced_task = f"{activity}nnContext from earlier evaluation:n" + "n".be a part of(context["shared_insights"][-2:])
else:
enhanced_task = activity
end result = await agent.run(enhanced_task)
outcomes.append(end result)
context["shared_insights"].append(f"{agent.title}: {end result['response'][:200]}...")
self.monitor.log_event("task_completed", {
"agent": agent.title,
"task_index": i,
"success": end result["success"]
})
self.monitor.log_event("workflow_completed", {"total_tasks": len(duties)})
return {
"workflow": self.title,
"outcomes": outcomes,
"context": context,
"abstract": self._generate_summary(outcomes)
}
def _generate_summary(self, outcomes: Record[Dict[str, Any]]) -> str:
"""Generate workflow abstract"""
summary_parts = []
for end in outcomes:
summary_parts.append(f"• {end result['agent']}: {end result['response'][:150]}...")
return f"""Workflow Abstract for {self.title}:
{chr(10).be a part of(summary_parts)}
Key Insights:
• Market alternatives recognized in AI framework house
• Technical structure suggestions offered
• Strategic implementation plan outlined
• Multi-agent collaboration demonstrated efficiently"""
We implement the WorkflowMonitor to log and observe occasions all through the execution, giving us real-time visibility into the actions taken by every agent. With the CustomWorkflow class, we orchestrate all the multi-agent course of, assigning duties, preserving shared context throughout brokers, and capturing all related insights. This construction ensures that we not solely execute duties in a coordinated and clear approach but in addition generate a complete abstract that highlights collaboration and key outcomes.
async def advanced_workflow_demo():
"""Reveal superior multi-agent workflow"""
print("🚀 Superior Multi-Agent Workflow Demo")
print("=" * 50)
workflow = CustomWorkflow(
title="Superior Enterprise Intelligence System",
description="Multi-agent system for complete enterprise evaluation"
)
market_agent = CustomAgent(
title="MarketAnalyst",
function="Senior Market Analysis Analyst",
directions="Analyze market traits, competitor panorama, and enterprise alternatives. Present data-driven insights with actionable suggestions.",
instruments=[MarketResearchTool()],
llm=MockChatModel()
)
tech_agent = CustomAgent(
title="TechArchitect",
function="Technical Structure Specialist",
directions="Consider technical options, code high quality, and architectural choices. Deal with scalability, maintainability, and greatest practices.",
instruments=[CodeAnalysisTool()],
llm=MockChatModel()
)
strategy_agent = CustomAgent(
title="StrategicPlanner",
function="Strategic Enterprise Planner",
directions="Synthesize market and technical insights into complete strategic suggestions. Deal with ROI, danger evaluation, and implementation roadmaps.",
instruments=[],
llm=MockChatModel()
)
workflow.add_agent(market_agent)
workflow.add_agent(tech_agent)
workflow.add_agent(strategy_agent)
duties = [
"Analyze the current AI framework market landscape and identify key opportunities for a new multi-agent framework targeting enterprise users.",
"""Analyze this code architecture pattern and provide technical assessment:
async def multi_agent_workflow():
agents = [ResearchAgent(), AnalysisAgent(), SynthesisAgent()]
context = SharedContext()
for agent in brokers:
strive:
end result = await agent.run(context.get_task())
if end result.success:
context.add_insight(end result.information)
else:
context.add_error(end result.error)
besides Exception as e:
logger.error(f"Agent {agent.title} failed: {e}")
return context.synthesize_recommendations()""",
"Based mostly in the marketplace evaluation and technical evaluation, create a complete strategic plan for launching a aggressive AI framework with give attention to multi-agent capabilities and enterprise adoption."
]
print("n🔄 Executing Superior Workflow...")
end result = await workflow.run(duties)
print("n✅ Workflow Accomplished Efficiently!")
print("=" * 50)
print("📊 COMPREHENSIVE ANALYSIS RESULTS")
print("=" * 50)
print(end result["summary"])
print("n📈 WORKFLOW MONITORING SUMMARY")
print("=" * 30)
abstract = workflow.monitor.get_summary()
print(f"Complete Occasions: {abstract['total_events']}")
print(f"Complete Length: {abstract['total_duration']:.2f} seconds")
print(f"Occasion Sorts: {', '.be a part of(abstract['event_types'])}")
return workflow, end result
async def simple_tool_demo():
"""Reveal particular person device performance"""
print("n🛠️ Particular person Device Demo")
print("=" * 30)
market_tool = MarketResearchTool()
code_tool = CodeAnalysisTool()
print("Obtainable Instruments:")
print(f"• {market_tool.title}: {market_tool.description}")
print(f"• {code_tool.title}: {code_tool.description}")
print("n🔍 Market Analysis Evaluation:")
market_result = await market_tool.run("competitor evaluation in AI frameworks")
print(market_result)
print("n🔍 Code Evaluation:")
sample_code=""'
import asyncio
from typing import Record, Dict
class AgentManager:
"""Manages a number of AI brokers"""
def __init__(self):
self.brokers = []
self.outcomes = []
async def add_agent(self, agent):
"""Add agent to supervisor"""
self.brokers.append(agent)
async def run_all(self, activity: str) -> Record[Dict]:
"""Run activity on all brokers"""
outcomes = []
for agent in self.brokers:
strive:
end result = await agent.execute(activity)
outcomes.append(end result)
besides Exception as e:
print(f"Agent failed: {e}")
outcomes.append({"error": str(e)})
return outcomes
'''
code_result = await code_tool.run(sample_code)
print(code_result)
We show two highly effective workflows. First, within the particular person device demo, we immediately check the capabilities of our MarketResearchTool and CodeAnalysisTool, guaranteeing they generate related insights independently. Then, we deliver every part collectively within the superior workflow demo, the place we deploy three specialised brokers, MarketAnalyst, TechArchitect, and StrategicPlanner, to sort out enterprise evaluation duties collaboratively.
async def essential():
"""Predominant demo perform"""
print("🐝 Superior BeeAI Framework Tutorial")
print("=" * 40)
print("This tutorial demonstrates:")
print("• Multi-agent workflows")
print("• Customized device growth")
print("• Reminiscence administration")
print("• Occasion monitoring")
print("• Manufacturing-ready patterns")
if BEEAI_AVAILABLE:
print("• Utilizing actual BeeAI Framework")
else:
print("• Utilizing customized implementation (BeeAI not out there)")
print("=" * 40)
await simple_tool_demo()
print("n" + "="*50)
await advanced_workflow_demo()
print("n🎉 Tutorial Full!")
print("nNext Steps:")
print("1. Set up BeeAI Framework correctly: pip set up beeai-framework")
print("2. Configure your most well-liked LLM (OpenAI, Anthropic, native fashions)")
print("3. Discover the official BeeAI documentation")
print("4. Construct customized brokers on your particular use case")
print("5. Deploy to manufacturing with correct monitoring")
if __name__ == "__main__":
strive:
import nest_asyncio
nest_asyncio.apply()
print("✅ Utilized nest_asyncio for Colab compatibility")
besides ImportError:
print("⚠️ nest_asyncio not out there - might not work in some environments")
asyncio.run(essential())
We wrap up our tutorial with the primary() perform, which ties collectively every part we’ve constructed, demonstrating each tool-level capabilities and a full multi-agent enterprise intelligence workflow. Whether or not we’re working BeeAI natively or utilizing a fallback setup, we guarantee compatibility with environments like Google Colab utilizing nest_asyncio. With this construction in place, we’re able to scale our agent techniques, discover deeper use circumstances, and confidently deploy production-ready AI workflows.
In conclusion, we’ve constructed and executed a strong multi-agent workflow utilizing the BeeAI framework (or a customized equal), showcasing its potential in real-world enterprise intelligence purposes. We’ve seen how straightforward it’s to create brokers with particular roles, connect instruments for activity augmentation, and monitor execution in a clear approach.
Take a look at the Codes. All credit score for this analysis goes to the researchers of this venture. Additionally, be at liberty to observe us on Twitter, Youtube and Spotify and don’t overlook to hitch our 100k+ ML SubReddit and Subscribe to our E-newsletter.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.