Skip to content

Nats.io#

The NatsAdapter in FastAgency enables seamless integration of your workflows with the Nats.io MQ message broker, providing a scalable and flexible solution for building distributed applications.

Use Cases#

When to Use the NatsAdapter:

  • High User Demand: If you need to scale beyond what multiple workers of the FastAPIAdapter can achieve, you can use Nats.io with a message queue and multiple workers to consume and produce messages. This distributed message-queue architecture allows scaling not only across multiple workers but also across multiple machines and clusters.

  • Observability: If you need the ability to audit workflow executions both in realtime and retrospectively, the NatsAdapter provides the necessary infrastructure to enable this feature.

Architecture Overview#

The following diagram illustrates the high-level architecture of an application using the NatsAdapter with MesopUI as its frontend:

Mesop Nats

The system consists of two main components:

Mesop Client App#

This application serves as the frontend interface for the system. It includes:

  • MesopUI: A friendly web interface for users to interact with the workflows. It facilitates the communication with the user and the NatsProvider.
  • Nats Provider: The Nats.io MQ message broker responsible for handling message communication between different parts of the system.

This application handles all client interactions and presents the results back to the user.

Nats App#

This application forms the backend of the system and consists of:

  • AutoGen Workflows: The workflows defined using the AutoGen framework. They are executed by the workers in the Nats Adapter.
  • NatsAdapter: Communicates with AutoGen, and makes the workflow messages available on corresponding Nats topics.

Now, it's time to see the Nats Adapter using FastAgency in action. Let's dive into an example and learn how to use it!

Installation#

We strongly recommend using Cookiecutter for setting up the project. Cookiecutter creates the project folder structure, default workflow, automatically installs all the necessary requirements, and creates a devcontainer that can be used with Visual Studio Code.

You can setup the project using Cookiecutter by following the project setup guide.

Alternatively, you can use pip + venv. Before getting started, ensure that FastAgency is installed with support for the AutoGen runtime, along with the mesop, fastapi, server and nats submodules by running the following command:

pip install "fastagency[autogen,mesop,fastapi,server,nats]"

This command installs FastAgency with support for both the mesop and console interfaces for AutoGen workflows and the NatsAdapter for workflow execution.

Example: Student and Teacher Learning Chat#

In this example, we'll create a simple learning chatbot where a student agent asks questions and a teacher agent responds, simulating a learning environment. We'll use MesopUI for the web interface and the NatsAdapter for workflow execution.

Step-by-Step Breakdown#

1. Define Workflow#

Define the workflow that your application will use. This is where you specify how the agents interact and what they do. Here's an example workflow definition:

import os
from typing import Any

from autogen.agentchat import ConversableAgent
from fastagency import UI
from fastagency.runtimes.autogen import AutoGenWorkflows

llm_config = {
    "config_list": [
        {
            "model": "gpt-4o-mini",
            "api_key": os.getenv("OPENAI_API_KEY"),
        }
    ],
    "temperature": 0.8,
}

wf = AutoGenWorkflows()


@wf.register(name="simple_learning", description="Student and teacher learning chat")  # type: ignore[misc]
def simple_workflow(ui: UI, params: dict[str, Any]) -> str:
    initial_message = ui.text_input(
        sender="Workflow",
        recipient="User",
        prompt="I can help you learn about mathematics. What subject you would like to explore?",
    )

    student_agent = ConversableAgent(
        name="Student_Agent",
        system_message="You are a student willing to learn.",
        llm_config=llm_config,
    )
    teacher_agent = ConversableAgent(
        name="Teacher_Agent",
        system_message="You are a math teacher.",
        llm_config=llm_config,
    )

    chat_result = student_agent.initiate_chat(
        teacher_agent,
        message=initial_message,
        summary_method="reflection_with_llm",
        max_turns=3,
    )

    return str(chat_result.summary)

2. Import Required Modules#

Next, import the required modules from the FastAgency. These imports provide the essential building blocks for integrating MesopUI. Additionally, import the NatsAdapter class for workflow execution.

