Skip to content

[ISSUE] streamIngestProtoRecords Function Issue - Fatal Error: Found Bad Pointer in Go Heap #12

@SignalXMarkWeidman

Description

@SignalXMarkWeidman

Description
I believe the underlining Go SDK issue in the streamIngestProtoRecords function was not fully resolved in the last version. When data is passed to Rust, the data has to be pinned in place in the Go memory manager or Go may move the data somewhere else. When that happens, the program will crash. I believe this is why I'm experiencing crashes (see debug log).

Once I implemented the streamIngestProtoRecords routine using Go's Runtime Pinner module (as shown below), I was able to sends billions of rows to ZeroBus without issue. I ran many tests without issue.

`

func streamIngestProtoRecords(streamPtr unsafe.Pointer, records [][]byte) (int64, error) {

if len(records) == 0 {
	return -1, nil // Return special value for empty batch
}

// Create arrays of pointers and lengths
recordPtrs := make([]*C.uint8_t, len(records))
recordLens := make([]C.size_t, len(records))

var p runtime.Pinner
defer p.Unpin()

for i, record := range records {
	if len(record) > 0 {
		unsafeRecord := (*C.uint8_t)(unsafe.SliceData(records[i]))
		p.Pin(unsafeRecord)
		recordPtrs[i] = unsafeRecord
		recordLens[i] = C.size_t(len(record))
	}
}

inRecords := (**C.uint8_t)(unsafe.SliceData(recordPtrs))
inLengths := (*C.size_t)(unsafe.SliceData(recordLens))

var cres C.CResult
offset := C.zerobus_stream_ingest_proto_records(
	(*C.CZerobusStream)(streamPtr),
	inRecords,
	inLengths,
	C.size_t(len(records)),
	&cres,
)

if offset == -2 {
	return -1, nil // Empty batch
}
if offset < 0 {
	return -1, ffiResult(cres)
}

return int64(offset), nil
}

`

Reproduction
This issue is very difficult to reproduce in a unit test. You need to put a lot of pressure on the Go Memory Manager to force the issue. We are sending billions of rows to ZeroBus in 200k rows at a time. The streamIngestProtoRecords method is called with a 50k array. The maximum inflight messages is set to 100k. However, I had the same result using 8k for both settings. We are using multiple concurrent connections by using multiple processes.

Debug Logs
After the program is running for 10 to 20 minutes the program will shut down with the following error:

`

runtime: pointer 0xc000bb1f80 to unused region of span span.base()=0xc000bb0000 span.limit=0xc000bb1f80 span.state=1
fatal error: found bad pointer in Go heap (incorrect use of unsafe or cgo?)

runtime stack:
runtime.throw({0x7ff728c71ad9?, 0xc000003500?})
		C:/Users/nobody/sdk/go1.24.0/src/runtime/panic.go:1096 +0x4d fp=0x8b7e1ff8e0 sp=0x8b7e1ff8b0 pc=0x7ff7236fb3ad
runtime.badPointer(0x212e7e33028, 0xc000bb1f80, 0x0, 0x0)
		C:/Users/nobody/sdk/go1.24.0/src/runtime/mbitmap.go:1244 +0x165 fp=0x8b7e1ff930 sp=0x8b7e1ff8e0 pc=0x7ff72369e5a5
runtime.findObject(0x212a10eba50?, 0x8b7e1ff9a0?, 0x7ff723702c4d?)
		C:/Users/nobody/sdk/go1.24.0/src/runtime/mbitmap.go:1296 +0xb0 fp=0x8b7e1ff968 sp=0x8b7e1ff930 pc=0x7ff7236f9b90
runtime.wbBufFlush1(0xc000096f08)
		C:/Users/nobody/sdk/go1.24.0/src/runtime/mwbbuf.go:240 +0xd1 fp=0x8b7e1ff9b0 sp=0x8b7e1ff968 pc=0x7ff7236bf6b1
runtime.wbBufFlush.func1()
		C:/Users/nobody/sdk/go1.24.0/src/runtime/mwbbuf.go:181 +0x1e fp=0x8b7e1ff9c8 sp=0x8b7e1ff9b0 pc=0x7ff7236f603e
runtime.systemstack(0xc00094e000)
		C:/Users/nobody/sdk/go1.24.0/src/runtime/asm_amd64.s:514 +0x49 fp=0x8b7e1ff9d8 sp=0x8b7e1ff9c8 pc=0x7ff7237010a9

goroutine 119 gp=0xc000003500 m=9 mp=0xc000b38008 [running]:
runtime.systemstack_switch()
		C:/Users/nobody/sdk/go1.24.0/src/runtime/asm_amd64.s:479 +0x8 fp=0xc000d25388 sp=0xc000d25378 pc=0x7ff723701048
runtime.wbBufFlush()
		C:/Users/nobody/sdk/go1.24.0/src/runtime/mwbbuf.go:180 +0x45 fp=0xc000d253a0 sp=0xc000d25388 pc=0x7ff7236bf5c5
gcWriteBarrier()
		C:/Users/nobody/sdk/go1.24.0/src/runtime/asm_amd64.s:1800 +0x8d fp=0xc000d25420 sp=0xc000d253a0 pc=0x7ff7237007cd
github.com/databricks/zerobus-sdk-go.streamIngestProtoRecords(0x212a0cee270, {0xc0041a2008, 0x3e8, 0x40?})
		C:/Users/nobody/go/pkg/mod/github.com/databricks/zerobus-sdk-go@v0.2.1/ffi.go:520 +0x26c fp=0xc000d254d8 sp=0xc000d25420 pc=0x7ff726b5ae4c
github.com/databricks/zerobus-sdk-go.(*ZerobusStream).IngestRecordsOffset(0xc003fd90f8, {0xc0027e2008, 0x3e8, 0x40?})
		C:/Users/nobody/go/pkg/mod/github.com/databricks/zerobus-sdk-go@v0.2.1/zerobus.go:474 +0x1d8 fp=0xc000d25508 sp=0xc000d254d8 pc=0x7ff726b57998

`

Other Information

  • OS: Windows
  • Version: 1.24.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions