diff --git a/data-platform/open-source-data-platforms/oci-streaming/README.md b/data-platform/open-source-data-platforms/oci-streaming/README.md index 57297a5bb..540d6158b 100644 --- a/data-platform/open-source-data-platforms/oci-streaming/README.md +++ b/data-platform/open-source-data-platforms/oci-streaming/README.md @@ -11,7 +11,6 @@ Reviewed: 04.06.2024 # Team Publications -- [Revival of the Magical Suitcase](https://medium.com/@devpiotrekk/revival-of-the-magical-suitcase-73093af23f29) - [LiveLabs: Deploying OCI Streaming Service](https://apexapps.oracle.com/pls/apex/f?p=133:180:107188281482541::::wid:664) - [LiveLabs: How do I ingest messages into OCI streaming in real-time with Oracle GoldenGate for Big Data?](https://apexapps.oracle.com/pls/apex/r/dbpm/livelabs/run-workshop?p210_wid=3572&session=107188281482541) @@ -20,6 +19,13 @@ Reviewed: 04.06.2024 - [OCI Streaming Documentation](https://docs.oracle.com/en-us/iaas/Content/Streaming/Concepts/streamingoverview.htm) - [Overview of OCI Streaming - YouTube](https://www.youtube.com/watch?v=G8-E_j-uVak) +# Reusable Assets + +- [Revival of the Magical Suitcase](https://medium.com/@devpiotrekk/revival-of-the-magical-suitcase-73093af23f29) + +- [Fake Producer and Consumer for OCI Streaming](https://github.com/oracle-devrel/technology-engineering/tree/main/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer) + +- [Create and run Mosquitto & Node-RED, connecting to OCI Streaming](https://github.com/oracle-devrel/technology-engineering/tree/main/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red) # License diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/LICENSE b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/LICENSE new file mode 100644 index 000000000..62c949c4e --- /dev/null +++ b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/LICENSE @@ -0,0 +1,35 @@ +Copyright (c) 2024 Oracle and/or its affiliates. + +The Universal Permissive License (UPL), Version 1.0 + +Subject to the condition set forth below, permission is hereby granted to any +person obtaining a copy of this software, associated documentation and/or data +(collectively the "Software"), free of charge and under any and all copyright +rights in the Software, and any and all patent rights owned or freely +licensable by each licensor hereunder covering either (i) the unmodified +Software as contributed to or provided by such licensor, or (ii) the Larger +Works (as defined below), to deal in both + +(a) the Software, and +(b) any piece of software and/or hardware listed in the lrgrwrks.txt file if +one is included with the Software (each a "Larger Work" to which the Software +is contributed by such licensors), + +without restriction, including without limitation the rights to copy, create +derivative works of, display, perform, and distribute the Software and make, +use, sell, offer for sale, import, export, have made, and have sold the +Software and the Larger Work(s), and to sublicense the foregoing rights on +either these or other terms. + +This license is subject to the following condition: +The above copyright notice and either this complete permission notice or at +a minimum a reference to the UPL must be included in all copies or +substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/README.md b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/README.md new file mode 100644 index 000000000..7c60223f1 --- /dev/null +++ b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/README.md @@ -0,0 +1,42 @@ +# Example of Producing and Consuming for OCI Streaming + +Reviewed: 22.10.2024 + +1. Create compute instance. Oracle Linux 7. +2. Run the below to install Git, clone the repo, and install several packages + ``` + sudo dnf install git-all -y + git clone https://github.com/bobpeulen/apache_kafka.git + sudo pip3 install kafka-python3 pandas numpy datetime + ``` + +3. Run the producer. + +``` +python apache_kafka/oci_streaming/producer.py \ +-tenancy_name 'oraemeadatamgmt' \ +-region 'eu-frankfurt-1' \ +-user_name 'OracleIdentityCloudService/bob.peulen@oracle.com' \ +-stream_name 'OpenSourceData_stream_1' \ +-stream_pool_ocid 'ocid1.streampool.oc1.eu-frankfurt-1.amaaaaaaeicj2tiacazj6xzvn7rkfdyci6w2io634erapt7ctpxtqxauvocmea' \ +-auth_token 'ADD YOUR TOKEN HERE' +``` + +4. Run the consumer. + ``` + python apache_kafka/oci_streaming/consumer.py \ + -tenancy_name 'oraemeadatamgmt' \ + -region 'eu-frankfurt-1' \ + -user_name 'OracleIdentityCloudService/bob.peulen@oracle.com' \ + -stream_name 'OpenSourceData_stream_1' \ + -stream_pool_ocid 'ocid1.streampool.oc1.eu-frankfurt-1.amaaaaaaeicj2tiacazj6xzvn7rkfdyci6w2io634erapt7ctpxtqxauvocmea' \ + -auth_token 'ADD YOUR TOKEN HERE' + ``` + +# License + +Copyright (c) 2024 Oracle and/or its affiliates. + +Licensed under the Universal Permissive License (UPL), Version 1.0. + +See [LICENSE](https://github.com/oracle-devrel/technology-engineering/blob/main/LICENSE) for more details. \ No newline at end of file diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/files/consumer.py b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/files/consumer.py new file mode 100644 index 000000000..5bd1ef195 --- /dev/null +++ b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/files/consumer.py @@ -0,0 +1,42 @@ +from kafka3 import KafkaConsumer, KafkaProducer +import pandas as pd +from time import sleep +import datetime +import numpy as np +from datetime import datetime, timedelta +import random +import argparse +import uuid + +#get arguments +parser = argparse.ArgumentParser() + +parser.add_argument("-tenancy_name", "--tenancy_name", help="tenancy_name", required=True) +parser.add_argument("-region", "--region", help="region",required=True) +parser.add_argument("-user_name", "--user_name", help="user_name including OracleIdentityCloudService", required=True) +parser.add_argument("-stream_name", "--stream_name", help="stream_name / topic", required=True) +parser.add_argument("-stream_pool_ocid", "--stream_pool_ocid", help="stream_pool_ocid", required=True) +parser.add_argument("-auth_token", "--auth_token", help="auth_token", required=True) + + +args = parser.parse_args() +tenancy_name = args.tenancy_name +region = args.region +user_name = args.user_name +stream_name = args.stream_name +stream_pool_ocid = args.stream_pool_ocid +auth_token = args.auth_token + + +def main(): + consumer = KafkaConsumer(stream_name, bootstrap_servers = f'cell-1.streaming.{region}.oci.oraclecloud.com:9092', + security_protocol = 'SASL_SSL', sasl_mechanism = 'PLAIN', + sasl_plain_username = f'{tenancy_name}/{user_name}/{stream_pool_ocid}', + sasl_plain_password = auth_token) + + for message in consumer: + print(f"{message.key}: {message.value}") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/files/producer.py b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/files/producer.py new file mode 100644 index 000000000..52596fd2d --- /dev/null +++ b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/files/producer.py @@ -0,0 +1,70 @@ +from kafka3 import KafkaConsumer, KafkaProducer +import pandas as pd +from time import sleep +import datetime +import numpy as np +from datetime import datetime, timedelta +import random +import argparse +import uuid + +#get arguments +parser = argparse.ArgumentParser() + +parser.add_argument("-tenancy_name", "--tenancy_name", help="tenancy_name", required=True) +parser.add_argument("-region", "--region", help="region",required=True) +parser.add_argument("-user_name", "--user_name", help="user_name including OracleIdentityCloudService", required=True) +parser.add_argument("-stream_name", "--stream_name", help="stream_name / topic", required=True) +parser.add_argument("-stream_pool_ocid", "--stream_pool_ocid", help="stream_pool_ocid", required=True) +parser.add_argument("-auth_token", "--auth_token", help="auth_token", required=True) + + +args = parser.parse_args() +tenancy_name = args.tenancy_name +region = args.region +user_name = args.user_name +stream_name = args.stream_name +stream_pool_ocid = args.stream_pool_ocid +auth_token = args.auth_token + + +def main(): + ## create connection + producer = KafkaProducer(bootstrap_servers = f'cell-1.streaming.{region}.oci.oraclecloud.com:9092', linger_ms = 50, batch_size = 2, + security_protocol = 'SASL_SSL', sasl_mechanism = 'PLAIN', + value_serializer = lambda v: v.encode('utf-8'), + sasl_plain_username = f'{tenancy_name}/{user_name}/{stream_pool_ocid}', + sasl_plain_password = auth_token) + #create random data + pd_list = [] + + for i in range(500): + + #set randomness, so each goes up and down together + randomness_sun_price = random.uniform(0.95, 1.015) + randomness_wspd = random.uniform(0.90, 0.95) + cust_id = str(uuid.uuid4()) + kwh_producing = round(random.uniform(0.11, 5.55), 2) + kwh_consuming = round(random.uniform(3.55, 19.55), 2) + + ##weather info + temp = randomness_sun_price + wspd = randomness_wspd + tsun = randomness_sun_price *1.22 + energy_price = randomness_sun_price *0.89 + + pd_list.append([cust_id, kwh_producing, kwh_consuming, temp, wspd, tsun, energy_price]) + + df_out = pd.DataFrame(pd_list, columns = ["cust_id", "kwh_producing", "kwh_consuming", "temp", "windspeed","sun_light", "energy_price"]) + + #push random data to stream + for index, row in df_out.iterrows(): + row_json = row.to_json(orient='records', lines=True) + producer.send(stream_name, value=row_json) + producer.flush() + print(f"Producing to stream: {row_json}") + sleep(5) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/files/producer_csv.py b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/files/producer_csv.py new file mode 100644 index 000000000..fd3eeaf24 --- /dev/null +++ b/data-platform/open-source-data-platforms/oci-streaming/code-examples/fake-producer-consumer/files/producer_csv.py @@ -0,0 +1,52 @@ +from kafka3 import KafkaConsumer, KafkaProducer +import pandas as pd +from time import sleep +import datetime +import numpy as np +from datetime import datetime, timedelta +import random +import argparse +import uuid + +#get arguments +parser = argparse.ArgumentParser() + +parser.add_argument("-tenancy_name", "--tenancy_name", help="tenancy_name", required=True) +parser.add_argument("-region", "--region", help="region",required=True) +parser.add_argument("-user_name", "--user_name", help="user_name including OracleIdentityCloudService", required=True) +parser.add_argument("-stream_name", "--stream_name", help="stream_name / topic", required=True) +parser.add_argument("-stream_pool_ocid", "--stream_pool_ocid", help="stream_pool_ocid", required=True) +parser.add_argument("-auth_token", "--auth_token", help="auth_token", required=True) + + +args = parser.parse_args() +tenancy_name = args.tenancy_name +region = args.region +user_name = args.user_name +stream_name = args.stream_name +stream_pool_ocid = args.stream_pool_ocid +auth_token = args.auth_token + + +def main(): + ## create connection + producer = KafkaProducer(bootstrap_servers = f'cell-1.streaming.{region}.oci.oraclecloud.com:9092', linger_ms = 50, batch_size = 2, + security_protocol = 'SASL_SSL', sasl_mechanism = 'PLAIN', + value_serializer = lambda v: v.encode('utf-8'), + sasl_plain_username = f'{tenancy_name}/{user_name}/{stream_pool_ocid}', + sasl_plain_password = auth_token) + + + df_out = pd.read_csv("./NAME_OF_CSV.csv") + + #push random data to stream + for index, row in df_out.iterrows(): + row_json = row.to_json(orient='records', lines=True) + producer.send(stream_name, value=row_json) + producer.flush() + print(f"Producing to stream: {row_json}") + sleep(5) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/LICENSE b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/LICENSE new file mode 100644 index 000000000..62c949c4e --- /dev/null +++ b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/LICENSE @@ -0,0 +1,35 @@ +Copyright (c) 2024 Oracle and/or its affiliates. + +The Universal Permissive License (UPL), Version 1.0 + +Subject to the condition set forth below, permission is hereby granted to any +person obtaining a copy of this software, associated documentation and/or data +(collectively the "Software"), free of charge and under any and all copyright +rights in the Software, and any and all patent rights owned or freely +licensable by each licensor hereunder covering either (i) the unmodified +Software as contributed to or provided by such licensor, or (ii) the Larger +Works (as defined below), to deal in both + +(a) the Software, and +(b) any piece of software and/or hardware listed in the lrgrwrks.txt file if +one is included with the Software (each a "Larger Work" to which the Software +is contributed by such licensors), + +without restriction, including without limitation the rights to copy, create +derivative works of, display, perform, and distribute the Software and make, +use, sell, offer for sale, import, export, have made, and have sold the +Software and the Larger Work(s), and to sublicense the foregoing rights on +either these or other terms. + +This license is subject to the following condition: +The above copyright notice and either this complete permission notice or at +a minimum a reference to the UPL must be included in all copies or +substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/README.md b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/README.md new file mode 100644 index 000000000..d019cc6f3 --- /dev/null +++ b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/README.md @@ -0,0 +1,103 @@ +# Create and run Mosquitto & Node-RED, connecting to OCI Streaming + +The below creates a Mosquitto instance on OCI and adds configuration to handle the incoming KPN IoT platform traffic. KPN needs CA signed certificates and encrypted messages, and username/password auth. + +- Create instance with CentOS 7 image +- Follow these steps to update a file for use of yum. https://dev.to/franzwong/fix-cannot-find-a-valid-baseurl-for-repo-in-centos-1h07 +- Based on the public IP of the compute, create a public DNS. This is needed to create CA certificates. + +- SSH into compute. Install and enable mosquitto: + + ``` + sudo yum -y install epel-release + sudo yum -y install mosquitto + sudo systemctl start mosquitto + sudo systemctl enable mosquitto + ``` + +- Firewall settings: + ``` + sudo firewall-cmd --permanent --add-service=http + sudo firewall-cmd --permanent --add-port=1883/tcp + sudo firewall-cmd --permanent --add-port=8883/tcp + sudo firewall-cmd --reload + ``` + +- Test the Mosquitto broker. Open two terminals and run in the different terminals: + ``` + mosquitto_sub -h localhost -t test_topic + mosquitto_pub -h localhost -t test_topic -m "hello world" + ``` + +- Create a password file. Run the below. In the example, 'bob' is the username. Password will be prompted when you run. + ``` + sudo mosquitto_passwd -c /etc/mosquitto/passwd bob + ``` + +- Create the CA keys. Public IP should be added to public DNS. When prompted for domain, use the full Domain. + ``` + sudo yum -y install certbot + sudo certbot certonly --standalone + ``` + + +- Change the Mosquitto config settings. + ``` + sudo rm /etc/mosquitto/mosquitto.conf + sudo nano /etc/mosquitto/mosquitto.conf + ``` + +- Add the below to the file. Change the domain name to the one you are using (mosquitto-demo.cooldemo.org). + ``` + allow_anonymous false + password_file /etc/mosquitto/passwd + listener 8883 + certfile /etc/letsencrypt/live/mosquitto-demo.cooldemo.org/cert.pem + cafile /etc/letsencrypt/live/mosquitto-demo.cooldemo.org/fullchain.pem + keyfile /etc/letsencrypt/live/mosquitto-demo.cooldemo.org/privkey.pem + ``` + +- Restart + ``` + sudo systemctl daemon-reload + sudo systemctl restart mosquitto + ``` + +- Test with credentials + ``` + mosquitto_sub -h localhost -t kpnthings -u "bob" -P "password" -p 8883 + mosquitto_pub -h localhost -t "kpnthings" -m "hello world" -u "bob" -P "password" -p 8883 + +- Test with credentials and certificate. + ``` + mosquitto_pub -h mosquitto-demo.cooldemo.org -t kpnthings -m "hello again" -p 8883 --cafile /etc/ssl/certs/ca-bundle.crt -u "bob" -P "password" + mosquitto_sub -h mosquitto-demo.cooldemo.org -t kpnthings -p 8883 --cafile /etc/ssl/certs/ca-bundle.crt -u "bob" -P "password" + ``` + + +- Create Cron job to create new certificates every day + ``` + sudo EDITOR=nano crontab -e + 15 3 * * * certbot renew --noninteractive --post-hook "systemctl restart mosquitto" + ``` + + + +# Add a connection to KPN IoT hub. Like screenshot. + ![img_1](./images/1.png) + +# Create OCI Streaming, Kafka Connect Configuration and get streaming credentials to connect, like: + ![img_2](./images/2.png) + +# Set up Node-RED and create flow from Mosquitto to OCI Streaming + ![img_2](./images/3.png) + ![img_2](./images/4.png) + ![img_2](./images/5.png) + +# License + +Copyright (c) 2024 Oracle and/or its affiliates. + +Licensed under the Universal Permissive License (UPL), Version 1.0. + +See [LICENSE](https://github.com/oracle-devrel/technology-engineering/blob/main/LICENSE) for more details. \ No newline at end of file diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/1.png b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/1.png new file mode 100644 index 000000000..c5e47e1c1 Binary files /dev/null and b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/1.png differ diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/2.png b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/2.png new file mode 100644 index 000000000..fb8d7884f Binary files /dev/null and b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/2.png differ diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/3.png b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/3.png new file mode 100644 index 000000000..76693af5f Binary files /dev/null and b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/3.png differ diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/4.png b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/4.png new file mode 100644 index 000000000..e0338db49 Binary files /dev/null and b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/4.png differ diff --git a/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/5.png b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/5.png new file mode 100644 index 000000000..3eb0adf55 Binary files /dev/null and b/data-platform/open-source-data-platforms/oci-streaming/code-examples/mosquitto_node-red/images/5.png differ