Skip to content

AWPThreadInfo

fastagency.adapters.awp.base.AWPThreadInfo #

AWPThreadInfo(
    run_agent_input: RunAgentInput, workflow_id: str
)

Represent AWP thread.

PARAMETER DESCRIPTION
run_agent_input

run agent input from the request

TYPE: RunAgentInput

workflow_id

The workflow id.

TYPE: str

Source code in fastagency/adapters/awp/base.py
def __init__(self, run_agent_input: RunAgentInput, workflow_id: str) -> None:
    """Represent AWP thread.

    Args:
        run_agent_input (RunAgentInput): run agent input from the request
        workflow_id (str): The workflow id.
    """
    self.run_agent_input = run_agent_input
    self.awp_id = run_agent_input.thread_id
    self.run_id = run_agent_input.run_id
    self.workflow_id = workflow_id
    self.out_queue: Queue[BaseMessage] = Queue()
    self.input_queue: Queue[str] = Queue()
    self.active = True
    self.encoder = EventEncoder()
    # all messages that have been attempted to send in one run
    self.sent_messages: list[BaseMessage] = []

active instance-attribute #

active = True

awp_id instance-attribute #

awp_id = thread_id

encoder instance-attribute #

encoder = EventEncoder()

input_queue instance-attribute #

input_queue: Queue[str] = Queue()

out_queue instance-attribute #

out_queue: Queue[BaseMessage] = Queue()

run_agent_input instance-attribute #

run_agent_input = run_agent_input

run_id instance-attribute #

run_id = run_id

sent_messages instance-attribute #

sent_messages: list[BaseMessage] = []

workflow_id instance-attribute #

workflow_id = workflow_id

has_text_input_widget #

has_text_input_widget() -> bool
Source code in fastagency/adapters/awp/base.py
def has_text_input_widget(self) -> bool:
    return False

next_message_id #

next_message_id() -> str
Source code in fastagency/adapters/awp/base.py
def next_message_id(self) -> str:
    return str(uuid4().hex)