Leave a Comment
Publisher.py
#!/usr/bin/env python
import argparse
import datetime
import json
import time
import os
from google.api_core.exceptions import InvalidArgument
from google.cloud import pubsub_v1
project_id = "PROJECT ID"
def get_callback(api_future, data):
"""Wrap message data in the context of the callback function."""
def callback(api_future):
try:
print("Published message {} now has message ID {}".format(
data, api_future.result()))
except Exception:
print("A problem occurred when publishing {}: {}\n".format(
data, api_future.exception()))
raise api_future.exception()
return callback
def pub(project_id, topic_name):
"""Publishes a message to a Pub/Sub topic."""
# [START pubsub_quickstart_pub_client]
# Initialize a Publisher client
client = pubsub_v1.PublisherClient()
# [END pubsub_quickstart_pub_client]
# Create a fully qualified identifier in the form of
# `projects/{project_id}/topics/{topic_name}`
topic_path = client.topic_path(project_id, topic_name)
data = b'test data'
# When you publish a message, the client returns a future.
try:
api_future = client.publish(topic_path, data=data)
api_future.add_done_callback(get_callback(api_future, data))
time.sleep(0.1)
except:
print("Catch exception when sending: ")
print("Sent done")
# Keep the main thread from exiting until background message
# is processed.
try:
while api_future.running():
time.sleep(0.1)
api_future.result()
except InvalidArgument as e:
print(type(e))
print("Catch exception when running: {}".format(e))
print("All done")
if __name__ == '__main__':
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/path/to/cred.json"
topic_name = 'topic'
pub(project_id, topic_name)
# [END pubsub_quickstart_pub_all]
Subscriber.py
#!/usr/bin/env python
# [START pubsub_quickstart_sub_all]
import argparse
import json
import time
import os
# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_sub_deps]
project_id = "PROJECT ID"
def list_subscriptions_in_topic(project_id, topic_name):
"""Lists all subscriptions for a given topic."""
# [START pubsub_list_topic_subscriptions]
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
for subscription in publisher.list_topic_subscriptions(topic_path):
print(subscription)
# [END pubsub_list_topic_subscriptions]
def create_subscription(project_id, topic_name, subscription_name):
"""Create a new pull subscription on the given topic."""
# [START pubsub_create_pull_subscription]
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
# TODO subscription_name = "Your Pub/Sub subscription name"
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
subscription = subscriber.create_subscription(
subscription_path, topic_path)
print('Subscription created: {}'.format(subscription))
# [END pubsub_create_pull_subscription]
def delete_subscription(project_id, subscription_name):
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_subscription]
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
subscriber.delete_subscription(subscription_path)
print('Subscription deleted: {}'.format(subscription_path))
def sub(project_id, subscription_name):
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(
project_id, subscription_name)
def callback(message):
print('Received message {} of message ID {}'.format(
message, message.message_id))
print(message.data)
message.ack()
print('Acknowledged message of message ID {}\n'.format(
message.message_id))
print('Done')
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
future = client.subscribe(subscription_path, callback=callback, flow_control=flow_control)
print('Listening for messages on {}..\n'.format(subscription_path))
# Keep the main thread from exiting so the subscriber can
# process messages in the background.
while True:
time.sleep(1)
if __name__ == '__main__':
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/path/to/cred.json"
topic_name = 'topic'
subscription_name = 'subscription'
list_subscriptions_in_topic(project_id, topic_name)
# create_subscription(project_id, topic_name, subscription_name)
try:
sub(project_id, subscription_name)
finally:
pass #delete_subscription(project_id, subscription_name)
# [END pubsub_quickstart_sub_all]