Skip to content

IONats

fastagency.studio.io.ionats.IONats #

IONats(
    user_id: str,
    thread_id: str,
    deployment_id: Optional[str] = "playground",
)

Bases: IOStream

Initialize the IO class.

Source code in fastagency/studio/io/ionats.py
def __init__(
    self, user_id: str, thread_id: str, deployment_id: Optional[str] = "playground"
) -> None:
    """Initialize the IO class."""
    self.queue: Queue = Queue()  # type: ignore[type-arg]
    self._publisher = broker.publish
    self._user_id = user_id
    self._thread_id = thread_id
    self._deployment_id = deployment_id
    self.subscriber: "AsyncAPISubscriber"

    self._input_request_subject = (
        f"chat.client.messages.{user_id}.{deployment_id}.{thread_id}"
    )
    self._input_receive_subject = (
        f"chat.server.messages.{user_id}.{deployment_id}.{thread_id}"
    )

queue instance-attribute #

queue: Queue = Queue()

subscriber instance-attribute #

subscriber: AsyncAPISubscriber

create async classmethod #

create(
    user_id: Union[str, UUID],
    thread_id: Union[str, UUID],
    deployment_id: Optional[
        Union[str, UUID]
    ] = "playground",
) -> IONats
Source code in fastagency/studio/io/ionats.py
@classmethod
async def create(
    cls,
    user_id: Union[str, UUID],
    thread_id: Union[str, UUID],
    deployment_id: Optional[Union[str, UUID]] = "playground",
) -> "IONats":
    thread_id = str(thread_id)
    user_id = str(user_id)
    deployment_id = str(deployment_id)
    self = cls(user_id=user_id, thread_id=thread_id, deployment_id=deployment_id)

    # dynamically subscribe to the chat server
    self.subscriber = broker.subscriber(
        subject=self._input_receive_subject,
        stream=stream,
        deliver_policy=api.DeliverPolicy("all"),
    )
    self.subscriber(self.handle_input)
    broker.setup_subscriber(self.subscriber)
    await self.subscriber.start()

    return self

handle_input async #

handle_input(
    body: InputResponseModel,
    msg: NatsMessage,
    logger: Logger,
) -> None
Source code in fastagency/studio/io/ionats.py
async def handle_input(
    self, body: InputResponseModel, msg: NatsMessage, logger: Logger
) -> None:
    logger.info(
        f"Received message in subject '{self._input_receive_subject}': {body}"
    )

    self.queue.put(msg)

input #

input(prompt: str = '', *, password: bool = False) -> str

Read a line from the input stream.

PARAMETER DESCRIPTION
prompt

The prompt to display. Defaults to "".

TYPE: str DEFAULT: ''

password

Whether to read a password. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

The line read from the input stream.

TYPE: str

Source code in fastagency/studio/io/ionats.py
def input(self, prompt: str = "", *, password: bool = False) -> str:
    """Read a line from the input stream.

    Args:
        prompt (str, optional): The prompt to display. Defaults to "".
        password (bool, optional): Whether to read a password. Defaults to False.

    Returns:
        str: The line read from the input stream.

    """
    # request a new input
    input_request_data = InputRequestModel(prompt=prompt, is_password=password)
    input_request_msg = ServerResponseModel(data=input_request_data, type="input")

    syncify(self._publisher)(input_request_msg, self._input_request_subject)

    # wait for the input to arrive and be propagated to queue
    while self.queue.empty():
        time.sleep(0.1)

    msg: NatsMessage = self.queue.get()

    self.queue.task_done()
    syncify(msg.ack)()

    retval = InputResponseModel.model_validate_json(
        msg.raw_message.data.decode("utf-8")
    ).msg

    return retval

print #

print(
    *objects: Any,
    sep: str = " ",
    end: str = "\n",
    flush: bool = False
) -> None

Print data to the output stream.

PARAMETER DESCRIPTION
objects

The data to print.

TYPE: any DEFAULT: ()

sep

The separator between objects. Defaults to " ".

TYPE: str DEFAULT: ' '

end

The end of the output. Defaults to "\n".

TYPE: str DEFAULT: '\n'

flush

Whether to flush the output. Defaults to False.

TYPE: bool DEFAULT: False

Source code in fastagency/studio/io/ionats.py
def print(
    self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False
) -> None:
    r"""Print data to the output stream.

    Args:
        objects (any): The data to print.
        sep (str, optional): The separator between objects. Defaults to " ".
        end (str, optional): The end of the output. Defaults to "\n".
        flush (bool, optional): Whether to flush the output. Defaults to False.
    """
    xs = sep.join(map(str, objects)) + end

    print_data = PrintModel(msg=xs)
    msg = ServerResponseModel(data=print_data, type="print")

    syncify(self._publisher)(msg, self._input_request_subject)