Publisher

The publisher is the code that emits events. It’s up to you how you structure and run the publisher. It might be a web service, an actor, a CLI tool, or anything else.

All you need to do:

  1. Collect all events you’re going to emit from this app into walnats.Events registry.

  2. Call register to create or update Nats JetStream streams.

  3. Call emit at any point to emit an event.

events = walnats.Events(USER_CREATED, USER_UPDATED)
async with events.connect() as conn:
    await conn.register()
    ...
    for user in new_users:
        await conn.emit(USER_CREATED, user)

Designing for failure

This section gives a few insights into how to make event producers more resilient.

  • Outbox. If the nats client we use (nats.py) gets disconnected from the server, it tries to reconnect in the background. If you try sending a message while the connection is lost, that message will be put into the pending queue and sent after the connection is restored. Hence, successful emit doesn’t mean the message will be delivered to Nats server. The client might be shut down before it restores the connection with the server. You can adjust this behavior in several ways:

    • Call walnats.types.ConnectedEvents.emit() with sync=True. Then walnats will make sure the message is delivered to Nats JetStream before returning.

    • Call nats.connect() with pending_size=0 (and pass this connection into walnats.Events.connect). The argument sets the maximum size in bytes of the pending messages queue. Setting it to zero disables the outbox altogether. When the connection is lost, you try to emit an event, and the limit is reached, the client will raise nats.errors.OutboundBufferLimitError.

  • Transactions. Use transactions for database changes. For example, for SQLAlchemy, see Transactions and Connection Management. That way, if I emit fails, the database changes will be rolled back, and then the whole operation you did can be safely retried.

  • Deduplication. If you have retries for the operation that emits an event, it’s possible that the operation (try to guess) will be retried, and so emit will be called twice. To avoid the same event to be emitted twice, you can provide a uid argument for walnats.types.ConnectedEvents.emit() (which has to be a unique identifier of the message), and Nats will make sure to ignore duplicates. The default deduplication window is 2 minutes.

If you want to know everything about what publishers can do with events, check out the API docs below. If you’re just getting started, feel free to skip to the next chapter: Actors.

API

class walnats.Events(*events: BaseEvent)

Registry of walnats.Event instances.

events = walnats.Events(event1, event2, event3)
async with events.connect() as conn:
    ...
get(name: str) BaseEvent | None

Get an walnats.Event from the list of registered events by name.

connect(server: list[str] | str | nats.NATS = DEFAULT_SERVER, close: bool = True) AsyncIterator[ConnectedEvents]

Context manager that keeps connection to Nats server.

Parameters:
  • server – Nats server URL, list of URLs, or already connected server.

  • close – Close the connection on exit from the context. Set to False if you explicitly pass a server instance and want to keep using it after leaving the context.

class walnats.types.ConnectedEvents

A registry of walnats.Event instances.

Like walnats.Events but connected to Nats server.

Use it to emit events. Don’t instanciate directly, use walnats.Events.connect() instead.

async with events.connect() as conn:
    await conn.register()
    await conn.emit(USER_CREATED, user)
async register(*, create: bool = True, update: bool = True) None

Create Nats JetStream streams for registered events.

await conn.register()
Parameters:
  • create – create the stream if doesn’t exist yet.

  • update – update the stream if already exists.

Raises:
async emit(event: Event[T], message: T, *, uid: str | None = None, trace_id: str | None = None, delay: float | None = None, meta: CloudEvent | dict[str, str] | None = None, sync: bool = False) None

Send an walnats.Event into Nats. The event must be registered first.

The emitted event will be broadcast to all actors that are interested in it.

await conn.emit(USER_CREATED, user)
Parameters:
  • event – registered event to which the message belongs.

  • message – the message payload to send.

  • uid – unique ID of the message. If provided, will be used to ensure that the same message is not delivered twice. Duplicate messagess with the same ID will be ignored. Deduplication window in Nats JetStream is 2 minutes by default.

  • trace_id – the ID of the request to use for distributed tracing. It doesn’t have any effect on actors but can be used by tracing middlewares, such as walnats.middlewares.ZipkinMiddleware.

  • delay – the minimum delay from now before the message can be processed by an actor (in seconds). Internally, the message will be first delivered to the actor immediately and the actor will put the message back with the delay without triggering the handler or any middlewares.

  • meta – either a dict of headers to include in Nats message or walnats.CloudEvent to include headers described by CloudEvents spec. Keep in mind that the spec for Nats is still WIP. This meta information doesn’t get into the handler but can be used by middlewares or third-party tools.

  • sync – make sure that the event is delivered into Nats JetStream. Turn it on when the message is very important. Keep it off for better performance or when the producer can’t handle failures at the time when message is emitted (for example, after you’ve already sent a POST request to a third-party API).

Raises:
  • nats.errors.ConnectionClosedError – The connection is already closed. An unlikely error that can happen if you passed a Nats connection into walnats.Events.connect() and then closed that connection.

  • nats.errors.OutboundBufferLimitError – The connection to Nats server is lost, and the pending_size limit of the Nats connection is reached.

  • nats.errors.MaxPayloadError – Size of the binary payload of the message is too big. The default is 1 Mb.

CloudEvents

CloudEvents is a specification that describes the format of metadata for events. Walnats doesn’t care about most of the data there but it can be useful if you use some third-party tools or consumers that can benefit from such metadata. If that’s the case, you can pass a walnats.CloudEvent instance as an argument into walnats.types.ConnectedEvents.emit(). This metadata will be included in the Nats message headers according to the WIP specification: NATS Protocol Binding for CloudEvents.

class walnats.CloudEvent(id: str, source: str, type: str, specversion: str = '1.0', datacontenttype: str | None = None, dataschema: str | None = None, subject: str | None = None, time: datetime | None = None, dataref: str | None = None, partitionkey: str | None = None, sampledrate: int | None = None, sequence: str | None = None, traceparent: str | None = None, tracestate: str | None = None)

Event metadata as described in CloudEvents spec.

The spec: https://github.com/cloudevents/spec/blob/v1.0/spec.md