Pubsub python client sample

Publisher.py

#!/usr/bin/env python

import argparse
import datetime
import json
import time
import os
from google.api_core.exceptions import InvalidArgument
from google.cloud import pubsub_v1
project_id = "PROJECT ID"

def get_callback(api_future, data):
    """Wrap message data in the context of the callback function."""

    def callback(api_future):
        try:
            print("Published message {} now has message ID {}".format(
                data, api_future.result()))
        except Exception:
            print("A problem occurred when publishing {}: {}\n".format(
                data, api_future.exception()))
            raise api_future.exception()
    return callback


def pub(project_id, topic_name):
    """Publishes a message to a Pub/Sub topic."""
    # [START pubsub_quickstart_pub_client]
    # Initialize a Publisher client
    client = pubsub_v1.PublisherClient()
    # [END pubsub_quickstart_pub_client]
    # Create a fully qualified identifier in the form of
    # `projects/{project_id}/topics/{topic_name}`
    topic_path = client.topic_path(project_id, topic_name)

    data = b'test data'
    
    # When you publish a message, the client returns a future.
    try:
        api_future = client.publish(topic_path, data=data)
        api_future.add_done_callback(get_callback(api_future, data))
        time.sleep(0.1)
    except:
        print("Catch exception when sending: ")
    print("Sent done")

    # Keep the main thread from exiting until background message
    # is processed.
    try:
        while api_future.running():
            time.sleep(0.1)
        api_future.result()
    except InvalidArgument as e:
        print(type(e))
        print("Catch exception when running: {}".format(e))

    print("All done")


if __name__ == '__main__':

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/path/to/cred.json"

    topic_name = 'topic'
    pub(project_id, topic_name)
# [END pubsub_quickstart_pub_all]

Subscriber.py

#!/usr/bin/env python

# [START pubsub_quickstart_sub_all]
import argparse
import json
import time
import os

# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_sub_deps]
project_id = "PROJECT ID"

def list_subscriptions_in_topic(project_id, topic_name):
    """Lists all subscriptions for a given topic."""
    # [START pubsub_list_topic_subscriptions]
    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"

    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    for subscription in publisher.list_topic_subscriptions(topic_path):
        print(subscription)
    # [END pubsub_list_topic_subscriptions]

def create_subscription(project_id, topic_name, subscription_name):
    """Create a new pull subscription on the given topic."""
    # [START pubsub_create_pull_subscription]
    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"
    # TODO subscription_name = "Your Pub/Sub subscription name"

    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)
    subscription = subscriber.create_subscription(
        subscription_path, topic_path)

    print('Subscription created: {}'.format(subscription))
    # [END pubsub_create_pull_subscription]

def delete_subscription(project_id, subscription_name):
    """Deletes an existing Pub/Sub topic."""
    # [START pubsub_delete_subscription]
    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    subscriber.delete_subscription(subscription_path)

    print('Subscription deleted: {}'.format(subscription_path))

def sub(project_id, subscription_name):
    client = pubsub_v1.SubscriberClient()
    subscription_path = client.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message {} of message ID {}'.format(
            message, message.message_id))
        print(message.data)
        message.ack()
        print('Acknowledged message of message ID {}\n'.format(
            message.message_id))
        print('Done')

    flow_control = pubsub_v1.types.FlowControl(max_messages=10)

    future = client.subscribe(subscription_path, callback=callback, flow_control=flow_control)
    print('Listening for messages on {}..\n'.format(subscription_path))

    # Keep the main thread from exiting so the subscriber can
    # process messages in the background.
    while True:
        time.sleep(1)


if __name__ == '__main__':
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/path/to/cred.json"
    topic_name = 'topic'
    subscription_name = 'subscription'
    list_subscriptions_in_topic(project_id, topic_name)

    # create_subscription(project_id, topic_name, subscription_name)
    try:
        sub(project_id, subscription_name)
    finally:
        pass #delete_subscription(project_id, subscription_name)
# [END pubsub_quickstart_sub_all]

Leave a Reply

Your email address will not be published. Required fields are marked *