Skip to content

AWPAdapter

fastagency.adapters.awp.AWPAdapter #

AWPAdapter(
    provider: ProviderProtocol,
    *,
    discovery_path: str = "/fastagency/discovery",
    awp_path: str = "/fastagency/awp",
    get_user_id: Optional[
        Callable[..., Optional[str]]
    ] = None
)

Bases: MessageProcessorMixin, CreateWorkflowUIMixin

Provider for AWP.

PARAMETER DESCRIPTION
provider

The provider.

TYPE: ProviderProtocol

discovery_path

The discovery path. Defaults to "/fastagency/discovery".

TYPE: str DEFAULT: '/fastagency/discovery'

awp_path

The agent wire protocol path. Defaults to "/fastagency/awp".

TYPE: str DEFAULT: '/fastagency/awp'

get_user_id

The get user id. Defaults to None.

TYPE: Optional[Callable[[], Optional[UUID]]] DEFAULT: None

Source code in fastagency/adapters/awp/base.py
def __init__(
    self,
    provider: ProviderProtocol,
    *,
    discovery_path: str = "/fastagency/discovery",
    awp_path: str = "/fastagency/awp",
    get_user_id: Optional[Callable[..., Optional[str]]] = None,
) -> None:
    """Provider for AWP.

    Args:
        provider (ProviderProtocol): The provider.
        discovery_path (str, optional): The discovery path. Defaults to "/fastagency/discovery".
        awp_path (str, optional): The agent wire protocol path. Defaults to "/fastagency/awp".
        get_user_id (Optional[Callable[[], Optional[UUID]]], optional): The get user id. Defaults to None.
    """
    self.provider = provider
    self.discovery_path = discovery_path
    self.awp_path = awp_path
    self.get_user_id = get_user_id or (lambda: None)
    self._awp_threads: dict[str, AWPThreadInfo] = {}
    self.router = self.setup_routes()

awp_path instance-attribute #

awp_path = awp_path

discovery_path instance-attribute #

discovery_path = discovery_path

get_user_id instance-attribute #

get_user_id = get_user_id or lambda: None

provider instance-attribute #

provider = provider

router instance-attribute #

router = setup_routes()

create #

create(app: Runnable, import_string: str) -> Iterator[None]
Source code in fastagency/adapters/awp/base.py
@contextmanager
def create(self, app: Runnable, import_string: str) -> Iterator[None]:
    raise NotImplementedError("create")

create_provider classmethod #

create_provider(fastapi_url: str) -> ProviderProtocol
Source code in fastagency/adapters/awp/base.py
@classmethod
def create_provider(
    cls,
    fastapi_url: str,
) -> ProviderProtocol:
    raise NotImplementedError("create")

create_subconversation #

create_subconversation() -> UIBase
Source code in fastagency/adapters/awp/base.py
def create_subconversation(self) -> UIBase:
    return self

create_workflow_ui #

create_workflow_ui(workflow_uuid: str) -> UI
Source code in fastagency/base.py
def create_workflow_ui(self: UIBase, workflow_uuid: str) -> "UI":
    return UI(uibase=self, workflow_uuid=workflow_uuid)

end_of_thread #

end_of_thread(thread_id: str) -> None
Source code in fastagency/adapters/awp/base.py
def end_of_thread(self, thread_id: str) -> None:
    thread_info = self._awp_threads.pop(thread_id, None)
    if thread_info:
        thread_info.active = False
        logger.info(f"Ended awp thread: {thread_info}")

error #

