From an Exposed HTTP Specification
To reproduce this example you will need a local pubsub emulator running on port 8086. The project repository has a docker-compose file setting up the emulator.
Specification Example
The url
attribute of the Server Object is the Google Cloud Platform project_id
.
asyncapi: 2.0.0
info:
title: User API
version: '1.0.0'
description: API to manage users
servers:
development:
url: asyncapi-local
protocol: gcloud-pubsub
description: Development Broker Server
channels:
user-update:
description: Topic for user updates
publish:
message:
$ref: '#/components/messages/UserUpdate'
components:
messages:
UserUpdate:
name: userUpdate
title: User Update
summary: Inform about users updates
payload:
type: object
required:
- id
properties:
id:
type: string
name:
type: string
age:
type: integer
defaultContentType: application/json
Expose the Specification
Assuming that the above specification has the name api-spec.yaml
:
asyncapi-docs --path api-spec.yaml
Creating subscribers module
# user_events.py
from typing import Any, Awaitable, Callable
async def receive_user_update(
message: Any, ack_func: Callable[[], Awaitable[None]]
) -> None:
print(f"Received update for user id={message.id}")
await ack_func()
Start subscriber to listen events
This specification don't declares subscribers. It is intentional because google pubsub can accept multiple subscribers with differents channel names on the same topic.
We will use the channels-subscribes
argument of the subscriber runner to set the pubsub subscription.
PUBSUB_EMULATOR_HOST=localhost:8086 PYTHONPATH=. asyncapi-subscriber \
--url http://localhost:5000/asyncapi.yaml \
--api-module user_events \
--channels-subscribes 'user-update:user-update-custom-sub=receive_user_update'
Waiting messages...
Publishing Updates
# publish.py
import asyncio
from asyncapi import build_api
api = build_api('http://localhost:5000/asyncapi.yaml')
channel_id = 'user-update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)
async def publish() -> None:
await api.connect()
await api.publish(channel_id, message)
await api.disconnect()
asyncio.run(publish())
print(f"Published update for user={message.id}")
python publish.py
Published update for user=fake-user
Receive Updates
Waiting messages...
Received update for user id=fake-user