In this section, you get access to examples of the integration of the Agent Connect Framework with popular frameworks, which demonstrates how you can integrate your existing agents with watsonx Orchestrate.

Requirements

Before you progress, you must install the following requirements to run these examples:

They also require their own .env files for defining environment variables and fetching them with python-dotenv. For more information, see Python-dotenv .

LangGraph example

LangGraph is a library for building stateful, multi-actor applications with LLMs. Here’s an example of a LangGraph agent integrated with Agent Connect:

Basic LangGraph agent

import os
from dotenv import load_dotenv
from typing import Dict, List, Tuple, Any
from fastapi import FastAPI, Request, Response, Header
from fastapi.responses import StreamingResponse
import uvicorn
import json
import time
import uuid
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI

load_dotenv()

# Initialize FastAPI app
app = FastAPI(title="LangGraph Agent Connect Example")

# Define message schema
class Message(BaseModel):
    role: str
    content: str
    name: str = None
    
class ChatRequest(BaseModel):
    model: str
    messages: List[Dict[str, Any]]
    stream: bool = False
    
# Define a simple LangGraph agent
def create_agent():
    # Initialize the LLM
    llm = ChatOpenAI(
        model="gpt-3.5-turbo",
        temperature=0,
        streaming=True
    )
    
    # Define the agent state
    class AgentState(BaseModel):
        messages: List[Any] = Field(default_factory=list)
        
    # Define the agent nodes
    def agent(state: AgentState):
        messages = state.messages
        response = llm.invoke(messages)
        return {"messages": messages + [response]}
    
    # Create the graph
    workflow = StateGraph(AgentState)
    workflow.add_node("agent", agent)
    workflow.set_entry_point("agent")
    workflow.add_edge("agent", END)
    
    # Compile the graph
    return workflow.compile()

# Create the agent
agent_executor = create_agent()

# Agent discovery endpoint
@app.get("/v1/agents")
async def discover_agents():
    return {
        "agents": [
            {
                "name": "LangGraph Example Agent",
                "description": "A simple LangGraph agent that demonstrates Agent Connect integration",
                "provider": {
                    "organization": "Your Organization",
                    "url": "https://your-organization.com"
                },
                "version": "1.0.0",
                "documentation_url": "https://docs.example.com/langgraph-agent",
                "capabilities": {
                    "streaming": True
                }
            }
        ]
    }

# Chat completion endpoint
@app.post("/v1/chat")
async def chat_completion(request: ChatRequest, x_thread_id: str = Header(None)):
    thread_id = x_thread_id or str(uuid.uuid4())
    
    # Convert the messages to LangChain format
    messages = []
    for msg in request.messages:
        if msg["role"] == "user":
            messages.append(HumanMessage(content=msg["content"]))
        elif msg["role"] == "assistant":
            messages.append(AIMessage(content=msg["content"]))
    
    # Handle streaming
    if request.stream:
        return StreamingResponse(
            stream_response(messages, thread_id),
            media_type="text/event-stream"
        )
    else:
        # Execute the agent
        result = agent_executor.invoke({"messages": messages})
        final_message = result["messages"][-1]
        
        # Format the response
        response = {
            "id": f"chatcmpl-{uuid.uuid4()}",
            "object": "chat.completion",
            "created": int(time.time()),
            "model": request.model,
            "choices": [
                {
                    "index": 0,
                    "message": {
                        "role": "assistant",
                        "content": final_message.content
                    },
                    "finish_reason": "stop"
                }
            ],
            "usage": {
                "prompt_tokens": 0,
                "completion_tokens": 0,
                "total_tokens": 0
            }
        }
        
        return response

# Stream response function
async def stream_response(messages, thread_id):
    # First, send a thinking step
    thinking_step = {
        "id": f"step-{uuid.uuid4()}",
        "object": "thread.run.step.delta",
        "thread_id": thread_id,
        "model": "langgraph-agent",
        "created": int(time.time()),
        "choices": [
            {
                "delta": {
                    "role": "assistant",
                    "step_details": {
                        "type": "thinking",
                        "content": "Analyzing the request and formulating a response..."
                    }
                }
            }
        ]
    }
    
    yield f"event: thread.run.step.delta\n"
    yield f"data: {json.dumps(thinking_step)}\n\n"
    
    # Execute the agent with streaming
    config = {"configurable": {"thread_id": thread_id}}
    
    # Get the agent's response
    result = agent_executor.invoke({"messages": messages})
    final_message = result["messages"][-1]
    
    # Stream the final message
    message_chunks = split_into_chunks(final_message.content)
    
    for i, chunk in enumerate(message_chunks):
        message_delta = {
            "id": f"msg-{uuid.uuid4()}",
            "object": "thread.message.delta",
            "thread_id": thread_id,
            "model": "langgraph-agent",
            "created": int(time.time()),
            "choices": [
                {
                    "delta": {
                        "role": "assistant",
                        "content": chunk
                    }
                }
            ]
        }
        
        yield f"event: thread.message.delta\n"
        yield f"data: {json.dumps(message_delta)}\n\n"
        

