FastAPI + Nats.io#
Combining the FastAPIAdapter
and NatsAdapter
in FastAgency provides the most scalable setup. It harnesses the power of the FastAPI framework to build and expose workflows as REST APIs while utilizing the Nats.io message broker for scalable and asynchronous communication. This setup is preferred for running large workloads in production.
Use Cases#
This section outlines the scenarios where it is particularly beneficial to combine the FastAPIAdapter
and NATSAdapter
.
When to Use the FastAPIAdapter
and NATSAdapter
Together:
-
High User Demand: If you need to scale beyond what multiple workers of the FastAPIAdapter can achieve, you can use Nats.io with a message queue and multiple workers to consume and produce messages. This distributed message-queue architecture allows scaling not only across multiple workers but also across multiple machines and clusters.
-
Observability: If you need the ability to audit workflow executions both in realtime and retrospectively, the
NatsAdapter
provides the necessary infrastructure to enable this feature. -
Security features of FastAPI: If you want to leverage the security features of FastAPI, such as authentication, authorization, along with the distributed architecture of NATS, this setup is the most suitable option. Please check the securing your FastAPIAdapter documentation for more information.
Architecture Overview#
The following section presents high-level architecture diagrams for the two available setups using the FastAPIAdapter
and NatsAdapter
together with:
The system is composed of three main components:
1. Mesop Client App#
This application serves as the frontend interface for the system. It includes:
MesopUI
: A friendly web interface for users to interact with the workflows. It facilitates the communication with the user and theFastAPIProvider
.FastAPIProvider
: A component that facilitates communication between the Mesop client and theFastAPIAdapter
.
The system is composed of three main components:
1. Custom Client App#
This application serves as the frontend interface for the system. It includes:
- Custom Client: A client application built in a different language, (e.g., HTML/JavaScript, Go, Java, etc.) that facilitates communication between the user and the
FastAPIAdapter
.
This application handles all interactions with the FastAPIAdapter
and presents the results back to the user.
2. FastAPI App#
This application is part of our system's backend and consists of:
-
NatsProvider
: Responsible for connecting to theNatsAdapter
, receiving workflow initiation messages, and distributing them to the workers for execution. -
FastAPIAdapter
: This component communicates withNatsProvider
, and implements routes and websocket for FastAPI. -
FastAPI: Provides the infrastructure for building and exposing AutoGen workflows via REST API.
3. Nats App#
This application is also part of our system's backend and consists of:
-
NatsAdapter
: This adapter connects to theNatsProvider
and is responsible for communicating with AutoGen workflows. -
AutoGen Workflows: These workflows, defined using the AutoGen framework, embody the core logic and behavior of your application. They leverage agents to perform various tasks and accomplish specific goals.
This architecture promotes a clear separation of concerns between the user interface, the API layer, and the workflow execution logic, enhancing modularity and maintainability. The FastAPI framework provides a user-friendly and efficient REST API, while the NATSAdapter
, combined with the Nats.io message broker, ensures scalability and asynchronous communication.
Now, it's time to see the FastAPIAdapter
and NatsAdapter
in action together. Let's dive into an example and learn how to use it!
Installation#
We strongly recommend using Cookiecutter for setting up the project. Cookiecutter creates the project folder structure, default workflow, automatically installs all the necessary requirements, and creates a devcontainer that can be used with Visual Studio Code.
You can setup the project using Cookiecutter by following the project setup guide.
Alternatively, you can use pip + venv.
Before getting started, ensure that FastAgency is installed with support for the AutoGen runtime, along with the mesop, fastapi, server, and nats submodules by running the following command:
This command installs FastAgency with support for both the mesop and console interfaces for AutoGen workflows, but with FastAPI serving input requests and independent workers communicating over Nats.io protocol running workflows.
Before getting started, ensure that FastAgency is installed with support for the AutoGen runtime, along with the fastapi, server, and nats submodules by running the following command:
This command installs FastAgency, but with FastAPI serving input requests and independent workers communicating over Nats.io protocol running workflows.
Example: Student and Teacher Learning Chat#
In this example, we'll create a simple learning chatbot where a student agent asks questions and a teacher agent responds, simulating a learning environment. We'll use MesopUI
for the web interface and the FastAPIAdapter
and NatsAdapter
for serving and executing the workflows.
In this example, we'll create a simple learning chatbot where a student agent asks questions and a teacher agent responds, simulating a learning environment. We'll create a custom client using HTML and JavaScript for the web interface and the FastAPIAdapter
and NatsAdapter
for serving and executing the workflows.
Step-by-Step Breakdown#
As shown in the architecture overview, this setup requires three components (applications). Let's begin our code walkthrough, starting with the NATS App.
1. Define Workflow#
To get started, define the workflow that your application will use. This is where you specify how the agents interact and what they do. Here's a simple example of a workflow definition:
import os
from typing import Any
from autogen.agentchat import ConversableAgent
from fastagency import UI
from fastagency.runtimes.autogen import AutoGenWorkflows
llm_config = {
"config_list": [
{
"model": "gpt-4o-mini",
"api_key": os.getenv("OPENAI_API_KEY"),
}
],
"temperature": 0.8,
}
wf = AutoGenWorkflows()
@wf.register(name="simple_learning", description="Student and teacher learning chat") # type: ignore[misc]
def simple_workflow(ui: UI, params: dict[str, Any]) -> str:
initial_message = ui.text_input(
sender="Workflow",
recipient="User",
prompt="I can help you learn about mathematics. What subject you would like to explore?",
)
student_agent = ConversableAgent(
name="Student_Agent",
system_message="You are a student willing to learn.",
llm_config=llm_config,
)
teacher_agent = ConversableAgent(
name="Teacher_Agent",
system_message="You are a math teacher.",
llm_config=llm_config,
)
chat_result = student_agent.initiate_chat(
teacher_agent,
message=initial_message,
summary_method="reflection_with_llm",
max_turns=3,
)
return str(chat_result.summary)
2. Import Required Modules#
Next, import the required modules from the FastAgency. These imports provide the essential building blocks for integrating with the client. Additionally, import the NatsAdapter
class for workflow execution.
import os
from typing import Any
from fastagency.adapters.nats import NatsAdapter
from fastapi import FastAPI
from ..workflow import wf
3. Configure the NatsAdapter
#
Create an instance of the NatsAdapter
and pass your workflow to it. The adapter will handle the communication with the NatsProvider
and distribute workflow execution to the workers.
nats_url = os.environ.get("NATS_URL", "nats://localhost:4222")
user: str = os.environ.get("FASTAGENCY_NATS_USER", "fastagency")
password: str = os.environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")
adapter = NatsAdapter(provider=wf, nats_url=nats_url, user=user, password=password)
4. Define FastAgency Application#
Create a NatsAdapter
and then add it to the FastAPI application using the lifespan parameter.
5. Adapter Chaining#
Above, we created Nats.io provider that will start brokers waiting to consume initiate workflow messages from the message broker.
Next, we set up a FastAPI service to interact with the NATS.io provider. This introduces the second component: the FastAPI App.
main_2_fastapi.py
from os import environ
from fastagency.adapters.fastapi import FastAPIAdapter
from fastagency.adapters.nats import NatsAdapter
from fastapi import FastAPI
nats_url = environ.get("NATS_URL", "nats://localhost:4222")
nats_user: str = "fastagency"
nats_password: str = environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")
provider = NatsAdapter.create_provider(
nats_url=nats_url, user=nats_user, password=nats_password
)
adapter = FastAPIAdapter(
provider=provider,
)
app = FastAPI()
app.include_router(adapter.router)
Finally, the last component is the Mesop Client App, which uses the MesopUI
to communicate with both the user and the FastAPIProvider
.
main_3_mesop.py
main_fastapi_custom_client.py
from os import environ
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastagency.adapters.fastapi import FastAPIAdapter
from fastagency.adapters.nats import NatsAdapter
nats_url = environ.get("NATS_URL", "nats://localhost:4222")
nats_user: str = "fastagency"
nats_password: str = environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password") # type: ignore[assignment]
provider = NatsAdapter.create_provider(
nats_url=nats_url, user=nats_user, password=nats_password
)
adapter = FastAPIAdapter(
provider=provider,
)
# app = FastAPI(lifespan=provider.lifespan)
app = FastAPI()
app.include_router(adapter.router)
Finally, for simplicity, we will serve our custom HTML client as part of the same FastAPI App using FastAPI's HTMLResponse.
Note
- The below example uses a simple HTML with JavaScript, all in a single string and served directly from the FastAgency FastAPI app for simplicity.
- This approach is not suitable for production but ideal for demonstrating core concepts.
- In a real-world scenario, you'd use a separate frontend, built with frameworks like React or Vue.js, or other languages such as Java, Go, or Ruby, based on your project needs.
main_fastapi_custom_client.py
html = """
<!DOCTYPE html>
<html>
<head>
<title>FastAgency Chat App</title>
</head>
<body>
<h1>FastAgency Chat App</h1>
<div id="workflows"></div>
<ul id="messages"></ul>
<script>
const API_URL = 'http://127.0.0.1:8008/fastagency';
const WS_URL = 'ws://127.0.0.1:8008/fastagency/ws'; // nosemgrep
let socket;
async function fetchWorkflows() {
const response = await fetch(`${API_URL}/discovery`);
const workflows = await response.json();
const container = document.getElementById('workflows');
workflows.forEach(workflow => {
const button = document.createElement('button');
button.textContent = workflow.description;
button.onclick = () => startWorkflow(workflow.name);
container.appendChild(button);
});
}
async function startWorkflow(name) {
const payload = {
workflow_name: name,
workflow_uuid: generateUUID(),
user_id: null, // Set to null for single-user applications; otherwise, provide the appropriate user ID
params: {}
};
const response = await fetch(`${API_URL}/initiate_workflow`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
const workflowJson = await response.json();
connectWebSocket(workflowJson);
}
function connectWebSocket(workflowJson) {
socket = new WebSocket(WS_URL);
socket.onopen = () => {
const initMessage = {
name: workflowJson.name,
workflow_uuid: workflowJson.workflow_uuid,
user_id: workflowJson.user_id,
params: {}
};
socket.send(JSON.stringify(initMessage));
};
socket.onmessage = (event) => handleMessage(JSON.parse(event.data));
}
function handleMessage(message) {
const messagesList = document.getElementById('messages');
const li = document.createElement('li');
if (message.type === 'text_input') {
const response = prompt(message.content.prompt);
socket.send(response);
li.textContent = `${message.sender} -> ${message.recipient}: ${message.content.prompt}`;
} else {
li.textContent = `${message.sender} -> ${message.recipient}: ${message.content?.body || message?.type || JSON.stringify(message)}`;
}
messagesList.appendChild(li);
}
fetchWorkflows();
// Helper function for generating UUID
function generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
if (c === 'x') {
return (Math.random() * 16 | 0).toString(16);
} else {
return (Math.random() * 16 | 0 & 0x3 | 0x8).toString(16);
}
});
}
</script>
</body>
</html>
"""
@app.get("/")
async def get() -> HTMLResponse:
return HTMLResponse(html)
6. Nats server setup#
The NatsAdapter
requires a running NATS server. The easiest way to start the NATS server is by using Docker. FastAgency leverages the JetStream feature of NATS and also utilizes authentication.
websocket {
# listen: localhost:9222
port: 9222
no_tls: true
compress: true
}
jetstream {}
accounts {
AUTH {
jetstream: enabled
users: [
{ user: fastagency, password: $FASTAGENCY_NATS_PASSWORD }
]
}
APP {
jetstream: enabled
}
SYS {}
}
authorization {
auth_callout {
issuer: $NATS_PUB_NKEY
auth_users: [ fastagency ]
account: AUTH
}
}
system_account: SYS
In the above Nats configuration, we define a user called fastagency
, and its password is read from the environment variable FASTAGENCY_NATS_PASSWORD
. We also enable JetStream in Nats and configure Nats to serve via the appropriate ports.
Complete Application Code#
Please copy and paste the following code into the same folder, using the file names exactly as mentioned below.
nats_server.conf
websocket {
# listen: localhost:9222
port: 9222
no_tls: true
compress: true
}
jetstream {}
accounts {
AUTH {
jetstream: enabled
users: [
{ user: fastagency, password: $FASTAGENCY_NATS_PASSWORD }
]
}
APP {
jetstream: enabled
}
SYS {}
}
authorization {
auth_callout {
issuer: $NATS_PUB_NKEY
auth_users: [ fastagency ]
account: AUTH
}
}
system_account: SYS
workflow.py
import os
from typing import Any
from autogen.agentchat import ConversableAgent
from fastagency import UI
from fastagency.runtimes.autogen import AutoGenWorkflows
llm_config = {
"config_list": [
{
"model": "gpt-4o-mini",
"api_key": os.getenv("OPENAI_API_KEY"),
}
],
"temperature": 0.8,
}
wf = AutoGenWorkflows()
@wf.register(name="simple_learning", description="Student and teacher learning chat") # type: ignore[misc]
def simple_workflow(ui: UI, params: dict[str, Any]) -> str:
initial_message = ui.text_input(
sender="Workflow",
recipient="User",
prompt="I can help you learn about mathematics. What subject you would like to explore?",
)
student_agent = ConversableAgent(
name="Student_Agent",
system_message="You are a student willing to learn.",
llm_config=llm_config,
)
teacher_agent = ConversableAgent(
name="Teacher_Agent",
system_message="You are a math teacher.",
llm_config=llm_config,
)
chat_result = student_agent.initiate_chat(
teacher_agent,
message=initial_message,
summary_method="reflection_with_llm",
max_turns=3,
)
return str(chat_result.summary)
main_1_nats.py
import os
from typing import Any
from fastagency.adapters.nats import NatsAdapter
from fastapi import FastAPI
from ..workflow import wf
nats_url = os.environ.get("NATS_URL", "nats://localhost:4222")
user: str = os.environ.get("FASTAGENCY_NATS_USER", "fastagency")
password: str = os.environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")
adapter = NatsAdapter(provider=wf, nats_url=nats_url, user=user, password=password)
app = FastAPI(lifespan=adapter.lifespan)
# this is optional, but we would like to see the list of available workflows
@app.get("/")
def list_workflows() -> dict[str, Any]:
return {"Workflows": {name: wf.get_description(name) for name in wf.names}}
# start the adapter with the following command
# uvicorn my_fastagency_app.deployment.main_1_nats:app --reload
main_2_fastapi.py
from os import environ
from fastagency.adapters.fastapi import FastAPIAdapter
from fastagency.adapters.nats import NatsAdapter
from fastapi import FastAPI
nats_url = environ.get("NATS_URL", "nats://localhost:4222")
nats_user: str = "fastagency"
nats_password: str = environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")
provider = NatsAdapter.create_provider(
nats_url=nats_url, user=nats_user, password=nats_password
)
adapter = FastAPIAdapter(
provider=provider,
)
app = FastAPI()
app.include_router(adapter.router)
# this is optional, but we would like to see the list of available workflows
@app.get("/")
def read_root() -> dict[str, dict[str, str]]:
return {
"Workflows": {name: provider.get_description(name) for name in provider.names}
}
# start the provider with the following command
# uvicorn my_fastagency_app.deployment.main_2_fastapi:app --host 0.0.0.0 --port 8008 --reload
main_3_mesop.py
from fastagency.adapters.fastapi import FastAPIAdapter
from fastagency.app import FastAgency
from fastagency.ui.mesop import MesopUI
fastapi_url = "http://localhost:8008"
provider = FastAPIAdapter.create_provider(
fastapi_url=fastapi_url,
)
ui = MesopUI()
app = FastAgency(
provider=provider,
ui=ui,
title="My FastAgency App",
)
# start the provider with the following command
# gunicorn my_fastagency_app.deployment.main_3_mesop:app -b 0.0.0.0:8888 --reload
nats_server.conf
websocket {
# listen: localhost:9222
port: 9222
no_tls: true
compress: true
}
jetstream {}
accounts {
AUTH {
jetstream: enabled
users: [
{ user: fastagency, password: $FASTAGENCY_NATS_PASSWORD }
]
}
APP {
jetstream: enabled
}
SYS {}
}
authorization {
auth_callout {
issuer: $NATS_PUB_NKEY
auth_users: [ fastagency ]
account: AUTH
}
}
system_account: SYS
workflow.py
import os
from typing import Any
from autogen.agentchat import ConversableAgent
from fastagency import UI
from fastagency.runtimes.autogen import AutoGenWorkflows
llm_config = {
"config_list": [
{
"model": "gpt-4o-mini",
"api_key": os.getenv("OPENAI_API_KEY"),
}
],
"temperature": 0.8,
}
wf = AutoGenWorkflows()
@wf.register(name="simple_learning", description="Student and teacher learning chat") # type: ignore[misc]
def simple_workflow(ui: UI, params: dict[str, Any]) -> str:
initial_message = ui.text_input(
sender="Workflow",
recipient="User",
prompt="I can help you learn about mathematics. What subject you would like to explore?",
)
student_agent = ConversableAgent(
name="Student_Agent",
system_message="You are a student willing to learn.",
llm_config=llm_config,
)
teacher_agent = ConversableAgent(
name="Teacher_Agent",
system_message="You are a math teacher.",
llm_config=llm_config,
)
chat_result = student_agent.initiate_chat(
teacher_agent,
message=initial_message,
summary_method="reflection_with_llm",
max_turns=3,
)
return str(chat_result.summary)
main_1_nats.py
import os
from typing import Any
from fastagency.adapters.nats import NatsAdapter
from fastapi import FastAPI
from ..workflow import wf
nats_url = os.environ.get("NATS_URL", "nats://localhost:4222")
user: str = os.environ.get("FASTAGENCY_NATS_USER", "fastagency")
password: str = os.environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")
adapter = NatsAdapter(provider=wf, nats_url=nats_url, user=user, password=password)
app = FastAPI(lifespan=adapter.lifespan)
# this is optional, but we would like to see the list of available workflows
@app.get("/")
def list_workflows() -> dict[str, Any]:
return {"Workflows": {name: wf.get_description(name) for name in wf.names}}
# start the adapter with the following command
# uvicorn my_fastagency_app.deployment.main_1_nats:app --reload
main_2_fastapi_custom_client.py
from os import environ
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastagency.adapters.fastapi import FastAPIAdapter
from fastagency.adapters.nats import NatsAdapter
html = """
<!DOCTYPE html>
<html>
<head>
<title>FastAgency Chat App</title>
</head>
<body>
<h1>FastAgency Chat App</h1>
<div id="workflows"></div>
<ul id="messages"></ul>
<script>
const API_URL = 'http://127.0.0.1:8008/fastagency';
const WS_URL = 'ws://127.0.0.1:8008/fastagency/ws'; // nosemgrep
let socket;
async function fetchWorkflows() {
const response = await fetch(`${API_URL}/discovery`);
const workflows = await response.json();
const container = document.getElementById('workflows');
workflows.forEach(workflow => {
const button = document.createElement('button');
button.textContent = workflow.description;
button.onclick = () => startWorkflow(workflow.name);
container.appendChild(button);
});
}
async function startWorkflow(name) {
const payload = {
workflow_name: name,
workflow_uuid: generateUUID(),
user_id: null, // Set to null for single-user applications; otherwise, provide the appropriate user ID
params: {}
};
const response = await fetch(`${API_URL}/initiate_workflow`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
const workflowJson = await response.json();
connectWebSocket(workflowJson);
}
function connectWebSocket(workflowJson) {
socket = new WebSocket(WS_URL);
socket.onopen = () => {
const initMessage = {
name: workflowJson.name,
workflow_uuid: workflowJson.workflow_uuid,
user_id: workflowJson.user_id,
params: {}
};
socket.send(JSON.stringify(initMessage));
};
socket.onmessage = (event) => handleMessage(JSON.parse(event.data));
}
function handleMessage(message) {
const messagesList = document.getElementById('messages');
const li = document.createElement('li');
if (message.type === 'text_input') {
const response = prompt(message.content.prompt);
socket.send(response);
li.textContent = `${message.sender} -> ${message.recipient}: ${message.content.prompt}`;
} else {
li.textContent = `${message.sender} -> ${message.recipient}: ${message.content?.body || message?.type || JSON.stringify(message)}`;
}
messagesList.appendChild(li);
}
fetchWorkflows();
// Helper function for generating UUID
function generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
if (c === 'x') {
return (Math.random() * 16 | 0).toString(16);
} else {
return (Math.random() * 16 | 0 & 0x3 | 0x8).toString(16);
}
});
}
</script>
</body>
</html>
"""
nats_url = environ.get("NATS_URL", "nats://localhost:4222")
nats_user: str = "fastagency"
nats_password: str = environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password") # type: ignore[assignment]
provider = NatsAdapter.create_provider(
nats_url=nats_url, user=nats_user, password=nats_password
)
adapter = FastAPIAdapter(
provider=provider,
)
# app = FastAPI(lifespan=provider.lifespan)
app = FastAPI()
app.include_router(adapter.router)
@app.get("/")
async def get() -> HTMLResponse:
return HTMLResponse(html)
# start the provider with the following command
# uvicorn main_2_fastapi_custom_client:app --port 8008 --reload
Run Application#
Once everything is set up, you can run your FastAgency application using the following commands.
The NATS docker container is started automatically by Cookiecutter for this setup. In this setup, we need to run three commands in separate terminal windows:
- Start FastAPI application that provides a conversational workflow:
- Start FastAPI application integrated with a NATS messaging system:
- Start Mesop web interface using gunicorn:
First, install the package using package manager such as pip
and then run it. In this setup, we need to run four commands in separate terminal windows:
- Start NATS Docker container:
Terminal 1
- Start FastAPI application that provides a conversational workflow:
- Start FastAPI application integrated with a NATS messaging system:
- Start Mesop web interface using gunicorn:
- Start NATS Docker container:
Terminal 1
- Start FastAPI application that provides a conversational workflow:
- Start FastAPI application integrated with a NATS messaging system:
- Start Mesop web interface using waitress:
You need to run Three commands in separate terminal windows:
- Start Nats Docker container:
Terminal 1
- Start FastAPI application that provides a conversational workflow:
- Start FastAPI application integrated with a Nats messaging system:
Output#
The outputs will vary based on the interface. Here is the output of the last terminal starting the UI:
INFO: Will watch for changes in these directories: ['/tmp/custom_fastapi_demo']
INFO: Uvicorn running on http://0.0.0.0:8008 (Press CTRL+C to quit)
INFO: Started reloader process [73937] using StatReload
INFO: Started server process [73940]
INFO: Waiting for application startup.
INFO: Application startup complete.
The FastAPI + Nats Adapter in FastAgency provides a highly scalable and flexible solution for building distributed applications. By leveraging the power of FastAPI for building REST APIs and the Nats.io MQ for asynchronous communication, you can create robust and efficient workflows that can handle high user demand and complex production setups.