Skip to content

RFC: Event Router #8287

@leandrodamascena

Description

@leandrodamascena

⚠️ To GenAI bots and contributors: Please do not implement this feature without proper discussion first. This is a design proposal under review, not an approved spec. PRs submitted without prior discussion will be closed.

Summary

An event routing utility for asynchronous Lambda triggers (S3, EventBridge, and future sources). It lives at aws_lambda_powertools.event_router and lets you declare routes by event attributes (key, bucket, source, detail-type) and dispatch to typed handlers. Default delivery uses Event Source Data Classes (lightweight, zero extra dependencies); optional Pydantic model integration for customers who want strong validation. A shared core handles routing and matching; each event source is a thin subclass that extracts route fields and wraps the event in the correct type.

Problem

Lambda functions triggered by asynchronous event sources (S3 notifications, EventBridge rules, SNS, etc.) frequently need to route events to different logic based on attributes: which bucket, which key pattern, which event source, which detail-type. Today, customers write if/elif/elif chains that grow unmaintainable:

def handler(event, context):
    if event["source"] == "order.service" and event["detail-type"] == "OrderCreated":
        return handle_order_created(event)
    elif event["source"] == "order.service" and event["detail-type"] == "OrderCancelled":
        return handle_order_cancelled(event)
    elif event["source"] == "inventory.service":
        return handle_inventory(event)
    ...

There is no structure, no type safety, no reusable pattern. The API Resolver solved this for HTTP triggers; async triggers need the same treatment.

Prior Art & Why a New Utility

  • APIGatewayRestResolver / HttpResolver: Powertools already solved this for HTTP. Customers expect the same DX for non-HTTP triggers. But the HTTP resolvers are tightly coupled to request/response semantics (status codes, headers, content-type negotiation) that don't apply to async events.
  • Prototype PR feat(event_handler): Add new EventHandler for Async Lambda #5799 (sinofseven): implemented 9 resolvers in one shot (+5.9k lines). Validated the demand, but too broad for a first pass and lacked RFC alignment.
  • Batch Processor: processes multiple records in a batch with partial failure reporting. Complementary, not overlapping - the router routes by content, BatchProcessor handles batch semantics.
  • Parser models: Pydantic models for S3, EventBridge, SNS, etc. already exist. The router should leverage them, not duplicate.

Where we differentiate: (1) a shared core that makes adding new event sources trivial (one small subclass), (2) Event Source Data Classes as default (no Pydantic tax), (3) optional Pydantic integration using the models we already ship, (4) pattern matching (glob, regex) on route fields.

Developer Experience

S3 Router - minimal

from aws_lambda_powertools.event_router import S3Router
from aws_lambda_powertools.utilities.data_classes import S3Event

app = S3Router()


@app.route(key="uploads/*.csv")
def process_csv(event: S3Event):
    record = event.record
    print(f"New CSV: {record.s3.get_object.key} in {record.s3.bucket.name}")


@app.route(key="images/*", event_name="ObjectCreated:*")
def process_image(event: S3Event):
    ...


@app.default()
def catch_all(event: S3Event):
    print(f"Unrouted S3 event: {event.record.s3.get_object.key}")


def lambda_handler(event, context):
    return app.resolve(event, context)

S3 via EventBridge - recommended for new architectures

AWS now recommends S3 event notifications via EventBridge over the legacy direct-to-Lambda format. These arrive as EventBridge events with source="aws.s3" and detail-types like "Object Created". Since this is an EventBridge event, use EventBridgeRouter with S3-specific routing on the detail fields:

from aws_lambda_powertools.event_router import EventBridgeRouter
from aws_lambda_powertools.utilities.data_classes import EventBridgeEvent

app = EventBridgeRouter()


@app.route(source="aws.s3", detail_type="Object Created")
def handle_s3_upload(event: EventBridgeEvent):
    detail = event.detail
    bucket = detail["bucket"]["name"]
    key = detail["object"]["key"]
    print(f"New object: s3://{bucket}/{key}")


def lambda_handler(event, context):
    return app.resolve(event, context)

The legacy S3Router is for direct S3-to-Lambda notifications (the Records[] format). For new architectures using S3→EventBridge→Lambda, use EventBridgeRouter directly. Both are first-class paths.