# Helper function to split text into chunks
def split_into_chunks(text, chunk_size=10):
    words = text.split()
    chunks = []
    current_chunk = []
    
    for word in words:
        current_chunk.append(word)
        if len(current_chunk) >= chunk_size:
            chunks.append(" ".join(current_chunk))
            current_chunk = []
    
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    
    return chunks

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

LangGraph agent with tool calling

In addition to the previous example, you can also integrate LangGraph agents with tool calling:

import os
from dotenv import load_dotenv
from typing import Dict, List, Tuple, Any, Optional
from fastapi import FastAPI, Request, Response, Header
from fastapi.responses import StreamingResponse
import uvicorn
import json
import time
import uuid
import asyncio
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolCall
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI

load_dotenv()

# Initialize FastAPI app
app = FastAPI(title="LangGraph Agent Connect Example with Tools")

# Define message schema
class Message(BaseModel):
    role: str
    content: str
    name: str = None
    
class ChatRequest(BaseModel):
    model: str
    messages: List[Dict[str, Any]]
    stream: bool = False
    
# Define tools
@tool
def get_weather(location: str, unit: str = "celsius") -> str:
    """Get the current weather for a location."""
    # In a real implementation, this would call a weather API
    weather_data = {
        "New York": {"temperature": 22, "condition": "sunny", "humidity": 45},
        "London": {"temperature": 18, "condition": "cloudy", "humidity": 70},
        "Tokyo": {"temperature": 28, "condition": "rainy", "humidity": 80},
    }
    
    if location in weather_data:
        data = weather_data[location]
        return json.dumps(data)
    else:
        return json.dumps({"error": f"Weather data not available for {location}"})

@tool
def search_knowledge_base(query: str) -> str:
    """Search the knowledge base for information."""
    # In a real implementation, this would query a vector database
    knowledge = {
        "company history": "The company was founded in 2010 and has grown to 500 employees.",
        "product features": "Our product offers AI-powered analytics, real-time reporting, and integration with major platforms.",
        "pricing": "We offer three tiers: Basic ($10/month), Pro ($25/month), and Enterprise (custom pricing)."
    }
    
    for key, value in knowledge.items():
        if query.lower() in key.lower():
            return value
    
    return "No information found for that query."

# Define a LangGraph agent with tools
def create_agent_with_tools():
    # Initialize the LLM
    llm = ChatOpenAI(
        model="gpt-3.5-turbo",
        temperature=0
    )
    
    # Define the tools
    tools = [get_weather, search_knowledge_base]
    
    # Define the agent state
    class AgentState(BaseModel):
        messages: List[Any] = Field(default_factory=list)
        tool_calls: List[ToolCall] = Field(default_factory=list)
        tool_results: List[Tuple[str, str]] = Field(default_factory=list)
        
    # Define the agent nodes
    def agent(state: AgentState):
        messages = state.messages
        
        # Add tool results to messages if any
        for tool_call_id, result in state.tool_results:
            messages.append(AIMessage(content=result))
        
        # Reset tool results
        state.tool_results = []
        
        # Add system message with tool instructions
        if not any(isinstance(msg, SystemMessage) for msg in messages):
            system_message = SystemMessage(content="You are a helpful assistant with access to tools. Use them when appropriate.")
            messages = [system_message] + messages
        
        # Invoke the LLM
        response = llm.invoke(messages)
        
        # Check if the response contains tool calls
        if hasattr(response, "tool_calls") and response.tool_calls:
            state.tool_calls = [
                ToolCall(
                    name=tool_call.name,
                    args=tool_call.args,
                    id=tool_call.id
                )
                for tool_call in response.tool_calls
            ]
            return {"messages": messages, "tool_calls": state.tool_calls, "next": "tools"}
        
        return {"messages": messages + [response], "next": END}
    
    def tools_node(state: AgentState):
        tool_node = ToolNode(tools)
        results = tool_node.invoke(state.tool_calls)
        
        # Store tool results
        state.tool_results = [(call.id, result) for call, result in zip(state.tool_calls, results)]
        state.tool_calls = []
        
        return {"messages": state.messages, "tool_results": state.tool_results, "next": "agent"}
    
    # Create the graph
    workflow = StateGraph(AgentState)
    workflow.add_node("agent", agent)
    workflow.add_node("tools", tools_node)
    workflow.set_entry_point("agent")
    workflow.add_edge("agent", "tools")
    workflow.add_edge("tools", "agent")
    
    # Compile the graph
    return workflow.compile()

# Create the agent
agent_executor = create_agent_with_tools()

