Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
4e12e1d
Changed S3Utils, S3MessageAdapter classes' constructors(adjusted docu…
Apr 13, 2021
38e0299
Merge branch 'master' into 1500-fix-unit-tests
Apr 15, 2021
47d9d33
1500-WebPublisher adjusted some documentation wording and added test_…
Apr 15, 2021
85a9096
1500-Adjusted documentation indentation in WebPublisher
Apr 15, 2021
85ada22
1500-Changed CsbExtractor class constructor(adjusted documentation) t…
Apr 16, 2021
15cfaa3
1500-Added unit tests for WebPublisher. Made sure using autospec=True
Apr 19, 2021
1e629ab
1500-Changed KafkaConsumer class constructors(adjusted documentation)…
Apr 28, 2021
b63bfe8
1500-Adjusted KafkaConsumer create_consumer to not do duplicate code,…
Apr 28, 2021
74e7f6f
1500-Added __init__.py to tests directory so was discoverable/module.
Apr 28, 2021
a07e642
1500-in KafkaConsumer renamed variables so tad more generic. Makes it…
Apr 28, 2021
da934c4
1500-Fixed KafkaConsumerTest (thought intellij refactor of var name w…
Apr 28, 2021
92e12c9
1500-KafkaConsumer consolidated config for deserializer.
Apr 28, 2021
ba17408
1500-Changed KafkaPublisher class constructors(adjusted documentation…
Apr 28, 2021
392788a
1500-Changed KafkaConsumerTest(s) to have vars named exp where it mak…
Apr 29, 2021
3426472
1500-KafkaConsumerTest improved the formatting of dicts, tested ret…
May 4, 2021
47c9dce
1500-Changed/added to KafkaPublisherTest(s).
May 4, 2021
921490a
1500-In KafkaPublisher cleaned up documentation, added method to cons…
May 4, 2021
a337c39
1500-Adjusted csb config variable name from file_identifier_prefix to…
May 4, 2021
5aab6d6
1500-Fixed lack of carriage returnin S3Utils for legibility.
May 4, 2021
5975e1d
1500-Changed SqsConsumer class constructor to take dictionary with ex…
May 7, 2021
204a2bd
1500-Decided to put "connect" back into SqsConsumer. Adjusted input p…
May 7, 2021
47b3e5b
1500-Fixed some bugs in SqsConsumer.
May 7, 2021
b2143ae
1500-Due to changing SqsConsumer class constructor to take dictionary…
May 7, 2021
11f8845
1500-fixed bug in tests/utils of message missing a carriage return. J…
May 11, 2021
9048e53
1500-Added logging to SqsHandlers and log_level method parameter. Adj…
May 11, 2021
5e0d3ba
1500-Added tests to SqsHandlersTest and removed config usage.
May 11, 2021
918c378
1500-Fixed SqsConsumerTest due to parameters into CB changing. skippe…
May 11, 2021
3f39966
1500-Removed unused conf variable.
May 11, 2021
4cffc38
1500-Removed unused var conf from classes.
May 11, 2021
8280a37
1500-Changed mock tests to not load configs but use mock data.
May 11, 2021
c16302e
1500-refactored S3Utils connect to take in type parameter instead of …
May 11, 2021
f8c5bd0
1500-Changed moto dependency to moto[all] because of some issues with…
May 11, 2021
ecfec1e
1500-added tests for different connect types for S3Utils
May 11, 2021
e4c7fb4
1500-Changed class constructors checking extra arguments and logging …
May 11, 2021
f5370ea
1500-Moved unit tests to tests/unit and integration tests to tests/in…
May 11, 2021
c93bab2
1500-Fixed one of the test_S3Utils tests that was commented out. Remo…
May 12, 2021
32a300a
1500-Updated python-client requirements boto3. Seems to be using an o…
May 12, 2021
a3f6e96
1500-Updated python-client requirements botocore to 1.20.71 due to co…
May 12, 2021
8fca7a9
1500-Changed circleci config for python client to try and update boto
May 12, 2021
bd38748
1500-added region_name to S3Utils connect for session. Suspect it was…
May 12, 2021
6a101f8
1500-Changing python-client circleci config to see if need to tell it…
May 12, 2021
927fb7e
1500-updated python-client requirements to install boto
May 12, 2021
12374a0
1500-Changed python-client integration test(s) to use environment var…
May 12, 2021
eb0646d
1500-Removed redundant log_level fields in all the configs. Put into …
May 13, 2021
aa0b9a9
1500-Changed the kafka config in the scripts for collection and granu…
May 14, 2021
3b14757
1500-Changed exception message to first be a string then passed into …
May 18, 2021
ebf71ee
1500-Adjusted exception thrown in S3Utils.connect for invalid type, w…
May 18, 2021
053df05
1500-Fixed log but in SqsConsumer of microseconds process time being …
May 18, 2021
5c66efa
2500-Added SqsHandlers create_upload_handler back with tests. Didn't …
May 18, 2021
d4b2013
1500-Changed references to psi_registry_url to registry_base_url
May 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,16 @@ jobs:
app-dir: ./onestop-python-client
pkg-manager: pip
- run:
name: "Run util tests"
name: "Run unit tests"
command: >
cd onestop-python-client/;
python -m unittest tests/util/*.py
python -m unittest discover -s test/unit
# This is commented out only because the OneStop we have running on cedardevs doesn't have its registry exposed. You can only reach it via sshing to another machine.
# - run:
# name: "Run integration tests"
# command: >
# cd onestop-python-client/;
# python -m unittest discover -s test/integration

orbs:
slack: circleci/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion kubernetes/pyconsumer-pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ data:
headers: UNIQUE_ID,FILE_UUID,LON,LAT,DEPTH,TIME,PLATFORM_NAME,PROVIDER
type: COLLECTION
collection_id: fdb56230-87f4-49f2-ab83-104cfd073177
psi_registry_url: https://cedardevs.org/
registry_base_url: https://cedardevs.org/
access_bucket: https://archive-testing-demo.s3-us-east-2.amazonaws.com
#access_bucket: https://odp-noaa-nesdis-ncei-test.s3-us-west-2.amazonaws.com
file_identifier_prefix: "gov.noaa.ncei.csb:"
Expand Down
3 changes: 2 additions & 1 deletion onestop-python-client/config/aws-util-config-dev.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Example config values for osim client
log_level: INFO

# AWS config values
sqs_url: https://sqs.us-east-2.amazonaws.com/798276211865/cloud-archive-client-sqs
sqs_name: 'foobar'
sqs_max_polls: 2
s3_region: "us-east-2"
s3_bucket: archive-testing-demo
s3_key: 'ABI-L1b-RadF/2019/298/15/OR_ABI-L1b-RadF-M6C15_G16_s20192981500369_e20192981510082_c20192981510166.nc'

#AWS config values for 2nd vault in different region
vault_name: archive-vault-new
Expand Down
3 changes: 1 addition & 2 deletions onestop-python-client/config/credentials-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ registry:
username: rw_user
password: rw_user_pwd



log_level: INFO
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
log_level: INFO
format: csv
headers: UNIQUE_ID,FILE_UUID,LON,LAT,DEPTH,TIME,PLATFORM_NAME,PROVIDER
type: COLLECTION
Expand All @@ -9,7 +8,7 @@ registry_base_url: http://localhost/onestop/api/registry
onestop_base_url: http://localhost/onestop/api/search/search
access_bucket: https://archive-testing-demo.s3-us-east-2.amazonaws.com
#access_bucket: https://odp-noaa-nesdis-ncei-test.s3-us-west-2.amazonaws.com
file_identifier_prefix: "gov.noaa.ncei.csb:"
file_id_prefix: "gov.noaa.ncei.csb:"

prefixMap:
NESDIS/CSB: 'fdb56230-87f4-49f2-ab83-104cfd073177'
Expand Down
228 changes: 111 additions & 117 deletions onestop-python-client/onestop/KafkaConsumer.py
Original file line number Diff line number Diff line change
@@ -1,136 +1,123 @@
import logging
import yaml

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.error import KafkaError
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
from onestop.util.ClientLogger import ClientLogger

class KafkaConsumer:
"""
A class used to consume messages from Kafka

Attributes
----------
conf: yaml file
kafka-publisher-config-dev.yml
logger: Logger object
utilizes python logger library and creates logging for our specific needs
logger.info: Logger object
logging statement that occurs when the class is instantiated
metadata_type: str
type of metadata (COLLECTION or GRANULE)
brokers: str
brokers (kubernetes service)
group_id: str
Client group id string. All clients sharing the same group.id belong to the same group
auto_offset_reset: str
Action to take when there is no initial offset in offset store or the desired offset is out of range (smallest, earliest, beginning, largest, latest, end, error)
schema_registry: str
schema registry (kubernetes service)
security: boolean
defines if security is in place
collection_topic: str
collection topic you want to consume
granule_topic: str
granule topic you want to consume
metadata_type: str
type of metadata (COLLECTION or GRANULE)
brokers: str
brokers (kubernetes service)
group_id: str
Client group id string. All clients sharing the same group.id belong to the same group
auto_offset_reset: str
Action to take when there is no initial offset in offset store or the desired offset is out of range (smallest, earliest, beginning, largest, latest, end, error)
schema_registry: str
schema registry (kubernetes service)
security_enabled: boolean
Whether to use security for the kafka schema registry client.
security_caLoc: str
Kafka schema registry certification authority (CA) file location.
security_keyLoc: str
Kafka schema registry client's private key file location.
security_certLoc: str
Kafka schema registry client's public key file location.
collection_topic_consume: str
collection topic you want to consume
granule_topic_consume: str
granule topic you want to consume
logger: Logger object
utilizes python logger library and creates logging for our specific needs

Methods
-------
get_logger(log_name, create_file)
creates logger file

register_client()
registers to schema registry client based on configs
register_client()
registers to schema registry client based on configs

create_consumer(registry_client)
subscribes to topic defined in configs and creates a consumer to deserialize messages from topic
connect()
utilizes register_client() and create_consumer(registry_client) to connect to schema registry and allow for consumption of topics

connect()
utilizes register_client() and create_consumer(registry_client) to connect to schema registry and allow for consumption of topics
create_consumer(registry_client)
subscribes to topic defined in configs and creates a consumer to deserialize messages from topic

consume(metadata_consumer, handler)
asynchronously polls for messages in the connected topic, results vary depending on the handler function that is passed into it
consume(metadata_consumer, handler)
asynchronously polls for messages in the connected topic, results vary depending on the handler function that is passed into it
"""
conf = None

def __init__(self, conf_loc):
with open(conf_loc) as f:
self.conf = yaml.load(f, Loader=yaml.FullLoader)

self.logger = self.get_logger(self.__class__.__name__, False)
self.logger.info("Initializing " + self.__class__.__name__)
self.metadata_type = self.conf['metadata_type']
self.brokers = self.conf['brokers']
self.group_id = self.conf['group_id']
self.auto_offset_reset = self.conf['auto_offset_reset']
self.schema_registry = self.conf['schema_registry']
self.security = self.conf['security']['enabled']

self.collection_topic = self.conf['collection_topic_consume']
self.granule_topic = self.conf['granule_topic_consume']

if self.metadata_type not in ['COLLECTION', 'GRANULE']:
raise ValueError("metadata_type must be 'COLLECTION' or 'GRANULE'")

def get_logger(self, log_name, create_file):
def __init__(self, metadata_type, brokers, group_id, auto_offset_reset, schema_registry, security, collection_topic_consume, granule_topic_consume, log_level = 'INFO', **wildargs):
"""
Utilizes python logger library and creates logging

:param log_name: str
name of log to be created
:param create_file: boolean
defines whether of not you want a logger file to be created

:return: Logger object
Attributes
----------
metadata_type: str
type of metadata (COLLECTION or GRANULE)
brokers: str
brokers (kubernetes service)
group_id: str
Client group id string. All clients sharing the same group.id belong to the same group
auto_offset_reset: str
Action to take when there is no initial offset in offset store or the desired offset is out of range (smallest, earliest, beginning, largest, latest, end, error)
schema_registry: str
schema registry (kubernetes service) URL
security: dict
enabled boolean: Whether to use security for kafka schema registry client.
caLoc str: Kafka schema registry certification authority (CA) file location.
keyLoc str: Kafka schema registry client's private key file location.
certLoc str: Kafka schema registry client's public key file location.

collection_topic_consume: str
collection topic you want to consume
granule_topic_consume: str
granule topic you want to consume
log_level: str
What log level to use for this class
"""

# create logger
log = logging.getLogger()
self.metadata_type = metadata_type
self.brokers = brokers
self.group_id = group_id
self.auto_offset_reset = auto_offset_reset
self.schema_registry = schema_registry
self.security_enabled = security['enabled']

# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
if self.security_enabled:
self.security_caLoc = security['caLoc']
self.security_keyLoc = security['keyLoc']
self.security_certLoc = security['certLoc']

if self.conf['log_level'] == "DEBUG":
log.setLevel(level=logging.DEBUG)
else:
if self.conf['log_level'] == "INFO":
log.setLevel(level=logging.INFO)
else:
log.setLevel(level=logging.ERROR)
self.collection_topic = collection_topic_consume
self.granule_topic = granule_topic_consume

fh = None
if create_file:
# create file handler for logger.
fh = logging.FileHandler(log_name)
fh.setFormatter(formatter)

# create console handler for logger.
ch = logging.StreamHandler()
ch.setFormatter(formatter)
if self.metadata_type not in ['COLLECTION', 'GRANULE']:
raise ValueError("metadata_type must be 'COLLECTION' or 'GRANULE'")

# add handlers to logger.
if create_file:
log.addHandler(fh)
self.logger = ClientLogger.get_logger(self.__class__.__name__, log_level, False)
self.logger.info("Initializing " + self.__class__.__name__)

log.addHandler(ch)
return log
if wildargs:
self.logger.warning("There were extra constructor arguments: " + str(wildargs))

def register_client(self):
"""
Registers to schema registry client based on configs

:return: SchemaRegistryClient (confluent kafka library)
"""
reg_conf = {'url': self.schema_registry}
"""
conf = {'url': self.schema_registry}

if self.security:
reg_conf['ssl.ca.location'] = self.conf['security']['caLoc']
reg_conf['ssl.key.location'] = self.conf['security']['keyLoc']
reg_conf['ssl.certificate.location'] = self.conf['security']['certLoc']
if self.security_enabled:
conf['ssl.ca.location'] = self.security_caLoc
conf['ssl.key.location'] = self.security_keyLoc
conf['ssl.certificate.location'] = self.security_certLoc

registry_client = SchemaRegistryClient(reg_conf)
self.logger.info("Creating SchemaRegistryClient with configuration:"+str(conf))
registry_client = SchemaRegistryClient(conf)
return registry_client

def connect(self):
Expand All @@ -152,33 +139,38 @@ def create_consumer(self, registry_client):

:return: DeserializingConsumer object
"""
metadata_schema = None
topic = None
if self.metadata_type == "COLLECTION":
metadata_schema = registry_client.get_latest_version(self.collection_topic + '-value').schema.schema_str
topic = self.collection_topic

if self.metadata_type == "GRANULE":
metadata_schema = registry_client.get_latest_version(self.granule_topic + '-value').schema.schema_str
topic = self.granule_topic

metadata_deserializer = AvroDeserializer(metadata_schema, registry_client)

consumer_conf = {'bootstrap.servers': self.brokers}

if self.security:
consumer_conf['security.protocol'] = 'SSL'
consumer_conf['ssl.ca.location'] = self.conf['security']['caLoc']
consumer_conf['ssl.key.location'] = self.conf['security']['keyLoc']
consumer_conf['ssl.certificate.location'] = self.conf['security']['certLoc']
self.logger.debug("topic: "+str(topic))

meta_consumer_conf = consumer_conf
meta_consumer_conf['key.deserializer'] = StringDeserializer('utf-8')
meta_consumer_conf['value.deserializer'] = metadata_deserializer
meta_consumer_conf['group.id'] = self.group_id
meta_consumer_conf['auto.offset.reset'] = self.auto_offset_reset
# This topic naming scheme is how OneStop creates the topics.
latest_schema = registry_client.get_latest_version(topic + '-value')

metadata_consumer = DeserializingConsumer(meta_consumer_conf)
metadata_schema = latest_schema.schema.schema_str
self.logger.debug("metadata_schema: "+metadata_schema)
metadata_deserializer = AvroDeserializer(metadata_schema, registry_client)
conf = {
'bootstrap.servers': self.brokers,
'key.deserializer': StringDeserializer('utf-8'),
'value.deserializer': metadata_deserializer,
'group.id': self.group_id,
'auto.offset.reset': self.auto_offset_reset
}

if self.security_enabled:
conf['security.protocol'] = 'SSL'
conf['ssl.ca.location'] = self.security_caLoc
conf['ssl.key.location'] = self.security_keyLoc
conf['ssl.certificate.location'] = self.security_certLoc

self.logger.debug("conf: "+str(conf))
metadata_consumer = DeserializingConsumer(conf)
self.logger.debug("topic: "+str(topic))
metadata_consumer.subscribe([topic])
return metadata_consumer

Expand All @@ -197,20 +189,22 @@ def consume(self, metadata_consumer, handler):
while True:
try:
msg = metadata_consumer.poll(10)
self.logger.debug("Message received: "+str(msg))

if msg is None:
print('No Messages')
self.logger.info('No Messages')
continue

self.logger.debug("Message key="+str(msg.key())+" value="+str(msg.value()))
key = msg.key()
value = msg.value()


except KafkaError:
raise
try:
handler(key, value)
except Exception as e:
self.logger.error("Message handler failed: {}".format(e))
break
self.logger.debug("Closing metadata_consumer")
metadata_consumer.close()
Loading