Skip to content

Securing the FastAPIAdapter#

When exposing your FastAgency workflows using the FastAPIAdapter, it's crucial to ensure the security of your API. Implementing proper security practices will protect your data, workflows, and client applications from unauthorized access, attacks, and data breaches.

This section will demonstrate how to secure your FastAPIAdapter.

Authentication#

Authentication is the process of verifying who a user is. For securing your FastAPIAdapter, we recommend using OAuth2 with password flow or API keys. In this section, we’ll demonstrate how to use the following security variations of OAuth2 with FastAPI:

Example: OAuth2 Password Flow#

Lets first take a look at the full code on how to add the OAuth2 security to your FastAPIAdapter, and then go through the code step by step.

Here’s the full code on how you can add OAuth2 password flow to your FastAPIAdapter:

main.py
from typing import Annotated, Any, Optional, Union

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

from fastagency.adapters.fastapi import FastAPIAdapter

from .workflows import wf

app = FastAPI(title="FastAPI with FastAgency")

################################################################################
#
# Taken from https://fastapi.tiangolo.com/tutorial/security/simple-oauth2/
#

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",  # pragma: allowlist secret
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",  # pragma: allowlist secret
        "disabled": True,
    },
}


def fake_hash_password(password: str) -> str:
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]:
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    return None


def fake_decode_token(token: str) -> Optional[UserInDB]:
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
) -> Optional[User]:
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
) -> Optional[User]:
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> dict[str, str]:
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}


#
# End of code from https://fastapi.tiangolo.com/tutorial/security/simple-oauth2/
#
################################################################################


def get_user_id(
    current_user: Annotated[User, Depends(get_current_active_user)],
) -> Optional[str]:
    return current_user.username


adapter = FastAPIAdapter(provider=wf, get_user_id=get_user_id)
app.include_router(adapter.router)


# this is optional, but we would like to see the list of available workflows
@app.get("/")
def read_root() -> dict[str, dict[str, str]]:
    return {"Workflows": {name: wf.get_description(name) for name in wf.names}}

main.py
from datetime import datetime, timedelta, timezone
from typing import Annotated, Any, Optional, Union

import jwt
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel

from fastagency.adapters.fastapi import FastAPIAdapter

from .workflows import wf

app = FastAPI(title="FastAPI with FastAgency")

################################################################################
#
# Taken from https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
#

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"  # pragma: allowlist secret
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # nosemgrep
        "disabled": False,
    }
}


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)  # type: ignore


def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]:
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    return None


def authenticate_user(
    fake_db: dict[str, Any], username: str, password: str
) -> Union[bool, UserInDB]:
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserInDB:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except InvalidTokenError:
        raise credentials_exception from None
    user = get_user(fake_users_db, username=token_data.username)  # type: ignore
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
) -> User:
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


def create_access_token(
    data: dict[str, Any], expires_delta: Union[timedelta, None] = None
) -> str:
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # nosemgrep
    return encoded_jwt


@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, # type: ignore
        expires_delta=access_token_expires,
    )
    return Token(access_token=access_token, token_type="bearer")


#
# End of code from https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
#
################################################################################


def get_user_id(
    current_user: Annotated[User, Depends(get_current_active_user)],
) -> Optional[str]:
    return current_user.username


adapter = FastAPIAdapter(provider=wf, get_user_id=get_user_id)
app.include_router(adapter.router)


# this is optional, but we would like to see the list of available workflows
@app.get("/")
def read_root() -> dict[str, dict[str, str]]:
    return {"Workflows": {name: wf.get_description(name) for name in wf.names}}

The main module imports the workflow from workflows.py, so make sure you have this file saved in the same directory as the main.py.

workflows.py
import os
from typing import Any

from autogen.agentchat import ConversableAgent

from fastagency import UI
from fastagency.runtimes.autogen import AutoGenWorkflows

__init__ = ["wf"]

llm_config = {
    "config_list": [
        {
            "model": "gpt-4o-mini",
            "api_key": os.getenv("OPENAI_API_KEY"),
        }
    ],
    "temperature": 0.8,
}

wf = AutoGenWorkflows()