EventBridge Router - minimal

from aws_lambda_powertools.event_router import EventBridgeRouter
from aws_lambda_powertools.utilities.data_classes import EventBridgeEvent

app = EventBridgeRouter()


@app.route(source="order.service", detail_type="OrderCreated")
def handle_order_created(event: EventBridgeEvent):
    detail = event.detail
    print(f"Order {detail['order_id']} created")


@app.route(source="inventory.service")
def handle_inventory(event: EventBridgeEvent):
    ...


def lambda_handler(event, context):
    return app.resolve(event, context)

With Pydantic model (opt-in)

from pydantic import BaseModel

from aws_lambda_powertools.event_router import EventBridgeRouter
from aws_lambda_powertools.utilities.data_classes import EventBridgeEvent

app = EventBridgeRouter()


class OrderCreated(BaseModel):
    order_id: str
    customer_id: str
    amount: float


@app.route(source="order.service", detail_type="OrderCreated", model=OrderCreated)
def handle_order(event: EventBridgeEvent, detail: OrderCreated):
    # detail is validated Pydantic model
    print(f"Order {detail.order_id} for customer {detail.customer_id}: ${detail.amount}")


def lambda_handler(event, context):
    return app.resolve(event, context)

Without model → handler receives the Event Source Data Class only. With model → the router validates the relevant payload (e.g. detail for EventBridge) and passes the validated model as a second argument. Validation failure raises by default; configure on_validation_error callback to override.

What resolve() returns

  • Route matched → returns whatever the handler returns. No wrapper type.
  • No route matched, @app.default() registered → calls the default handler.
  • No route matched, no default → raises EventRouterNotFoundError.

Architecture

Shared core: BaseEventRouter

All routing logic lives here. Each event source inherits and adds only extraction:

BaseEventRouter
├── Route registration (decorator → handler + matchers)
├── Field matching engine (exact, glob, regex)
├── Resolve loop: extract fields → match → call handler
├── Event shape validation (before routing)
├── Default/fallback handler
├── Model validation (optional, delegates to Parser)
└── Middleware hooks (before/after resolve)

S3Router(BaseEventRouter)
├── _validate_event_shape(event) → assert "Records" with s3 key
├── _extract_fields(event) → {bucket, key, event_name}
├── event_class = S3Event
└── Decorator API: @app.route(key=, bucket=, event_name=)

EventBridgeRouter(BaseEventRouter)
├── _validate_event_shape(event) → assert "source", "detail-type", "detail"
├── _extract_fields(event) → {source, detail_type, ...}
├── event_class = EventBridgeEvent
└── Decorator API: @app.route(source=, detail_type=)

A new router (e.g. SNS) requires: _validate_event_shape, _extract_fields, event_class, and the decorator kwargs.

Event shape validation

Before routing, the router validates that the incoming event matches the expected structure. An S3Router receiving an EventBridge event (misconfigured trigger) raises EventShapeMismatchError immediately - not a silent routing failure or a malformed data class wrapper. This catches trigger misconfiguration at invocation time rather than deep inside a handler.

Route matching

Each @app.route(...) kwarg becomes a matcher. A route matches when all specified matchers pass (AND logic). Matchers support:

Pattern Example Behavior
Exact string source="order.service" Equality check
Glob key="uploads/*.csv" fnmatch
Regex key=re.compile(r"^data/\d{4}/") re.search

Unspecified fields are wildcards (match anything).

Match priority: most-specific route wins. Specificity is scored as: exact match > regex > glob. Among routes of the same type, more constrained fields (more matchers specified) wins. If two routes score equally, first registered wins as tiebreaker. This prevents broad globs from shadowing specific patterns regardless of registration order.

No route matched diagnostics: when no route matches and no default handler exists, EventRouterNotFoundError includes the extracted fields and list of routes attempted, so debugging is immediate. With debug=True on the router, each route evaluation step is logged.

S3 key decoding

S3 event notifications URL-encode the object key (my file.txt arrives as my+file.txt). The router decodes the key using unquote_plus() - matching the behavior of S3Event.object_key - before running pattern matching. Routes match against the decoded, human-readable key. This is documented explicitly to avoid confusion.

Event wrapping

