Actors

Actors listen to events and do something (“act”) when an event occurs.

Implement handlers

The handler is a function (sync or async, async is better) that accepts the event and acts on it. For example:

async def send_email(user: User) -> None:
    ...

That’s it. The handler knows nothing about Nats, Walnats, or the publisher. That means it’s dead simple to test, refactor, or reuse with another framework. This pattern is known as Functional core, imperative shell. The handler is the functional core, and walnats is the imperative shell that you don’t need to test (at least not in the unit tests).

Declare actors

The actor connects together the event and the handler. Each actor also has a name that must be unique for all actors subscribed to the same event.

WELCOME_USER = walnats.Actor(
    'welcome-user', USER_REGISTERED, send_email,
)

That’s all you need to get started. Below are the API docs covering different configuration options you can specify for an actor. They allow you to adjust the actor’s behavior in case there are too many messages or a handler failure. We’ll also talk about handling high loads and failures in the Subscriber section. So, you can skip the rest of the page for now if you’re only getting started.

API

class walnats.Actor

A subscriber group that listens to a specific walnats.Event.

async def send_email(user: User) -> None:
    ...

SEND_EMAIL = walnats.Actor('send-email', USER_CREATED, send_email)

The following options are submitted into Nats JetStream and so cannot be ever changed after the actor is registered for the first time:

  • description

  • ack_wait

  • max_attempts

  • max_ack_pending

name: str

The actor name. It is used as durable consumer name in Nats, and so must be unique per event and never change. If you ever change the name, a consumer with the old name will be left in Nats JetStream and will accumulate events (until a limit is reached, and you should have Limits).

event: BaseEvent[T, R]

The event to listen to. Exactly one instance of the same Actor will receive a message. That means, you can run the same actor multiple times on different machines, and only one will receive a message. And not a single message will be lost.

handler: Callable[[T], Awaitable[R] | R]

The function to call when a message is received.

description: str | None = None

The actor description, will be attached to the consumer name in Nats. Can be useful if you use observability tools for Nats.

ack_wait: float = 16

How many seconds to wait from the last update before trying to redeliver the message. Before calling the handler, a task will be started that periodically sends a pulse into Nats saying that the job is in progress. The pulse, hovewer, might not arrive in Nats if the network or machine dies or something has blocked the scheduler for too long.

max_attempts: int | None = None

How many attempts Nats will make to deliver the message. The message is redelivered if the handler fails to handle it.

max_ack_pending: int = 1000

How many messages can be in progress simultaneously across the whole system. If the limit is reached, delivery of messages is suspended.

middlewares: tuple[Middleware, ...] = ()

Callbacks that are triggered at different stages of message handling. Most of the time, you’ll need regular decorators instead. Middlewares are useful when you need an additional context, like how many times the message was redelivered. In particular, for logs, metrics, alerts. Middlewares cannot be used for flow control.

max_jobs: int = 16

How many jobs can be running simultaneously in this actor on this machine. The best number depends on available resources and the handler performance. Keep it low for slow handlers, keep it high for highly concurrent handlers.

job_timeout: float = 32

How long at most the handler execution can take for a single message. If this timeout is reached, asyncio.CancelledError is raised in handler, and then all the same things happen as for regular failure: on_failure hooks, log message, nak. Doesn’t do anything for sync jobs without execute_in specified.

execute_in: ExecuteIn = 'main'

Run the handler in the current thread, in a separate thread pool, in a process pool.

retry_delay: Sequence[float] = (0.5, 1, 2, 4)

A sequence of delays (in seconds) for each retry. If the attempt number is higher than the sequence len, the las item (-1) will be used. The default value is (1, 2, 4, 8), so third retry will be 4s, 5th will be 8s, and 12th will still be 8.

The delay is used only when the message is explicitly nak’ed. If instead the whole instance explodes or there is a network issue, the message will be redelivered as soon as ack_wait is reached.

pulse: bool = True

Keep sending pulse into Nats JetStream while processing the message. The pulse signal makes sure that the message won’t be redelivered to another instance of actor while this one is in progress. Disabling the pulse will prevent the message being stuck if a handler stucks, but that also means the message must be processed faster that ack_wait.

priority: Priority = 1

Priority of the actor compared to other actors. Actors with a higher priority have a higher chance to get started earlier. Longer an actor waits its turn, higher its priority gets.

property consumer_name: str

Durable name for Nats JetStream consumer.

class walnats.Priority

Priority denotes how fast an Actor can acquire the mutex.

Actors with a higher priority start faster when the number of concurrent jobs reaches the value specified by max_jobs argument of walnats.types.ConnectedActors.listen() method.

SEND_EMAIL = walnats.Actor(
    'send-email', USER_CREATED, send_email,
    priority=walnats.Priority.HIGH,
)
HIGH = 0

Start as soon as possible.

NORMAL = 1

Start if there are no HIGH priority actors.

LOW = 2

Start if there are no HIGH or NORMAL priority actors.

acquire(sem: Semaphore) AsyncIterator[None]

Acquire semaphore with priority.

class walnats.ExecuteIn

Run the handler in the current thread, in a thread pool, in a process pool.

This is the enum of possible values for walnats.Actor.execute_in.

def generate_daily_report(_) -> None:
    ...

GENERATE_DAILY_REPORT = walnats.Actor(
    'generate-daily-report', DAY_PASSED, generate_daily_report,
    execute_in=walnats.ExecuteIn.PROCESS,
)
MAIN = 'main'

The default behavior, run the handler in the current (“main”) thread.

Use it for async/await handlers or sync but very fast handlers.

THREAD = 'thread'

Run the handler in a thread pool.

Use it for slow IO-bound non-async/await handlers.

The number of threads can be configured with max_threads argument of walnats.types.ConnectedActors.listen().

PROCESS = 'process'

Run the handler in a process pool.

Use it for slow CPU-bound handlers. The handler must be non-async/await.

The number of threads can be configured with max_processes argument of walnats.types.ConnectedActors.listen().