@wf.register(name="simple_learning", description="Student and teacher learning chat")
def simple_workflow(ui: UI, params: dict[str, Any]) -> str:
    initial_message = ui.text_input(
        sender="Workflow",
        recipient="User",
        prompt="I can help you learn about mathematics. What subject you would like to explore?",
    )

    student_agent = ConversableAgent(
        name="Student_Agent",
        system_message="You are a student willing to learn.",
        llm_config=llm_config,
        # human_input_mode="ALWAYS",
    )
    teacher_agent = ConversableAgent(
        name="Teacher_Agent",
        system_message="You are a math teacher.",
        llm_config=llm_config,
        # human_input_mode="ALWAYS",
    )

    chat_result = student_agent.initiate_chat(
        teacher_agent,
        message=initial_message,
        summary_method="reflection_with_llm",
        max_turns=5,
    )

    return chat_result.summary  # type: ignore[no-any-return]

The project structure should look like this:

secured_fastapi_adapter
+-- __init__.py
+-- main.yml.py
+-- workflows.py

You can now start your secured adapter using the following command:

uvicorn secured_fastapi_adapter.main:app --host 0.0.0.0 --port 8008

Now, lets go through the steps on how to build the OAuth2 security and add it to your FastAPI provider.

Step 0: Install dependencies#

pip install "fastagency[autogen,mesop,fastapi,server]"

To use JWT tokens, you need to install PyJWT and passlib in addition to fastagency.

pip install "fastagency[autogen,mesop,fastapi,server]" PyJWT "passlib[bcrypt]"

This command installs FastAgency with support for both the Console and Mesop interfaces for AutoGen workflows, but with FastAPI both serving input requests and running workflows.

Alternatively, you can use Cookiecutter, which is the preferred method. Cookiecutter creates the project folder structure, default workflow, automatically installs all the necessary requirements, and creates a devcontainer that can be used with Visual Studio Code.

Step 1: Imports#

    from typing import Annotated, Any, Optional, Union

    from fastapi import Depends, FastAPI, HTTPException, status
    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    from pydantic import BaseModel

    from fastagency.adapters.fastapi import FastAPIAdapter

    from .workflows import wf
    from datetime import datetime, timedelta, timezone
    from typing import Annotated, Any, Optional, Union

    import jwt
    from fastapi import Depends, FastAPI, HTTPException, status
    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    from jwt.exceptions import InvalidTokenError
    from passlib.context import CryptContext
    from pydantic import BaseModel

    from fastagency.adapters.fastapi import FastAPIAdapter

    from .workflows import wf
  • wf: AutoGenWorkflows object that manages and registers workflows. Here, we have a workflow with the one simple_workflow registered under the name "simple_learning".

  • FastAPIAdapter: We'll attach the adapter to the FastAPI app. It exposes the workflows as REST APIs.

Step 2: Initial Setup#

We start by defining the FastAPI app.

    app = FastAPI(title="FastAPI with FastAgency")
    app = FastAPI(title="FastAPI with FastAgency")

Step 3: Mock Database Setup#

This step sets up a mock database with two users (johndoe and alice). This is a simplified user store for the example, where each user has attributes like username, email, and hashed_password.

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",  # pragma: allowlist secret
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",  # pragma: allowlist secret
        "disabled": True,
    },
}

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # nosemgrep
        "disabled": False,
    }
}
You can see in this example that the password is not stored as a plaintext but as a hash in the database, to find out more on how this hash is generated, and more details about securing your app with OAuth2 and hashed passwords, please visit OAuth2 using JWT tokens FastAPI tutorial.

  • fake_users_db: A dictionary that represents a mock database of users. Each user has attributes like hashed_password, disabled (for checking active users), and user_id (which we will use for authorization).

Step 4: OAuth2 Authentication Setup#

Here we configure OAuth2 with password flow and token-based authentication. This ensures that users authenticate with a username and password, and the app generates a token.

def fake_hash_password(password: str) -> str:
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

OAuth2PasswordBearer: This is used to handle token-based authentication. tokenUrl="token" indicates that users will obtain a token by calling the /token endpoint.

fake_hash_password: A simple mock function to simulate password hashing.

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

OAuth2PasswordBearer: This is used to handle token-based authentication. tokenUrl="token" indicates that users will obtain a token by calling the /token endpoint.

pwd_context: CryptContext which will be used for password hash verification.

Step 5: User Authentication Logic#

This step simulates user lookup, token decoding, and user validation. The token received in API requests is decoded to get the user, and the user’s information is validated for authorization.

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]:
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    return None


def fake_decode_token(token: str) -> Optional[UserInDB]:
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
) -> Optional[User]:
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
) -> Optional[User]:
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user
  • User and UserInDB: These Pydantic models represent user data. UserInDB extends User by adding a hashed_password field for password comparison.

  • get_user: This function retrieves user data from fake_users_db.

  • fake_decode_token: Simulates the process of decoding a token to extract user information (in this mock, the token is just the username).

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)  # type: ignore


