IBM Agent Connect
Agent Connect Framework
- Overview
- Quickstart
- Examples
- Other protocols
- Implementation
Examples
Example implementations of Agent Connect Framework with popular frameworks
In this section, you get access to examples of the integration of the Agent Connect Framework with popular frameworks, which demonstrates how you can integrate your existing agents with watsonx Orchestrate.
Requirements
Before you progress, you must install the following requirements to run these examples:
They also require their own .env
files for defining environment variables and fetching them with python-dotenv
. For more information, see Python-dotenv .
LangGraph example
LangGraph is a library for building stateful, multi-actor applications with LLMs. Here’s an example of a LangGraph agent integrated with Agent Connect:
Basic LangGraph agent
import os
from dotenv import load_dotenv
from typing import Dict, List, Tuple, Any
from fastapi import FastAPI, Request, Response, Header
from fastapi.responses import StreamingResponse
import uvicorn
import json
import time
import uuid
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
load_dotenv()
# Initialize FastAPI app
app = FastAPI(title="LangGraph Agent Connect Example")
# Define message schema
class Message(BaseModel):
role: str
content: str
name: str = None
class ChatRequest(BaseModel):
model: str
messages: List[Dict[str, Any]]
stream: bool = False
# Define a simple LangGraph agent
def create_agent():
# Initialize the LLM
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0,
streaming=True
)
# Define the agent state
class AgentState(BaseModel):
messages: List[Any] = Field(default_factory=list)
# Define the agent nodes
def agent(state: AgentState):
messages = state.messages
response = llm.invoke(messages)
return {"messages": messages + [response]}
# Create the graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent)
workflow.set_entry_point("agent")
workflow.add_edge("agent", END)
# Compile the graph
return workflow.compile()
# Create the agent
agent_executor = create_agent()
# Agent discovery endpoint
@app.get("/v1/agents")
async def discover_agents():
return {
"agents": [
{
"name": "LangGraph Example Agent",
"description": "A simple LangGraph agent that demonstrates Agent Connect integration",
"provider": {
"organization": "Your Organization",
"url": "https://your-organization.com"
},
"version": "1.0.0",
"documentation_url": "https://docs.example.com/langgraph-agent",
"capabilities": {
"streaming": True
}
}
]
}
# Chat completion endpoint
@app.post("/v1/chat")
async def chat_completion(request: ChatRequest, x_thread_id: str = Header(None)):
thread_id = x_thread_id or str(uuid.uuid4())
# Convert the messages to LangChain format
messages = []
for msg in request.messages:
if msg["role"] == "user":
messages.append(HumanMessage(content=msg["content"]))
elif msg["role"] == "assistant":
messages.append(AIMessage(content=msg["content"]))
# Handle streaming
if request.stream:
return StreamingResponse(
stream_response(messages, thread_id),
media_type="text/event-stream"
)
else:
# Execute the agent
result = agent_executor.invoke({"messages": messages})
final_message = result["messages"][-1]
# Format the response
response = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": final_message.content
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
return response
# Stream response function
async def stream_response(messages, thread_id):
# First, send a thinking step
thinking_step = {
"id": f"step-{uuid.uuid4()}",
"object": "thread.run.step.delta",
"thread_id": thread_id,
"model": "langgraph-agent",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"step_details": {
"type": "thinking",
"content": "Analyzing the request and formulating a response..."
}
}
}
]
}
yield f"event: thread.run.step.delta\n"
yield f"data: {json.dumps(thinking_step)}\n\n"
# Execute the agent with streaming
config = {"configurable": {"thread_id": thread_id}}
# Get the agent's response
result = agent_executor.invoke({"messages": messages})
final_message = result["messages"][-1]
# Stream the final message
message_chunks = split_into_chunks(final_message.content)
for i, chunk in enumerate(message_chunks):
message_delta = {
"id": f"msg-{uuid.uuid4()}",
"object": "thread.message.delta",
"thread_id": thread_id,
"model": "langgraph-agent",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"content": chunk
}
}
]
}
yield f"event: thread.message.delta\n"
yield f"data: {json.dumps(message_delta)}\n\n"
# Helper function to split text into chunks
def split_into_chunks(text, chunk_size=10):
words = text.split()
chunks = []
current_chunk = []
for word in words:
current_chunk.append(word)
if len(current_chunk) >= chunk_size:
chunks.append(" ".join(current_chunk))
current_chunk = []
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
LangGraph agent with tool calling
In addition to the previous example, you can also integrate LangGraph agents with tool calling:
import os
from dotenv import load_dotenv
from typing import Dict, List, Tuple, Any, Optional
from fastapi import FastAPI, Request, Response, Header
from fastapi.responses import StreamingResponse
import uvicorn
import json
import time
import uuid
import asyncio
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolCall
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
load_dotenv()
# Initialize FastAPI app
app = FastAPI(title="LangGraph Agent Connect Example with Tools")
# Define message schema
class Message(BaseModel):
role: str
content: str
name: str = None
class ChatRequest(BaseModel):
model: str
messages: List[Dict[str, Any]]
stream: bool = False
# Define tools
@tool
def get_weather(location: str, unit: str = "celsius") -> str:
"""Get the current weather for a location."""
# In a real implementation, this would call a weather API
weather_data = {
"New York": {"temperature": 22, "condition": "sunny", "humidity": 45},
"London": {"temperature": 18, "condition": "cloudy", "humidity": 70},
"Tokyo": {"temperature": 28, "condition": "rainy", "humidity": 80},
}
if location in weather_data:
data = weather_data[location]
return json.dumps(data)
else:
return json.dumps({"error": f"Weather data not available for {location}"})
@tool
def search_knowledge_base(query: str) -> str:
"""Search the knowledge base for information."""
# In a real implementation, this would query a vector database
knowledge = {
"company history": "The company was founded in 2010 and has grown to 500 employees.",
"product features": "Our product offers AI-powered analytics, real-time reporting, and integration with major platforms.",
"pricing": "We offer three tiers: Basic ($10/month), Pro ($25/month), and Enterprise (custom pricing)."
}
for key, value in knowledge.items():
if query.lower() in key.lower():
return value
return "No information found for that query."
# Define a LangGraph agent with tools
def create_agent_with_tools():
# Initialize the LLM
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0
)
# Define the tools
tools = [get_weather, search_knowledge_base]
# Define the agent state
class AgentState(BaseModel):
messages: List[Any] = Field(default_factory=list)
tool_calls: List[ToolCall] = Field(default_factory=list)
tool_results: List[Tuple[str, str]] = Field(default_factory=list)
# Define the agent nodes
def agent(state: AgentState):
messages = state.messages
# Add tool results to messages if any
for tool_call_id, result in state.tool_results:
messages.append(AIMessage(content=result))
# Reset tool results
state.tool_results = []
# Add system message with tool instructions
if not any(isinstance(msg, SystemMessage) for msg in messages):
system_message = SystemMessage(content="You are a helpful assistant with access to tools. Use them when appropriate.")
messages = [system_message] + messages
# Invoke the LLM
response = llm.invoke(messages)
# Check if the response contains tool calls
if hasattr(response, "tool_calls") and response.tool_calls:
state.tool_calls = [
ToolCall(
name=tool_call.name,
args=tool_call.args,
id=tool_call.id
)
for tool_call in response.tool_calls
]
return {"messages": messages, "tool_calls": state.tool_calls, "next": "tools"}
return {"messages": messages + [response], "next": END}
def tools_node(state: AgentState):
tool_node = ToolNode(tools)
results = tool_node.invoke(state.tool_calls)
# Store tool results
state.tool_results = [(call.id, result) for call, result in zip(state.tool_calls, results)]
state.tool_calls = []
return {"messages": state.messages, "tool_results": state.tool_results, "next": "agent"}
# Create the graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent)
workflow.add_node("tools", tools_node)
workflow.set_entry_point("agent")
workflow.add_edge("agent", "tools")
workflow.add_edge("tools", "agent")
# Compile the graph
return workflow.compile()
# Create the agent
agent_executor = create_agent_with_tools()
# Agent discovery endpoint
@app.get("/v1/agents")
async def discover_agents():
return {
"agents": [
{
"name": "LangGraph Tool Agent",
"description": "A LangGraph agent with tool calling capabilities",
"provider": {
"organization": "Your Organization",
"url": "https://your-organization.com"
},
"version": "1.0.0",
"documentation_url": "https://docs.example.com/langgraph-tool-agent",
"capabilities": {
"streaming": True
}
}
]
}
# Chat completion endpoint
@app.post("/v1/chat")
async def chat_completion(request: ChatRequest, x_thread_id: str = Header(None)):
thread_id = x_thread_id or str(uuid.uuid4())
# Convert the messages to LangChain format
messages = []
for msg in request.messages:
if msg["role"] == "user":
messages.append(HumanMessage(content=msg["content"]))
elif msg["role"] == "assistant":
messages.append(AIMessage(content=msg["content"]))
# Handle streaming
if request.stream:
return StreamingResponse(
stream_response_with_tools(messages, thread_id),
media_type="text/event-stream"
)
else:
# Execute the agent
result = agent_executor.invoke({"messages": messages})
final_message = result["messages"][-1]
# Format the response
response = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": final_message.content
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
return response
# Stream response function with tool calling
async def stream_response_with_tools(messages, thread_id):
# First, send a thinking step
thinking_step = {
"id": f"step-{uuid.uuid4()}",
"object": "thread.run.step.delta",
"thread_id": thread_id,
"model": "langgraph-tool-agent",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"step_details": {
"type": "thinking",
"content": "Analyzing the request and determining if tools are needed..."
}
}
}
]
}
yield f"event: thread.run.step.delta\n"
yield f"data: {json.dumps(thinking_step)}\n\n"
# Add a small delay to simulate processing
await asyncio.sleep(0.5)
# Execute the agent
config = {"configurable": {"thread_id": thread_id}}
# This is a simplified example - in a real implementation, you would
# capture the tool calls and tool responses during execution and stream them
# For demonstration, let's assume the agent uses the weather tool
tool_call_id = f"call-{uuid.uuid4()}"
# Stream a tool call
tool_call_step = {
"id": f"step-{uuid.uuid4()}",
"object": "thread.run.step.delta",
"thread_id": thread_id,
"model": "langgraph-tool-agent",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"step_details": {
"type": "tool_calls",
"tool_calls": [
{
"id": tool_call_id,
"name": "get_weather",
"args": {
"location": "New York",
"unit": "celsius"
}
}
]
}
}
}
]
}
yield f"event: thread.run.step.delta\n"
yield f"data: {json.dumps(tool_call_step)}\n\n"
# Add a small delay to simulate tool execution
await asyncio.sleep(0.5)
# Stream a tool response
tool_response_step = {
"id": f"step-{uuid.uuid4()}",
"object": "thread.run.step.delta",
"thread_id": thread_id,
"model": "langgraph-tool-agent",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"step_details": {
"type": "tool_response",
"content": '{"temperature": 22, "condition": "sunny", "humidity": 45}',
"name": "get_weather",
"tool_call_id": tool_call_id
}
}
}
]
}
yield f"event: thread.run.step.delta\n"
yield f"data: {json.dumps(tool_response_step)}\n\n"
# Add a small delay to simulate processing
await asyncio.sleep(0.5)
# Get the agent's final response
result = agent_executor.invoke({"messages": messages})
final_message = result["messages"][-1]
# Stream the final message
message_chunks = split_into_chunks(final_message.content)
for i, chunk in enumerate(message_chunks):
message_delta = {
"id": f"msg-{uuid.uuid4()}",
"object": "thread.message.delta",
"thread_id": thread_id,
"model": "langgraph-tool-agent",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"content": chunk
}
}
]
}
yield f"event: thread.message.delta\n"
yield f"data: {json.dumps(message_delta)}\n\n"
# Add a small delay to simulate streaming
await asyncio.sleep(0.05)
# Helper function to split text into chunks
def split_into_chunks(text, chunk_size=10):
words = text.split()
chunks = []
current_chunk = []
for word in words:
current_chunk.append(word)
if len(current_chunk) >= chunk_size:
chunks.append(" ".join(current_chunk))
current_chunk = []
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
CrewAI example
CrewAI is a framework for orchestrating role-playing autonomous AI agents. Here’s an example of a CrewAI agent integrated with Agent Connect:
import os
import queue
import threading
from typing import Dict, List, Any, Optional
from fastapi import FastAPI, Request, Response, Header
from fastapi.responses import StreamingResponse
import uvicorn
import json
import time
import uuid
import asyncio
from pydantic import BaseModel
from crewai import Agent, Task, Crew, Process
from crewai.utilities.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
ToolUsageStartedEvent,
ToolUsageFinishedEvent
)
from crewai.utilities.events.base_event_listener import BaseEventListener
# Initialize FastAPI app
app = FastAPI(title="CrewAI Agent Connect Example")
# Define message schema
class ChatRequest(BaseModel):
model: str
messages: List[Dict[str, Any]]
stream: bool = False
# Create CrewAI agents
researcher = Agent(
role="Researcher",
goal="Find accurate and relevant information",
backstory="You are an expert researcher with a talent for finding information.",
verbose=True
)
writer = Agent(
role="Writer",
goal="Create engaging and informative content",
backstory="You are a skilled writer who can explain complex topics clearly.",
verbose=True
)
analyst = Agent(
role="Analyst",
goal="Analyze information and extract insights",
backstory="You are an analytical thinker who can identify patterns and insights.",
verbose=True
)
# Create a function to process requests with CrewAI
def process_with_crew(query):
# Create tasks
research_task = Task(
description=f"Research information about: {query}",
agent=researcher,
expected_output='A detailed research report.'
)
analysis_task = Task(
description="Analyze the research findings and extract key insights",
agent=analyst,
dependencies=[research_task],
expected_output='A summary of key findings.'
)
writing_task = Task(
description="Create a comprehensive response based on the research and analysis",
agent=writer,
dependencies=[analysis_task],
expected_output='A concise answer to the query with sources cited and insights discovered.'
)
# Create the crew
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
process=Process.sequential
)
# Run the crew
result = crew.kickoff()
return result
# Agent discovery endpoint
@app.get("/v1/agents")
async def discover_agents():
return {
"agents": [
{
"name": "CrewAI Research Team",
"description": "A team of specialized agents that collaborate to research, analyze, and present information",
"provider": {
"organization": "Your Organization",
"url": "https://your-organization.com"
},
"version": "1.0.0",
"documentation_url": "https://docs.example.com/crewai-agent",
"capabilities": {
"streaming": True
}
}
]
}
# Chat completion endpoint
@app.post("/v1/chat")
async def chat_completion(request: ChatRequest, x_thread_id: str = Header(None)):
thread_id = x_thread_id or str(uuid.uuid4())
# Extract the user query from the messages
user_messages = [msg for msg in request.messages if msg["role"] == "user"]
if not user_messages:
return {"error": "No user message found"}
query = user_messages[-1]["content"]
# Handle streaming
if request.stream:
return StreamingResponse(
stream_crew_response(query, thread_id),
media_type="text/event-stream"
)
else:
# Process with CrewAI
result = process_with_crew(query)
# Format the response
response = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": result
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
return response
# Create a custom event listener for streaming CrewAI events
class CrewAIStreamingListener(BaseEventListener):
def __init__(self, thread_id):
super().__init__()
self.thread_id = thread_id
# Use a thread-safe queue for communication between threads
self.thread_queue = queue.Queue()
# Track events to prevent duplicates
self.processed_events = set()
print(f"CrewAIStreamingListener initialized with thread_id: {thread_id}")
def setup_listeners(self, crewai_event_bus):
print("Setting up event listeners")
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event):
# Generate a unique event ID
event_id = f"kickoff-{id(event)}"
# Check if we've already processed this event
if event_id in self.processed_events:
print(f"Skipping duplicate event: {event_id}")
return
# Mark this event as processed
self.processed_events.add(event_id)
print(f"Event: CrewKickoffStartedEvent {event_id}")
thinking_step = {
"id": f"step-{uuid.uuid4()}",
"object": "thread.run.step.delta",
"thread_id": self.thread_id,
"model": "crewai-team",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"step_details": {
"type": "thinking",
"content": f"Assembling a team of agents to work on your request..."
}
}
}
]
}
# Use thread-safe queue.put() instead of asyncio.Queue
print(f"Putting CrewKickoffStartedEvent {event_id} in queue")
self.thread_queue.put((f"event: thread.run.step.delta\n", f"data: {json.dumps(thinking_step)}\n\n"))
@crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event):
# Generate a unique event ID
event_id = f"agent-start-{id(event)}"
# Check if we've already processed this event
if event_id in self.processed_events:
print(f"Skipping duplicate event: {event_id}")
return
# Mark this event as processed
self.processed_events.add(event_id)
print(f"Event: AgentExecutionStartedEvent {event_id}")
agent_step = {
"id": f"step-{uuid.uuid4()}",
"object": "thread.run.step.delta",
"thread_id": self.thread_id,
"model": "crewai-team",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"step_details": {
"type": "thinking",
"content": f"{event.agent.role} is working on: {event.task.description}"
}
}
}
]
}
self.thread_queue.put((f"event: thread.run.step.delta\n", f"data: {json.dumps(agent_step)}\n\n"))
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event):
# Generate a unique event ID
event_id = f"tool-start-{id(event)}"
# Check if we've already processed this event
if event_id in self.processed_events:
print(f"Skipping duplicate event: {event_id}")
return
# Mark this event as processed
self.processed_events.add(event_id)
print(f"Event: ToolUsageStartedEvent {event_id}")
tool_step = {
"id": f"step-{uuid.uuid4()}",
"object": "thread.run.step.delta",
"thread_id": self.thread_id,
"model": "crewai-team",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"step_details": {
"type": "tool_calls",
"tool_calls": [
{
"id": f"call-{uuid.uuid4()}",
"name": event.tool_name,
"args": event.tool_input
}
]
}
}
}
]
}
self.thread_queue.put((f"event: thread.run.step.delta\n", f"data: {json.dumps(tool_step)}\n\n"))
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event):
# Generate a unique event ID
event_id = f"tool-finish-{id(event)}"
# Check if we've already processed this event
if event_id in self.processed_events:
print(f"Skipping duplicate event: {event_id}")
return
# Mark this event as processed
self.processed_events.add(event_id)
print(f"Event: ToolUsageFinishedEvent {event_id}")
tool_response_step = {
"id": f"step-{uuid.uuid4()}",
"object": "thread.run.step.delta",
"thread_id": self.thread_id,
"model": "crewai-team",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"step_details": {
"type": "tool_response",
"content": str(event.output),
"name": event.tool_name,
"tool_call_id": f"call-{uuid.uuid4()}"
}
}
}
]
}
self.thread_queue.put((f"event: thread.run.step.delta\n", f"data: {json.dumps(tool_response_step)}\n\n"))
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event):
# Generate a unique event ID
event_id = f"agent-complete-{id(event)}"
# Check if we've already processed this event
if event_id in self.processed_events:
print(f"Skipping duplicate event: {event_id}")
return
# Mark this event as processed
self.processed_events.add(event_id)
print(f"Event: AgentExecutionCompletedEvent {event_id}")
# Stream the agent's output as a message
message_chunks = split_into_chunks(event.output)
for i, chunk in enumerate(message_chunks):
message_delta = {
"id": f"msg-{uuid.uuid4()}",
"object": "thread.message.delta",
"thread_id": self.thread_id,
"model": "crewai-team",
"created": int(time.time()),
"choices": [
{
"delta": {
"role": "assistant",
"content": chunk
}
}
]
}
self.thread_queue.put((f"event: thread.message.delta\n", f"data: {json.dumps(message_delta)}\n\n"))
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event):
# Generate a unique event ID
event_id = f"kickoff-complete-{id(event)}"
# Check if we've already processed this event
if event_id in self.processed_events:
print(f"Skipping duplicate event: {event_id}")
return
# Mark this event as processed
self.processed_events.add(event_id)
print(f"Event: CrewKickoffCompletedEvent {event_id}")
# Signal that the streaming is complete
self.thread_queue.put(None)
# Stream response function for CrewAI
async def stream_crew_response(query, thread_id):
# Create the event listener
listener = CrewAIStreamingListener(thread_id)
# Start the crew execution in a separate thread
thread = threading.Thread(
target=process_with_crew_thread,
args=(query, listener)
)
thread.daemon = True
thread.start()
# Create an asyncio queue for the FastAPI streaming response
async_queue = asyncio.Queue()
# Start a background task to transfer items from thread queue to async queue
asyncio.create_task(transfer_queue_items(listener.thread_queue, async_queue))
# Stream events from the async queue
while True:
event_data = await async_queue.get()
if event_data is None:
break
event_type, event_content = event_data
yield event_type
yield event_content
# Function to transfer items from thread queue to async queue
async def transfer_queue_items(thread_queue, async_queue):
print("Starting queue transfer task")
while True:
try:
# Use blocking get with timeout to be more responsive
# but not consume too much CPU
item = thread_queue.get(block=True, timeout=0.1)
print(f"Received item from thread queue: {item[:50] if item is not None else None}")
# If we got None, it means we're done
if item is None:
print("Received None, signaling end of streaming")
await async_queue.put(None)
break
# Otherwise, put the item in the async queue
await async_queue.put(item)
print(f"Put item in async queue")
except queue.Empty:
# If timeout occurs, just continue the loop
await asyncio.sleep(0.01) # Short sleep to prevent CPU spinning
# Function to run CrewAI in a separate thread
def process_with_crew_thread(query, listener):
try:
print("Starting CrewAI execution in thread")
# Import the event bus to register the listener
from crewai.utilities.events import crewai_event_bus
# Register the listener with the event bus
print("Registering event listener")
listener.setup_listeners(crewai_event_bus)
print("Event listener registered")
# Create tasks
print("Creating tasks")
research_task = Task(
description=f"Research information about: {query}",
agent=researcher,
expected_output='A detailed research report.'
)
analysis_task = Task(
description="Analyze the research findings and extract key insights",
agent=analyst,
dependencies=[research_task],
expected_output='A summary of key findings.'
)
writing_task = Task(
description="Create a comprehensive response based on the research and analysis",
agent=writer,
dependencies=[analysis_task],
expected_output='A concise answer to the query with sources cited and insights discovered.'
)
# Create the crew
print("Creating crew")
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
process=Process.sequential
)
# Run the crew directly in this thread
print("Starting crew.kickoff()")
result = crew.kickoff()
print("Crew execution completed")
return result
except Exception as e:
# In case of error, put a None in the queue to signal the end of streaming
print(f"Error in crew execution: {str(e)}")
listener.thread_queue.put(None)
# Helper function to split text into chunks
def split_into_chunks(text, chunk_size=10):
words = text.split()
chunks = []
current_chunk = []
for word in words:
current_chunk.append(word)
if len(current_chunk) >= chunk_size:
chunks.append(" ".join(current_chunk))
current_chunk = []
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Copilot example
This service allows you to register a Microsoft Copilot Studio agent in watsonx Orchestrate via the Agent Connect Framework.
This service takes messages from Orchestrate, then uses the DirectLine API to authenticate, start a conversation, and send/receive messages with the externally hosted agent.
Finally, it returns the response of the agent using a format and specification that is compatible with watsonx Orchestrate by implementing a chat completions endpoint.
Before you use this example, you must use the following environment variables in you .env
file:
BOT_SECRET=<BOT SECRET>
TOKEN_ENDPOINT=https://directline.botframework.com/v3/directline/tokens/generate
You can get the BOT_SECRET
from the Copilot Studio Agent Settings page, and then navigate to Security > Web Channel Security.
import asyncio
import json
import os
import time
import uuid
from typing import Any, Dict, List, Optional
import httpx
from dotenv import load_dotenv
from fastapi import FastAPI, Header
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class DirectLineToken:
value: str
created_at: datetime
class CopilotAgent:
"""
Handles authentication and conversations with Microsoft Copilot Studio via Direct Line API.
"""
TOKEN_ENDPOINT = os.getenv("TOKEN_ENDPOINT", "")
BOT_SECRET = os.getenv("BOT_SECRET", "")
BASE_URL = "https://directline.botframework.com/v3/directline"
def __init__(self) -> None:
self._token_cache: Optional[DirectLineToken] = None
self._conversation_id: str = ""
self._client = httpx.AsyncClient()
async def _get_direct_line_token(self) -> str:
if (
self._token_cache
and datetime.utcnow() - self._token_cache.created_at < timedelta(minutes=29)
):
return self._token_cache.value
resp = await self._client.post(
self.TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {self.BOT_SECRET}"}
)
resp.raise_for_status()
token = resp.json()["token"]
self._token_cache = DirectLineToken(value=token, created_at=datetime.utcnow())
return token
async def _start_conversation(self) -> str:
if self._conversation_id != "":
return self._conversation_id
token = await self._get_direct_line_token()
resp = await self._client.post(
f"{self.BASE_URL}/conversations",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
)
resp.raise_for_status()
if resp.json()["conversationId"]:
self._conversation_id = resp.json()["conversationId"]
return self._conversation_id
async def ask(self, user_text: str) -> str:
token = await self._get_direct_line_token()
convo_id = await self._start_conversation()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
send_payload = {"type": "message", "from": {"id": "user"}, "text": user_text}
send_resp = await self._client.post(
f"{self.BASE_URL}/conversations/{convo_id}/activities",
headers=headers,
json=send_payload,
)
send_resp.raise_for_status()
activity_id = send_resp.json()["id"]
timeout_s, start = 180, time.time()
while time.time() - start < timeout_s:
recv_resp = await self._client.get(
f"{self.BASE_URL}/conversations/{convo_id}/activities", headers=headers
)
recv_resp.raise_for_status()
for act in recv_resp.json().get("activities", []):
if (
act.get("type") == "message"
and act.get("from", {}).get("id") != "user"
and act.get("replyToId") == activity_id
):
return act.get("text", "")
await asyncio.sleep(0.5)
return "Copilot did not answer in time."
app = FastAPI()
class ChatRequest(BaseModel):
messages: List[Dict[str, Any]]
stream: bool = False
thread_state: Dict[str, Dict[str, Any]] = {}
@app.get("/v1/agents")
async def discover_agents():
return {
"agents": [
{
"name": "Copilot Studio Agent",
"description": "Microsoft Copilot Studio exposed via Agent Connect",
"provider": {
"organization": "Microsoft",
"url": "https://copilotstudio.microsoft.com",
},
"version": "1.0.0",
"documentation_url": "https://learn.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-direct-line-3-0-api-reference?view=azure-bot-service-4.0",
"capabilities": {"streaming": True},
}
]
}
def _sse_event(data: str, event: str = "message") -> str:
return f"event: {event}\ndata: {data}\n\n"
@app.post("/v1/chat")
async def chat_completion(
request: ChatRequest, x_ibm_thread_id: str = Header(None)
):
assert x_ibm_thread_id is not None and isinstance(x_ibm_thread_id, str) and len(x_ibm_thread_id) > 0
thread_id = x_ibm_thread_id
if thread_id not in thread_state:
thread_state[thread_id] = {"history": [], "agent": CopilotAgent()}
state = thread_state[thread_id]
hist: List[Dict[str, Any]] = state["history"]
agent: CopilotAgent = state["agent"]
for msg in request.messages:
if msg not in hist:
hist.append(msg)
try:
user_msg = next(m["content"] for m in reversed(hist) if m["role"] == "user")
except StopIteration:
return JSONResponse(status_code=400, content={"error": "No user message."})
assistant_reply = await agent.ask(user_msg)
hist.append({"role": "assistant", "content": assistant_reply})
if request.stream:
async def event_stream():
yield _sse_event(
json.dumps(
{
"choices": [
{
"index": 0,
"delta": {"role": "assistant"},
}
]
}
)
)
for token in assistant_reply.split():
yield _sse_event(
json.dumps(
{
"choices": [
{
"index": 0,
"delta": {"content": token + " "},
}
]
}
)
)
await asyncio.sleep(0)
yield _sse_event(
json.dumps(
{"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]}
)
)
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
return {
"id": str(uuid.uuid4()),
"object": "chat.completion",
"created": int(time.time()),
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": assistant_reply},
"finish_reason": "stop",
}
],
}
if __name__ == "__main__":
import uvicorn
load_dotenv()
uvicorn.run("server:app", host="0.0.0.0", port=8083, reload=True)
Custom Framework Example
If you’re using a custom agent framework or building your own, you can implement the Agent Connect API to integrate your agent with Agent Connect. Here’s a minimal example to get you started:
import os
from typing import Dict, List, Any, Optional
from fastapi import FastAPI, Request, Response, Header
from fastapi.responses import StreamingResponse
import uvicorn
import json
import time
import uuid
import asyncio
from pydantic import BaseModel
# Initialize FastAPI app
app = FastAPI(title="Custom Agent Connect Example")
# Define message schema
class ChatRequest(BaseModel):
messages: List[Dict[str, Any]]
stream: bool = False
# Simple in-memory storage for conversation history
conversation_history = {}
# Agent discovery endpoint
@app.get("/v1/agents")
async def discover_agents():
return {
"agents": [
{
"name": "Custom Agent",
"description": "A custom agent that demonstrates the minimal implementation of Agent Connect",
"provider": {
"organization": "Your Organization",
"url": "https://your-organization.com"
},
"version": "1.0.0",
"documentation_url": "https://docs.example.com/custom-agent",
"capabilities": {
"streaming": True
}
}
]
}
# Chat completion endpoint
@app.post("/v1/chat")
async def chat_completion(request: ChatRequest, x_ibm_thread_id: str = Header(None)):
thread_id = x_ibm_thread_id or str(uuid.uuid4())
# Store or retrieve conversation history
if thread_id not in conversation_history:
conversation_history[thread_id] = []
# Add the new messages to history
for msg in request.messages:
if msg not in conversation_history[thread_id]:
conversation_history[thread_id].append(msg)
# Extract the user query from the messages
user_messages = [msg for msg in request.messages if msg["role"] == "user"]
if not user_messages:
return {"error": "No user message found"}
query = user_messages[-1]["content"]
# Handle streaming
if request.stream:
return StreamingResponse(
stream_custom_response(query, thread_id),
media_type="text/event-stream"
)
else:
# Generate a response (this is a placeholder, replace with actual logic)
response_content = f"Received query: {query}"
# Add the response to the conversation history
conversation_history[thread_id].append({"role": "assistant", "content": response_content})
return {"thread_id": thread_id, "messages": conversation_history[thread_id]}
async def stream_custom_response(query: str, thread_id: str):
# This is a placeholder for streaming logic, replace with actual implementation
for i in range(5):
yield f"data: Streaming response part {i+1} for query: {query}\n\n"
await asyncio.sleep(1)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)