Securing the FastAPIAdapter#
When exposing your FastAgency workflows using the FastAPIAdapter
, it's crucial to ensure the security of your API. Implementing proper security practices will protect your data, workflows, and client applications from unauthorized access, attacks, and data breaches.
This section will demonstrate how to secure your FastAPIAdapter
.
Authentication#
Authentication is the process of verifying who a user is. For securing your FastAPIAdapter
, we recommend using OAuth2 with password flow or API keys. In this section, we’ll demonstrate how to use the following security variations of OAuth2 with FastAPI:
Example: OAuth2 Password Flow#
Lets first take a look at the full code on how to add the OAuth2 security to your FastAPIAdapter
, and then go through the code step by step.
Here’s the full code on how you can add OAuth2 password flow to your FastAPIAdapter
:
main.py
from typing import Annotated, Any, Optional, Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from fastagency.adapters.fastapi import FastAPIAdapter
from .workflows import wf
app = FastAPI(title="FastAPI with FastAgency")
################################################################################
#
# Taken from https://fastapi.tiangolo.com/tutorial/security/simple-oauth2/
#
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret", # pragma: allowlist secret
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2", # pragma: allowlist secret
"disabled": True,
},
}
def fake_hash_password(password: str) -> str:
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]:
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None
def fake_decode_token(token: str) -> Optional[UserInDB]:
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
) -> Optional[User]:
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
) -> Optional[User]:
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> dict[str, str]:
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
#
# End of code from https://fastapi.tiangolo.com/tutorial/security/simple-oauth2/
#
################################################################################
def get_user_id(
current_user: Annotated[User, Depends(get_current_active_user)],
) -> Optional[str]:
return current_user.username
adapter = FastAPIAdapter(provider=wf, get_user_id=get_user_id)
app.include_router(adapter.router)
# this is optional, but we would like to see the list of available workflows
@app.get("/")
def read_root() -> dict[str, dict[str, str]]:
return {"Workflows": {name: wf.get_description(name) for name in wf.names}}
main.py
from datetime import datetime, timedelta, timezone
from typing import Annotated, Any, Optional, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel
from fastagency.adapters.fastapi import FastAPIAdapter
from .workflows import wf
app = FastAPI(title="FastAPI with FastAgency")
################################################################################
#
# Taken from https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
#
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # pragma: allowlist secret
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # nosemgrep
"disabled": False,
}
}
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password) # type: ignore
def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]:
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None
def authenticate_user(
fake_db: dict[str, Any], username: str, password: str
) -> Union[bool, UserInDB]:
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserInDB:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception from None
user = get_user(fake_users_db, username=token_data.username) # type: ignore
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
def create_access_token(
data: dict[str, Any], expires_delta: Union[timedelta, None] = None
) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # nosemgrep
return encoded_jwt
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, # type: ignore
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
#
# End of code from https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
#
################################################################################
def get_user_id(
current_user: Annotated[User, Depends(get_current_active_user)],
) -> Optional[str]:
return current_user.username
adapter = FastAPIAdapter(provider=wf, get_user_id=get_user_id)
app.include_router(adapter.router)
# this is optional, but we would like to see the list of available workflows
@app.get("/")
def read_root() -> dict[str, dict[str, str]]:
return {"Workflows": {name: wf.get_description(name) for name in wf.names}}
The main module imports the workflow from workflows.py
, so make sure you have this file saved in the same directory as the main.py
.
workflows.py
import os
from typing import Any
from autogen.agentchat import ConversableAgent
from fastagency import UI
from fastagency.runtimes.autogen import AutoGenWorkflows
__init__ = ["wf"]
llm_config = {
"config_list": [
{
"model": "gpt-4o-mini",
"api_key": os.getenv("OPENAI_API_KEY"),
}
],
"temperature": 0.8,
}
wf = AutoGenWorkflows()
@wf.register(name="simple_learning", description="Student and teacher learning chat")
def simple_workflow(ui: UI, params: dict[str, Any]) -> str:
initial_message = ui.text_input(
sender="Workflow",
recipient="User",
prompt="I can help you learn about mathematics. What subject you would like to explore?",
)
student_agent = ConversableAgent(
name="Student_Agent",
system_message="You are a student willing to learn.",
llm_config=llm_config,
# human_input_mode="ALWAYS",
)
teacher_agent = ConversableAgent(
name="Teacher_Agent",
system_message="You are a math teacher.",
llm_config=llm_config,
# human_input_mode="ALWAYS",
)
chat_result = student_agent.initiate_chat(
teacher_agent,
message=initial_message,
summary_method="reflection_with_llm",
max_turns=5,
)
return chat_result.summary # type: ignore[no-any-return]
The project structure should look like this:
You can now start your secured adapter using the following command:
Now, lets go through the steps on how to build the OAuth2 security and add it to your FastAPI provider.
Step 0: Install dependencies#
This command installs FastAgency with support for both the Console and Mesop interfaces for AutoGen workflows, but with FastAPI both serving input requests and running workflows.
Alternatively, you can use Cookiecutter, which is the preferred method. Cookiecutter creates the project folder structure, default workflow, automatically installs all the necessary requirements, and creates a devcontainer that can be used with Visual Studio Code.
Step 1: Imports#
from datetime import datetime, timedelta, timezone
from typing import Annotated, Any, Optional, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel
from fastagency.adapters.fastapi import FastAPIAdapter
from .workflows import wf
-
wf
:AutoGenWorkflows
object that manages and registers workflows. Here, we have a workflow with the one simple_workflow registered under the name "simple_learning". -
FastAPIAdapter
: We'll attach the adapter to the FastAPI app. It exposes the workflows as REST APIs.
Step 2: Initial Setup#
We start by defining the FastAPI
app.
Step 3: Mock Database Setup#
This step sets up a mock database with two users (johndoe and alice). This is a simplified user store for the example, where each user has attributes like username, email, and hashed_password.
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret", # pragma: allowlist secret
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2", # pragma: allowlist secret
"disabled": True,
},
}
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # nosemgrep
"disabled": False,
}
}
fake_users_db
: A dictionary that represents a mock database of users. Each user has attributes likehashed_password
,disabled
(for checking active users), anduser_id
(which we will use for authorization).
Step 4: OAuth2 Authentication Setup#
Here we configure OAuth2 with password flow and token-based authentication. This ensures that users authenticate with a username and password, and the app generates a token.
def fake_hash_password(password: str) -> str:
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
OAuth2PasswordBearer
: This is used to handle token-based authentication. tokenUrl="token"
indicates that users will obtain a token by calling the /token
endpoint.
fake_hash_password
: A simple mock function to simulate password hashing.
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
OAuth2PasswordBearer
: This is used to handle token-based authentication. tokenUrl="token"
indicates that users will obtain a token by calling the /token
endpoint.
pwd_context
: CryptContext
which will be used for password hash verification.
Step 5: User Authentication Logic#
This step simulates user lookup, token decoding, and user validation. The token received in API requests is decoded to get the user, and the user’s information is validated for authorization.
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]:
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None
def fake_decode_token(token: str) -> Optional[UserInDB]:
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
) -> Optional[User]:
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
) -> Optional[User]:
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
-
User and UserInDB
: These Pydantic models represent user data. UserInDB extends User by adding a hashed_password field for password comparison. -
get_user
: This function retrieves user data fromfake_users_db
. -
fake_decode_token
: Simulates the process of decoding a token to extract user information (in this mock, the token is just the username).
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password) # type: ignore
def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]:
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None
def authenticate_user(
fake_db: dict[str, Any], username: str, password: str
) -> Union[bool, UserInDB]:
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserInDB:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception from None
user = get_user(fake_users_db, username=token_data.username) # type: ignore
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
-
User and UserInDB
: These Pydantic models represent user data. UserInDB extends User by adding a hashed_password field for password comparison. -
get_user
: This function retrieves user data fromfake_users_db
. -
verify_password
: Verifies the user paswword by comparing it to the password hash in thefake_users_db
Step 6: OAuth2 Token Login Endpoint#
This step defines the /token
endpoint where users submit their username and password to receive an authentication token. The system checks if the password matches before generating the token.
@app.post("/token")
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> dict[str, str]:
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
/token
: This endpoint handles user login. If the username and password match, it returns a token in the form of the username. Otherwise, an error is returned.
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
def create_access_token(
data: dict[str, Any], expires_delta: Union[timedelta, None] = None
) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # nosemgrep
return encoded_jwt
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, # type: ignore
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
create_access_token
: Generates a JSON Web Token (JWT) that will be returned to the user after authenticating through the/token
endpoint/token
: This endpoint handles user login. If the username and password match, it returns a token in the form of a JWT. Otherwise, an error is returned.
Step 7: Securing Routes with OAuth2#
Here we secure the FastAPI routes by requiring the user to pass an OAuth2 token to authenticate and authorize requests.
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
) -> Optional[User]:
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
) -> Optional[User]:
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserInDB:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception from None
user = get_user(fake_users_db, username=token_data.username) # type: ignore
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
-
get_current_user
: This function extracts the token from the request, decodes it, and returns the user object if valid. If the token is invalid, a 401 error is returned. -
get_current_active_user
: It checks whether the user account is active(disabled = False)
, raising an error if the user is inactive.
Step 8: Custom Client ID Generation#
This step extracts the user’s user_id from the current authenticated user and provides it to the adapter. This will allow syncing client information during workflow interactions.
get_user_id
: Returns the user_id of the authenticated user, which is later used for authorization in the workflows.
Step 9: Connecting the Adapter and Securing It#
Finally, we connect the FastAPIAdapter
to the FastAPI app. The get_user_id function is passed as a security dependency, ensuring that every internal API call is secured.
get_user_id
: This function ensures that the user_id
of the authenticated user is passed with every internal request, securing the workflows exposed by the adapter.
Step 10: Workflow Listing Endpoint#
We add an optional /
endpoint to list the available workflows, which can be useful for understanding what workflows are currently available in the FastAPIAdapter
.
/
: Lists the available workflows and their descriptions for easier access and understanding of what the API offers.
Final Explanation#
In this setup:
-
Authentication is handled using OAuth2, with token generation via a
/token
endpoint. -
User information is stored in a mock database, and token-based authentication ensures that only valid users can interact with the workflows.
-
The
get_user_id
function is the central security mechanism, ensuring that each user’s actions are tracked and authorized via theiruser_id
. -
By attaching the
get_user_id
function to theFastAPIAdapter
, we effectively secure all the internal API calls exposed by the workflows, providing a scalable and consistent approach to authorization and client sync.
Implementing and connecting a simple client to secured FastAPIAdapter#
Now we will implement a client that can connect to your secured FastAPIAdapter
. Here is the full code of the client which you can run using python simple_client.py
in your console.
simple_client.py
import asyncio
import json
from typing import Any
import requests
import websockets
from asyncer import asyncify
from fastagency.messages import AskingMessage, IOMessage, WorkflowCompleted
from fastagency.ui.console import ConsoleUI
# API base URL
FASTAGENCY_URL = "http://localhost:8008"
# User credentials
CREDENTIALS = {
"username": "johndoe",
"password": "secret", # pragma: allowlist secret
}
# Function to authenticate and get the OAuth token
def get_oauth_token() -> str:
"""Authenticate the user and return the access token."""
response = requests.post(f"{FASTAGENCY_URL}/token", data=CREDENTIALS)
response.raise_for_status() # Ensure we handle errors
return response.json().get("access_token") # type: ignore
# Function to initiate the workflow
def initiate_workflow(token: str) -> dict[str, Any]:
"""Initiate the workflow and return the initial payload."""
payload = {
"workflow_name": "simple_learning",
"workflow_uuid": "1234", # You can generate this dynamically
"user_id": None,
"params": {"message": "Hello"},
}
headers = {"Authorization": f"Bearer {token}"}
response = requests.post(
f"{FASTAGENCY_URL}/fastagency/initiate_workflow", json=payload, headers=headers
)
response.raise_for_status() # Ensure we handle errors
return response.json() # type: ignore
# Function to handle WebSocket communication
async def websocket_workflow(token: str, initial_payload: dict[str, Any]) -> None:
"""Establish a WebSocket connection and handle the workflow interaction."""
websocket_url = f"ws{FASTAGENCY_URL[4:]}/fastagency/ws"
ui = ConsoleUI() # Initialize the UI for handling user interaction
async with websockets.connect(
websocket_url, extra_headers={"Authorization": f"Bearer {token}"}
) as websocket:
# Send the initial payload to start the workflow
await websocket.send(json.dumps(initial_payload))
while True:
# Receive messages from the WebSocket server
response = await websocket.recv()
message = IOMessage.create(**json.loads(response))
# Process the received message and interact with the UI
result = await asyncify(ui.process_message)(message)
# Respond if the message requires further input
if isinstance(message, AskingMessage) and result is not None:
await websocket.send(result)
elif isinstance(message, WorkflowCompleted):
# Exit the loop when the workflow is completed
break
# Main function to run the workflow
async def main() -> None:
"""Main function to orchestrate the workflow."""
# Step 1: Authenticate to get the OAuth2 token
token = get_oauth_token()
# Step 2: Initiate the workflow and get the initial payload
initial_payload = initiate_workflow(token)
# Step 3: Handle WebSocket interaction
await websocket_workflow(token, initial_payload)
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())
Now let's do a step-by-step breakdown of the code, focusing on the OAuth2 token acquisition and WebSocket security. We'll explain each part with relevant code snippets and describe how the ConsoleUI
is used for simplicity, but note that you can implement custom message processing logic.
Step 1: Authenticate and Get OAuth2 Token#
The first part of the process is obtaining an access token by authenticating with the FastAPI server using OAuth2.
CREDENTIALS = {
"username": "johndoe",
"password": "secret", # pragma: allowlist secret
}
# Function to authenticate and get the OAuth token
def get_oauth_token() -> str:
"""Authenticate the user and return the access token."""
response = requests.post(f"{FASTAGENCY_URL}/token", data=CREDENTIALS)
response.raise_for_status() # Ensure we handle errors
return response.json().get("access_token") # type: ignore
-
What happens:
- The
CREDENTIALS
dictionary holds the username and password for a user (in this case,"johndoe"
and"secret"
). - The
get_oauth_token()
function sends a POST request to the FastAPI authentication endpoint (/token
) with these credentials. - If the authentication is successful, the FastAPI server returns a token as the
access_token
. This token will be used in the headers for subsequent requests.
- The
-
Key Points:
- This token secures the API calls and WebSocket connections by proving the user’s identity.
- The
raise_for_status()
ensures any HTTP error (e.g., wrong credentials) will raise an exception, helping with debugging.
Step 2: Initiate the Workflow#
After obtaining the token, we initiate a workflow by sending a POST request to the server, passing the token in the headers for authorization.
def initiate_workflow(token: str) -> dict[str, Any]:
"""Initiate the workflow and return the initial payload."""
payload = {
"workflow_name": "simple_learning",
"workflow_uuid": "1234", # You can generate this dynamically
"user_id": None,
"params": {"message": "Hello"},
}
headers = {"Authorization": f"Bearer {token}"}
response = requests.post(
f"{FASTAGENCY_URL}/fastagency/initiate_workflow", json=payload, headers=headers
)
response.raise_for_status() # Ensure we handle errors
return response.json() # type: ignore
-
What happens:
- This function constructs a payload that includes the workflow name, a UUID (in this case, hardcoded to
"1234"
), and some parameters (a message saying"Hello"
). - It sends a POST request to the
/fastagency/initiate_workflow
endpoint, which triggers the desired workflow on the server. - The OAuth2 token is added in the headers for authorization:
"Authorization": f"Bearer {token}"
.
- This function constructs a payload that includes the workflow name, a UUID (in this case, hardcoded to
-
Key Points:
- The workflow can have dynamic parameters like user ID and message. You can modify this payload to fit your specific workflow.
- The server returns an initial payload that will be used to start communication over WebSockets.
Step 3: Establish WebSocket Connection#
Now, we use the WebSocket protocol to establish a real-time connection with the FastAPI server and handle the workflow’s communication.
websocket_url = f"ws{FASTAGENCY_URL[4:]}/fastagency/ws"
ui = ConsoleUI() # Initialize the UI for handling user interaction
async with websockets.connect(
websocket_url, extra_headers={"Authorization": f"Bearer {token}"}
) as websocket:
# Send the initial payload to start the workflow
await websocket.send(json.dumps(initial_payload))
-
What happens:
- The WebSocket connection is made to the FastAPI WebSocket endpoint (
/fastagency/ws
). - The
Authorization
header with theBearer
token ensures that the WebSocket connection is secured and authenticated. - The
initial_payload
obtained from the previous step is sent to the WebSocket server to start the workflow.
- The WebSocket connection is made to the FastAPI WebSocket endpoint (
-
Key Points:
- The WebSocket connection is protected by the OAuth2 token, just like the HTTP requests.
- The
ConsoleUI
is used here to keep things simple, but this UI can be replaced with a custom implementation for processing incoming messages from the server.
Step 4: Handling WebSocket Messages#
Once the WebSocket connection is established, the client listens for messages from the server, processes them, and sends appropriate responses if required.
while True:
# Receive messages from the WebSocket server
response = await websocket.recv()
message = IOMessage.create(**json.loads(response))
# Process the received message and interact with the UI
result = await asyncify(ui.process_message)(message)
# Respond if the message requires further input
if isinstance(message, AskingMessage) and result is not None:
await websocket.send(result)
elif isinstance(message, WorkflowCompleted):
# Exit the loop when the workflow is completed
break
-
What happens:
- The client continuously listens for messages using
await websocket.recv()
. - The received messages are processed by the
ConsoleUI
(or custom message handling logic). - If the server sends an
AskingMessage
(a message requesting input), the client responds appropriately. - When the workflow is completed (
WorkflowCompleted
), the loop breaks, and the connection is closed.
- The client continuously listens for messages using
-
Key Points:
- The
ConsoleUI
simplifies the example by handling basic message processing. It can be replaced with custom UI logic for more complex workflows. - The
asyncify
decorator is used to make theui.process_message
function non-blocking, allowing the program to handle other asynchronous tasks while waiting for user input.
- The
Step 5: Running the Workflow#
Finally, the entire process is orchestrated in the main()
function, which ties everything together.
async def main() -> None:
"""Main function to orchestrate the workflow."""
# Step 1: Authenticate to get the OAuth2 token
token = get_oauth_token()
# Step 2: Initiate the workflow and get the initial payload
initial_payload = initiate_workflow(token)
# Step 3: Handle WebSocket interaction
await websocket_workflow(token, initial_payload)
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())
- What happens:
- The
main()
function first authenticates the user to get the OAuth2 token. - It then initiates the workflow by sending the appropriate request and gets the initial WebSocket payload.
- Finally, it handles the WebSocket communication, processing messages and interacting with the workflow in real-time.
- The
Summary#
- Token Acquisition: OAuth2 token is obtained via the
/token
endpoint, securing both API and WebSocket interactions. - WebSocket Security: WebSocket communication is secured by passing the OAuth2 token in the
Authorization
header. - Message Processing: The
ConsoleUI
is used for simplicity, but you can implement your own message-handling logic to interact with the workflow.
By using OAuth2 for authentication and WebSockets for real-time communication, this example demonstrates how you can build a secure, interactive client to communicate with a FastAPI-based workflow system.