Skip to content

From a Python Specification

To reproduce this example you will need a local pubsub emulator running on port 8086. The project repository has a docker-compose file setting up the emulator.

Specification Example

The url attribute of the Server Object is the Google Cloud Platform project_id.

# specification.py

import dataclasses
from typing import Optional

import asyncapi


@dataclasses.dataclass
class UserUpdatePayload:
    id: str
    name: Optional[str] = None
    age: Optional[int] = None


dev_server = asyncapi.Server(
    url='asyncapi-local',
    protocol=asyncapi.ProtocolType.GCLOUD_PUBSUB,
    description='Development Broker Server',
)
message = asyncapi.Message(
    name='userUpdate',
    title='User Update',
    summary='Inform about users updates',
    payload=UserUpdatePayload,
)
user_update_channel = asyncapi.Channel(
    description='Topic for user updates',
    publish=asyncapi.Operation(message=message),
)

spec = asyncapi.Specification(
    info=asyncapi.Info(
        title='User API', version='1.0.0', description='API to manage users',
    ),
    servers={'development': dev_server},
    channels={'user-update': user_update_channel},
    components=asyncapi.Components(messages={'UserUpdate': message}),
)

Creating subscribers module

# user_events.py

from typing import Awaitable, Callable

import specification


spec = specification.spec


async def receive_user_update(
    message: specification.UserUpdatePayload,
    ack_func: Callable[[], Awaitable[None]],
) -> None:
    print(f"Received update for user id={message.id}")
    await ack_func()

Start subscriber to listen events

This specification don't declares subscribers. It is intentional because google pubsub can accept multiple subscribers with differents channel names on the same topic.

We will use the channels-subscribes argument of the subscriber runner to set the pubsub subscription.

PUBSUB_EMULATOR_HOST=localhost:8086 PYTHONPATH=. asyncapi-subscriber \
    --api-module user_events \
    --channels-subscribes 'user-update:user-update-custom-sub=receive_user_update'
Waiting messages...

Publishing Updates

# publish.py

import asyncio

from asyncapi import build_api_auto_spec


api = build_api_auto_spec('specification')
channel_id = 'user-update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)


async def publish() -> None:
    await api.connect()
    await api.publish(channel_id, message)
    await api.disconnect()


asyncio.run(publish())

print(f"Published update for user={message.id}")
python publish.py

Published update for user=fake-user

Receive Updates

Waiting messages...
Received update for user id=fake-user