# Agent discovery endpoint
@app.get("/v1/agents")
async def discover_agents():
    return {
        "agents": [
            {
                "name": "LangGraph Tool Agent",
                "description": "A LangGraph agent with tool calling capabilities",
                "provider": {
                    "organization": "Your Organization",
                    "url": "https://your-organization.com"
                },
                "version": "1.0.0",
                "documentation_url": "https://docs.example.com/langgraph-tool-agent",
                "capabilities": {
                    "streaming": True
                }
            }
        ]
    }

# Chat completion endpoint
@app.post("/v1/chat")
async def chat_completion(request: ChatRequest, x_thread_id: str = Header(None)):
    thread_id = x_thread_id or str(uuid.uuid4())
    
    # Convert the messages to LangChain format
    messages = []
    for msg in request.messages:
        if msg["role"] == "user":
            messages.append(HumanMessage(content=msg["content"]))
        elif msg["role"] == "assistant":
            messages.append(AIMessage(content=msg["content"]))
    
    # Handle streaming
    if request.stream:
        return StreamingResponse(
            stream_response_with_tools(messages, thread_id),
            media_type="text/event-stream"
        )
    else:
        # Execute the agent
        result = agent_executor.invoke({"messages": messages})
        final_message = result["messages"][-1]
        
        # Format the response
        response = {
            "id": f"chatcmpl-{uuid.uuid4()}",
            "object": "chat.completion",
            "created": int(time.time()),
            "model": request.model,
            "choices": [
                {
                    "index": 0,
                    "message": {
                        "role": "assistant",
                        "content": final_message.content
                    },
                    "finish_reason": "stop"
                }
            ],
            "usage": {
                "prompt_tokens": 0,
                "completion_tokens": 0,
                "total_tokens": 0
            }
        }
        
        return response

# Stream response function with tool calling
async def stream_response_with_tools(messages, thread_id):
    # First, send a thinking step
    thinking_step = {
        "id": f"step-{uuid.uuid4()}",
        "object": "thread.run.step.delta",
        "thread_id": thread_id,
        "model": "langgraph-tool-agent",
        "created": int(time.time()),
        "choices": [
            {
                "delta": {
                    "role": "assistant",
                    "step_details": {
                        "type": "thinking",
                        "content": "Analyzing the request and determining if tools are needed..."
                    }
                }
            }
        ]
    }
    
    yield f"event: thread.run.step.delta\n"
    yield f"data: {json.dumps(thinking_step)}\n\n"
    
    # Add a small delay to simulate processing
    await asyncio.sleep(0.5)
    
    # Execute the agent
    config = {"configurable": {"thread_id": thread_id}}
    
    # This is a simplified example - in a real implementation, you would
    # capture the tool calls and tool responses during execution and stream them
    
    # For demonstration, let's assume the agent uses the weather tool
    tool_call_id = f"call-{uuid.uuid4()}"
    
    # Stream a tool call
    tool_call_step = {
        "id": f"step-{uuid.uuid4()}",
        "object": "thread.run.step.delta",
        "thread_id": thread_id,
        "model": "langgraph-tool-agent",
        "created": int(time.time()),
        "choices": [
            {
                "delta": {
                    "role": "assistant",
                    "step_details": {
                        "type": "tool_calls",
                        "tool_calls": [
                            {
                                "id": tool_call_id,
                                "name": "get_weather",
                                "args": {
                                    "location": "New York",
                                    "unit": "celsius"
                                }
                            }
                        ]
                    }
                }
            }
        ]
    }
    
    yield f"event: thread.run.step.delta\n"
    yield f"data: {json.dumps(tool_call_step)}\n\n"
    
    # Add a small delay to simulate tool execution
    await asyncio.sleep(0.5)
    
    # Stream a tool response
    tool_response_step = {
        "id": f"step-{uuid.uuid4()}",
        "object": "thread.run.step.delta",
        "thread_id": thread_id,
        "model": "langgraph-tool-agent",
        "created": int(time.time()),
        "choices": [
            {
                "delta": {
                    "role": "assistant",
                    "step_details": {
                        "type": "tool_response",
                        "content": '{"temperature": 22, "condition": "sunny", "humidity": 45}',
                        "name": "get_weather",
                        "tool_call_id": tool_call_id
                    }
                }
            }
        ]
    }
    
    yield f"event: thread.run.step.delta\n"
    yield f"data: {json.dumps(tool_response_step)}\n\n"
    
    # Add a small delay to simulate processing
    await asyncio.sleep(0.5)
    
    # Get the agent's final response
    result = agent_executor.invoke({"messages": messages})
    final_message = result["messages"][-1]
    
    # Stream the final message
    message_chunks = split_into_chunks(final_message.content)
    
    for i, chunk in enumerate(message_chunks):
        message_delta = {
            "id": f"msg-{uuid.uuid4()}",
            "object": "thread.message.delta",
            "thread_id": thread_id,
            "model": "langgraph-tool-agent",
            "created": int(time.time()),
            "choices": [
                {
                    "delta": {
                        "role": "assistant",
                        "content": chunk
                    }
                }
            ]
        }
        
        yield f"event: thread.message.delta\n"
        yield f"data: {json.dumps(message_delta)}\n\n"
        
        # Add a small delay to simulate streaming
        await asyncio.sleep(0.05)