Before calling the handler, the router wraps the raw dict in the appropriate Event Source Data Class:

  1. Instantiate the data class: event_class(event) (direct constructor, not a classmethod)
  2. If model= is specified, additionally validate the relevant payload slice (e.g. event["detail"] for EventBridge) with model.model_validate()
  3. Pass both to the handler: handler(event: DataClass, detail: Model) or just handler(event: DataClass) without model

S3: one record per invocation

S3-to-Lambda always delivers exactly one record per invocation. The Records array exists for format legacy (shared with SNS/SQS) but S3 never sends more than one. The router processes Records[0] - there is no batch mode. For multi-record scenarios (S3→SQS→Lambda), the event is an SQS event, not an S3 event - use BatchProcessor for that path.

Integration with BatchProcessor

Every router implements __call__(record) - it receives a single record and routes it to the matching handler. This makes any router directly usable as a record_handler in BatchProcessor, with zero glue code:

from aws_lambda_powertools.event_router import SQSRouter
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

app = SQSRouter()


@app.route(body_contains="order")
def handle_order(record: SQSRecord):
    ...


@app.route(body_contains="inventory")
def handle_inventory(record: SQSRecord):
    ...


def lambda_handler(event, context):
    return process_partial_response(
        event=event,
        record_handler=app,
        processor=BatchProcessor(event_type=EventType.SQS),
    )

The same pattern works for DynamoDB Streams and Kinesis:

from aws_lambda_powertools.event_router import DynamoDBRouter
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response

app = DynamoDBRouter()


@app.route(event_name="INSERT")
def handle_insert(record: DynamoDBRecord):
    ...


@app.route(event_name="MODIFY")
def handle_update(record: DynamoDBRecord):
    ...


def lambda_handler(event, context):
    return process_partial_response(
        event=event,
        record_handler=app,
        processor=BatchProcessor(event_type=EventType.DynamoDBStreams),
    )

The contract is simple: the router is a callable. BatchProcessor calls app(record) for each record in the batch; the router dispatches to the matching handler. BatchProcessor handles partial failures, retries, FIFO ordering - all the batch semantics it already owns. The router only routes.

Customers who don't need batch semantics use app.resolve(event, context) directly - the router iterates records itself and raises on any failure (standard Lambda behavior: one failure = entire batch retried).

Async handler support

Route handlers can be async def. The router detects async handlers and awaits them automatically:

from aws_lambda_powertools.event_router import SQSRouter

app = SQSRouter()


@app.route(body_contains="order")
async def handle_order(record: SQSRecord):
    await external_api.process(record.body)

For composition with BatchProcessor, use AsyncBatchProcessor + async_process_partial_response:

from aws_lambda_powertools.event_router import SQSRouter
from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, EventType, async_process_partial_response

app = SQSRouter()


@app.route(body_contains="order")
async def handle_order(record: SQSRecord):
    await external_api.process(record.body)


def lambda_handler(event, context):
    return async_process_partial_response(
        event=event,
        record_handler=app,
        processor=AsyncBatchProcessor(event_type=EventType.SQS),
    )

The router itself is not async-aware at the composition level; it simply returns a coroutine when the matched handler is async. AsyncBatchProcessor handles the event loop. For app.resolve(event, context) with async handlers, the router calls asyncio.run() internally (same approach as AsyncBatchProcessor).

Two APIs, clear separation

API Use case
app.resolve(event, context) Single-record sources (S3, EventBridge) or multi-record without partial failure reporting
record_handler=app Compose with BatchProcessor for partial batch failures, FIFO, etc.

The router never knows about batch semantics. The BatchProcessor never knows about routing. Each does its job; composition is explicit and intentional.

Integration with other utilities

Utility Integration
Event Source Data Classes Default event type delivered to handlers. Zero extra deps.
Parser (Pydantic models) model= on routes. Reuses existing S3Model, EventBridgeModel, or custom models.
BatchProcessor record_handler=app - the router is a callable that routes one record at a time.
Logger app.resolve() can inject correlation fields (event source, route name) into structured logs.
Tracer Each route handler can be auto-traced as a subsegment.
Idempotency Compose with @idempotent on route handlers. The router routes; idempotency is the handler's responsibility.
Middleware Factory Middleware can wrap individual route handlers or the entire resolve.

