Skip to content

[BUG] NPE when schema not exist with useHumanReadableSchemaVersion enabled #842

@RobertIndie

Description

@RobertIndie

Describe the bug

│ pulsar-sink java.lang.NullPointerException: Cannot read the array length because "array" is null                                                                                                                                                  │
│ pulsar-sink     at java.nio.ByteBuffer.wrap(ByteBuffer.java:437) ~[?:?]                                                                                                                                                                           │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.util.MetadataUtil.parseSchemaVersionFromBytes(MetadataUtil.java:169) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                  │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.util.MetadataUtil.extractedMetadata(MetadataUtil.java:88) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                             │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.format.JsonFormat.recordWriterBuf(JsonFormat.java:120) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                                │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:324) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                  │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:272) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:216) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                      │
│ pulsar-sink     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]                                                                                                                                                 │
│ pulsar-sink     at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]                                                                                                                                                                │
│ pulsar-sink     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]                                                                                                          │
│ pulsar-sink     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]                                                                                                                                         │
│ pulsar-sink     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]                                                                                                                                         │
│ pulsar-sink     at java.lang.Thread.run(Thread.java:840) ~[?:?]

To Reproduce
Steps to reproduce the behavior:

  1. Start a cloud storage sink connector with useHumanReadableSchemaVersion and withMetadata enabled
  2. Produce some none schema messages
        Producer<byte[]> producer = client.newProducer()
                .topic("persistent://public/default/test-s3-2")
                .create();

        for (int i = 0; i < 10; i++) {
            String message = "{\"test-message\": \"test-value\"}";
            MessageId msgID = producer.send(message.getBytes());
            System.out.println("Publish " + "my-message-" + i
                    + " and message ID " + msgID);
        }
  1. NPE will be thrown in the connector. The connector won't failed and hang forever.

Expected behavior
Should ignore parsing the no-exist schema.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions