NatsAdapter
  fastagency.adapters.nats.NatsAdapter #
 NatsAdapter(
    provider: ProviderProtocol,
    *,
    nats_url: Optional[str] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
    super_conversation: Optional[NatsAdapter] = None
)
 Bases: MessageProcessorMixin, CreateWorkflowUIMixin
Provider for NATS.
| PARAMETER | DESCRIPTION | 
|---|---|
 provider  |    The provider.   TYPE:   |  
 nats_url  |    The NATS URL. Defaults to None in which case 'nats://localhost:4222' is used.  |  
 user  |    The user. Defaults to None.  |  
 password  |    The password. Defaults to None.  |  
 super_conversation  |    The super conversation. Defaults to None.   TYPE:   |  
Source code in fastagency/adapters/nats/base.py
   super_conversation  instance-attribute  #
 super_conversation: Optional[NatsAdapter] = (
    super_conversation
)
  create #
     create_provider  classmethod  #
 create_provider(
    nats_url: Optional[str] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
) -> ProviderProtocol
Source code in fastagency/adapters/nats/base.py
    create_subconversation #
 create_subconversation() -> NatsAdapter
  create_workflow_ui #
     error #
 error(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
   function_call_execution #
 function_call_execution(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]
Source code in fastagency/messages.py
   keep_alive #
 keep_alive(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
   lifespan  async  #
 lifespan(app: Any) -> AsyncIterator[None]
  multiple_choice #
 multiple_choice(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]
Source code in fastagency/messages.py
   process_message #
  Source code in fastagency/adapters/nats/base.py
   start #
     suggested_function_call #
 suggested_function_call(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
   system_message #
 system_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
   text_input #
 text_input(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]
Source code in fastagency/messages.py
   text_message #
 text_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
   visit #
     visit_error #
     visit_function_call_execution #
 visit_function_call_execution(
    message: FunctionCallExecution,
) -> Optional[str]
  visit_keep_alive #
     visit_multiple_choice #
 visit_multiple_choice(message: MultipleChoice) -> str
Source code in fastagency/adapters/nats/base.py
   visit_suggested_function_call #
 visit_suggested_function_call(
    message: SuggestedFunctionCall,
) -> Optional[str]
  visit_system_message #
 visit_system_message(
    message: SystemMessage,
) -> Optional[str]
  visit_text_input #
  Source code in fastagency/adapters/nats/base.py
   visit_text_message #
 visit_text_message(message: TextMessage) -> None
  visit_workflow_completed #
 visit_workflow_completed(
    message: WorkflowCompleted,
) -> Optional[str]
  visit_workflow_started #
 visit_workflow_started(
    message: WorkflowStarted,
) -> Optional[str]
  workflow_completed #
 workflow_completed(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
   workflow_started #
 workflow_started(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]