Middlewares

Custom middlewares

class walnats.middlewares.Middleware

Middlewares are callbacks that are triggered whe handling a message.

In many cases, like when you want to suppress specific errors from the handler, you should use decorators on handlers instead of middlewares. Middlewares are useful when you need an additional context, like how many times the event was retried.

Hooks can be both sync and async. In both cases, be mindful that they run in the same thread as the main scheduler for the important tasks. So, every hook must be fast and safe.

on_start(ctx: Context) Coroutine[None, None, None] | None

Triggered right before running the handler.

If asynchronous, it can be executed when handler is already running or even finished.

on_failure(ctx: ErrorContext) Coroutine[None, None, None] | None

Triggered if the handler failed with an exception.

If asynchronous, it can be executed when already outside of the except block, and so the traceback might not be available.

It is possible for on_failure to be triggered before on_start. For example, if message decoding fails.

on_success(ctx: OkContext) Coroutine[None, None, None] | None

Triggered when the handler successfully processed the message.

Context

class walnats.types.Context

Context passed into walnats.middlewares.Middleware.on_start() callback.

message: object

Decoded message payload.

actor: Actor

Actor object from which the callback is triggered. The most common things you want from it are ctx.actor.name and ctx.actor.event.name.

property metadata: Msg.Metadata

Message metadata provided by Nats.

property seq_number: int

Sequence ID of the message in Nats JetStream.

property attempts: int

The number of times the message was tried to be delivered.

1 if this is the first delivery. Always positive.

It is possible that there already were delivery attempts but the handler wasn’t triggered. For instance, if nats.py client lost the message because of a bug or message body deserialization has failed. So, the number of delivery attempts might be higher than the number of times the handler was triggered for the message.

property trace_id: str | None

The trace_id provided when emitting the message.

This value is typically used for distributed tracing.

class walnats.types.ErrorContext

Context passed into walnats.middlewares.Middleware.on_failure callback.

message: object | None

Decoded message payload. Will be None if the failure is raise while trying to decode the message payload.

actor: Actor

Actor object from which the callback is triggered. The most common things you want from it are ctx.actor.name and ctx.actor.event.name.

exception: Exception | asyncio.CancelledError

The exception raised.

class walnats.types.OkContext

Context passed into walnats.middlewares.Middleware.on_success() callback.

actor: Actor

Actor object from which the callback is triggered. The most common things you want from it are ctx.actor.name and ctx.actor.event.name.

message: object

Decoded message payload.

duration: float

How long it took for handler to finish the job. It also includes time spent in await, so be mindful of it when using it to reason about handler performance.

Integrations

Integrations with third-party services to provide observability for running actors.

class walnats.middlewares.OpenTelemetryTraceMiddleware(tracer: 'opentelemetry.trace.Tracer')
class walnats.middlewares.PrometheusMiddleware

Store Prometheus metrics.

Requires prometheus-client package to be installed.

class walnats.middlewares.SentryMiddleware

Report actor failures into Sentry using the official Sentry SDK.

The failure report will include tags:

  • actor: actor name.

  • event: event name.

Also, the “additional data” section will have:

  • delivered_at: timestamp when Nats delivered the message to the consumer.

  • stream_seq_id: sequence ID of the message in the Nats JetStream stream.

Requires sentry-sdk package to be installed.

class walnats.middlewares.StatsdMiddleware(client: DogStatsd)

Emit statsd metrics using Datadog statsd client.

We use Datadog statsd client because it is (compared to all alternatives) well maintained, type safe, and supports tags.

Requires datadog package to be installed.

class walnats.middlewares.ZipkinMiddleware(tracer: aiozipkin.Tracer, sampled: bool | None = None)

Emit Zipkin span.

Requires aiozipkin package to be installed.

Misc

Miscellaneous middlewares that don’t depend on third-party packages or services.

class walnats.middlewares.CurrentContextMiddleware

Middleware that records the current context on start.

This is a small trick to make the context available from handler or decorators when you need additional information about the Nats message received. We don’t pass context in handlers explicitly by default because most of the handlers should care only about the actual message payload, independent of how it was delivered.

class walnats.middlewares.ExtraLogMiddleware(logger: logging.Logger | logging.LoggerAdapter = <Logger walnats.middlewares (WARNING)>)

Write logs with extra fields using logging.

The extra fields aren’t shown by default, you need to specifically configure a logs formatter that supports it. For example, python-json-logger.

class walnats.middlewares.TextLogMiddleware(logger: logging.Logger | logging.LoggerAdapter = <Logger walnats.middlewares (WARNING)>)

Write plain text logs using logging.

The middleware might be useful for debugging. For the prod, consider using walnats.middlewares.ExtraLogMiddleware.

By default, DEBUG-level logs are not shown. You need to enable them explicitly:

import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

Wrappers

Wrappers are middlewares that wrap another middleware, usually to provide some kind of flow control.

class walnats.middlewares.ErrorThresholdMiddleware

Trigger on_failure only if there are too many sequential errors.

The errors are counted per actor, per message, and overall. The counter is reset when there is at least one successfully processed message.

Parameters:
  • middleware – wrapped middleware to triger when errors accumulate.

  • total_failures – how many errors have to accumulate over all actors before the on_failure of the wrapped middleware is triggered. Use it to detect when the whole system is unhealthy.

  • actor_failures – how many errors have to accumulate in a single actor before the on_failure of the wrapped middleware is triggered. Use it to detect an unhealthy actor.

  • message_failures – how many times the message have to fail before on_failure of the wrapped middleware is triggered. Use it to detect an unhealthy message.

class walnats.middlewares.FrequencyMiddleware

Trigger middleware only once in the given timeframe.

The on_start and on_success are triggered only once per actor in the given timeframe. The on_failure is triggered once per error message per actor in the given timeframe.

Use it to avoid spamming your notifications channel with many copies of the same message when shit hits the fan.

Parameters:
  • middleware – middleware to trigger.

  • timeframe – how long (in seconds) the deduplication window should be.