# Helper function to split text into chunks
def split_into_chunks(text, chunk_size=10):
    words = text.split()
    chunks = []
    current_chunk = []
    
    for word in words:
        current_chunk.append(word)
        if len(current_chunk) >= chunk_size:
            chunks.append(" ".join(current_chunk))
            current_chunk = []
    
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    
    return chunks

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

CrewAI example

CrewAI is a framework for orchestrating role-playing autonomous AI agents. Here’s an example of a CrewAI agent integrated with Agent Connect:

import os
import queue
import threading
from typing import Dict, List, Any, Optional
from fastapi import FastAPI, Request, Response, Header
from fastapi.responses import StreamingResponse
import uvicorn
import json
import time
import uuid
import asyncio
from pydantic import BaseModel
from crewai import Agent, Task, Crew, Process
from crewai.utilities.events import (
    CrewKickoffStartedEvent,
    CrewKickoffCompletedEvent,
    AgentExecutionStartedEvent,
    AgentExecutionCompletedEvent,
    ToolUsageStartedEvent,
    ToolUsageFinishedEvent
)
from crewai.utilities.events.base_event_listener import BaseEventListener

# Initialize FastAPI app
app = FastAPI(title="CrewAI Agent Connect Example")

# Define message schema
class ChatRequest(BaseModel):
    model: str
    messages: List[Dict[str, Any]]
    stream: bool = False

# Create CrewAI agents
researcher = Agent(
    role="Researcher",
    goal="Find accurate and relevant information",
    backstory="You are an expert researcher with a talent for finding information.",
    verbose=True
)

writer = Agent(
    role="Writer",
    goal="Create engaging and informative content",
    backstory="You are a skilled writer who can explain complex topics clearly.",
    verbose=True
)

analyst = Agent(
    role="Analyst",
    goal="Analyze information and extract insights",
    backstory="You are an analytical thinker who can identify patterns and insights.",
    verbose=True
)

# Create a function to process requests with CrewAI
def process_with_crew(query):
    # Create tasks
    research_task = Task(
        description=f"Research information about: {query}",
        agent=researcher,
        expected_output='A detailed research report.'
    )
    
    analysis_task = Task(
        description="Analyze the research findings and extract key insights",
        agent=analyst,
        dependencies=[research_task],
        expected_output='A summary of key findings.'
    )
    
    writing_task = Task(
        description="Create a comprehensive response based on the research and analysis",
        agent=writer,
        dependencies=[analysis_task],
        expected_output='A concise answer to the query with sources cited and insights discovered.'
    )
    
    # Create the crew
    crew = Crew(
        agents=[researcher, analyst, writer],
        tasks=[research_task, analysis_task, writing_task],
        process=Process.sequential
    )
    
    # Run the crew
    result = crew.kickoff()
    return result

# Agent discovery endpoint
@app.get("/v1/agents")
async def discover_agents():
    return {
        "agents": [
            {
                "name": "CrewAI Research Team",
                "description": "A team of specialized agents that collaborate to research, analyze, and present information",
                "provider": {
                    "organization": "Your Organization",
                    "url": "https://your-organization.com"
                },
                "version": "1.0.0",
                "documentation_url": "https://docs.example.com/crewai-agent",
                "capabilities": {
                    "streaming": True
                }
            }
        ]
    }

# Chat completion endpoint
@app.post("/v1/chat")
async def chat_completion(request: ChatRequest, x_thread_id: str = Header(None)):
    thread_id = x_thread_id or str(uuid.uuid4())
    
    # Extract the user query from the messages
    user_messages = [msg for msg in request.messages if msg["role"] == "user"]
    if not user_messages:
        return {"error": "No user message found"}
    
    query = user_messages[-1]["content"]
    
    # Handle streaming
    if request.stream:
        return StreamingResponse(
            stream_crew_response(query, thread_id),
            media_type="text/event-stream"
        )
    else:
        # Process with CrewAI
        result = process_with_crew(query)
        
        # Format the response
        response = {
            "id": f"chatcmpl-{uuid.uuid4()}",
            "object": "chat.completion",
            "created": int(time.time()),
            "model": request.model,
            "choices": [
                {
                    "index": 0,
                    "message": {
                        "role": "assistant",
                        "content": result
                    },
                    "finish_reason": "stop"
                }
            ],
            "usage": {
                "prompt_tokens": 0,
                "completion_tokens": 0,
                "total_tokens": 0
            }
        }
        
        return response


