Skip to content

MessageProcessorProtocol

fastagency.messages.MessageProcessorProtocol #

Bases: Protocol

error #

error(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def error(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    # error specific parameters
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]: ...

function_call_execution #

function_call_execution(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]
Source code in fastagency/messages.py
def function_call_execution(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    # function_call_execution specific parameters
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]: ...

keep_alive #

keep_alive(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def keep_alive(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]: ...

multiple_choice #

multiple_choice(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]
Source code in fastagency/messages.py
def multiple_choice(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    # multiple_choice specific parameters
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]: ...

process_message #

process_message(message: IOMessage) -> Optional[str]
Source code in fastagency/messages.py
def process_message(self, message: IOMessage) -> Optional[str]: ...

suggested_function_call #

suggested_function_call(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def suggested_function_call(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    # suggested_function_call specific parameters
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]: ...

system_message #

system_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def system_message(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    # system_message specific parameters
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]: ...

text_input #

text_input(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]
Source code in fastagency/messages.py
def text_input(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    # text_input specific parameters
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]: ...

text_message #

text_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def text_message(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    # text_message specific parameters
    body: Optional[str] = None,
) -> Optional[str]: ...

workflow_completed #

workflow_completed(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_completed(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    # workflow_completed specific parameters
    result: Optional[str] = None,
) -> Optional[str]: ...

workflow_started #

workflow_started(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_started(
    self,
    # common parameters for all messages
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    # workflow_started specific parameters
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]: ...