def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]:
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    return None


def authenticate_user(
    fake_db: dict[str, Any], username: str, password: str
) -> Union[bool, UserInDB]:
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserInDB:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except InvalidTokenError:
        raise credentials_exception from None
    user = get_user(fake_users_db, username=token_data.username)  # type: ignore
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
) -> User:
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user
  • User and UserInDB: These Pydantic models represent user data. UserInDB extends User by adding a hashed_password field for password comparison.

  • get_user: This function retrieves user data from fake_users_db.

  • verify_password: Verifies the user paswword by comparing it to the password hash in the fake_users_db

Step 6: OAuth2 Token Login Endpoint#

This step defines the /token endpoint where users submit their username and password to receive an authentication token. The system checks if the password matches before generating the token.

@app.post("/token")
async def login(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> dict[str, str]:
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}
  • /token: This endpoint handles user login. If the username and password match, it returns a token in the form of the username. Otherwise, an error is returned.
class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


def create_access_token(
    data: dict[str, Any], expires_delta: Union[timedelta, None] = None
) -> str:
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # nosemgrep
    return encoded_jwt


@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, # type: ignore
        expires_delta=access_token_expires,
    )
    return Token(access_token=access_token, token_type="bearer")
  • create_access_token: Generates a JSON Web Token (JWT) that will be returned to the user after authenticating through the /token endpoint
  • /token: This endpoint handles user login. If the username and password match, it returns a token in the form of a JWT. Otherwise, an error is returned.

Step 7: Securing Routes with OAuth2#

Here we secure the FastAPI routes by requiring the user to pass an OAuth2 token to authenticate and authorize requests.

async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
) -> Optional[User]:
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
) -> Optional[User]:
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserInDB:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except InvalidTokenError:
        raise credentials_exception from None
    user = get_user(fake_users_db, username=token_data.username)  # type: ignore
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
) -> User:
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user
  • get_current_user: This function extracts the token from the request, decodes it, and returns the user object if valid. If the token is invalid, a 401 error is returned.

  • get_current_active_user: It checks whether the user account is active (disabled = False), raising an error if the user is inactive.

Step 8: Custom Client ID Generation#

This step extracts the user’s user_id from the current authenticated user and provides it to the adapter. This will allow syncing client information during workflow interactions.

def get_user_id(
    current_user: Annotated[User, Depends(get_current_active_user)],
) -> Optional[str]:
    return current_user.username
def get_user_id(
    current_user: Annotated[User, Depends(get_current_active_user)],
) -> Optional[str]:
    return current_user.username

get_user_id: Returns the user_id of the authenticated user, which is later used for authorization in the workflows.

Step 9: Connecting the Adapter and Securing It#

Finally, we connect the FastAPIAdapter to the FastAPI app. The get_user_id function is passed as a security dependency, ensuring that every internal API call is secured.

adapter = FastAPIAdapter(provider=wf, get_user_id=get_user_id)
app.include_router(adapter.router)
adapter = FastAPIAdapter(provider=wf, get_user_id=get_user_id)
app.include_router(adapter.router)

get_user_id: This function ensures that the user_id of the authenticated user is passed with every internal request, securing the workflows exposed by the adapter.

Step 10: Workflow Listing Endpoint#

We add an optional / endpoint to list the available workflows, which can be useful for understanding what workflows are currently available in the FastAPIAdapter.

@app.get("/")
def read_root() -> dict[str, dict[str, str]]:
    return {"Workflows": {name: wf.get_description(name) for name in wf.names}}
@app.get("/")
def read_root() -> dict[str, dict[str, str]]:
    return {"Workflows": {name: wf.get_description(name) for name in wf.names}}

/: Lists the available workflows and their descriptions for easier access and understanding of what the API offers.

Final Explanation#

In this setup:

  • Authentication is handled using OAuth2, with token generation via a /token endpoint.

  • User information is stored in a mock database, and token-based authentication ensures that only valid users can interact with the workflows.

  • The get_user_id function is the central security mechanism, ensuring that each user’s actions are tracked and authorized via their user_id.

  • By attaching the get_user_id function to the FastAPIAdapter, we effectively secure all the internal API calls exposed by the workflows, providing a scalable and consistent approach to authorization and client sync.

Implementing and connecting a simple client to secured FastAPIAdapter#

Now we will implement a client that can connect to your secured FastAPIAdapter. Here is the full code of the client which you can run using python simple_client.py in your console.

simple_client.py
import asyncio
import json
from typing import Any

