Nats.io#
The NatsAdapter
in FastAgency enables seamless integration of your workflows with the Nats.io MQ message broker, providing a scalable and flexible solution for building distributed applications.
Use Cases#
When to Use the NatsAdapter
:
-
High User Demand: If you need to scale beyond what multiple workers of the FastAPIAdapter can achieve, you can use Nats.io with a message queue and multiple workers to consume and produce messages. This distributed message-queue architecture allows scaling not only across multiple workers but also across multiple machines and clusters.
-
Observability: If you need the ability to audit workflow executions both in realtime and retrospectively, the
NatsAdapter
provides the necessary infrastructure to enable this feature.
Architecture Overview#
The following diagram illustrates the high-level architecture of an application using the NatsAdapter
with MesopUI
as its frontend:
The system consists of two main components:
Mesop Client App#
This application serves as the frontend interface for the system. It includes:
MesopUI
: A friendly web interface for users to interact with the workflows. It facilitates the communication with the user and theNatsProvider
.- Nats Provider: The Nats.io MQ message broker responsible for handling message communication between different parts of the system.
This application handles all client interactions and presents the results back to the user.
Nats App#
This application forms the backend of the system and consists of:
- AutoGen Workflows: The workflows defined using the AutoGen framework. They are executed by the workers in the Nats Adapter.
NatsAdapter
: Communicates with AutoGen, and makes the workflow messages available on corresponding Nats topics.
Now, it's time to see the Nats Adapter using FastAgency in action. Let's dive into an example and learn how to use it!
Installation#
We strongly recommend using Cookiecutter for setting up the project. Cookiecutter creates the project folder structure, default workflow, automatically installs all the necessary requirements, and creates a devcontainer that can be used with Visual Studio Code.
You can setup the project using Cookiecutter by following the project setup guide.
Alternatively, you can use pip + venv. Before getting started, ensure that FastAgency is installed with support for the AutoGen runtime, along with the mesop, fastapi, server and nats submodules by running the following command:
This command installs FastAgency with support for both the mesop and console interfaces for AutoGen workflows and the NatsAdapter
for workflow execution.
Example: Student and Teacher Learning Chat#
In this example, we'll create a simple learning chatbot where a student agent asks questions and a teacher agent responds, simulating a learning environment. We'll use MesopUI
for the web interface and the NatsAdapter
for workflow execution.
Step-by-Step Breakdown#
1. Define Workflow#
Define the workflow that your application will use. This is where you specify how the agents interact and what they do. Here's an example workflow definition:
import os
from typing import Any
from autogen.agentchat import ConversableAgent
from fastagency import UI
from fastagency.runtimes.autogen import AutoGenWorkflows
llm_config = {
"config_list": [
{
"model": "gpt-4o-mini",
"api_key": os.getenv("OPENAI_API_KEY"),
}
],
"temperature": 0.8,
}
wf = AutoGenWorkflows()
@wf.register(name="simple_learning", description="Student and teacher learning chat") # type: ignore[misc]
def simple_workflow(ui: UI, params: dict[str, Any]) -> str:
initial_message = ui.text_input(
sender="Workflow",
recipient="User",
prompt="I can help you learn about mathematics. What subject you would like to explore?",
)
student_agent = ConversableAgent(
name="Student_Agent",
system_message="You are a student willing to learn.",
llm_config=llm_config,
)
teacher_agent = ConversableAgent(
name="Teacher_Agent",
system_message="You are a math teacher.",
llm_config=llm_config,
)
chat_result = student_agent.initiate_chat(
teacher_agent,
message=initial_message,
summary_method="reflection_with_llm",
max_turns=3,
)
return str(chat_result.summary)
2. Import Required Modules#
Next, import the required modules from the FastAgency. These imports provide the essential building blocks for integrating MesopUI
. Additionally, import the NatsAdapter
class for workflow execution.
import os
from typing import Any
from fastagency.adapters.nats import NatsAdapter
from fastapi import FastAPI
from ..workflow import wf
3. Configure the Nats Adapter#
Create an instance of the NatsAdapter
and pass your workflow to it. The adapter will handle the communication with the NatsProvider
and distribute workflow execution to the workers.
nats_url = os.environ.get("NATS_URL", "nats://localhost:4222")
user: str = os.environ.get("FASTAGENCY_NATS_USER", "fastagency")
password: str = os.environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")
adapter = NatsAdapter(provider=wf, nats_url=nats_url, user=user, password=password)
4. Define FastAgency Application#
Create a NatsAdapter
and then add it to a FastAPI application using the lifespan parameter. The adapter will have all REST API and WebSocket routes for communicating with a client.
5. Nats server setup#
The NatsAdapter
requires a running NATS server. The easiest way to start the NATS server is by using Docker. FastAgency leverages the JetStream feature of NATS and also utilizes authentication.
websocket {
# listen: localhost:9222
port: 9222
no_tls: true
compress: true
}
jetstream {}
accounts {
AUTH {
jetstream: enabled
users: [
{ user: fastagency, password: $FASTAGENCY_NATS_PASSWORD }
]
}
APP {
jetstream: enabled
}
SYS {}
}
authorization {
auth_callout {
issuer: $NATS_PUB_NKEY
auth_users: [ fastagency ]
account: AUTH
}
}
system_account: SYS
In the above Nats configuration, we define a user called fastagency
, and its password is read from the environment variable FASTAGENCY_NATS_PASSWORD
. We also enable JetStream in Nats and configure Nats to serve via the appropriate ports.
Complete Application Code#
Please copy and paste the following code into the same folder, using the file names exactly as mentioned below.
nats_server.conf
websocket {
# listen: localhost:9222
port: 9222
no_tls: true
compress: true
}
jetstream {}
accounts {
AUTH {
jetstream: enabled
users: [
{ user: fastagency, password: $FASTAGENCY_NATS_PASSWORD }
]
}
APP {
jetstream: enabled
}
SYS {}
}
authorization {
auth_callout {
issuer: $NATS_PUB_NKEY
auth_users: [ fastagency ]
account: AUTH
}
}
system_account: SYS
workflow.py
import os
from typing import Any
from autogen.agentchat import ConversableAgent
from fastagency import UI
from fastagency.runtimes.autogen import AutoGenWorkflows
llm_config = {
"config_list": [
{
"model": "gpt-4o-mini",
"api_key": os.getenv("OPENAI_API_KEY"),
}
],
"temperature": 0.8,
}
wf = AutoGenWorkflows()
@wf.register(name="simple_learning", description="Student and teacher learning chat") # type: ignore[misc]
def simple_workflow(ui: UI, params: dict[str, Any]) -> str:
initial_message = ui.text_input(
sender="Workflow",
recipient="User",
prompt="I can help you learn about mathematics. What subject you would like to explore?",
)
student_agent = ConversableAgent(
name="Student_Agent",
system_message="You are a student willing to learn.",
llm_config=llm_config,
)
teacher_agent = ConversableAgent(
name="Teacher_Agent",
system_message="You are a math teacher.",
llm_config=llm_config,
)
chat_result = student_agent.initiate_chat(
teacher_agent,
message=initial_message,
summary_method="reflection_with_llm",
max_turns=3,
)
return str(chat_result.summary)
main_1_nats.py
import os
from typing import Any
from fastagency.adapters.nats import NatsAdapter
from fastapi import FastAPI
from ..workflow import wf
nats_url = os.environ.get("NATS_URL", "nats://localhost:4222")
user: str = os.environ.get("FASTAGENCY_NATS_USER", "fastagency")
password: str = os.environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")
adapter = NatsAdapter(provider=wf, nats_url=nats_url, user=user, password=password)
app = FastAPI(lifespan=adapter.lifespan)
# this is optional, but we would like to see the list of available workflows
@app.get("/")
def list_workflows() -> dict[str, Any]:
return {"Workflows": {name: wf.get_description(name) for name in wf.names}}
# start the adapter with the following command
# uvicorn my_fastagency_app.deployment.main_1_nats:app --reload
main_2_mesop.py
from os import environ
from fastagency.adapters.nats import NatsAdapter
from fastagency.app import FastAgency
from fastagency.ui.mesop.mesop import MesopUI
nats_url = environ.get("NATS_URL", "nats://localhost:4222")
nats_user: str = "fastagency"
nats_password: str = environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password") # type: ignore[assignment]
provider = NatsAdapter.create_provider(
nats_url=nats_url, user=nats_user, password=nats_password
)
ui = MesopUI()
app = FastAgency(provider=provider, ui=ui)
# start the provider with the following command
# gunicorn main_2_mesop:app -b 0.0.0.0:8888 --reload
Run Application#
The NATS docker container is started automatically by Cookiecutter for this setup. In this setup, we need to run two commands in separate terminal windows:
- Start FastAPI application that provides a conversational workflow:
- Start Mesop web interface using gunicorn:
First, install the package using package manager such as pip
and then run it. In this setup, we need to run three commands in separate terminal windows:
- Start NATS Docker container:
Terminal 1
- Start FastAPI application that provides a conversational workflow:
- Start Mesop web interface using gunicorn:
- Start NATS Docker container:
Terminal 1
- Start FastAPI application that provides a conversational workflow:
- Start Mesop web interface using waitress:
Output#
The outputs will vary based on the interface. Here is the output of the last terminal starting the UI:
[2024-10-10 13:19:18 +0530] [23635] [INFO] Starting gunicorn 23.0.0
[2024-10-10 13:19:18 +0530] [23635] [INFO] Listening at: http://127.0.0.1:8888 (23635)
[2024-10-10 13:19:18 +0530] [23635] [INFO] Using worker: sync
[2024-10-10 13:19:18 +0530] [23645] [INFO] Booting worker with pid: 23645
The NatsAdapter
in FastAgency provides a powerful and flexible way to integrate your workflows with the Nats.io message broker. By leveraging the scalability and distributed architecture of Nats, you can build highly scalable and production-ready applications. With its easy-to-use API and seamless integration with the MesopUI
, the NatsAdapter
simplifies the development process while enabling advanced features like conversation auditing.