import os
from typing import Any

from fastagency.adapters.nats import NatsAdapter
from fastapi import FastAPI

from ..workflow import wf

3. Configure the Nats Adapter#

Create an instance of the NatsAdapter and pass your workflow to it. The adapter will handle the communication with the NatsProvider and distribute workflow execution to the workers.

nats_url = os.environ.get("NATS_URL", "nats://localhost:4222")
user: str = os.environ.get("FASTAGENCY_NATS_USER", "fastagency")
password: str = os.environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")

adapter = NatsAdapter(provider=wf, nats_url=nats_url, user=user, password=password)

4. Define FastAgency Application#

Create a NatsAdapter and then add it to a FastAPI application using the lifespan parameter. The adapter will have all REST API and WebSocket routes for communicating with a client.

app = FastAPI(lifespan=adapter.lifespan)

5. Nats server setup#

The NatsAdapter requires a running NATS server. The easiest way to start the NATS server is by using Docker. FastAgency leverages the JetStream feature of NATS and also utilizes authentication.

websocket {
    # listen: localhost:9222
    port: 9222
    no_tls: true
    compress: true
}

jetstream {}

accounts {
  AUTH {
    jetstream: enabled
    users: [
      { user: fastagency, password: $FASTAGENCY_NATS_PASSWORD }
    ]
  }
  APP {
    jetstream: enabled
  }
  SYS {}
}

authorization {
  auth_callout {
    issuer: $NATS_PUB_NKEY
    auth_users: [ fastagency ]
    account: AUTH
  }
}

system_account: SYS

In the above Nats configuration, we define a user called fastagency, and its password is read from the environment variable FASTAGENCY_NATS_PASSWORD. We also enable JetStream in Nats and configure Nats to serve via the appropriate ports.

Complete Application Code#

Please copy and paste the following code into the same folder, using the file names exactly as mentioned below.

nats_server.conf
websocket {
    # listen: localhost:9222
    port: 9222
    no_tls: true
    compress: true
}

jetstream {}

accounts {
  AUTH {
    jetstream: enabled
    users: [
      { user: fastagency, password: $FASTAGENCY_NATS_PASSWORD }
    ]
  }
  APP {
    jetstream: enabled
  }
  SYS {}
}

authorization {
  auth_callout {
    issuer: $NATS_PUB_NKEY
    auth_users: [ fastagency ]
    account: AUTH
  }
}

system_account: SYS
workflow.py
import os
from typing import Any

from autogen.agentchat import ConversableAgent
from fastagency import UI
from fastagency.runtimes.autogen import AutoGenWorkflows

llm_config = {
    "config_list": [
        {
            "model": "gpt-4o-mini",
            "api_key": os.getenv("OPENAI_API_KEY"),
        }
    ],
    "temperature": 0.8,
}

wf = AutoGenWorkflows()


@wf.register(name="simple_learning", description="Student and teacher learning chat")  # type: ignore[misc]
def simple_workflow(ui: UI, params: dict[str, Any]) -> str:
    initial_message = ui.text_input(
        sender="Workflow",
        recipient="User",
        prompt="I can help you learn about mathematics. What subject you would like to explore?",
    )

    student_agent = ConversableAgent(
        name="Student_Agent",
        system_message="You are a student willing to learn.",
        llm_config=llm_config,
    )
    teacher_agent = ConversableAgent(
        name="Teacher_Agent",
        system_message="You are a math teacher.",
        llm_config=llm_config,
    )

    chat_result = student_agent.initiate_chat(
        teacher_agent,
        message=initial_message,
        summary_method="reflection_with_llm",
        max_turns=3,
    )

    return str(chat_result.summary)
main_1_nats.py
import os
from typing import Any

from fastagency.adapters.nats import NatsAdapter
from fastapi import FastAPI

from ..workflow import wf

nats_url = os.environ.get("NATS_URL", "nats://localhost:4222")
user: str = os.environ.get("FASTAGENCY_NATS_USER", "fastagency")
password: str = os.environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")

