Skip to content

GCloud PubSub Server Bindings

The AsyncAPI Specification allows custom properties for the protocols that the server runs. Here we will use custom parameters to control the pubsub EventsHandler.

To reproduce this example you will need a local pubsub emulator running on port 8086. The project repository has a docker-compose file setting up the emulator.

Specification Example

The url attribute of the Server Object is the Google Cloud Platform project_id.

asyncapi: 2.0.0

info:
  title: User API
  version: '1.0.0'
  description: API to manage users

servers:
  development:
    url: asyncapi-local
    protocol: gcloud-pubsub
    description: Development Broker Server

channels:
  user-update:
    description: Topic for user updates
    publish:
      message:
        $ref: '#/components/messages/UserUpdate'

components:
  messages:
    UserUpdate:
      name: userUpdate
      title: User Update
      summary: Inform about users updates
      payload:
        type: object
        required:
          - id
        properties:
          id:
            type: string
          name:
            type: string
          age:
            type: integer

defaultContentType: application/json

Expose the Specification

Assuming that the above specification has the name api-spec.yaml:

asyncapi-docs --path api-spec.yaml

Creating subscribers module

# user_events.py

from typing import Any


async def receive_user_update(message: Any) -> None:
    print(f"Received update for user id={message.id}")

Start subscriber to listen events

For pubsub custom attributes we will use the server-bindings argument of the subscriber runner. The pubsub EventsHandler accept two parameters: consumer_wait_time and consumer_ack_messages. The first one is used to wait if there are no messages to consume. The default value is 1 second.

The second parameter tells the EventsHandler to acknowledge the message or not. The default value is True. When consumer_ack_messages is False, the acknowledge function will be set on subscriber kwargs by the name ack_func.

This specification don't declares subscribers. It is intentional because google pubsub can accept multiple subscribers with differents channel names on the same topic.

We will use the channels-subscribes argument of the subscriber runner to set the pubsub subscription.

PUBSUB_EMULATOR_HOST=localhost:8086 PYTHONPATH=. asyncapi-subscriber \
    --url http://localhost:5000/asyncapi.yaml \
    --api-module user_events \
    --channels-subscribes 'user-update:user-update-custom-sub=receive_user_update' \
    --server-bindings 'gcloud-pubsub:consumer_wait_time=0.1;consumer_ack_messages=1'
Waiting messages...

Publishing Updates

# publish.py

import asyncio

from asyncapi import build_api


api = build_api('http://localhost:5000/asyncapi.yaml')
channel_id = 'user-update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)


async def publish() -> None:
    await api.connect()
    await api.publish(channel_id, message)
    await api.disconnect()


asyncio.run(publish())

print(f"Published update for user={message.id}")
python publish.py

Published update for user=fake-user

Receive Updates

Waiting messages...
Received update for user id=fake-user