Skip to content

NatsProvider

fastagency.adapters.nats.base.NatsProvider #

NatsProvider(
    nats_url: Optional[str] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
)

Bases: ProviderProtocol

Initialize the nats workflows.

PARAMETER DESCRIPTION
nats_url

The NATS URL. Defaults to None.

TYPE: Optional[str] DEFAULT: None

user

The user. Defaults to None.

TYPE: Optional[str] DEFAULT: None

password

The password. Defaults to None.

TYPE: Optional[str] DEFAULT: None

Source code in fastagency/adapters/nats/base.py
def __init__(
    self,
    nats_url: Optional[str] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
) -> None:
    """Initialize the nats workflows.

    Args:
        nats_url (Optional[str], optional): The NATS URL. Defaults to None.
        user (Optional[str], optional): The user. Defaults to None.
        password (Optional[str], optional): The password. Defaults to None.
    """
    self.nats_url = nats_url or "nats://localhost:4222"
    self.user = user
    self.password = password

    self.broker = NatsBroker(self.nats_url, user=self.user, password=self.password)
    self.app = FastStream(self.broker)

    self._initiate_chat_subject: str = "chat.server.initiate_chat"

    self.is_broker_running: bool = False

app instance-attribute #

app = FastStream(broker)

broker instance-attribute #

broker = NatsBroker(nats_url, user=user, password=password)

is_broker_running instance-attribute #

is_broker_running: bool = False

names property #

names: list[str]

nats_url instance-attribute #

nats_url = nats_url or 'nats://localhost:4222'

password instance-attribute #

password = password

user instance-attribute #

user = user

get_description #

get_description(name: str) -> str
Source code in fastagency/adapters/nats/base.py
def get_description(self, name: str) -> str:
    description = asyncio.run(self._get_description(name))
    logger.debug(f"Description: {description}")
    # return "Student and teacher learning chat"
    return description

run #

run(
    name: str,
    ui: UI,
    user_id: Optional[str] = None,
    **kwargs: Any
) -> str
Source code in fastagency/adapters/nats/base.py
def run(
    self,
    name: str,
    ui: UI,
    user_id: Optional[str] = None,
    **kwargs: Any,
) -> str:
    # subscribe to whatever topic you need
    # consume a message from the topic and call that visitor pattern (which is happening in NatsProvider)
    workflow_uuid = ui._workflow_uuid
    init_message = InitiateWorkflowModel(
        user_id=user_id,
        workflow_uuid=workflow_uuid,
        params=kwargs,
        name=name,
    )
    _from_server_subject = f"chat.client.messages.{user_id}.{workflow_uuid}"
    _to_server_subject = f"chat.server.messages.{user_id}.{workflow_uuid}"

    async def send_initiate_chat_msg() -> None:
        await self.broker.publish(init_message, self._initiate_chat_subject)
        logger.info("Initiate chat message sent")

    @asynccontextmanager
    async def lifespan() -> AsyncIterator[None]:
        async with self.broker:
            await self.broker.start()
            logger.debug("Broker started")
            try:
                yield
            finally:
                await self.broker.close()

    async def _setup_and_run() -> None:
        await send_initiate_chat_msg()
        await self._setup_subscriber(ui, _from_server_subject, _to_server_subject)
        while True:  # noqa: ASYNC110
            await asyncio.sleep(0.1)

    async def run_lifespan() -> None:
        if not self.is_broker_running:
            self.is_broker_running = True
            async with lifespan():
                await _setup_and_run()
        else:
            await _setup_and_run()

    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

    loop.run_until_complete(run_lifespan())

    return "NatsWorkflows.run() completed"