# Create a custom event listener for streaming CrewAI events
class CrewAIStreamingListener(BaseEventListener):
    def __init__(self, thread_id):
        super().__init__()
        self.thread_id = thread_id
        # Use a thread-safe queue for communication between threads
        self.thread_queue = queue.Queue()
        # Track events to prevent duplicates
        self.processed_events = set()
        print(f"CrewAIStreamingListener initialized with thread_id: {thread_id}")
        
    def setup_listeners(self, crewai_event_bus):
        print("Setting up event listeners")
        
        @crewai_event_bus.on(CrewKickoffStartedEvent)
        def on_crew_kickoff_started(source, event):
            # Generate a unique event ID
            event_id = f"kickoff-{id(event)}"
            
            # Check if we've already processed this event
            if event_id in self.processed_events:
                print(f"Skipping duplicate event: {event_id}")
                return
                
            # Mark this event as processed
            self.processed_events.add(event_id)
            
            print(f"Event: CrewKickoffStartedEvent {event_id}")
            thinking_step = {
                "id": f"step-{uuid.uuid4()}",
                "object": "thread.run.step.delta",
                "thread_id": self.thread_id,
                "model": "crewai-team",
                "created": int(time.time()),
                "choices": [
                    {
                        "delta": {
                            "role": "assistant",
                            "step_details": {
                                "type": "thinking",
                                "content": f"Assembling a team of agents to work on your request..."
                            }
                        }
                    }
                ]
            }
            # Use thread-safe queue.put() instead of asyncio.Queue
            print(f"Putting CrewKickoffStartedEvent {event_id} in queue")
            self.thread_queue.put((f"event: thread.run.step.delta\n", f"data: {json.dumps(thinking_step)}\n\n"))
        
        @crewai_event_bus.on(AgentExecutionStartedEvent)
        def on_agent_execution_started(source, event):
            # Generate a unique event ID
            event_id = f"agent-start-{id(event)}"
            
            # Check if we've already processed this event
            if event_id in self.processed_events:
                print(f"Skipping duplicate event: {event_id}")
                return
                
            # Mark this event as processed
            self.processed_events.add(event_id)
            
            print(f"Event: AgentExecutionStartedEvent {event_id}")
            agent_step = {
                "id": f"step-{uuid.uuid4()}",
                "object": "thread.run.step.delta",
                "thread_id": self.thread_id,
                "model": "crewai-team",
                "created": int(time.time()),
                "choices": [
                    {
                        "delta": {
                            "role": "assistant",
                            "step_details": {
                                "type": "thinking",
                                "content": f"{event.agent.role} is working on: {event.task.description}"
                            }
                        }
                    }
                ]
            }
            self.thread_queue.put((f"event: thread.run.step.delta\n", f"data: {json.dumps(agent_step)}\n\n"))
        
        @crewai_event_bus.on(ToolUsageStartedEvent)
        def on_tool_usage_started(source, event):
            # Generate a unique event ID
            event_id = f"tool-start-{id(event)}"
            
            # Check if we've already processed this event
            if event_id in self.processed_events:
                print(f"Skipping duplicate event: {event_id}")
                return
                
            # Mark this event as processed
            self.processed_events.add(event_id)
            
            print(f"Event: ToolUsageStartedEvent {event_id}")
            tool_step = {
                "id": f"step-{uuid.uuid4()}",
                "object": "thread.run.step.delta",
                "thread_id": self.thread_id,
                "model": "crewai-team",
                "created": int(time.time()),
                "choices": [
                    {
                        "delta": {
                            "role": "assistant",
                            "step_details": {
                                "type": "tool_calls",
                                "tool_calls": [
                                    {
                                        "id": f"call-{uuid.uuid4()}",
                                        "name": event.tool_name,
                                        "args": event.tool_input
                                    }
                                ]
                            }
                        }
                    }
                ]
            }
            self.thread_queue.put((f"event: thread.run.step.delta\n", f"data: {json.dumps(tool_step)}\n\n"))
        
        @crewai_event_bus.on(ToolUsageFinishedEvent)
        def on_tool_usage_finished(source, event):
            # Generate a unique event ID
            event_id = f"tool-finish-{id(event)}"
            
            # Check if we've already processed this event
            if event_id in self.processed_events:
                print(f"Skipping duplicate event: {event_id}")
                return
                
            # Mark this event as processed
            self.processed_events.add(event_id)
            
            print(f"Event: ToolUsageFinishedEvent {event_id}")
            tool_response_step = {
                "id": f"step-{uuid.uuid4()}",
                "object": "thread.run.step.delta",
                "thread_id": self.thread_id,
                "model": "crewai-team",
                "created": int(time.time()),
                "choices": [
                    {
                        "delta": {
                            "role": "assistant",
                            "step_details": {
                                "type": "tool_response",
                                "content": str(event.output),
                                "name": event.tool_name,
                                "tool_call_id": f"call-{uuid.uuid4()}"
                            }
                        }
                    }
                ]
            }
            self.thread_queue.put((f"event: thread.run.step.delta\n", f"data: {json.dumps(tool_response_step)}\n\n"))
        
        @crewai_event_bus.on(AgentExecutionCompletedEvent)
        def on_agent_execution_completed(source, event):
            # Generate a unique event ID
            event_id = f"agent-complete-{id(event)}"
            
            # Check if we've already processed this event
            if event_id in self.processed_events:
                print(f"Skipping duplicate event: {event_id}")
                return
                
            # Mark this event as processed
            self.processed_events.add(event_id)
            
            print(f"Event: AgentExecutionCompletedEvent {event_id}")
            # Stream the agent's output as a message
            message_chunks = split_into_chunks(event.output)
            
            for i, chunk in enumerate(message_chunks):
                message_delta = {
                    "id": f"msg-{uuid.uuid4()}",
                    "object": "thread.message.delta",
                    "thread_id": self.thread_id,
                    "model": "crewai-team",
                    "created": int(time.time()),
                    "choices": [
                        {
                            "delta": {
                                "role": "assistant",
                                "content": chunk
                            }
                        }
                    ]
                }
                self.thread_queue.put((f"event: thread.message.delta\n", f"data: {json.dumps(message_delta)}\n\n"))
        
        @crewai_event_bus.on(CrewKickoffCompletedEvent)
        def on_crew_kickoff_completed(source, event):
            # Generate a unique event ID
            event_id = f"kickoff-complete-{id(event)}"
            
            # Check if we've already processed this event
            if event_id in self.processed_events:
                print(f"Skipping duplicate event: {event_id}")
                return
                
            # Mark this event as processed
            self.processed_events.add(event_id)
            
            print(f"Event: CrewKickoffCompletedEvent {event_id}")
            # Signal that the streaming is complete
            self.thread_queue.put(None)

