Skip to content

Commit 8f12c04

Browse files
committed
Replace System.out with Powertools Logging.
1 parent a5689e9 commit 8f12c04

File tree

1 file changed

+82
-32
lines changed

1 file changed

+82
-32
lines changed

docs/utilities/kafka.md

Lines changed: 82 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -132,24 +132,29 @@ The Kafka utility transforms raw Lambda Kafka events into an intuitive format fo
132132

133133
=== "Avro Messages"
134134

135-
```java hl_lines="13 16"
135+
```java hl_lines="18 21"
136136
package org.example;
137137

138138
import com.amazonaws.services.lambda.runtime.Context;
139139
import com.amazonaws.services.lambda.runtime.RequestHandler;
140140
import org.apache.kafka.clients.consumer.ConsumerRecord;
141141
import org.apache.kafka.clients.consumer.ConsumerRecords;
142+
import org.slf4j.Logger;
143+
import org.slf4j.LoggerFactory;
142144
import software.amazon.lambda.powertools.kafka.Deserialization;
143145
import software.amazon.lambda.powertools.kafka.DeserializationType;
146+
import software.amazon.lambda.powertools.logging.Logging;
144147

145148
public class AvroKafkaHandler implements RequestHandler<ConsumerRecords<String, User>, String> {
149+
private static final Logger LOGGER = LoggerFactory.getLogger(AvroKafkaHandler.class);
146150

147151
@Override
152+
@Logging
148153
@Deserialization(type = DeserializationType.KAFKA_AVRO)
149154
public String handleRequest(ConsumerRecords<String, User> records, Context context) {
150155
for (ConsumerRecord<String, User> record : records) {
151156
User user = record.value(); // User class is auto-generated from Avro schema
152-
System.out.printf("Processing user: %s, age %d%n", user.getName(), user.getAge());
157+
LOGGER.info("Processing user: {}, age {}", user.getName(), user.getAge());
153158
}
154159
return "OK";
155160
}
@@ -158,24 +163,29 @@ The Kafka utility transforms raw Lambda Kafka events into an intuitive format fo
158163

159164
=== "Protocol Buffers"
160165

161-
```java hl_lines="13 16"
166+
```java hl_lines="18 21"
162167
package org.example;
163168

164169
import com.amazonaws.services.lambda.runtime.Context;
165170
import com.amazonaws.services.lambda.runtime.RequestHandler;
166171
import org.apache.kafka.clients.consumer.ConsumerRecord;
167172
import org.apache.kafka.clients.consumer.ConsumerRecords;
173+
import org.slf4j.Logger;
174+
import org.slf4j.LoggerFactory;
168175
import software.amazon.lambda.powertools.kafka.Deserialization;
169176
import software.amazon.lambda.powertools.kafka.DeserializationType;
177+
import software.amazon.lambda.powertools.logging.Logging;
170178

171179
public class ProtobufKafkaHandler implements RequestHandler<ConsumerRecords<String, UserProto.User>, String> {
180+
private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufKafkaHandler.class);
172181

173182
@Override
183+
@Logging
174184
@Deserialization(type = DeserializationType.KAFKA_PROTOBUF)
175185
public String handleRequest(ConsumerRecords<String, UserProto.User> records, Context context) {
176186
for (ConsumerRecord<String, UserProto.User> record : records) {
177187
UserProto.User user = record.value(); // UserProto.User class is auto-generated from Protocol Buffer schema
178-
System.out.printf("Processing user: %s, age %d%n", user.getName(), user.getAge());
188+
LOGGER.info("Processing user: {}, age {}", user.getName(), user.getAge());
179189
}
180190
return "OK";
181191
}
@@ -184,24 +194,29 @@ The Kafka utility transforms raw Lambda Kafka events into an intuitive format fo
184194

185195
=== "JSON Messages"
186196

187-
```java hl_lines="13 16"
197+
```java hl_lines="18 21"
188198
package org.example;
189199

190200
import com.amazonaws.services.lambda.runtime.Context;
191201
import com.amazonaws.services.lambda.runtime.RequestHandler;
192202
import org.apache.kafka.clients.consumer.ConsumerRecord;
193203
import org.apache.kafka.clients.consumer.ConsumerRecords;
204+
import org.slf4j.Logger;
205+
import org.slf4j.LoggerFactory;
194206
import software.amazon.lambda.powertools.kafka.Deserialization;
195207
import software.amazon.lambda.powertools.kafka.DeserializationType;
208+
import software.amazon.lambda.powertools.logging.Logging;
196209

197210
public class JsonKafkaHandler implements RequestHandler<ConsumerRecords<String, User>, String> {
211+
private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaHandler.class);
198212

199213
@Override
214+
@Logging
200215
@Deserialization(type = DeserializationType.KAFKA_JSON)
201216
public String handleRequest(ConsumerRecords<String, User> records, Context context) {
202217
for (ConsumerRecord<String, User> record : records) {
203218
User user = record.value(); // Deserialized JSON object into User POJO
204-
System.out.printf("Processing user: %s, age %d%n", user.getName(), user.getAge());
219+
LOGGER.info("Processing user: {}, age {}", user.getName(), user.getAge());
205220
}
206221
return "OK";
207222
}
@@ -218,28 +233,33 @@ The `@Deserialization` annotation deserializes both keys and values based on you
218233

219234
=== "Key and Value Deserialization"
220235

221-
```java hl_lines="17"
236+
```java hl_lines="22"
222237
package org.example;
223238

224239
import com.amazonaws.services.lambda.runtime.Context;
225240
import com.amazonaws.services.lambda.runtime.RequestHandler;
226241
import org.apache.kafka.clients.consumer.ConsumerRecord;
227242
import org.apache.kafka.clients.consumer.ConsumerRecords;
243+
import org.slf4j.Logger;
244+
import org.slf4j.LoggerFactory;
228245
import software.amazon.lambda.powertools.kafka.Deserialization;
229246
import software.amazon.lambda.powertools.kafka.DeserializationType;
247+
import software.amazon.lambda.powertools.logging.Logging;
230248

231249
public class KeyValueKafkaHandler implements RequestHandler<ConsumerRecords<ProductKey, ProductInfo>, String> {
250+
private static final Logger LOGGER = LoggerFactory.getLogger(KeyValueKafkaHandler.class);
232251

233252
@Override
253+
@Logging
234254
@Deserialization(type = DeserializationType.KAFKA_AVRO)
235255
public String handleRequest(ConsumerRecords<ProductKey, ProductInfo> records, Context context) {
236256
for (ConsumerRecord<ProductKey, ProductInfo> record : records) {
237257
// Access both deserialized components
238258
ProductKey key = record.key(); // ProductKey class is auto-generated from Avro schema
239259
ProductInfo product = record.value(); // ProductInfo class is auto-generated from Avro schema
240260

241-
System.out.printf("Processing product ID: %s%n", key.getProductId());
242-
System.out.printf("Product: %s - $%.2f%n", product.getName(), product.getPrice());
261+
LOGGER.info("Processing product ID: {}", key.getProductId());
262+
LOGGER.info("Product: {} - ${}", product.getName(), product.getPrice());
243263
}
244264
return "OK";
245265
}
@@ -248,31 +268,36 @@ The `@Deserialization` annotation deserializes both keys and values based on you
248268

249269
=== "Value-Only Deserialization"
250270

251-
```java hl_lines="17"
271+
```java hl_lines="22"
252272
package org.example;
253273

254274
import com.amazonaws.services.lambda.runtime.Context;
255275
import com.amazonaws.services.lambda.runtime.RequestHandler;
256276
import org.apache.kafka.clients.consumer.ConsumerRecord;
257277
import org.apache.kafka.clients.consumer.ConsumerRecords;
278+
import org.slf4j.Logger;
279+
import org.slf4j.LoggerFactory;
258280
import software.amazon.lambda.powertools.kafka.Deserialization;
259281
import software.amazon.lambda.powertools.kafka.DeserializationType;
282+
import software.amazon.lambda.powertools.logging.Logging;
260283

261284
public class ValueOnlyKafkaHandler implements RequestHandler<ConsumerRecords<String, Order>, String> {
285+
private static final Logger LOGGER = LoggerFactory.getLogger(ValueOnlyKafkaHandler.class);
262286

263287
@Override
288+
@Logging
264289
@Deserialization(type = DeserializationType.KAFKA_JSON)
265290
public String handleRequest(ConsumerRecords<String, Order> records, Context context) {
266291
for (ConsumerRecord<String, Order> record : records) {
267292
// Key remains as string (if present)
268293
String key = record.key();
269294
if (key != null) {
270-
System.out.printf("Message key: %s%n", key);
295+
LOGGER.info("Message key: {}", key);
271296
}
272297

273298
// Value is deserialized as JSON
274299
Order order = record.value();
275-
System.out.printf("Order #%s - Total: $%.2f%n", order.getOrderId(), order.getTotal());
300+
LOGGER.info("Order #{} - Total: ${}", order.getOrderId(), order.getTotal());
276301
}
277302
return "OK";
278303
}
@@ -289,19 +314,24 @@ When working with primitive data types (strings, integers, etc.) rather than str
289314

290315
=== "Primitive key"
291316

292-
```java hl_lines="17 19"
317+
```java hl_lines="18 22"
293318
package org.example;
294319

295320
import com.amazonaws.services.lambda.runtime.Context;
296321
import com.amazonaws.services.lambda.runtime.RequestHandler;
297322
import org.apache.kafka.clients.consumer.ConsumerRecord;
298323
import org.apache.kafka.clients.consumer.ConsumerRecords;
324+
import org.slf4j.Logger;
325+
import org.slf4j.LoggerFactory;
299326
import software.amazon.lambda.powertools.kafka.Deserialization;
300327
import software.amazon.lambda.powertools.kafka.DeserializationType;
328+
import software.amazon.lambda.powertools.logging.Logging;
301329

302330
public class PrimitiveKeyHandler implements RequestHandler<ConsumerRecords<Integer, Customer>, String> {
331+
private static final Logger LOGGER = LoggerFactory.getLogger(PrimitiveKeyHandler.class);
303332

304333
@Override
334+
@Logging
305335
@Deserialization(type = DeserializationType.KAFKA_JSON)
306336
public String handleRequest(ConsumerRecords<Integer, Customer> records, Context context) {
307337
for (ConsumerRecord<Integer, Customer> record : records) {
@@ -311,9 +341,9 @@ When working with primitive data types (strings, integers, etc.) rather than str
311341
// Value is deserialized as JSON
312342
Customer customer = record.value();
313343

314-
System.out.printf("Key: %d%n", key);
315-
System.out.printf("Name: %s%n", customer.getName());
316-
System.out.printf("Email: %s%n", customer.getEmail());
344+
LOGGER.info("Key: {}", key);
345+
LOGGER.info("Name: {}", customer.getName());
346+
LOGGER.info("Email: {}", customer.getEmail());
317347
}
318348
return "OK";
319349
}
@@ -322,19 +352,24 @@ When working with primitive data types (strings, integers, etc.) rather than str
322352

323353
=== "Primitive key and value"
324354

325-
```java hl_lines="17 20"
355+
```java hl_lines="18 22"
326356
package org.example;
327357

328358
import com.amazonaws.services.lambda.runtime.Context;
329359
import com.amazonaws.services.lambda.runtime.RequestHandler;
330360
import org.apache.kafka.clients.consumer.ConsumerRecord;
331361
import org.apache.kafka.clients.consumer.ConsumerRecords;
362+
import org.slf4j.Logger;
363+
import org.slf4j.LoggerFactory;
332364
import software.amazon.lambda.powertools.kafka.Deserialization;
333365
import software.amazon.lambda.powertools.kafka.DeserializationType;
366+
import software.amazon.lambda.powertools.logging.Logging;
334367

335368
public class PrimitiveHandler implements RequestHandler<ConsumerRecords<String, String>, String> {
369+
private static final Logger LOGGER = LoggerFactory.getLogger(PrimitiveHandler.class);
336370

337371
@Override
372+
@Logging
338373
@Deserialization(type = DeserializationType.KAFKA_JSON)
339374
public String handleRequest(ConsumerRecords<String, String> records, Context context) {
340375
for (ConsumerRecord<String, String> record : records) {
@@ -344,8 +379,8 @@ When working with primitive data types (strings, integers, etc.) rather than str
344379
// Value is automatically deserialized as String
345380
String value = record.value();
346381

347-
System.out.printf("Key: %s%n", key);
348-
System.out.printf("Value: %s%n", value);
382+
LOGGER.info("Key: {}", key);
383+
LOGGER.info("Value: {}", value);
349384
}
350385
return "OK";
351386
}
@@ -404,32 +439,37 @@ Each Kafka record contains important metadata that you can access alongside the
404439
import org.apache.kafka.clients.consumer.ConsumerRecord;
405440
import org.apache.kafka.clients.consumer.ConsumerRecords;
406441
import org.apache.kafka.common.header.Header;
442+
import org.slf4j.Logger;
443+
import org.slf4j.LoggerFactory;
407444
import software.amazon.lambda.powertools.kafka.Deserialization;
408445
import software.amazon.lambda.powertools.kafka.DeserializationType;
446+
import software.amazon.lambda.powertools.logging.Logging;
409447

410448
public class MetadataKafkaHandler implements RequestHandler<ConsumerRecords<String, Customer>, String> {
449+
private static final Logger LOGGER = LoggerFactory.getLogger(MetadataKafkaHandler.class);
411450

412451
@Override
452+
@Logging
413453
@Deserialization(type = DeserializationType.KAFKA_AVRO)
414454
public String handleRequest(ConsumerRecords<String, Customer> records, Context context) {
415455
for (ConsumerRecord<String, Customer> record : records) {
416456
// Log record coordinates for tracing
417-
System.out.printf("Processing message from topic '%s'%n", record.topic());
418-
System.out.printf(" Partition: %d, Offset: %d%n", record.partition(), record.offset());
419-
System.out.printf(" Produced at: %d%n", record.timestamp());
457+
LOGGER.info("Processing message from topic '{}'", record.topic());
458+
LOGGER.info(" Partition: {}, Offset: {}", record.partition(), record.offset());
459+
LOGGER.info(" Produced at: {}", record.timestamp());
420460

421461
// Process message headers
422462
if (record.headers() != null) {
423463
for (Header header : record.headers()) {
424-
System.out.printf(" Header: %s = %s%n",
464+
LOGGER.info(" Header: {} = {}",
425465
header.key(), new String(header.value()));
426466
}
427467
}
428468

429469
// Access the Avro deserialized message content
430470
Customer customer = record.value(); // Customer class is auto-generated from Avro schema
431-
System.out.printf("Processing order for: %s%n", customer.getName());
432-
System.out.printf("Order total: $%.2f%n", customer.getOrderTotal());
471+
LOGGER.info("Processing order for: {}", customer.getName());
472+
LOGGER.info("Order total: ${}", customer.getOrderTotal());
433473
}
434474
return "OK";
435475
}
@@ -477,10 +517,11 @@ Handle errors gracefully when processing Kafka messages to ensure your applicati
477517

478518
public class ErrorHandlingKafkaHandler implements RequestHandler<ConsumerRecords<String, Order>, String> {
479519

480-
private static final Logger logger = LoggerFactory.getLogger(ErrorHandlingKafkaHandler.class);
520+
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorHandlingKafkaHandler.class);
481521
private static final Metrics metrics = MetricsFactory.getMetricsInstance();
482522

483523
@Override
524+
@Logging
484525
@FlushMetrics(namespace = "KafkaProcessing", service = "order-processing")
485526
@Deserialization(type = DeserializationType.KAFKA_AVRO)
486527
public String handleRequest(ConsumerRecords<String, Order> records, Context context) {
@@ -494,10 +535,9 @@ Handle errors gracefully when processing Kafka messages to ensure your applicati
494535
processOrder(order);
495536
successfulRecords++;
496537
metrics.addMetric("ProcessedRecords", 1, MetricUnit.COUNT);
497-
498538
} catch (Exception e) {
499539
failedRecords++;
500-
logger.error("Error processing Kafka message from topic: {}, partition: {}, offset: {}",
540+
LOGGER.error("Error processing Kafka message from topic: {}, partition: {}, offset: {}",
501541
record.topic(), record.partition(), record.offset(), e);
502542
metrics.addMetric("ProcessingErrors", 1, MetricUnit.COUNT);
503543
// Optionally send to DLQ or error topic
@@ -511,7 +551,7 @@ Handle errors gracefully when processing Kafka messages to ensure your applicati
511551

512552
private void processOrder(Order order) {
513553
// Your business logic here
514-
System.out.printf("Processing order: %s%n", order.getOrderId());
554+
LOGGER.info("Processing order: {}", order.getOrderId());
515555
}
516556

517557
private void sendToDlq(ConsumerRecord<String, Order> record) {
@@ -535,14 +575,18 @@ The Idempotency utility automatically stores the result of each successful opera
535575
import com.amazonaws.services.lambda.runtime.RequestHandler;
536576
import org.apache.kafka.clients.consumer.ConsumerRecord;
537577
import org.apache.kafka.clients.consumer.ConsumerRecords;
578+
import org.slf4j.Logger;
579+
import org.slf4j.LoggerFactory;
538580
import software.amazon.lambda.powertools.kafka.Deserialization;
539581
import software.amazon.lambda.powertools.kafka.DeserializationType;
540582
import software.amazon.lambda.powertools.idempotency.Idempotency;
541583
import software.amazon.lambda.powertools.idempotency.IdempotencyConfig;
542584
import software.amazon.lambda.powertools.idempotency.Idempotent;
543585
import software.amazon.lambda.powertools.idempotency.persistence.dynamodb.DynamoDBPersistenceStore;
586+
import software.amazon.lambda.powertools.logging.Logging;
544587

545588
public class IdempotentKafkaHandler implements RequestHandler<ConsumerRecords<String, Payment>, String> {
589+
private static final Logger LOGGER = LoggerFactory.getLogger(IdempotentKafkaHandler.class);
546590

547591
public IdempotentKafkaHandler() {
548592
// Configure idempotency with DynamoDB persistence store
@@ -555,6 +599,7 @@ The Idempotency utility automatically stores the result of each successful opera
555599
}
556600

557601
@Override
602+
@Logging
558603
@Deserialization(type = DeserializationType.KAFKA_JSON)
559604
public String handleRequest(ConsumerRecords<String, Payment> records, Context context) {
560605
for (ConsumerRecord<String, Payment> record : records) {
@@ -569,7 +614,7 @@ The Idempotency utility automatically stores the result of each successful opera
569614

570615
@Idempotent
571616
private void processPayment(Payment payment) {
572-
System.out.printf("Processing payment %s%n", payment.getPaymentId());
617+
LOGGER.info("Processing payment {}", payment.getPaymentId());
573618

574619
// Your business logic here
575620
PaymentService.process(payment.getPaymentId(), payment.getCustomerId(), payment.getAmount());
@@ -625,26 +670,31 @@ When using binary serialization formats across multiple programming languages, e
625670

626671
=== "Using Python naming convention"
627672

628-
```java hl_lines="28 31 34 35 37 38 51"
673+
```java hl_lines="33 36 39 42 56"
629674
package org.example;
630675

631676
import com.amazonaws.services.lambda.runtime.Context;
632677
import com.amazonaws.services.lambda.runtime.RequestHandler;
633678
import org.apache.kafka.clients.consumer.ConsumerRecord;
634679
import org.apache.kafka.clients.consumer.ConsumerRecords;
680+
import org.slf4j.Logger;
681+
import org.slf4j.LoggerFactory;
635682
import software.amazon.lambda.powertools.kafka.Deserialization;
636683
import software.amazon.lambda.powertools.kafka.DeserializationType;
684+
import software.amazon.lambda.powertools.logging.Logging;
637685
import com.fasterxml.jackson.annotation.JsonProperty;
638686
import java.time.Instant;
639687

640688
public class CrossLanguageKafkaHandler implements RequestHandler<ConsumerRecords<String, OrderEvent>, String> {
689+
private static final Logger LOGGER = LoggerFactory.getLogger(CrossLanguageKafkaHandler.class);
641690

642691
@Override
692+
@Logging
643693
@Deserialization(type = DeserializationType.KAFKA_JSON)
644694
public String handleRequest(ConsumerRecords<String, OrderEvent> records, Context context) {
645695
for (ConsumerRecord<String, OrderEvent> record : records) {
646696
OrderEvent order = record.value(); // OrderEvent class handles JSON with Python field names
647-
System.out.printf("Processing order %s from %s%n",
697+
LOGGER.info("Processing order {} from {}",
648698
order.getOrderId(), order.getOrderDate());
649699
}
650700
return "OK";

0 commit comments

Comments
 (0)