Subscriber

The subscriber runs a set of actors. It’s up to you to provide CLI for the subscriber (or another way you want to run it), connect to the database, decide what actors you want to run where, and all that stuff.

What you need to do:

  1. Collect all actors you want to run into the walnats.Actors registry.

  2. Call register to create Nats JetStream consumers.

  3. Call listen to start listening for events.

actors = walnats.Actors(SEND_EMAIL, SEND_SMS)
async with actors.connect() as conn:
    await conn.register()
    await conn.listen()

Below are the API docs that list all options available for connecting and listening. You can scroll past them and go directly to the sections covering the handling of failures and the high load.

API

class walnats.Actors(*actors: Actor)

Registry of walnats.Actor instances.

actors = walnats.Actors(actor1, actor2, actor3)
async with actors.connect() as conn:
    ...
get(name: str) Actor | None

Get an walnats.Actor from the list of registered actors by name.

connect(server: list[str] | str | nats.NATS = DEFAULT_SERVER, close: bool = True) AsyncIterator[ConnectedActors]

Context manager that keeps connection to Nats server.

Parameters:
  • server – Nats server URL, list of URLs, or already connected server.

  • close – Close the connection on exit from the context. Set to False if you explicitly pass a server instance and want to keep using it after leaving the context.

class walnats.types.ConnectedActors

A registry of walnats.Actor instances.

Like walnats.Actors but connected to Nats server.

Use it to listen to events. Don’t instanciate directly, use walnats.Actors.connect() instead.

async with actors.connect() as conn:
    await conn.register()
    await conn.listen()
async register() None

Add nats consumers for actors.

async with actors.connect() as conn:
    await conn.register()
async listen(*, burst: bool = False, max_polls: int | None = None, poll_delay: float = 2, batch: int = 1, max_jobs: int = 16, max_processes: int | None = None, max_threads: int | None = None) None

Listen Nats for events for all registered actors.

Parameters:
  • burst – try polling events for all actors only once, handle everything polled, and exit. It’s useful for testing when you know there is alread a message in the queue when you start listening, and that’s all you want to process.

  • max_polls – how many polling requests can be active simultaneously. Default to the number of actors. You might want to set it to a lower value if you have more actors than free network sockets in the system. Then actors will take turns to polling, each turn taking poll_delay.

  • poll_delay – how long each poll request will wait for messages. Low values will produce more requests but work better with low max_polls and better detect broken connections. See “keepalive” on wiki.

  • batch – how many messages (max) to pull in a single poll request. Higher values reduce the number of network requests (and so give better performance) but can result in messages that wait on a specific instance for a job to be available while they could be delivered to another instance. In other words, leave it 1 (default) if you scale horizontally.

  • max_jobs – how many jobs (handlers) can be running at the same time. Higher values put more strain on CPU but give better performance if the handlers are IO-bound and use a lot of async/await.

  • max_processes – if an Actor is configured to run in a process, this is how many processes at most can be running at the same time. Defaults to the number of processors on the machine.

  • max_threads – if an Actor is configured to run in a thread, this is how many threads at most can be running at the same time. Defaults to the number of processors on the machine + 4 but at most 32.

Design for high load

Walnats does its best to balance the load between actors, but exact numbers and strategies highly depend on the type of jobs you have, and that’s something that only you may know.

  • You can run multiple processes, each having its own walnats event listener. Either directly with multiprocessing or running the whole application multiple times. If you use supervisord, consider adjusting the numprocs option based on how many cores the target machine has.

  • The whole idea of async/await is to switch to CPU-bound work while waiting for an IO-bound response. For example, while we’re waiting for a database response to a query, we can prepare and run another query. So, make sure there is always work to do while in await. You can do that by increasing the max_jobs value in both individual actors and listen, running more actors on the same machine, and actively using async/await in your code, with asyncio.gather and all.

  • After some point, increasing max_jobs doesn’t bring any value. This is the point when there is already more than enough work to do while in await, and so blocking operations start making the pause at await much longer than it is needed. It will make every job slower, and you’d better scale with more processes or machines instead.

  • Keep in mind that all system resources are limited, and some limits are smaller than you might think. For example, the number of files or network connections that can be opened simultaneously. Again, having a smaller max_jobs (and in the case of network connections, max_polls) might help.

  • If you have a long-running CPU-bound task, make sure to run it in a separate process poll by specifying execute_in.

Design for failure

  1. Keep in mind that your code can explode at any point, especially if you work a lot with external services and third-party APIs. Walnats will take care of retries, but it’s on you to make sure nothing is left half-done in case of a failure (“Atomicity” part of ACID). For database queries, use a transaction. For network requests, keep them closer to the end and retry just the requests in case of failures. For file writes, write a temporary file first, and then copy it in the right place.

  2. Make sure to have a reasonable ack_wait value. A too high number means the event might arrive when nobody needs the result (real-time system). Too low value might mean that walnats didn’t get enough time to send an “in progress” pulse into Nats, and so the message was re-delivered to another instance of the actor while the first one hasn’t failed.

  3. The job may freeze. Make sure you specify timeouts for all network requests and that the actor itself has a low enough job_timeout value.

  4. Some errors are caused by deterministic bugs, so no amount of retries can fix them. Make sure to specify max_attempts for the actor and limits for the event.

  5. Make sure you have error reporting in place. Walnats provides out-of-the-box middleware for Sentry (walnats.middlewares.SentryMiddleware), but you can always write your own middleware for whatever service you use.