# Stream response function for CrewAI
async def stream_crew_response(query, thread_id):
    # Create the event listener
    listener = CrewAIStreamingListener(thread_id)
    
    # Start the crew execution in a separate thread
    thread = threading.Thread(
        target=process_with_crew_thread,
        args=(query, listener)
    )
    thread.daemon = True
    thread.start()
    
    # Create an asyncio queue for the FastAPI streaming response
    async_queue = asyncio.Queue()
    
    # Start a background task to transfer items from thread queue to async queue
    asyncio.create_task(transfer_queue_items(listener.thread_queue, async_queue))
    
    # Stream events from the async queue
    while True:
        event_data = await async_queue.get()
        if event_data is None:
            break
        
        event_type, event_content = event_data
        yield event_type
        yield event_content

# Function to transfer items from thread queue to async queue
async def transfer_queue_items(thread_queue, async_queue):
    print("Starting queue transfer task")
    while True:
        try:
            # Use blocking get with timeout to be more responsive
            # but not consume too much CPU
            item = thread_queue.get(block=True, timeout=0.1)
            print(f"Received item from thread queue: {item[:50] if item is not None else None}")
            
            # If we got None, it means we're done
            if item is None:
                print("Received None, signaling end of streaming")
                await async_queue.put(None)
                break
                
            # Otherwise, put the item in the async queue
            await async_queue.put(item)
            print(f"Put item in async queue")
            
        except queue.Empty:
            # If timeout occurs, just continue the loop
            await asyncio.sleep(0.01)  # Short sleep to prevent CPU spinning

# Function to run CrewAI in a separate thread
def process_with_crew_thread(query, listener):
    try:
        print("Starting CrewAI execution in thread")
        
        # Import the event bus to register the listener
        from crewai.utilities.events import crewai_event_bus
        
        # Register the listener with the event bus
        print("Registering event listener")
        listener.setup_listeners(crewai_event_bus)
        print("Event listener registered")
        
        # Create tasks
        print("Creating tasks")
        research_task = Task(
            description=f"Research information about: {query}",
            agent=researcher,
            expected_output='A detailed research report.'
        )
        
        analysis_task = Task(
            description="Analyze the research findings and extract key insights",
            agent=analyst,
            dependencies=[research_task],
            expected_output='A summary of key findings.'
        )
        
        writing_task = Task(
            description="Create a comprehensive response based on the research and analysis",
            agent=writer,
            dependencies=[analysis_task],
            expected_output='A concise answer to the query with sources cited and insights discovered.'
        )
        
        # Create the crew
        print("Creating crew")
        crew = Crew(
            agents=[researcher, analyst, writer],
            tasks=[research_task, analysis_task, writing_task],
            process=Process.sequential
        )
        
        # Run the crew directly in this thread
        print("Starting crew.kickoff()")
        result = crew.kickoff()
        print("Crew execution completed")
        
        return result
    except Exception as e:
        # In case of error, put a None in the queue to signal the end of streaming
        print(f"Error in crew execution: {str(e)}")
        listener.thread_queue.put(None)

