File tree Expand file tree Collapse file tree 2 files changed +38
-0
lines changed
Expand file tree Collapse file tree 2 files changed +38
-0
lines changed Original file line number Diff line number Diff line change @@ -45,6 +45,32 @@ const recordProcessor = {
4545 } ) ;
4646 completeCallback ( ) ;
4747 } ,
48+
49+ shutdownRequested ( shutdownRequestedInput : kcl . ShutdownRequestedInput , completeCallback : kcl . Callback ) {
50+ shutdownRequestedInput . checkpointer . checkpoint ( ( _err ?: string ) => {
51+ completeCallback ( ) ;
52+ } ) ;
53+ } ,
4854} ;
4955
5056kcl ( recordProcessor ) . run ( ) ;
57+
58+ const recordProcessorWithoutShutdownRequested : kcl . RecordProcessor = {
59+ initialize ( _initializeInput : kcl . InitializeInput , completeCallback : kcl . Callback ) {
60+ completeCallback ( ) ;
61+ } ,
62+
63+ processRecords ( processRecordsInput : kcl . ProcessRecordsInput , completeCallback : kcl . Callback ) {
64+ completeCallback ( ) ;
65+ } ,
66+
67+ leaseLost ( _leaseLostInput : kcl . LeaseLossInput , completeCallback : kcl . Callback ) {
68+ completeCallback ( ) ;
69+ } ,
70+
71+ shardEnded ( shardEndedInput : kcl . ShardEndedInput , completeCallback : kcl . Callback ) {
72+ completeCallback ( ) ;
73+ } ,
74+ } ;
75+
76+ kcl ( recordProcessorWithoutShutdownRequested ) . run ( ) ;
Original file line number Diff line number Diff line change @@ -58,6 +58,8 @@ declare namespace KCLProcess {
5858
5959 interface ShardEndedInput extends CheckpointInput { } // eslint-disable-line @typescript-eslint/no-empty-interface
6060
61+ interface ShutdownRequestedInput extends CheckpointInput { } // eslint-disable-line @typescript-eslint/no-empty-interface
62+
6163 interface RecordProcessor {
6264 /**
6365 * Called once by the KCL before any calls to processRecords. Any initialization
@@ -110,6 +112,16 @@ declare namespace KCLProcess {
110112 * ended operations are completed.
111113 */
112114 shardEnded ( shardEndedInput : ShardEndedInput , completeCallback : Callback ) : void ;
115+ /**
116+ * Called by the KCL to indicate that this record processor should shut down.
117+ * This is called when the KCL is being shutdown using requestedShutdown.
118+ * Clients should checkpoint at this time if they wish to save their progress.
119+ *
120+ * @param shutdownRequestedInput - Shutdown request information with checkpointer.
121+ * @param completeCallback - The callback must be invoked once shutdown
122+ * requested operations are completed.
123+ */
124+ shutdownRequested ?( shutdownRequestedInput : ShutdownRequestedInput , completeCallback : Callback ) : void ;
113125 }
114126
115127 interface KCLInput {
You can’t perform that action at this time.
0 commit comments