Middlewares¶
Custom middlewares¶
- class walnats.middlewares.Middleware¶
Middlewares are callbacks that are triggered whe handling a message.
In many cases, like when you want to suppress specific errors from the handler, you should use decorators on handlers instead of middlewares. Middlewares are useful when you need an additional context, like how many times the event was retried.
Hooks can be both sync and async. In both cases, be mindful that they run in the same thread as the main scheduler for the important tasks. So, every hook must be fast and safe.
- on_start(ctx: Context) Coroutine[None, None, None] | None ¶
Triggered right before running the handler.
If asynchronous, it can be executed when handler is already running or even finished.
- on_failure(ctx: ErrorContext) Coroutine[None, None, None] | None ¶
Triggered if the handler failed with an exception.
If asynchronous, it can be executed when already outside of the except block, and so the traceback might not be available.
It is possible for on_failure to be triggered before on_start. For example, if message decoding fails.
Context¶
- class walnats.types.Context¶
Context passed into
walnats.middlewares.Middleware.on_start()
callback.- message: object¶
Decoded message payload.
- actor: Actor¶
Actor object from which the callback is triggered. The most common things you want from it are
ctx.actor.name
andctx.actor.event.name
.
- property metadata: Msg.Metadata¶
Message metadata provided by Nats.
- property seq_number: int¶
Sequence ID of the message in Nats JetStream.
- property attempts: int¶
The number of times the message was tried to be delivered.
1 if this is the first delivery. Always positive.
It is possible that there already were delivery attempts but the handler wasn’t triggered. For instance, if nats.py client lost the message because of a bug or message body deserialization has failed. So, the number of delivery attempts might be higher than the number of times the handler was triggered for the message.
- property trace_id: str | None¶
The
trace_id
provided when emitting the message.This value is typically used for distributed tracing.
- class walnats.types.ErrorContext¶
Context passed into
walnats.middlewares.Middleware.on_failure
callback.- message: object | None¶
Decoded message payload. Will be None if the failure is raise while trying to decode the message payload.
- actor: Actor¶
Actor object from which the callback is triggered. The most common things you want from it are
ctx.actor.name
andctx.actor.event.name
.
- exception: Exception | asyncio.CancelledError¶
The exception raised.
- class walnats.types.OkContext¶
Context passed into
walnats.middlewares.Middleware.on_success()
callback.- actor: Actor¶
Actor object from which the callback is triggered. The most common things you want from it are
ctx.actor.name
andctx.actor.event.name
.
- message: object¶
Decoded message payload.
- duration: float¶
How long it took for handler to finish the job. It also includes time spent in
await
, so be mindful of it when using it to reason about handler performance.
Integrations¶
Integrations with third-party services to provide observability for running actors.
- class walnats.middlewares.OpenTelemetryTraceMiddleware(tracer: 'opentelemetry.trace.Tracer')¶
- class walnats.middlewares.PrometheusMiddleware¶
Store Prometheus metrics.
Requires prometheus-client package to be installed.
- class walnats.middlewares.SentryMiddleware¶
Report actor failures into Sentry using the official Sentry SDK.
The failure report will include tags:
actor: actor name.
event: event name.
Also, the “additional data” section will have:
delivered_at: timestamp when Nats delivered the message to the consumer.
stream_seq_id: sequence ID of the message in the Nats JetStream stream.
Requires sentry-sdk package to be installed.
Misc¶
Miscellaneous middlewares that don’t depend on third-party packages or services.
- class walnats.middlewares.CurrentContextMiddleware¶
Middleware that records the current context on start.
This is a small trick to make the context available from handler or decorators when you need additional information about the Nats message received. We don’t pass context in handlers explicitly by default because most of the handlers should care only about the actual message payload, independent of how it was delivered.
- class walnats.middlewares.ExtraLogMiddleware(logger: logging.Logger | logging.LoggerAdapter = <Logger walnats.middlewares (WARNING)>)¶
Write logs with
extra
fields usinglogging
.The
extra
fields aren’t shown by default, you need to specifically configure a logs formatter that supports it. For example,python-json-logger
.
- class walnats.middlewares.TextLogMiddleware(logger: logging.Logger | logging.LoggerAdapter = <Logger walnats.middlewares (WARNING)>)¶
Write plain text logs using
logging
.The middleware might be useful for debugging. For the prod, consider using
walnats.middlewares.ExtraLogMiddleware
.By default, DEBUG-level logs are not shown. You need to enable them explicitly:
import logging logger = logging.getLogger() logger.setLevel(logging.DEBUG)
Wrappers¶
Wrappers are middlewares that wrap another middleware, usually to provide some kind of flow control.
- class walnats.middlewares.ErrorThresholdMiddleware¶
Trigger on_failure only if there are too many sequential errors.
The errors are counted per actor, per message, and overall. The counter is reset when there is at least one successfully processed message.
- Parameters:
middleware – wrapped middleware to triger when errors accumulate.
total_failures – how many errors have to accumulate over all actors before the on_failure of the wrapped middleware is triggered. Use it to detect when the whole system is unhealthy.
actor_failures – how many errors have to accumulate in a single actor before the on_failure of the wrapped middleware is triggered. Use it to detect an unhealthy actor.
message_failures – how many times the message have to fail before on_failure of the wrapped middleware is triggered. Use it to detect an unhealthy message.
- class walnats.middlewares.FrequencyMiddleware¶
Trigger middleware only once in the given timeframe.
The on_start and on_success are triggered only once per actor in the given timeframe. The on_failure is triggered once per error message per actor in the given timeframe.
Use it to avoid spamming your notifications channel with many copies of the same message when shit hits the fan.
- Parameters:
middleware – middleware to trigger.
timeframe – how long (in seconds) the deduplication window should be.