Automatic Specification decorator
Creating subscribers module
# user_events.py
import dataclasses
from typing import Optional
from asyncapi import AutoSpec
spec = AutoSpec('User Events', development='redis://localhost')
@dataclasses.dataclass
class UserUpdateMessage:
id: str
name: Optional[str] = None
age: Optional[int] = None
@spec.subscribe(channel_name='user/update')
async def receive_user_update(message: UserUpdateMessage) -> None:
print(f"Received update for user id={message.id}")
Start subscriber to listen events
PYTHONPATH=. asyncapi-subscriber --api-module user_events
Publishing Updates
# publish.py
import asyncio
from asyncapi import build_api_auto_spec
api = build_api_auto_spec('user_events')
channel_id = 'user/update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)
async def publish() -> None:
await api.connect()
await api.publish(channel_id, message)
await api.disconnect()
asyncio.run(publish())
print(f"Published update for user={message.id}")
python publish.py
Published update for user=fake-user
Receive Updates
Waiting messages...
Received update for user id=fake-user
Expose Specification
PYTHONPATH=. asyncapi-docs --api-module user_events
curl -i localhost:5000/asyncapi.yaml