# Helper function to split text into chunks
def split_into_chunks(text, chunk_size=10):
    words = text.split()
    chunks = []
    current_chunk = []
    
    for word in words:
        current_chunk.append(word)
        if len(current_chunk) >= chunk_size:
            chunks.append(" ".join(current_chunk))
            current_chunk = []
    
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    
    return chunks

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Copilot example

This service allows you to register a Microsoft Copilot Studio agent in watsonx Orchestrate via the Agent Connect Framework.

This service takes messages from Orchestrate, then uses the DirectLine API to authenticate, start a conversation, and send/receive messages with the externally hosted agent.

Finally, it returns the response of the agent using a format and specification that is compatible with watsonx Orchestrate by implementing a chat completions endpoint.

Before you use this example, you must use the following environment variables in you .env file:

BOT_SECRET=<BOT SECRET>
TOKEN_ENDPOINT=https://directline.botframework.com/v3/directline/tokens/generate

You can get the BOT_SECRET from the Copilot Studio Agent Settings page, and then navigate to Security > Web Channel Security.

copilot.py
import asyncio
import json
import os
import time
import uuid
from typing import Any, Dict, List, Optional

import httpx
from dotenv import load_dotenv
from fastapi import FastAPI, Header
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class DirectLineToken:
    value: str
    created_at: datetime

class CopilotAgent:
    """
    Handles authentication and conversations with Microsoft Copilot Studio via Direct Line API.
    """

    TOKEN_ENDPOINT = os.getenv("TOKEN_ENDPOINT", "")
    BOT_SECRET = os.getenv("BOT_SECRET", "")
    BASE_URL = "https://directline.botframework.com/v3/directline"


    def __init__(self) -> None:
            self._token_cache: Optional[DirectLineToken] = None
            self._conversation_id: str = ""
            self._client = httpx.AsyncClient()

    async def _get_direct_line_token(self) -> str:
        if (
            self._token_cache
            and datetime.utcnow() - self._token_cache.created_at < timedelta(minutes=29)
        ):
            return self._token_cache.value

        resp = await self._client.post(
            self.TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {self.BOT_SECRET}"}
        )
        resp.raise_for_status()
        token = resp.json()["token"]  
        self._token_cache = DirectLineToken(value=token, created_at=datetime.utcnow())
        return token

    async def _start_conversation(self) -> str:
        if self._conversation_id != "":
            return self._conversation_id

        token = await self._get_direct_line_token()
        resp = await self._client.post(
            f"{self.BASE_URL}/conversations",
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
            },
        )
        resp.raise_for_status()
        if resp.json()["conversationId"]:
            self._conversation_id = resp.json()["conversationId"]
        return self._conversation_id

    async def ask(self, user_text: str) -> str:
        token = await self._get_direct_line_token()
        convo_id = await self._start_conversation()

        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
        }
        send_payload = {"type": "message", "from": {"id": "user"}, "text": user_text}
        send_resp = await self._client.post(
            f"{self.BASE_URL}/conversations/{convo_id}/activities",
            headers=headers,
            json=send_payload,
        )
        send_resp.raise_for_status()
        activity_id = send_resp.json()["id"]

        timeout_s, start = 180, time.time()
        while time.time() - start < timeout_s:
            recv_resp = await self._client.get(
                f"{self.BASE_URL}/conversations/{convo_id}/activities", headers=headers
            )
            recv_resp.raise_for_status()
            for act in recv_resp.json().get("activities", []):
                if (
                    act.get("type") == "message"
                    and act.get("from", {}).get("id") != "user"
                    and act.get("replyToId") == activity_id
                ):
                    return act.get("text", "")
            await asyncio.sleep(0.5)

        
        return "Copilot did not answer in time."


app = FastAPI()


class ChatRequest(BaseModel):
    messages: List[Dict[str, Any]]
    stream: bool = False


thread_state: Dict[str, Dict[str, Any]] = {}


@app.get("/v1/agents")
async def discover_agents():
    return {
        "agents": [
            {
                "name": "Copilot Studio Agent",
                "description": "Microsoft Copilot Studio exposed via Agent Connect",
                "provider": {
                    "organization": "Microsoft",
                    "url": "https://copilotstudio.microsoft.com",
                },
                "version": "1.0.0",
                "documentation_url": "https://learn.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-direct-line-3-0-api-reference?view=azure-bot-service-4.0",
                "capabilities": {"streaming": True},
            }
        ]
    }


