Skip to content

GCS Sink Connector not able to write as parquet when published data using golang client for pulsar.. works fine with java client #196

@mdsadimnadeem

Description

@mdsadimnadeem

@freeznet when I am writing data using pulsar golang client .. the below is the program I am using to write on pulsar topic and its able to publish on pulsar and I am also able to consume it .. but the stream native gcs sink connector is not able to write it as parquet on gcs and I am getting the below exception..

Slack Thread on Apache Pulsar Community : https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1640579795494300

Exception Stack Trace : `16:28:54.381 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/sadim-test-json3][/pulsar-poc/cloud-storage-sink] Subscribed to topic on pulsar-poc-broker-1.pulsar-poc-broker.pulsar-poc.svc.cluster.local/10.248.1.83:6650 -- consumer: 43
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Flushing [SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persistent://public/default/sadim-test-json3], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@23bbe054], schema=AUTO_CONSUME({schemaVersion=,schemaType=BYTES}{schemaVersion=org.apache.pulsar.common.protocol.schema.LatestVersion@2dd916ed,schemaType=JSON}), failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$322/0x0000000840688840@14f36ab9, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$321/0x0000000840688440@495fc424), value=[B@50d82707)] buffered records to blob store
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception:
org.apache.avro.SchemaParseException: Cannot parse schema
at org.apache.avro.Schema.parse(Schema.java:1633) ~[java-instance.jar:?]
at org.apache.avro.Schema$Parser.parse(Schema.java:1430) ~[java-instance.jar:?]
at org.apache.avro.Schema$Parser.parse(Schema.java:1418) ~[java-instance.jar:?]
at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertToAvroSchema(AvroRecordUtil.java:103) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.format.ParquetFormat.initSchema(ParquetFormat.java:63) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:239) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
16:28:55.788 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Skip flushing, because the pending flush queue is empty ...

accuknox-mysql-helm-temp/pulsar-test-dec22-test1/public/default/CiliumTelemetryDemo27Dec/2021-12-27`

Golang Client Code : `package main
import (
"context"
"time"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"encoding/json"
"strconv"
)

func main() {
fmt.Println("hello world")
for i := 0; i < 500; i++ {
fmt.Println("sadim"+ strconv.Itoa(i))
writePulsar("ciliumjson","sadim111")
fmt.Println("sadimend"+ strconv.Itoa(i))
}
fmt.Println("hello world 2")
}

type AccuknoxJsonWrapperObject struct {
msg AccuknoxFinalJsonWrapperObject msg:"json"
}

type AccuknoxFinalJsonWrapperObject struct {
inputjson string
}

var (
exampleSchemaDef = "{"type":"record","name":"Example2","namespace":"test2","fields":[{"name":"inputjson","type":"string"}]}"
)

func PublishToPulsarTopic(inputjson string, topicName string) (result string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
fmt.Println("Could not instantiate Pulsar client: %v")
}

defer client.Close()

	JsonWrapperObject2 := AccuknoxFinalJsonWrapperObject{inputjson}
	fmt.Println("sadim1")
	fmt.Println(JsonWrapperObject2)
	JsonWrapperObject := AccuknoxJsonWrapperObject{JsonWrapperObject2}
	fmt.Println("sadim2")
	fmt.Println(JsonWrapperObject)
	
	file, _ := json.Marshal(JsonWrapperObject2)
	fmt.Println("sadim3")
	fmt.Println(file)
	file2, _ := json.Marshal(JsonWrapperObject)
	fmt.Println("sadim4")
	fmt.Println(file2)

properties := make(map[string]string)
properties["pulsar"] = "hello"
fmt.Println("sadim5")
fmt.Println(properties)

jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
Schema: jsonSchemaWithProperties,
})

if err != nil {
fmt.Println(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: (JsonWrapperObject2),
})

defer producer.Close()

if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

return ""

}
type testJSON struct {
InputJson string
}
var (
exampleSchemaDef5 = "{"type":"record","name":"testJSON","namespace":"test2"," +
""fields":[{"name":"InputJson","type":"testJSON"}]}"
)

var (
exSchema = "{"type":"record","schema":"","properties":{"InputJson":"testJSON"}}"
)
func writePulsar(inputjson string, topicName string) (result string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
fmt.Println(err)
}
defer client.Close()

//a2 := testJSON{InputJson: inputjson}
//properties := map[string]testJSON{"InputJson" : a2}
properties2 := make(map[string]string)
properties2["InputJson"] = "testJSON"
jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef5, properties2)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
Schema: jsonSchemaWithProperties,
})
if err != nil {
fmt.Println(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: &testJSON{
InputJson: inputjson,
},
})
if err != nil {
fmt.Println(err)
}
producer.Close()
return ""
}`

Java Working Client which is able to write as parquet on pulsar code: `package accuknox.datalake;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.JSONSchema;

public class App3 {
public static void main(String[] args) throws InterruptedException, IOException {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
/*
* Producer producer = client.newProducer(JSONSchema.of(String.class))
* .topic("persistent://public/default/cilium-telemetry-4") .create();
*/
//Schema.JSON
Producer producer = client.newProducer(JSONSchema.of(AccuknoxJsonObject.class)).topic("CiliumTelemetryDemo27Dec").create();
//Producer<byte[]> producer = client.newProducer().topic("cilium-test1").create();
FileReader fr = null;
BufferedReader br = null;
AccuknoxJsonObject jsonObject = null;
for (int i=0;i<=90;i++) {
try
{
File file = new File("./knoxfeedermockdata/flow.txt");
//File file = new File("./knoxfeedermockdata/cilium_alerts_tableMockData.txt");
//File file = new File("./knoxfeedermockdata/cilium_v3_All.txt");
System.out.println(file.getAbsolutePath());
fr=new FileReader(file);
br=new BufferedReader(fr);
String line;
while((line=br.readLine())!=null)
{
System.out.println(line);
//for(int i = 0 ; i< 1000000; i++)
jsonObject = new AccuknoxJsonObject(line);
producer.send(jsonObject);
}
System.gc();
//this.getRuntime().gc()
fr.close(); //closes the stream and release the resources
}
catch(IOException e)
{
System.err.print(e);
} catch(NullPointerException e2) {
System.err.print(e2);
} finally {
try {
if(br != null)
br.close();
if(fr != null)
fr.close();
} catch(NullPointerException ex) {
System.err.print(ex);
}
}
//Thread.sleep(200);
}
producer.close();
client.close();
}
}
`

cc: @sijie

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions