Skip to content

MessageProcessorMixin

fastagency.messages.MessageProcessorMixin #

Bases: ABC

error #

error(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def error(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        Error(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            short=short,
            long=long,
        )
    )

function_call_execution #

function_call_execution(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]
Source code in fastagency/messages.py
def function_call_execution(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        FunctionCallExecution(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            function_name=function_name,
            call_id=call_id,
            retval=retval,
        )
    )

keep_alive #

keep_alive(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def keep_alive(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        KeepAlive(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
        )
    )

multiple_choice #

multiple_choice(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]
Source code in fastagency/messages.py
def multiple_choice(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    choices = choices or []
    return self.process_message(
        MultipleChoice(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            prompt=prompt,
            choices=choices,
            default=default,
            single=single,
        )
    )

process_message #

process_message(message: IOMessage) -> Optional[str]
Source code in fastagency/messages.py
def process_message(self, message: IOMessage) -> Optional[str]:
    try:
        return self.visit(message)
    except Exception as e:
        # log the error and return None
        logger.error(f"Error processing message ({message}): {e}", exc_info=True)
        return None

suggested_function_call #

suggested_function_call(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def suggested_function_call(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    arguments = arguments or {}
    return self.process_message(
        SuggestedFunctionCall(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            function_name=function_name,
            call_id=call_id,
            arguments=arguments,
        )
    )

system_message #

system_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def system_message(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    message = message or {}
    return self.process_message(
        SystemMessage(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            message=message,
        )
    )

text_input #

text_input(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]
Source code in fastagency/messages.py
def text_input(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    suggestions = suggestions or []
    return self.process_message(
        TextInput(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            prompt=prompt,
            suggestions=suggestions,
            password=password,
        )
    )

text_message #

text_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def text_message(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        TextMessage(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            body=body,
        )
    )

visit #

visit(message: IOMessage) -> Optional[str]
Source code in fastagency/messages.py
def visit(self, message: IOMessage) -> Optional[str]:
    method_name = f"visit_{message.type}"
    method = getattr(self, method_name, self.visit_default)
    return method(message)

visit_default abstractmethod #

visit_default(message: IOMessage) -> Optional[str]
Source code in fastagency/messages.py
@abstractmethod
def visit_default(self, message: IOMessage) -> Optional[str]: ...

visit_error #

visit_error(message: Error) -> Optional[str]
Source code in fastagency/messages.py
def visit_error(self, message: Error) -> Optional[str]:
    return self.visit_default(message)

visit_function_call_execution #

visit_function_call_execution(
    message: FunctionCallExecution,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_function_call_execution(
    self, message: FunctionCallExecution
) -> Optional[str]:
    return self.visit_default(message)

visit_keep_alive #

visit_keep_alive(message: KeepAlive) -> Optional[str]
Source code in fastagency/messages.py
def visit_keep_alive(self, message: KeepAlive) -> Optional[str]:
    return self.visit_default(message)

visit_multiple_choice #

visit_multiple_choice(
    message: MultipleChoice,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_multiple_choice(self, message: MultipleChoice) -> Optional[str]:
    return self.visit_default(message)

visit_suggested_function_call #

visit_suggested_function_call(
    message: SuggestedFunctionCall,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_suggested_function_call(
    self, message: SuggestedFunctionCall
) -> Optional[str]:
    return self.visit_default(message)

visit_system_message #

visit_system_message(
    message: SystemMessage,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_system_message(self, message: SystemMessage) -> Optional[str]:
    return self.visit_default(message)

visit_text_input #

visit_text_input(message: TextInput) -> Optional[str]
Source code in fastagency/messages.py
def visit_text_input(self, message: TextInput) -> Optional[str]:
    return self.visit_default(message)

visit_text_message #

visit_text_message(message: TextMessage) -> Optional[str]
Source code in fastagency/messages.py
def visit_text_message(self, message: TextMessage) -> Optional[str]:
    return self.visit_default(message)

visit_workflow_completed #

visit_workflow_completed(
    message: WorkflowCompleted,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]:
    return self.visit_default(message)

visit_workflow_started #

visit_workflow_started(
    message: WorkflowStarted,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_workflow_started(self, message: WorkflowStarted) -> Optional[str]:
    return self.visit_default(message)

workflow_completed #

workflow_completed(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_completed(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        WorkflowCompleted(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            result=result,
        )
    )

workflow_started #

workflow_started(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_started(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    params = params or {}
    return self.process_message(
        WorkflowStarted(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            name=name,
            description=description,
            params=params,
        )
    )