NatsAdapter
fastagency.adapters.nats.base.NatsAdapter #
NatsAdapter(
provider: ProviderProtocol,
*,
nats_url: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
super_conversation: Optional[NatsAdapter] = None
)
Bases: MessageProcessorMixin
, CreateWorkflowUIMixin
Provider for NATS.
PARAMETER | DESCRIPTION |
---|---|
provider | The provider. TYPE: |
nats_url | The NATS URL. Defaults to None in which case 'nats://localhost:4222' is used. |
user | The user. Defaults to None. |
password | The password. Defaults to None. |
super_conversation | The super conversation. Defaults to None. TYPE: |
Source code in fastagency/adapters/nats/base.py
super_conversation instance-attribute
#
super_conversation: Optional[NatsAdapter] = (
super_conversation
)
create #
create_provider classmethod
#
create_provider(
nats_url: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
) -> ProviderProtocol
Source code in fastagency/adapters/nats/base.py
create_subconversation #
create_subconversation() -> NatsAdapter
create_workflow_ui #
error #
error(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
short: Optional[str] = None,
long: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
function_call_execution #
function_call_execution(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
function_name: Optional[str] = None,
call_id: Optional[str] = None,
retval: Any = None,
) -> Optional[str]
Source code in fastagency/messages.py
keep_alive #
keep_alive(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
lifespan async
#
lifespan(app: Any) -> AsyncIterator[None]
multiple_choice #
multiple_choice(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
prompt: Optional[str] = None,
choices: Optional[list[str]] = None,
default: Optional[str] = None,
single: bool = True,
) -> Optional[str]
Source code in fastagency/messages.py
process_message #
Source code in fastagency/adapters/nats/base.py
start #
suggested_function_call #
suggested_function_call(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
function_name: Optional[str] = None,
call_id: Optional[str] = None,
arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
system_message #
system_message(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
message: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
text_input #
text_input(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
prompt: Optional[str] = None,
suggestions: Optional[list[str]] = None,
password: bool = False,
) -> Optional[str]
Source code in fastagency/messages.py
text_message #
text_message(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
body: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
visit #
visit_error #
visit_function_call_execution #
visit_function_call_execution(
message: FunctionCallExecution,
) -> Optional[str]
visit_keep_alive #
visit_multiple_choice #
visit_multiple_choice(message: MultipleChoice) -> str
Source code in fastagency/adapters/nats/base.py
visit_suggested_function_call #
visit_suggested_function_call(
message: SuggestedFunctionCall,
) -> Optional[str]
visit_system_message #
visit_system_message(
message: SystemMessage,
) -> Optional[str]
visit_text_input #
Source code in fastagency/adapters/nats/base.py
visit_text_message #
visit_text_message(message: TextMessage) -> None
visit_workflow_completed #
visit_workflow_completed(
message: WorkflowCompleted,
) -> Optional[str]
visit_workflow_started #
visit_workflow_started(
message: WorkflowStarted,
) -> Optional[str]
workflow_completed #
workflow_completed(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
result: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
workflow_started #
workflow_started(
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
params: Optional[dict[str, Any]] = None,
) -> Optional[str]