Skip to content

Python AsyncAPI

asyncapi-python

Python library for translate asyncapi specification to python code, without code generation.


AsyncAPI Pattern: https://asyncapi.io


Documentation: https://dutradda.github.io/asyncapi-python/

Source Code: https://github.com/dutradda/asyncapi-python


Key Features

  • Reads an asyncapi specification and create publishers and subscribers from it

  • Support for specification declaration with dataclasses

  • Provides application for create subscribers

  • Support for kafka, redis and postgres protocols (same as broadcaster library)

  • Extra support for google cloud pubsub service

  • Expose in http the auto-generated specification

Requirements

  • Python 3.8+
  • broadcaster
  • jsondaora
  • requests (Optional for http specification)
  • typer (Optional for subscriber application)
  • pyyaml (Optional for yaml specification)
  • apidaora (Optional for expose specification)

  • Package extra installs:

    • http
    • yaml
    • kafka
    • redis
    • postgres
    • subscriber
    • docs
    • google-cloud-pubsub

Installation

$ pip install asyncapi[http,yaml,redis,subscriber,docs]

YAML Specification Example

asyncapi: 2.0.0

info:
  title: User API
  version: '1.0.0'
  description: API to manage users

servers:
  development:
    url: localhost
    protocol: redis
    description: Development Broker Server

channels:
  user/update:
    description: Topic for user updates
    subscribe:
      operationId: receive_user_update
      message:
        $ref: '#/components/messages/UserUpdate'
    publish:
      message:
        $ref: '#/components/messages/UserUpdate'

components:
  messages:
    UserUpdate:
      name: userUpdate
      title: User Update
      summary: Inform about users updates
      payload:
        type: object
        required:
          - id
        properties:
          id:
            type: string
          name:
            type: string
          age:
            type: integer

defaultContentType: application/json

Creating subscribers module

# user_events.py

from typing import Any


async def receive_user_update(message: Any) -> None:
    print(f"Received update for user id={message.id}")

Start subscriber to listen events

PYTHONPATH=. asyncapi-subscriber \
    --url api-spec.yaml \
    --api-module user_events
Waiting messages...

Publishing Updates

# publish.py

import asyncio

from asyncapi import build_api


api = build_api('api-spec.yaml')
channel_id = 'user/update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)


async def publish() -> None:
    await api.connect()
    await api.publish(channel_id, message)
    await api.disconnect()


asyncio.run(publish())

print(f"Published update for user={message.id}")
python publish.py

Published update for user=fake-user

Receive Updates

Waiting messages...
Received update for user id=fake-user

Expose Specification

asyncapi-docs --path api-spec.yaml
curl -i localhost:5000/asyncapi.yaml

Python Specification Example

# specification.py

import dataclasses
from typing import Optional

import asyncapi


@dataclasses.dataclass
class UserUpdatePayload:
    id: str
    name: Optional[str] = None
    age: Optional[int] = None


dev_server = asyncapi.Server(
    url='localhost',
    protocol=asyncapi.ProtocolType.REDIS,
    description='Development Broker Server',
)
message = asyncapi.Message(
    name='userUpdate',
    title='User Update',
    summary='Inform about users updates',
    payload=UserUpdatePayload,
)
user_update_channel = asyncapi.Channel(
    description='Topic for user updates',
    subscribe=asyncapi.Operation(
        operation_id='receive_user_update', message=message,
    ),
    publish=asyncapi.Operation(message=message),
)

spec = asyncapi.Specification(
    info=asyncapi.Info(
        title='User API', version='1.0.0', description='API to manage users',
    ),
    servers={'development': dev_server},
    channels={'user/update': user_update_channel},
    components=asyncapi.Components(messages={'UserUpdate': message}),
)

Creating subscribers module

# py_spec_user_events.py

import specification


spec = specification.spec


async def receive_user_update(
    message: specification.UserUpdatePayload,
) -> None:
    print(f"Received update for user id={message.id}")

Start subscriber to listen events

PYTHONPATH=. asyncapi-subscriber --api-module user_events
Waiting messages...

Publishing Updates

# publish.py

import asyncio

from asyncapi import build_api_auto_spec


api = build_api_auto_spec('specification')
channel_id = 'user/update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)


async def publish() -> None:
    await api.connect()
    await api.publish(channel_id, message)
    await api.disconnect()


asyncio.run(publish())

print(f"Published update for user={message.id}")
python publish.py

Published update for user=fake-user

Receive Updates

Waiting messages...
Received update for user id=fake-user

Expose Specification

PYTHONPATH=. asyncapi-docs --api-module specification
curl -i localhost:5000/asyncapi.yaml