Defaults & Decisions

Decision Default Rationale
Event type delivered to handler Event Source Data Class Lightweight, no Pydantic dep, good autocomplete
Pydantic integration Opt-in via model= No tax for those who don't need it
Match priority Most-specific wins (exact > regex > glob) Prevents broad patterns from shadowing specific ones
S3 multi-record No batch mode; S3→Lambda is always 1 record Matches the AWS service contract
No route match Raise EventRouterNotFoundError with diagnostics Explicit failure; @app.default() for fallback
Glob matching fnmatch from stdlib Already used by customers for S3 key patterns
S3 key matching Decoded (unquote_plus) before match Matches S3Event.object_key behavior
Event shape validation Fail fast with EventShapeMismatchError Catch misconfigured triggers immediately
Core is abstract BaseEventRouter ABC New sources only add extraction, never routing logic

Initial Scope: S3 + EventBridge

We start with two routers that cover the most common single-record async patterns:

S3Router - route by:

  • bucket (exact, glob)
  • key (exact, glob, regex) - decoded before matching
  • event_name (exact, glob - e.g. ObjectCreated:*)

EventBridgeRouter - route by:

  • source (exact, glob)
  • detail_type (exact, glob, regex)

Once these two are validated, multi-record routers follow naturally:

SQSRouter - route by:

  • body_contains (substring, glob)
  • message_attributes (key/value match)
  • event_source_arn (exact, glob - route by queue)

DynamoDBRouter - route by:

  • event_name (INSERT, MODIFY, REMOVE)
  • table_name (extracted from eventSourceARN)

KinesisRouter - route by:

  • partition_key (exact, glob, regex)

Multi-record routers are designed for record_handler=app composition with BatchProcessor. They also work standalone via app.resolve(event, context) for cases where partial failure reporting isn't needed.

Open Questions

  1. detail field matching for EventBridge: should we support routing by fields inside detail in v1 (e.g. @app.route(source="...", detail__order_type="express"))? Or leave that to the handler? Leaning toward handler-level filtering for v1, with detail-matching as a v2 feature.

  2. Naming: settled on S3Router / EventBridgeRouter under aws_lambda_powertools.event_router. "Router" distinguishes from the HTTP "Resolver" family and communicates the core function: routing events to handlers.

  3. Return value semantics: async event sources ignore the Lambda return value. Should resolve() propagate the handler's return value as-is (useful for testing), or always return None? Leaning toward propagate as-is - it costs nothing and helps testability.

  4. Validation failure handling: when model= validation fails, should it (a) raise, (b) call a configurable error handler, or (c) skip to the next matching route? Leaning toward (a) raise by default with an optional on_validation_error callback.

  5. Middleware contract: the HTTP resolver middleware uses BaseMiddlewareHandler with next_middleware and Response. Async events have no Response type. Do we define a simpler middleware contract for event routers, or adapt the existing one?

Future Considerations

  • Additional routers: SNS (topic_arn, subject, message_attributes), CloudWatch Logs (log_group, filter_name), IoT.
  • Detail-path matching for EventBridge: route by nested fields in detail without handler-level filtering.
  • Router composition: multiple routers in one Lambda with a top-level dispatcher that detects the event source and delegates.
  • EventBridge replay awareness: support filtering by replay-name field for customers who need different behavior for replayed vs. live events.
  • Route testing utility: app.test_route(event) that returns match result without executing the handler, for debugging complex routing setups.

Out of Scope

  • Batch semantics inside the router: the router routes, it does not own partial failure reporting, FIFO ordering, or retry logic. Compose with BatchProcessor via record_handler=app for batch semantics.
  • Event replay / DLQ handling: infrastructure concern, not routing.
  • Event transformation / enrichment: the handler does this. The router routes, not transforms.
  • Built-in S3 object fetching: the router routes by event metadata. Reading the actual S3 object is the handler's job.
  • HTTP semantics: no status codes, no headers, no content negotiation. Use the existing HTTP resolvers for that.
  • Idempotency: compose with @idempotent on route handlers. At-least-once delivery is a handler concern, not a routing concern.

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    RFCtriagePending triage from maintainers

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status
    Triage

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions