diff --git a/amazon_kclpy/kcl.py b/amazon_kclpy/kcl.py index e77a2fe..1ba99d3 100644 --- a/amazon_kclpy/kcl.py +++ b/amazon_kclpy/kcl.py @@ -1,5 +1,5 @@ ''' -Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Copyright 2014-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. @@ -12,7 +12,41 @@ express or implied. See the License for the specific language governing permissions and limitations under the License. ''' -import abc, base64, io, json, os, random, sys, time, traceback +import abc +import json +import sys +import traceback + + +class CheckpointError(Exception): + ''' + Error class used for wrapping exception names passed through the input file. + ''' + def __init__(self, value): + ''' + :type value: str + :param value: The name of the exception that was received while checkpointing. For more details see + https://github.com/awslabs/amazon-kinesis-client/tree/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/exceptions + Any of those exceptions' names could be returned by the MultiLangDaemon as a response to a checkpoint action. + ''' + self.value = value + + def __str__(self): + return repr(self.value) + + +class InvalidStateException(CheckpointError): + def __init__(self, value='InvalidStateException'): + super(self, InvalidStateException).__init__(value) + + +class ShutdownException(CheckpointError): + pass + + +class ThrottlingException(CheckpointError): + pass + class _IOHandler(object): ''' @@ -87,22 +121,6 @@ def write_action(self, response): self.write_line(json.dumps(response)) -class CheckpointError(Exception): - ''' - Error class used for wrapping exception names passed through the input file. - ''' - def __init__(self, value): - ''' - :type value: str - :param value: The name of the exception that was received while checkpointing. For more details see - https://github.com/awslabs/amazon-kinesis-client/tree/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/exceptions - Any of those exceptions' names could be returned by the MultiLangDaemon as a response to a checkpoint action. - ''' - self.value = value - - def __str__(self): - return repr(self.value) - class Checkpointer(object): ''' A checkpointer class which allows you to make checkpoint requests. A checkpoint marks a point in a shard @@ -118,6 +136,15 @@ def __init__(self, io_handler): ''' self.io_handler = io_handler + def _raise_exception_by_str(self, error): + exception_classes = { + 'InvalidStateException': InvalidStateException, + 'ShutdownException': ShutdownException, + 'ThrottlingException': ThrottlingException + } + exception_class = exception_classes.get(error, CheckpointError) + raise exception_class(error) + def _get_action(self): ''' Gets the next json message from STDIN @@ -139,12 +166,12 @@ def checkpoint(self, sequenceNumber=None): :type sequenceNumber: str :param sequenceNumber: The sequence number to checkpoint at or None if you want to checkpoint at the farthest record ''' - response = {"action" : "checkpoint", "checkpoint" : sequenceNumber} + response = {"action": "checkpoint", "checkpoint": sequenceNumber} self.io_handler.write_action(response) action = self._get_action() if action.get('action') == 'checkpoint': - if action.get('error') != None: - raise CheckpointError(action.get('error')) + if action.get('error') is not None: + self._raise_exception_by_str(action.get('error')) else: ''' We are in an invalid state. We will raise a checkpoint exception @@ -153,7 +180,8 @@ def checkpoint(self, sequenceNumber=None): exception. Note that the documented guidance is that this exception is NOT retryable so the client code should exit. ''' - raise CheckpointError('InvalidStateException') + raise InvalidStateException() + # RecordProcessor base class class RecordProcessorBase(object): @@ -212,12 +240,14 @@ def shutdown(self, checkpointer, reason): ''' return + class MalformedAction(Exception): ''' Raised when an action given by the MultiLangDaemon doesn't have all the appropriate attributes. ''' pass + class KCLProcess(object): def __init__(self, record_processor, inputfile=sys.stdin, outputfile=sys.stdout, errorfile=sys.stderr): @@ -282,7 +312,7 @@ def _report_done(self, response_for=None): :param response_for: Required parameter; the action that this status message is confirming completed. ''' - self.io_handler.write_action({"action" : "status", "responseFor" : response_for}) + self.io_handler.write_action({"action": "status", "responseFor": response_for}) def _handle_a_line(self, line): ''' @@ -298,7 +328,6 @@ def _handle_a_line(self, line): self._perform_action(action) self._report_done(action.get('action')) - def run(self): ''' Starts this KCL processor's main loop. @@ -314,5 +343,3 @@ def run(self): line = self.io_handler.read_line() if line: self._handle_a_line(line) - - diff --git a/samples/sample_kclpy_app.py b/samples/sample_kclpy_app.py index 0d0970c..fa541b5 100755 --- a/samples/sample_kclpy_app.py +++ b/samples/sample_kclpy_app.py @@ -14,9 +14,12 @@ permissions and limitations under the License. ''' from __future__ import print_function -import sys, time, json, base64 +import sys +import time +import base64 from amazon_kclpy import kcl + class RecordProcessor(kcl.RecordProcessorBase): ''' A RecordProcessor processes a shard in a stream. Its methods will be called with this pattern: @@ -54,28 +57,24 @@ def checkpoint(self, checkpointer, sequence_number=None): try: checkpointer.checkpoint(sequence_number) return - except kcl.CheckpointError as e: - if 'ShutdownException' == e.value: - ''' - A ShutdownException indicates that this record processor should be shutdown. This is due to - some failover event, e.g. another MultiLangDaemon has taken the lease for this shard. - ''' - print('Encountered shutdown execption, skipping checkpoint') + except kcl.ShutdownException: + ''' + A ShutdownException indicates that this record processor should be shutdown. This is due to + some failover event, e.g. another MultiLangDaemon has taken the lease for this shard. + ''' + print('Encountered shutdown execption, skipping checkpoint') + return + except kcl.InvalidStateException: + sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n') + except kcl.ThrottlingException: + if self.CHECKPOINT_RETRIES - 1 == n: + sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n)) return - elif 'ThrottlingException' == e.value: - ''' - A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many - dynamo writes. We will sleep temporarily to let it recover. - ''' - if self.CHECKPOINT_RETRIES - 1 == n: - sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n)) - return - else: - print('Was throttled while checkpointing, will attempt again in {s} seconds'.format(s=self.SLEEP_SECONDS)) - elif 'InvalidStateException' == e.value: - sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n') - else: # Some other error - sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e)) + else: + print('Was throttled while checkpointing, will attempt again in {s} seconds'.format(s=self.SLEEP_SECONDS)) + except kcl.CheckpointError as e: + sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e)) + time.sleep(self.SLEEP_SECONDS) def process_record(self, data, partition_key, sequence_number): @@ -118,7 +117,7 @@ def process_records(self, records, checkpointer): seq = int(seq) key = record.get('partitionKey') self.process_record(data, key, seq) - if self.largest_seq == None or seq > self.largest_seq: + if self.largest_seq is None or seq > self.largest_seq: self.largest_seq = seq # Checkpoints every 60 seconds if time.time() - self.last_checkpoint_time > self.CHECKPOINT_FREQ_SECONDS: @@ -149,7 +148,7 @@ def shutdown(self, checkpointer, reason): # shard id print('Was told to terminate, will attempt to checkpoint.') self.checkpoint(checkpointer, None) - else: # reason == 'ZOMBIE' + else: # reason == 'ZOMBIE' print('Shutting down due to failover. Will not checkpoint.') except: pass