import requests
import websockets
from asyncer import asyncify

from fastagency.messages import AskingMessage, IOMessage, WorkflowCompleted
from fastagency.ui.console import ConsoleUI

# API base URL
FASTAGENCY_URL = "http://localhost:8008"

# User credentials
CREDENTIALS = {
    "username": "johndoe",
    "password": "secret",  # pragma: allowlist secret
}


# Function to authenticate and get the OAuth token
def get_oauth_token() -> str:
    """Authenticate the user and return the access token."""
    response = requests.post(f"{FASTAGENCY_URL}/token", data=CREDENTIALS)
    response.raise_for_status()  # Ensure we handle errors
    return response.json().get("access_token")  # type: ignore


# Function to initiate the workflow
def initiate_workflow(token: str) -> dict[str, Any]:
    """Initiate the workflow and return the initial payload."""
    payload = {
        "workflow_name": "simple_learning",
        "workflow_uuid": "1234",  # You can generate this dynamically
        "user_id": None,
        "params": {"message": "Hello"},
    }

    headers = {"Authorization": f"Bearer {token}"}
    response = requests.post(
        f"{FASTAGENCY_URL}/fastagency/initiate_workflow", json=payload, headers=headers
    )
    response.raise_for_status()  # Ensure we handle errors
    return response.json()  # type: ignore


# Function to handle WebSocket communication
async def websocket_workflow(token: str, initial_payload: dict[str, Any]) -> None:
    """Establish a WebSocket connection and handle the workflow interaction."""
    websocket_url = f"ws{FASTAGENCY_URL[4:]}/fastagency/ws"
    ui = ConsoleUI()  # Initialize the UI for handling user interaction

    async with websockets.connect(
        websocket_url, extra_headers={"Authorization": f"Bearer {token}"}
    ) as websocket:
        # Send the initial payload to start the workflow
        await websocket.send(json.dumps(initial_payload))

        while True:
            # Receive messages from the WebSocket server
            response = await websocket.recv()
            message = IOMessage.create(**json.loads(response))

            # Process the received message and interact with the UI
            result = await asyncify(ui.process_message)(message)

            # Respond if the message requires further input
            if isinstance(message, AskingMessage) and result is not None:
                await websocket.send(result)
            elif isinstance(message, WorkflowCompleted):
                # Exit the loop when the workflow is completed
                break


# Main function to run the workflow
async def main() -> None:
    """Main function to orchestrate the workflow."""
    # Step 1: Authenticate to get the OAuth2 token
    token = get_oauth_token()

    # Step 2: Initiate the workflow and get the initial payload
    initial_payload = initiate_workflow(token)

    # Step 3: Handle WebSocket interaction
    await websocket_workflow(token, initial_payload)


if __name__ == "__main__":
    # Run the async main function
    asyncio.run(main())

Now let's do a step-by-step breakdown of the code, focusing on the OAuth2 token acquisition and WebSocket security. We'll explain each part with relevant code snippets and describe how the ConsoleUI is used for simplicity, but note that you can implement custom message processing logic.

Step 1: Authenticate and Get OAuth2 Token#

The first part of the process is obtaining an access token by authenticating with the FastAPI server using OAuth2.

    CREDENTIALS = {
        "username": "johndoe",
        "password": "secret",  # pragma: allowlist secret
    }


    # Function to authenticate and get the OAuth token
    def get_oauth_token() -> str:
        """Authenticate the user and return the access token."""
        response = requests.post(f"{FASTAGENCY_URL}/token", data=CREDENTIALS)
        response.raise_for_status()  # Ensure we handle errors
        return response.json().get("access_token")  # type: ignore
  • What happens:

    • The CREDENTIALS dictionary holds the username and password for a user (in this case, "johndoe" and "secret").
    • The get_oauth_token() function sends a POST request to the FastAPI authentication endpoint (/token) with these credentials.
    • If the authentication is successful, the FastAPI server returns a token as the access_token. This token will be used in the headers for subsequent requests.
  • Key Points:

    • This token secures the API calls and WebSocket connections by proving the user’s identity.
    • The raise_for_status() ensures any HTTP error (e.g., wrong credentials) will raise an exception, helping with debugging.

Step 2: Initiate the Workflow#