def _sse_event(data: str, event: str = "message") -> str:
    return f"event: {event}\ndata: {data}\n\n"


@app.post("/v1/chat")
async def chat_completion(
    request: ChatRequest, x_ibm_thread_id: str = Header(None)
):
    assert x_ibm_thread_id is not None and isinstance(x_ibm_thread_id, str) and len(x_ibm_thread_id) > 0
    thread_id = x_ibm_thread_id
    if thread_id not in thread_state:
        thread_state[thread_id] = {"history": [], "agent": CopilotAgent()}


    state = thread_state[thread_id]
    hist: List[Dict[str, Any]] = state["history"]
    agent: CopilotAgent = state["agent"]

    for msg in request.messages:
        if msg not in hist:
            hist.append(msg)

    try:
        user_msg = next(m["content"] for m in reversed(hist) if m["role"] == "user")
    except StopIteration:
        return JSONResponse(status_code=400, content={"error": "No user message."})

    assistant_reply = await agent.ask(user_msg)

    hist.append({"role": "assistant", "content": assistant_reply})

    if request.stream:
        async def event_stream():
            yield _sse_event(
                json.dumps(
                    {
                        "choices": [
                            {
                                "index": 0,
                                "delta": {"role": "assistant"},
                            }
                        ]
                    }
                )
            )
            for token in assistant_reply.split():
                yield _sse_event(
                    json.dumps(
                        {
                            "choices": [
                                {
                                    "index": 0,
                                    "delta": {"content": token + " "},
                                }
                            ]
                        }
                    )
                )
                await asyncio.sleep(0)
            yield _sse_event(
                json.dumps(
                    {"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]}
                )
            )
            yield "data: [DONE]\n\n"

        return StreamingResponse(event_stream(), media_type="text/event-stream")

    return {
        "id": str(uuid.uuid4()),
        "object": "chat.completion",
        "created": int(time.time()),
        "choices": [
            {
                "index": 0,
                "message": {"role": "assistant", "content": assistant_reply},
                "finish_reason": "stop",
            }
        ],
    }


if __name__ == "__main__":
    import uvicorn
    load_dotenv()

    uvicorn.run("server:app", host="0.0.0.0", port=8083, reload=True)

Custom Framework Example

If you’re using a custom agent framework or building your own, you can implement the Agent Connect API to integrate your agent with Agent Connect. Here’s a minimal example to get you started:

import os
from typing import Dict, List, Any, Optional
from fastapi import FastAPI, Request, Response, Header
from fastapi.responses import StreamingResponse
import uvicorn
import json
import time
import uuid
import asyncio
from pydantic import BaseModel

# Initialize FastAPI app
app = FastAPI(title="Custom Agent Connect Example")

# Define message schema
class ChatRequest(BaseModel):
    messages: List[Dict[str, Any]]
    stream: bool = False

# Simple in-memory storage for conversation history
conversation_history = {}

# Agent discovery endpoint
@app.get("/v1/agents")
async def discover_agents():
    return {
        "agents": [
            {
                "name": "Custom Agent",
                "description": "A custom agent that demonstrates the minimal implementation of Agent Connect",
                "provider": {
                    "organization": "Your Organization",
                    "url": "https://your-organization.com"
                },
                "version": "1.0.0",
                "documentation_url": "https://docs.example.com/custom-agent",
                "capabilities": {
                    "streaming": True
                }
            }
        ]
    }

# Chat completion endpoint
@app.post("/v1/chat")
async def chat_completion(request: ChatRequest, x_ibm_thread_id: str = Header(None)):
    thread_id = x_ibm_thread_id or str(uuid.uuid4())
    
    # Store or retrieve conversation history
    if thread_id not in conversation_history:
        conversation_history[thread_id] = []
    
    # Add the new messages to history
    for msg in request.messages:
        if msg not in conversation_history[thread_id]:
            conversation_history[thread_id].append(msg)
    
    # Extract the user query from the messages
    user_messages = [msg for msg in request.messages if msg["role"] == "user"]
    if not user_messages:
        return {"error": "No user message found"}
    
    query = user_messages[-1]["content"]
    
    # Handle streaming
    if request.stream:
        return StreamingResponse(
            stream_custom_response(query, thread_id),
            media_type="text/event-stream"
        )
    else:
        # Generate a response (this is a placeholder, replace with actual logic)
        response_content = f"Received query: {query}"

        # Add the response to the conversation history
        conversation_history[thread_id].append({"role": "assistant", "content": response_content})

        return {"thread_id": thread_id, "messages": conversation_history[thread_id]}

async def stream_custom_response(query: str, thread_id: str):
    # This is a placeholder for streaming logic, replace with actual implementation
    for i in range(5):
        yield f"data: Streaming response part {i+1} for query: {query}\n\n"
        await asyncio.sleep(1)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)