|
| 1 | +import oci |
| 2 | +import sys |
| 3 | +import base64 |
| 4 | +import time |
| 5 | + |
| 6 | +# ========================================================== |
| 7 | +# This file provides an example of basic streaming usage |
| 8 | +# * - List streams |
| 9 | +# * - Get a Stream |
| 10 | +# * - Create a Stream |
| 11 | +# * - Delete a Stream |
| 12 | +# * - Publish to a Stream |
| 13 | +# * - Consume from a stream partition using cursor |
| 14 | +# * - Consume from a stream using a group cursor |
| 15 | +# Documentation : https://docs.cloud.oracle.com/iaas/Content/Streaming/Concepts/streamingoverview.htm |
| 16 | + |
| 17 | +# Usage : python stream_example.py <compartment id> |
| 18 | + |
| 19 | +STREAM_NAME = "SdkExampleStream" |
| 20 | +PARTITIONS = 1 |
| 21 | + |
| 22 | + |
| 23 | +def publish_example_messages(client, stream_id): |
| 24 | + # Build up a PutMessagesDetails and publish some messages to the stream |
| 25 | + message_list = [] |
| 26 | + for i in range(100): |
| 27 | + key = "key" + str(i) |
| 28 | + value = "value" + str(i) |
| 29 | + encoded_key = base64.b64encode(key) |
| 30 | + encoded_value = base64.b64encode(value) |
| 31 | + message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)) |
| 32 | + |
| 33 | + print ("Publishing {} messages to the stream {} ".format(len(message_list), stream_id)) |
| 34 | + messages = oci.streaming.models.PutMessagesDetails(messages=message_list) |
| 35 | + put_message_result = client.put_messages(stream_id, messages) |
| 36 | + |
| 37 | + # The put_message_result can contain some useful metadata for handling failures |
| 38 | + for entry in put_message_result.data.entries: |
| 39 | + if entry.error: |
| 40 | + print ("Error ({}) : {}".format(entry.error, entry.error_message)) |
| 41 | + else: |
| 42 | + print ("Published message to partition {} , offset {}".format(entry.partition, entry.offset)) |
| 43 | + |
| 44 | + |
| 45 | +def get_or_create_stream(client, compartment_id, stream_name, partition, sac_composite): |
| 46 | + |
| 47 | + list_streams = client.list_streams(compartment_id, name=stream_name, |
| 48 | + lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE) |
| 49 | + if list_streams.data: |
| 50 | + # If we find an active stream with the correct name, we'll use it. |
| 51 | + print ("An active stream {} has been found".format(stream_name)) |
| 52 | + sid = list_streams.data[0].id |
| 53 | + return get_stream(sac_composite.client, sid) |
| 54 | + |
| 55 | + print (" No Active stream {} has been found; Creating it now. ".format(stream_name)) |
| 56 | + print (" Creating stream {} with {} partitions.".format(stream_name, partition)) |
| 57 | + |
| 58 | + # Create stream_details object that need to be passed while creating stream. |
| 59 | + stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition, |
| 60 | + compartment_id=compartment, retention_in_hours=24) |
| 61 | + |
| 62 | + # Since stream creation is asynchronous; we need to wait for the stream to become active. |
| 63 | + response = sac_composite.create_stream_and_wait_for_state( |
| 64 | + stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE]) |
| 65 | + return response |
| 66 | + |
| 67 | + |
| 68 | +def get_stream(admin_client, stream_id): |
| 69 | + return admin_client.get_stream(stream_id) |
| 70 | + |
| 71 | + |
| 72 | +def delete_stream(client, stream_id): |
| 73 | + print (" Deleting Stream {}".format(stream_id)) |
| 74 | + # Stream deletion is an asynchronous operation, give it some time to complete. |
| 75 | + client.delete_stream_and_wait_for_state(stream_id, oci.streaming.models.StreamSummary.LIFECYCLE_STATE_DELETED) |
| 76 | + |
| 77 | + |
| 78 | +def get_cursor_by_partition(client, stream_id, partition): |
| 79 | + print("Creating a cursor for partition {}".format(partition)) |
| 80 | + cursor_details = oci.streaming.models.CreateCursorDetails( |
| 81 | + partition=partition, |
| 82 | + type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON) |
| 83 | + response = client.create_cursor(stream_id, cursor_details) |
| 84 | + cursor = response.data.value |
| 85 | + return cursor |
| 86 | + |
| 87 | + |
| 88 | +def simple_message_loop(client, stream_id, initial_cursor): |
| 89 | + cursor = initial_cursor |
| 90 | + while True: |
| 91 | + get_response = client.get_messages(stream_id, cursor, limit=10) |
| 92 | + # No messages to process. return. |
| 93 | + if not get_response.data: |
| 94 | + return |
| 95 | + |
| 96 | + # Process the messages |
| 97 | + print(" Read {} messages".format(len(get_response.data))) |
| 98 | + for message in get_response.data: |
| 99 | + print("{}: {}".format(base64.b64decode(message.key), base64.b64decode(message.value))) |
| 100 | + |
| 101 | + # get_messages is a throttled method; clients should retrieve sufficiently large message |
| 102 | + # batches, as to avoid too many http requests. |
| 103 | + time.sleep(1) |
| 104 | + # use the next-cursor for iteration |
| 105 | + cursor = get_response.headers["opc-next-cursor"] |
| 106 | + |
| 107 | + |
| 108 | +def get_cursor_by_group(sc, sid, group_name, instance_name): |
| 109 | + print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name)) |
| 110 | + cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name, |
| 111 | + type=oci.streaming.models. |
| 112 | + CreateGroupCursorDetails.TYPE_TRIM_HORIZON, |
| 113 | + commit_on_get=True) |
| 114 | + response = sc.create_group_cursor(sid, cursor_details) |
| 115 | + return response.data.value |
| 116 | + |
| 117 | + |
| 118 | +# Load the default configuration |
| 119 | +config = oci.config.from_file() |
| 120 | + |
| 121 | +# Create a StreamAdminClientCompositeOperations for composite operations. |
| 122 | +stream_admin_client = oci.streaming.StreamAdminClient(config) |
| 123 | +stream_admin_client_composite = oci.streaming.StreamAdminClientCompositeOperations(stream_admin_client) |
| 124 | + |
| 125 | +if len(sys.argv) != 2: |
| 126 | + raise RuntimeError('This example expects an ocid for the compartment in which streams should be created.') |
| 127 | + |
| 128 | +compartment = sys.argv[1] |
| 129 | + |
| 130 | +# We will reuse a stream if its already created. |
| 131 | +# This will utilize list_streams() to determine if a stream exists and return it, or create a new one. |
| 132 | +stream = get_or_create_stream(stream_admin_client, compartment, STREAM_NAME, |
| 133 | + PARTITIONS, stream_admin_client_composite).data |
| 134 | + |
| 135 | +print (" Created Stream {} with id : {}".format(stream.name, stream.id)) |
| 136 | + |
| 137 | +# Streams are assigned a specific endpoint url based on where they are provisioned. |
| 138 | +# Create a stream client using the provided message endpoint. |
| 139 | +stream_client = oci.streaming.StreamClient(config, service_endpoint=stream.messages_endpoint) |
| 140 | +s_id = stream.id |
| 141 | + |
| 142 | +# Publish some messages to the stream |
| 143 | +publish_example_messages(stream_client, s_id) |
| 144 | + |
| 145 | +# Use a cursor for getting messages; each get_messages call will return a next-cursor for iteration. |
| 146 | +# There are a couple kinds of cursors. |
| 147 | +# A cursor can be created at a given partition/offset. |
| 148 | +# This gives explicit offset management control to the consumer. |
| 149 | + |
| 150 | +print("Starting a simple message loop with a partition cursor") |
| 151 | +partition_cursor = get_cursor_by_partition(stream_client, s_id, partition="0") |
| 152 | +simple_message_loop(stream_client, s_id, partition_cursor) |
| 153 | + |
| 154 | +# A cursor can be created as part of a consumer group. |
| 155 | +# Committed offsets are managed for the group, and partitions |
| 156 | +# are dynamically balanced amongst consumers in the group. |
| 157 | +group_cursor = get_cursor_by_group(stream_client, s_id, "example-group", "example-instance-1") |
| 158 | +simple_message_loop(stream_client, s_id, group_cursor) |
| 159 | + |
| 160 | +# Cleanup; remember to delete streams which are not in use. |
| 161 | +delete_stream(stream_admin_client_composite, s_id) |
0 commit comments