After obtaining the token, we initiate a workflow by sending a POST request to the server, passing the token in the headers for authorization.

    def initiate_workflow(token: str) -> dict[str, Any]:
        """Initiate the workflow and return the initial payload."""
        payload = {
            "workflow_name": "simple_learning",
            "workflow_uuid": "1234",  # You can generate this dynamically
            "user_id": None,
            "params": {"message": "Hello"},
        }

        headers = {"Authorization": f"Bearer {token}"}
        response = requests.post(
            f"{FASTAGENCY_URL}/fastagency/initiate_workflow", json=payload, headers=headers
        )
        response.raise_for_status()  # Ensure we handle errors
        return response.json()  # type: ignore
  • What happens:

    • This function constructs a payload that includes the workflow name, a UUID (in this case, hardcoded to "1234"), and some parameters (a message saying "Hello").
    • It sends a POST request to the /fastagency/initiate_workflow endpoint, which triggers the desired workflow on the server.
    • The OAuth2 token is added in the headers for authorization: "Authorization": f"Bearer {token}".
  • Key Points:

    • The workflow can have dynamic parameters like user ID and message. You can modify this payload to fit your specific workflow.
    • The server returns an initial payload that will be used to start communication over WebSockets.

Step 3: Establish WebSocket Connection#

Now, we use the WebSocket protocol to establish a real-time connection with the FastAPI server and handle the workflow’s communication.

        websocket_url = f"ws{FASTAGENCY_URL[4:]}/fastagency/ws"
        ui = ConsoleUI()  # Initialize the UI for handling user interaction

        async with websockets.connect(
            websocket_url, extra_headers={"Authorization": f"Bearer {token}"}
        ) as websocket:
            # Send the initial payload to start the workflow
            await websocket.send(json.dumps(initial_payload))
  • What happens:

    • The WebSocket connection is made to the FastAPI WebSocket endpoint (/fastagency/ws).
    • The Authorization header with the Bearer token ensures that the WebSocket connection is secured and authenticated.
    • The initial_payload obtained from the previous step is sent to the WebSocket server to start the workflow.
  • Key Points:

    • The WebSocket connection is protected by the OAuth2 token, just like the HTTP requests.
    • The ConsoleUI is used here to keep things simple, but this UI can be replaced with a custom implementation for processing incoming messages from the server.

Step 4: Handling WebSocket Messages#

Once the WebSocket connection is established, the client listens for messages from the server, processes them, and sends appropriate responses if required.

            while True:
                # Receive messages from the WebSocket server
                response = await websocket.recv()
                message = IOMessage.create(**json.loads(response))

                # Process the received message and interact with the UI
                result = await asyncify(ui.process_message)(message)

                # Respond if the message requires further input
                if isinstance(message, AskingMessage) and result is not None:
                    await websocket.send(result)
                elif isinstance(message, WorkflowCompleted):
                    # Exit the loop when the workflow is completed
                    break
  • What happens:

    • The client continuously listens for messages using await websocket.recv().
    • The received messages are processed by the ConsoleUI (or custom message handling logic).
    • If the server sends an AskingMessage (a message requesting input), the client responds appropriately.
    • When the workflow is completed (WorkflowCompleted), the loop breaks, and the connection is closed.
  • Key Points:

    • The ConsoleUI simplifies the example by handling basic message processing. It can be replaced with custom UI logic for more complex workflows.
    • The asyncify decorator is used to make the ui.process_message function non-blocking, allowing the program to handle other asynchronous tasks while waiting for user input.

Step 5: Running the Workflow#

Finally, the entire process is orchestrated in the main() function, which ties everything together.

    async def main() -> None:
        """Main function to orchestrate the workflow."""
        # Step 1: Authenticate to get the OAuth2 token
        token = get_oauth_token()

        # Step 2: Initiate the workflow and get the initial payload
        initial_payload = initiate_workflow(token)

        # Step 3: Handle WebSocket interaction
        await websocket_workflow(token, initial_payload)


    if __name__ == "__main__":
        # Run the async main function
        asyncio.run(main())
  • What happens:
    • The main() function first authenticates the user to get the OAuth2 token.
    • It then initiates the workflow by sending the appropriate request and gets the initial WebSocket payload.
    • Finally, it handles the WebSocket communication, processing messages and interacting with the workflow in real-time.

Summary#

  • Token Acquisition: OAuth2 token is obtained via the /token endpoint, securing both API and WebSocket interactions.
  • WebSocket Security: WebSocket communication is secured by passing the OAuth2 token in the Authorization header.
  • Message Processing: The ConsoleUI is used for simplicity, but you can implement your own message-handling logic to interact with the workflow.

By using OAuth2 for authentication and WebSockets for real-time communication, this example demonstrates how you can build a secure, interactive client to communicate with a FastAPI-based workflow system.