adapter = NatsAdapter(provider=wf, nats_url=nats_url, user=user, password=password)

app = FastAPI(lifespan=adapter.lifespan)


# this is optional, but we would like to see the list of available workflows
@app.get("/")
def list_workflows() -> dict[str, Any]:
    return {"Workflows": {name: wf.get_description(name) for name in wf.names}}


# start the adapter with the following command
# uvicorn my_fastagency_app.deployment.main_1_nats:app --reload
main_2_mesop.py
from os import environ

from fastagency.adapters.nats import NatsAdapter
from fastagency.app import FastAgency
from fastagency.ui.mesop.mesop import MesopUI

nats_url = environ.get("NATS_URL", "nats://localhost:4222")
nats_user: str = "fastagency"
nats_password: str = environ.get("FASTAGENCY_NATS_PASSWORD", "fastagency_nats_password")  # type: ignore[assignment]

provider = NatsAdapter.create_provider(
    nats_url=nats_url, user=nats_user, password=nats_password
)

ui = MesopUI()

app = FastAgency(provider=provider, ui=ui)

# start the provider with the following command
# gunicorn main_2_mesop:app -b 0.0.0.0:8888 --reload

Run Application#

The NATS docker container is started automatically by Cookiecutter for this setup. In this setup, we need to run two commands in separate terminal windows:

  • Start FastAPI application that provides a conversational workflow:

Terminal 1

uvicorn main_1_nats:app --reload
  • Start Mesop web interface using gunicorn:

Terminal 2

gunicorn main_2_mesop:app -b 0.0.0.0:8888 --reload

First, install the package using package manager such as pip and then run it. In this setup, we need to run three commands in separate terminal windows:

  • Start NATS Docker container:

Terminal 1

docker run -d --name nats-fastagency --rm -p 4222:4222 -p 9222:9222 -p 8222:8222 -v $(pwd)/nats_server.conf:/etc/nats/nats_server.conf -e FASTAGENCY_NATS_PASSWORD='fastagency_nats_password' nats:latest -c /etc/nats/nats_server.conf
  • Start FastAPI application that provides a conversational workflow:

Terminal 2

pip install uvicorn
uvicorn main_1_nats:app --reload
  • Start Mesop web interface using gunicorn:

Terminal 3

pip install gunicorn
gunicorn main_2_mesop:app -b 0.0.0.0:8888 --reload
  • Start NATS Docker container:

Terminal 1

docker run -d --name nats-fastagency --rm -p 4222:4222 -p 9222:9222 -p 8222:8222 -v $(pwd)/nats_server.conf:/etc/nats/nats_server.conf -e FASTAGENCY_NATS_PASSWORD='fastagency_nats_password' nats:latest -c /etc/nats/nats_server.conf
  • Start FastAPI application that provides a conversational workflow:

Terminal 2

pip install uvicorn
uvicorn main_1_nats:app --reload
  • Start Mesop web interface using waitress:

Terminal 3

pip install waitress
waitress-serve --listen=0.0.0.0:8888 main_2_mesop:app

Output#

The outputs will vary based on the interface. Here is the output of the last terminal starting the UI:

[2024-10-10 13:19:18 +0530] [23635] [INFO] Starting gunicorn 23.0.0
[2024-10-10 13:19:18 +0530] [23635] [INFO] Listening at: http://127.0.0.1:8888 (23635)
[2024-10-10 13:19:18 +0530] [23635] [INFO] Using worker: sync
[2024-10-10 13:19:18 +0530] [23645] [INFO] Booting worker with pid: 23645

Initial message

The NatsAdapter in FastAgency provides a powerful and flexible way to integrate your workflows with the Nats.io message broker. By leveraging the scalability and distributed architecture of Nats, you can build highly scalable and production-ready applications. With its easy-to-use API and seamless integration with the MesopUI, the NatsAdapter simplifies the development process while enabling advanced features like conversation auditing.