Skip to content

Commit 0fc7f21

Browse files
authored
Cleanup streamlit (#273)
* move producer creation outside producing call * make topic with 1 partition * teardown instructions * rename files to something more classicly pythony * make cv-able * mr sr out of data handler * subscribe rather than assign * rename import * sql formatting * comments * note nyse hrs --------- Co-authored-by: Cerchie <[email protected]>
1 parent ee0ae20 commit 0fc7f21

File tree

3 files changed

+47
-30
lines changed

3 files changed

+47
-30
lines changed

flink-streamlit/README.md

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ In this project you'll produce stock trade events from the [Alpaca API markets](
88

99
<img width="718" alt="graph of the 4 technologies" src="https://github.com/Cerchie/alpaca-kafka-flink-streamlit/assets/54046179/7600d717-69bc-46c5-8679-d8d65b9ce810">
1010

11+
Note: Although it may change in the future, at the time this README was written, the NYSE is open from 9:30 to 4 EST. If you run this application outside of those hours, you may not see data coming through.
12+
1113

1214
## Step 1: Get set up in Confluent Cloud
1315

@@ -150,7 +152,8 @@ In the cell of the new workspace, you can start running SQL statements. Copy and
150152
```sql
151153
CREATE TABLE tumble_interval_SPY
152154
(`symbol` STRING, `window_start` STRING,`window_end` STRING,`price` DOUBLE, PRIMARY KEY (`symbol`) NOT ENFORCED)
153-
WITH ('value.format' = 'json-registry');
155+
DISTRIBUTED BY (symbol) INTO 1 BUCKETS
156+
WITH ('value.format' = 'json-registry');
154157
```
155158
- Click 'Run'.
156159

@@ -187,11 +190,15 @@ Generate a key using the widget you'll find on the right of the screen on the ho
187190

188191
## Step 3: Get started running the app
189192

190-
`git clone https://github.com/Cerchie/finnhub.git && cd finnhub`
193+
```
194+
git clone https://github.com/Cerchie/finnhub.git && cd finnhub
195+
```
191196

192197
then
193198

194-
`pip install -r requirements.txt`
199+
```
200+
pip install -r requirements.txt
201+
```
195202

196203
Now, create a file in the root directory named `.streamlit/secrets.toml` (that initial `.` is part of the convention.)
197204

@@ -210,6 +217,12 @@ Note that the `:` is necessary for `BASIC_AUTH_USER_INFO`.
210217

211218
You'll need a [Streamlit account](https://streamlit.io/) as well for the [secrets to be in the environment](https://docs.streamlit.io/streamlit-community-cloud/deploy-your-app/secrets-management).
212219

213-
Now, run `streamlit run alpacaviz.py` in your root dir in order to run the app.
220+
Now, run
221+
```streamlit run app.py```
222+
in your root dir in order to run the app.
223+
224+
To deploy on Streamlit yourself, follow the [instructions here](https://docs.streamlit.io/streamlit-community-cloud/deploy-your-app) and make sure to [include the secrets](https://docs.streamlit.io/streamlit-community-cloud/deploy-your-app/secrets-management) in your settings.
225+
226+
## Step 4: Teardown in Confluent Cloud
214227

215-
To deploy on Streamlit yourself, follow the [instructions here](https://docs.streamlit.io/streamlit-community-cloud/deploy-your-app) and make sure to [include the secrets](https://docs.streamlit.io/streamlit-community-cloud/deploy-your-app/secrets-management) in your settings.
228+
To avoid wasting resources after following this exercise, you can teardown your environment in Confluent Cloud. To do that, navigate to your environment's page and click 'Delete' at the lower right-hand side. This will delete your environment and its associated resources.

flink-streamlit/alpacaviz.py renamed to flink-streamlit/app.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import asyncio
22
import json
3-
import random
4-
import string
53
import pandas as pd
64
import streamlit as st
75
from confluent_kafka import Consumer, TopicPartition
8-
from setupsocket import on_select
6+
from kafkaproducer import on_select
97
import altair as alt
108

119

@@ -46,10 +44,8 @@ async def display_quotes(component):
4644
window_history = []
4745
topic_name = option
4846

49-
# starting from a specific partition here, it may be different depending on the topic so try a few out or just start from the beginning with the auto.offset.reset config
50-
partition = TopicPartition(f"tumble_interval_{topic_name}", 0, 7)
51-
consumer.assign([partition])
52-
consumer.seek(partition)
47+
topic_name = f"tumble_interval_{option}"
48+
consumer.subscribe(topic_name)
5349

5450
while True:
5551
try:

flink-streamlit/setupsocket.py renamed to flink-streamlit/kafkaproducer.py

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
# set up kafka client
1313
print("Setting up Kafka client")
14+
1415
config_dict = {
1516
"bootstrap.servers": "pkc-921jm.us-east-2.aws.confluent.cloud:9092",
1617
"sasl.mechanisms": "PLAIN",
@@ -20,8 +21,20 @@
2021
"sasl.password": st.secrets["SASL_PASSWORD"],
2122
}
2223

24+
2325
client_config = config_dict
2426

27+
# setting up the producer
28+
producer = Producer(client_config)
29+
30+
srconfig = {
31+
"url": st.secrets["SR_URL"],
32+
"basic.auth.user.info": st.secrets["BASIC_AUTH_USER_INFO"],
33+
}
34+
35+
# setting up the schema registry connection
36+
schema_registry_client = SchemaRegistryClient(srconfig)
37+
2538
# schema for producer matching one in SPY topic in Confluent Cloud
2639
schema_str = """{
2740
"$id": "http://example.com/myURI.schema.json",
@@ -47,13 +60,6 @@
4760
}"""
4861

4962

50-
def delivery_report(err, event):
51-
if err is not None:
52-
print(f'Delivery failed on reading for {event.key().decode("utf8")}: {err}')
53-
else:
54-
print(f"delivered new event from producer")
55-
56-
5763
def serialize_custom_data(custom_data, ctx):
5864
return {
5965
"bid_timestamp": str(custom_data.timestamp),
@@ -62,20 +68,22 @@ def serialize_custom_data(custom_data, ctx):
6268
}
6369

6470

65-
async def quote_data_handler(stockname, data):
66-
# this will run when `wss_client.subscribe_quotes(fn, stockname)` is called
71+
# setting up the JSON serializer
72+
json_serializer = JSONSerializer(
73+
schema_str, schema_registry_client, serialize_custom_data
74+
)
6775

68-
producer = Producer(client_config)
69-
srconfig = {
70-
"url": st.secrets["SR_URL"],
71-
"basic.auth.user.info": st.secrets["BASIC_AUTH_USER_INFO"],
72-
}
7376

74-
schema_registry_client = SchemaRegistryClient(srconfig)
77+
def delivery_report(err, event):
78+
if err is not None:
79+
print(f'Delivery failed on reading for {event.key().decode("utf8")}: {err}')
80+
else:
81+
print(f"delivered new event from producer")
82+
83+
84+
async def quote_data_handler(stockname, data):
85+
# this will run when `wss_client.subscribe_quotes(fn, stockname)` is called
7586

76-
json_serializer = JSONSerializer(
77-
schema_str, schema_registry_client, serialize_custom_data
78-
)
7987
producer.produce(
8088
topic=stockname,
8189
key=stockname,

0 commit comments

Comments
 (0)