error(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def error(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        Error(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            short=short,
            long=long,
        )
    )

function_call_execution #

function_call_execution(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]
Source code in fastagency/messages.py
def function_call_execution(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        FunctionCallExecution(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            function_name=function_name,
            call_id=call_id,
            retval=retval,
        )
    )

get_thread_info_of_awp #

get_thread_info_of_awp(
    awp_id: str,
) -> Optional[AWPThreadInfo]
Source code in fastagency/adapters/awp/base.py
def get_thread_info_of_awp(self, awp_id: str) -> Optional[AWPThreadInfo]:
    return self._awp_threads.get(awp_id)

get_thread_info_of_workflow #

get_thread_info_of_workflow(
    workflow_uuid: str,
) -> Optional[AWPThreadInfo]
Source code in fastagency/adapters/awp/base.py
def get_thread_info_of_workflow(
    self, workflow_uuid: str
) -> Optional[AWPThreadInfo]:
    thread_info = next(
        (x for x in self._awp_threads.values() if x.workflow_id == workflow_uuid),
        None,
    )
    if thread_info is None:
        logger.error(
            f"Workflow {workflow_uuid} not found in threads: {self._awp_threads}"
        )
        raise RuntimeError(
            f"Workflow {workflow_uuid} not found in threads: {self._awp_threads}"
        )
    return thread_info

keep_alive #

keep_alive(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def keep_alive(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        KeepAlive(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
        )
    )

multiple_choice #

multiple_choice(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]
Source code in fastagency/messages.py
def multiple_choice(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    choices = choices or []
    return self.process_message(
        MultipleChoice(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            prompt=prompt,
            choices=choices,
            default=default,
            single=single,
        )
    )

process_message #

process_message(message: IOMessage) -> Optional[str]
Source code in fastagency/messages.py
def process_message(self, message: IOMessage) -> Optional[str]:
    try:
        return self.visit(message)
    except Exception as e:
        # log the error and return None
        logger.error(f"Error processing message ({message}): {e}", exc_info=True)
        return None

run_thread async #

run_thread(
    input: RunAgentInput, request: Request
) -> AsyncIterator[str]
Source code in fastagency/adapters/awp/base.py
async def run_thread(
    self, input: RunAgentInput, request: Request
) -> AsyncIterator[str]:
    thread_info = self._awp_threads.get(input.thread_id)
    if thread_info is None:
        logger.error(f"Thread {input.thread_id} not found")
        raise RuntimeError(f"Thread {input.thread_id} not found")

    run_started = RunStartedEvent(
        type=EventType.RUN_STARTED,
        thread_id=thread_info.awp_id,
        run_id=thread_info.run_id,
    )
    yield self._sse_send(run_started, thread_info)

    while not await request.is_disconnected():
        try:
            message = await asyncio.wait_for(
                thread_info.out_queue.get(), timeout=0.5
            )
            yield self._sse_send(message, thread_info)
            if isinstance(message, RunFinishedEvent):
                break
            if isinstance(message, CustomEvent) and message.name == "thread_over":
                run_finished = RunFinishedEvent(
                    type=EventType.RUN_FINISHED,
                    thread_id=thread_info.awp_id,
                    run_id=thread_info.run_id,
                )
                yield self._sse_send(run_finished, thread_info)
                logger.info(f"Thread {input.thread_id} is over")
                self.end_of_thread(input.thread_id)
                break
        except asyncio.TimeoutError:
            await asyncio.sleep(
                0
            )  # Yield control briefly, might not be strictly needed
            continue  # Go back to the top and check if request is still open

    logger.info(f"run thread {input.thread_id} completed")

send_to_thread #

send_to_thread(thread_id: str, message: str) -> None
Source code in fastagency/adapters/awp/base.py
def send_to_thread(self, thread_id: str, message: str) -> None:
    thread_info = self._awp_threads.get(thread_id)
    if thread_info:
        if not thread_info.active:
            logger.error(f"Thread {thread_id} is not active")
            return
        thread_info.out_queue.put_nowait(message)
    else:
        logger.error(f"Thread {thread_id} not found")

setup_routes #

setup_routes() -> APIRouter
Source code in fastagency/adapters/awp/base.py
def setup_routes(self) -> APIRouter:
    router = APIRouter()

    @router.post(self.awp_path)
    async def run_agent(
        input: RunAgentInput,
        request: Request,
        user_id: Optional[str] = Depends(self.get_user_id),
    ) -> StreamingResponse:
        headers = {
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Nginx: prevent buffering
        }

        if input.thread_id in self._awp_threads:
            ## existing thread, resume
            logger.info(f"Resuming thread: {input.thread_id}")
            logger.info(f"Messages: {input.messages}")
            thread_info = self._awp_threads[input.thread_id]
            last_message = input.messages[-1]
            if isinstance(last_message, UserMessage):
                thread_info.input_queue.put_nowait(last_message.content)
            return StreamingResponse(
                self.run_thread(input, request), headers=headers
            )

        ## new thread, create
        workflow_uuid: str = uuid4().hex

        thread_info = AWPThreadInfo(input, workflow_id=workflow_uuid)
        self._awp_threads[input.thread_id] = thread_info
        logger.info(f"Created new thread: {input.thread_id}")

        init_msg = InitiateWorkflowModel(
            user_id=user_id,
            workflow_uuid=workflow_uuid,
            params={},
            name="simple_learning",
        )

        async def process_messages_in_background(workflow_uuid: str) -> None:
            def a_process_messages_in_background(
                workflow_uuid: str,
            ) -> None:
                workflow_ids.workflow_uuid = workflow_uuid
                self.provider.run(
                    name=init_msg.name,
                    ui=self.create_workflow_ui(workflow_uuid),
                    user_id=user_id if user_id else "None",
                    **init_msg.params,
                )

            await asyncify(a_process_messages_in_background)(workflow_uuid)
            workflow_ids.workflow_uuid = None

        try:
            task = asyncio.create_task(
                process_messages_in_background(workflow_uuid)
            )
            logger.info(f"Started task: {task}")
            # asyncio.create_task(
            #    asyncify(process_messages_in_background)(workflow_uuid)
            # )
        except Exception as e:
            logger.error(f"Error in awp endpoint: {e}", stack_info=True)
        finally:
            ...
            # self.end_of_thread(request.thread_id)
        return StreamingResponse(self.run_thread(input, request), headers=headers)

    @router.get(
        self.discovery_path,
        responses={
            404: {"detail": "Key Not Found"},
            504: {"detail": "Unable to connect to provider"},
        },
    )
    def discovery(
        user_id: Optional[str] = Depends(self.get_user_id),
    ) -> list[WorkflowInfo]:
        try:
            names = self.provider.names
        except FastAgencyConnectionError as e:
            raise HTTPException(status_code=504, detail=str(e)) from e

        try:
            descriptions = [self.provider.get_description(name) for name in names]
        except FastAgencyKeyError as e:
            raise HTTPException(status_code=404, detail=str(e)) from e

        return [
            WorkflowInfo(name=name, description=description)
            for name, description in zip(names, descriptions)
        ]

    return router

start #

start(
    *,
    app: Runnable,
    import_string: str,
    name: Optional[str] = None,
    params: dict[str, Any],
    single_run: bool = False
) -> None
Source code in fastagency/adapters/awp/base.py
def start(
    self,
    *,
    app: "Runnable",
    import_string: str,
    name: Optional[str] = None,
    params: dict[str, Any],
    single_run: bool = False,
) -> None:
    raise NotImplementedError("start")

suggested_function_call #

suggested_function_call(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def suggested_function_call(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    arguments = arguments or {}
    return self.process_message(
        SuggestedFunctionCall(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            function_name=function_name,
            call_id=call_id,
            arguments=arguments,
        )
    )

system_message #

system_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def system_message(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    message = message or {}
    return self.process_message(
        SystemMessage(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            message=message,
        )
    )

text_input #

text_input(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]
Source code in fastagency/messages.py
def text_input(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    suggestions = suggestions or []
    return self.process_message(
        TextInput(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            prompt=prompt,
            suggestions=suggestions,
            password=password,
        )
    )

text_message #

text_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def text_message(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        TextMessage(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            body=body,
        )
    )

visit #

visit(message: IOMessage) -> Optional[str]
Source code in fastagency/messages.py
def visit(self, message: IOMessage) -> Optional[str]:
    method_name = f"visit_{message.type}"
    method = getattr(self, method_name, self.visit_default)
    return method(message)

visit_default #

visit_default(message: IOMessage) -> Optional[str]
Source code in fastagency/adapters/awp/base.py
def visit_default(self, message: IOMessage) -> Optional[str]:
    async def a_visit_default(
        self: AWPAdapter, message: IOMessage, workflow_uuid: str
    ) -> Optional[str]:
        logger.info(f"Default Visiting message: {message}")

        return None

    if isinstance(message, IOMessage):
        workflow_uuid = message.workflow_uuid
    else:
        logger.error(f"Dang! Message is not an IOMessage: {message}")
        logger.error(f"Message type: {type(message)}")
        workflow_uuid = workflow_ids.workflow_uuid

    return syncify(a_visit_default)(self, message, workflow_uuid)

visit_error #

visit_error(message: Error) -> Optional[str]
Source code in fastagency/messages.py
def visit_error(self, message: Error) -> Optional[str]:
    return self.visit_default(message)

visit_function_call_execution #

visit_function_call_execution(
    message: FunctionCallExecution,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_function_call_execution(
    self, message: FunctionCallExecution
) -> Optional[str]:
    return self.visit_default(message)

visit_input_request #

visit_input_request(message: InputRequestEvent) -> None
Source code in fastagency/adapters/awp/base.py
def visit_input_request(
    self, message: autogen.events.agent_events.InputRequestEvent
) -> None:
    async def a_visit_input_request(
        self: AWPAdapter,
        message: autogen.events.agent_events.InputRequestEvent,
        workflow_uuid: str,
    ) -> None:
        logger.info(f"Visiting input request: {message}")
        thread_info = self.get_thread_info_of_workflow(workflow_uuid)
        if thread_info is None:
            logger.error(
                f"Thread info not found for workflow {workflow_uuid}: {self._awp_threads}"
            )
            raise KeyError(
                f"Thread info not found for workflow {workflow_uuid}: {self._awp_threads}"
            )

        out_queue = thread_info.out_queue
        ## send end of run message, so that the UI can acquire answer and call us back
        run_finished = RunFinishedEvent(
            type=EventType.RUN_FINISHED,
            thread_id=thread_info.awp_id,
            run_id=thread_info.run_id,
        )
        out_queue.put_nowait(run_finished)
        input_queue = thread_info.input_queue
        message.content.respond(await input_queue.get())

    workflow_uuid = workflow_ids.workflow_uuid
    syncify(a_visit_input_request)(self, message, workflow_uuid)

visit_keep_alive #

visit_keep_alive(message: KeepAlive) -> Optional[str]
Source code in fastagency/messages.py
def visit_keep_alive(self, message: KeepAlive) -> Optional[str]:
    return self.visit_default(message)

visit_multiple_choice #

visit_multiple_choice(
    message: MultipleChoice,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_multiple_choice(self, message: MultipleChoice) -> Optional[str]:
    return self.visit_default(message)

visit_run_completion #

visit_run_completion(message: RunCompletionEvent) -> None
Source code in fastagency/adapters/awp/base.py
def visit_run_completion(
    self, message: autogen.events.agent_events.RunCompletionEvent
) -> None:
    async def a_visit_run_completion(
        self: AWPAdapter,
        message: autogen.events.agent_events.RunCompletionEvent,
        workflow_uuid: str,
    ) -> None:
        logger.info(f"Visiting run completion: {message}")
        thread_info = self.get_thread_info_of_workflow(workflow_uuid)
        if thread_info is None:
            logger.error(
                f"Thread info not found for workflow {workflow_uuid}: {self._awp_threads}"
            )
            return
        out_queue = thread_info.out_queue

        thread_over = CustomEvent(
            type=EventType.CUSTOM, name="thread_over", value={}
        )
        out_queue.put_nowait(thread_over)

    workflow_uuid = workflow_ids.workflow_uuid
    return syncify(a_visit_run_completion)(self, message, workflow_uuid)

visit_suggested_function_call #

visit_suggested_function_call(
    message: SuggestedFunctionCall,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_suggested_function_call(
    self, message: SuggestedFunctionCall
) -> Optional[str]:
    return self.visit_default(message)

visit_system_message #

visit_system_message(
    message: SystemMessage,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_system_message(self, message: SystemMessage) -> Optional[str]:
    return self.visit_default(message)

visit_text #

visit_text(message: TextEvent) -> None
Source code in fastagency/adapters/awp/base.py
def visit_text(self, message: autogen.events.agent_events.TextEvent) -> None:
    async def a_visit_text(
        self: AWPAdapter,
        message: autogen.events.agent_events.InputRequestEvent,
        workflow_uuid: str,
    ) -> None:
        logger.info(f"Visiting text event: {message}")
        thread_info = self.get_thread_info_of_workflow(workflow_uuid)
        if thread_info is None:
            logger.error(
                f"Thread info not found for workflow {workflow_uuid}: {self._awp_threads}"
            )
            return

        out_queue = thread_info.out_queue
        content = message.content
        uuid = str(content.uuid)
        if content.content:
            message_started = TextMessageStartEvent(
                type=EventType.TEXT_MESSAGE_START, message_id=uuid, role="assistant"
            )
            out_queue.put_nowait(message_started)

            message_content = TextMessageContentEvent(
                type=EventType.TEXT_MESSAGE_CONTENT,
                message_id=uuid,
                delta=content.content,
            )
            out_queue.put_nowait(message_content)

            message_end = TextMessageEndEvent(
                type=EventType.TEXT_MESSAGE_END, message_id=uuid
            )
            out_queue.put_nowait(message_end)

    workflow_uuid = workflow_ids.workflow_uuid
    syncify(a_visit_text)(self, message, workflow_uuid)

visit_text_input #

visit_text_input(message: TextInput) -> str
Source code in fastagency/adapters/awp/base.py
def visit_text_input(self, message: TextInput) -> str:
    async def a_visit_text_input(self: AWPAdapter, message: TextInput) -> str:
        workflow_uuid = message.workflow_uuid
        thread_info = self.get_thread_info_of_workflow(workflow_uuid)
        if thread_info is None:
            logger.error(
                f"Thread info not found for workflow {workflow_uuid}: {self._awp_threads}"
            )
            raise KeyError(
                f"Thread info not found for workflow {workflow_uuid}: {self._awp_threads}"
            )

        out_queue = thread_info.out_queue

        message_started = TextMessageStartEvent(
            type=EventType.TEXT_MESSAGE_START,
            message_id=message.uuid,
            role="assistant",
        )
        out_queue.put_nowait(message_started)

        message_content = TextMessageContentEvent(
            type=EventType.TEXT_MESSAGE_CONTENT,
            message_id=message.uuid,
            delta=message.prompt,
        )
        out_queue.put_nowait(message_content)

        message_end = TextMessageEndEvent(
            type=EventType.TEXT_MESSAGE_END, message_id=message.uuid
        )
        out_queue.put_nowait(message_end)

        if thread_info.has_text_input_widget():
            # todo : invoke function to get an answer
            ...

        ## send end of run message, so that the UI can acquire answer and call us back
        run_finished = RunFinishedEvent(
            type=EventType.RUN_FINISHED,
            thread_id=thread_info.awp_id,
            run_id=thread_info.run_id,
        )
        out_queue.put_nowait(run_finished)

        # wait for the answer to be sent back
        return await thread_info.input_queue.get()

    return syncify(a_visit_text_input)(self, message)

visit_text_message #

visit_text_message(message: TextMessage) -> None
Source code in fastagency/adapters/awp/base.py
def visit_text_message(self, message: TextMessage) -> None:
    async def a_visit_text_message(self: AWPAdapter, message: TextMessage) -> None:
        workflow_uuid = message.workflow_uuid
        thread_info = self.get_thread_info_of_workflow(workflow_uuid)
        if thread_info is None:
            logger.error(
                f"Thread info not found for workflow {workflow_uuid}: {self._awp_threads}"
            )
            return
        out_queue = thread_info.out_queue

        message_started = TextMessageStartEvent(
            type=EventType.TEXT_MESSAGE_START,
            message_id=message.uuid,
            role="assistant",
        )
        out_queue.put_nowait(message_started)

        message_content = TextMessageContentEvent(
            type=EventType.TEXT_MESSAGE_CONTENT,
            message_id=message.uuid,
            delta=message.body,
        )
        out_queue.put_nowait(message_content)

        message_end = TextMessageEndEvent(
            type=EventType.TEXT_MESSAGE_END, message_id=message.uuid
        )
        out_queue.put_nowait(message_end)

    syncify(a_visit_text_message)(self, message)

visit_workflow_completed #

visit_workflow_completed(
    message: WorkflowCompleted,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]:
    return self.visit_default(message)

visit_workflow_started #

visit_workflow_started(
    message: WorkflowStarted,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_workflow_started(self, message: WorkflowStarted) -> Optional[str]:
    return self.visit_default(message)

workflow_completed #

workflow_completed(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_completed(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        WorkflowCompleted(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            result=result,
        )
    )

workflow_started #

workflow_started(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_started(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    params = params or {}
    return self.process_message(
        WorkflowStarted(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            name=name,
            description=description,
            params=params,
        )
    )