Skip to content

Commit 3e89160

Browse files
authored
Merge pull request #149 from oracle-devrel/fharris-patch-1
oci streaming exercices with Go and Java
2 parents 4e7d79b + 863ee4e commit 3e89160

File tree

5 files changed

+267
-0
lines changed

5 files changed

+267
-0
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package com.oci.stream;
2+
3+
import com.google.common.util.concurrent.Uninterruptibles;
4+
import com.oracle.bmc.ConfigFileReader;
5+
import com.oracle.bmc.auth.AuthenticationDetailsProvider;
6+
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
7+
import com.oracle.bmc.streaming.StreamClient;
8+
import com.oracle.bmc.streaming.model.CreateGroupCursorDetails;
9+
import com.oracle.bmc.streaming.model.Message;
10+
import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest;
11+
import com.oracle.bmc.streaming.requests.GetMessagesRequest;
12+
import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse;
13+
import com.oracle.bmc.streaming.responses.GetMessagesResponse;
14+
15+
import java.util.concurrent.TimeUnit;
16+
17+
import static java.nio.charset.StandardCharsets.UTF_8;
18+
19+
20+
public class Consumer {
21+
public static void main(String[] args) throws Exception {
22+
final String configurationFilePath = "/home/fernando_h/.oci/config";
23+
final String profile = "DEFAULT";
24+
final String ociStreamOcid = "ocid1.stream.oc1.eu-frankfurt-1.amaaaaaaue...";
25+
final String ociMessageEndpoint = "https://cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com";
26+
27+
final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
28+
final AuthenticationDetailsProvider provider =
29+
new ConfigFileAuthenticationDetailsProvider(configFile);
30+
31+
// Streams are assigned a specific endpoint url based on where they are provisioned.
32+
// Create a stream client using the provided message endpoint.
33+
StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
34+
35+
// A cursor can be created as part of a consumer group.
36+
// Committed offsets are managed for the group, and partitions
37+
// are dynamically balanced amongst consumers in the group.
38+
System.out.println("Starting a simple message loop with a group cursor");
39+
String groupCursor =
40+
getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
41+
simpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
42+
43+
}
44+
45+
private static void simpleMessageLoop(
46+
StreamClient streamClient, String streamId, String initialCursor) {
47+
String cursor = initialCursor;
48+
for (int i = 0; i < 10; i++) {
49+
50+
GetMessagesRequest getRequest =
51+
GetMessagesRequest.builder()
52+
.streamId(streamId)
53+
.cursor(cursor)
54+
.limit(25)
55+
.build();
56+
57+
GetMessagesResponse getResponse = streamClient.getMessages(getRequest);
58+
59+
// process the messages
60+
System.out.println(String.format("Read %s messages.", getResponse.getItems().size()));
61+
for (Message message : ((GetMessagesResponse) getResponse).getItems()) {
62+
System.out.println(
63+
String.format(
64+
"%s: %s",
65+
message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8),
66+
new String(message.getValue(), UTF_8)));
67+
}
68+
69+
// getMessages is a throttled method; clients should retrieve sufficiently large message
70+
// batches, as to avoid too many http requests.
71+
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
72+
73+
// use the next-cursor for iteration
74+
cursor = getResponse.getOpcNextCursor();
75+
}
76+
}
77+
78+
private static String getCursorByGroup(
79+
StreamClient streamClient, String streamId, String groupName, String instanceName) {
80+
System.out.println(
81+
String.format(
82+
"Creating a cursor for group %s, instance %s.", groupName, instanceName));
83+
84+
CreateGroupCursorDetails cursorDetails =
85+
CreateGroupCursorDetails.builder()
86+
.groupName(groupName)
87+
.instanceName(instanceName)
88+
.type(CreateGroupCursorDetails.Type.TrimHorizon)
89+
.commitOnGet(true)
90+
.build();
91+
92+
CreateGroupCursorRequest createCursorRequest =
93+
CreateGroupCursorRequest.builder()
94+
.streamId(streamId)
95+
.createGroupCursorDetails(cursorDetails)
96+
.build();
97+
98+
CreateGroupCursorResponse groupCursorResponse =
99+
streamClient.createGroupCursor(createCursorRequest);
100+
return groupCursorResponse.getCursor().getValue();
101+
}
102+
103+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module example/producer
2+
3+
go 1.18
4+
5+
require (
6+
github.com/oracle/oci-go-sdk/v49 v49.2.0 // indirect
7+
github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b // indirect
8+
)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2+
github.com/oracle/oci-go-sdk/v49 v49.2.0 h1:l4PUk81EKdTDD4mDg5wrELpdWFqYeE9KYejfNgtsyUI=
3+
github.com/oracle/oci-go-sdk/v49 v49.2.0/go.mod h1:E8q2DXmXnSozLdXHUFF+o3L2gzcWbiFIPFYOYWdqOfc=
4+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5+
github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b h1:br+bPNZsJWKicw/5rALEo67QHs5weyD5tf8WST+4sJ0=
6+
github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
7+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
8+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
9+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
10+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
11+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
8+
"github.com/oracle/oci-go-sdk/v49/common"
9+
"github.com/oracle/oci-go-sdk/v49/example/helpers"
10+
"github.com/oracle/oci-go-sdk/v49/streaming"
11+
)
12+
13+
const ociMessageEndpoint = "https://cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com"
14+
const ociStreamOcid = "ocid1.stream.oc1.eu-frankfurt-1.amaaaaaauevftmqaikcwu43ouqq6lz2jhfrwevh3yrh2u6q4zpzcss5zvvuq"
15+
const ociConfigFilePath = "/home/fernando_h/.oci/config"
16+
const ociProfileName = "DEFAULT"
17+
18+
func main() {
19+
fmt.Println("Go oci oss sdk example producer")
20+
putMsgInStream(ociMessageEndpoint, ociStreamOcid)
21+
}
22+
23+
func putMsgInStream(streamEndpoint string, streamOcid string) {
24+
fmt.Println("Stream endpoint for put msg api is: " + streamEndpoint)
25+
26+
provider, err := common.ConfigurationProviderFromFileWithProfile(ociConfigFilePath, ociProfileName, "")
27+
helpers.FatalIfError(err)
28+
29+
streamClient, err := streaming.NewStreamClientWithConfigurationProvider(provider, streamEndpoint)
30+
helpers.FatalIfError(err)
31+
32+
// Create a request and dependent object(s).
33+
for i := 0; i < 5; i++ {
34+
putMsgReq := streaming.PutMessagesRequest{StreamId: common.String(streamOcid),
35+
PutMessagesDetails: streaming.PutMessagesDetails{
36+
// we are batching 2 messages for each Put Request
37+
Messages: []streaming.PutMessagesDetailsEntry{
38+
{Key: []byte("key dummy-0-" + strconv.Itoa(i)),
39+
Value: []byte("value dummy-" + strconv.Itoa(i))},
40+
{Key: []byte("key dummy-1-" + strconv.Itoa(i)),
41+
Value: []byte("value dummy-" + strconv.Itoa(i))}}},
42+
}
43+
44+
// Send the request using the service client
45+
putMsgResp, err := streamClient.PutMessages(context.Background(), putMsgReq)
46+
helpers.FatalIfError(err)
47+
48+
// Retrieve value from the response.
49+
fmt.Println(putMsgResp)
50+
}
51+
52+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Start by cloning this repository into your OCI Cloud Shell:
2+
3+
```git
4+
git clone https://github.com/fharris/ocistreaming-drills
5+
cd ocistreaming-drills/
6+
```
7+
8+
# Producing messages to OCI Streaming with Go
9+
10+
11+
*Example from documentation*
12+
https://docs.oracle.com/en-us/iaas/Content/Streaming/Tasks/streaming-quickstart-oci-sdk-for-go.htm
13+
https://github.com/oracle/oci-go-sdk
14+
15+
16+
```shell
17+
vi producer.go
18+
```
19+
20+
Update variables according to your OCI configuration file and OCI Streaming Service streamOCID and Endpoint:
21+
```Go
22+
const ociMessageEndpoint = "https://cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com"
23+
const ociStreamOcid = "ocid1.stream.oc1.eu-frankfurt-1.amaaaaa..."
24+
const ociConfigFilePath = "/home/fernando_h/.oci/config"
25+
const ociProfileName = "DEFAULT"
26+
```
27+
28+
You might need to update dependencies in mod with below command :
29+
```
30+
go get -d github.com/oracle/oci-go-sdk/v49@latest
31+
```
32+
33+
Then run the application:
34+
```
35+
go run .
36+
```
37+
38+
You sould be able to see a bunch of new messages being sent to OCI Streaming.
39+
40+
41+
# Consuming messages from OCI Streaming with Java
42+
43+
*Example from documentation*
44+
https://docs.oracle.com/en-us/iaas/Content/Streaming/Tasks/streaming-quickstart-oci-sdk-for-java.htm#java-sdk-streaming-quickstart
45+
https://github.com/oracle/oci-java-sdk
46+
47+
48+
```shell
49+
mvn -B archetype:generate -DgroupId=com.oci.stream -DartifactId=Consumer -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4
50+
cd Consumer
51+
vi pom.xml
52+
```
53+
54+
Add dependencies to *pom* file:
55+
```xml
56+
<dependencies>
57+
<dependency>
58+
<groupId>com.oracle.oci.sdk</groupId>
59+
<artifactId>oci-java-sdk-common</artifactId>
60+
<version>1.33.2</version>
61+
</dependency>
62+
<dependency>
63+
<groupId>com.oracle.oci.sdk</groupId>
64+
<artifactId>oci-java-sdk-streaming</artifactId>
65+
<version>1.33.2</version>
66+
</dependency>
67+
</dependencies>
68+
```
69+
70+
Copy class Consumer.Java to mvn project location:
71+
```shell
72+
cp ../Consumer.java src/main/java/com/oci/stream/
73+
```
74+
75+
You sould need to update de oci configuration file and the oci stream id and message endpoint:
76+
```java
77+
final String configurationFilePath = "/home/fernando_h/.oci/config";
78+
final String profile = "DEFAULT";
79+
final String ociStreamOcid = "ocid1.stream.oc1.eu-frankfurt-1.amaaaaaaue...";
80+
final String ociMessageEndpoint = "https://cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com";
81+
```
82+
83+
Open Consumer.java and update OCI configuration:
84+
```
85+
vi src/main/java/com/oci/stream/Consumer.java
86+
```
87+
88+
Finally, run the application with maven and see the messages previously produced with the Go producer being processed:
89+
```
90+
mvn install exec:java -Dexec.mainClass=com.oci.stream.Consumer
91+
```
92+
93+
You should be able to see the output with the captured messages

0 commit comments

Comments
 (0)