Skip to content

arrow go parquet wirting leads to memory leak #506

@darren2013

Description

@darren2013

Describe the bug, including details regarding any error messages, version, and platform.

darren@darrendus-Mac-mini ~ % go tool pprof -text http://10.20.183.250:6060/debug/pprof/heap | head -30
Fetching profile over HTTP from http://10.20.183.250:6060/debug/pprof/heap
Saved profile in /Users/darren/pprof/pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz
File: log_fusion
Build ID: aee9d00c9e5d647228e1f8922ddc80e2115f794f
Type: inuse_space
Time: 2025-09-12 19:04:55 CST
Showing nodes accounting for 14863.66MB, 95.57% of 15552.03MB total
Dropped 241 nodes (cum <= 77.76MB)
flat flat% sum% cum cum%
3141.12MB 20.20% 20.20% 3141.12MB 20.20% github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewColumnMetaData (inline)
2053.06MB 13.20% 33.40% 2053.06MB 13.20% github.com/apache/arrow-go/v18/parquet/metadata.(*ColumnChunkMetaDataBuilder).Finish
2022.75MB 13.01% 46.41% 2022.75MB 13.01% github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewStatistics (inline)
1683.18MB 10.82% 57.23% 6348.41MB 40.82% github.com/apache/arrow-go/v18/parquet/file.(*columnWriter).Close
1630.68MB 10.49% 67.71% 5779.84MB 37.16% github.com/apache/arrow-go/v18/parquet/metadata.(*RowGroupMetaDataBuilder).NextColumnChunk
1389.17MB 8.93% 76.65% 14417.33MB 92.70% github.com/apache/arrow-go/v18/parquet/file.(*rowGroupWriter).NextColumn
871.98MB 5.61% 82.25% 871.98MB 5.61% github.com/apache/arrow-go/v18/arrow/memory.(*GoAllocator).Allocate (partial-inline)
764.03MB 4.91% 87.17% 1008.04MB 6.48% github.com/apache/arrow-go/v18/parquet/metadata.NewColumnChunkMetaDataBuilderWithContents (inline)
344.82MB 2.22% 89.38% 344.82MB 2.22% bufio.NewWriterSize (inline)
244MB 1.57% 90.95% 244MB 1.57% github.com/apache/arrow-go/v18/parquet/schema.ColumnPathFromNode
215.05MB 1.38% 92.33% 269.10MB 1.73% github.com/apache/arrow-go/v18/parquet/pqarrow.writeDenseArrow
128.99MB 0.83% 93.16% 128.99MB 0.83% github.com/apache/arrow-go/v18/parquet/metadata.NewRowGroupMetaDataBuilder (inline)
97.67MB 0.63% 93.79% 97.67MB 0.63% reflect.mapassign_faststr0
89.58MB 0.58% 94.37% 89.58MB 0.58% github.com/apache/arrow-go/v18/internal/hashing.NewHashTable[go.shape.int32] (inline)
37.51MB 0.24% 94.61% 428.80MB 2.76% github.com/apache/arrow-go/v18/parquet/metadata.NewByteArrayStatistics
29.01MB 0.19% 94.80% 824.28MB 5.30% github.com/apache/arrow-go/v18/parquet/file.NewByteArrayColumnChunkWriter
29MB 0.19% 94.98% 179.68MB 1.16% encoding/json.(*decodeState).object
23.50MB 0.15% 95.13% 711.76MB 4.58% github.com/apache/arrow-go/v18/parquet/internal/encoding.byteArrayEncoderTraits.Encoder
16.50MB 0.11% 95.24% 14825.96MB 95.33% github.com/apache/arrow-go/v18/parquet/pqarrow.(*arrowColumnWriter).Write
16MB 0.1% 95.34% 701.82MB 4.51% github.com/apache/arrow-go/v18/parquet/internal/encoding.NewPooledBufferWriter (inline)
14.52MB 0.093% 95.44% 194.20MB 1.25% github.com/darren2013/log_fusion_source/internal/source.FileConsumer
7MB 0.045% 95.48% 351.82MB 2.26% github.com/apache/thrift/lib/go/thrift.NewStreamTransportW (inline)
5MB 0.032% 95.51% 95.58MB 0.61% github.com/apache/arrow-go/v18/internal/hashing.NewBinaryMemoTable

the momry is increasing unitil there is no free memory to use

the witie code as follows
`func (p *ParquetProcessor) flushBatch(entities []model.Event) error {
if len(entities) == 0 {
return nil
}

// 检查Arrow Schema是否已初始化
if p.arrowSchema == nil {
	return fmt.Errorf("arrow schema not initialized")
}

// 检查Arrow写入器是否已初始化
if p.arrowWriter == nil {
	return fmt.Errorf("arrow writer not initialized")
}

// 当前时间
now := time.Now().UnixMilli()

// 优化Arrow Record构建器的使用
// 每次创建新的RecordBuilder以确保内存正确释放
if p.recordBuilder != nil {
	p.recordBuilder.Release()
	p.recordBuilder = nil
}
p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema)

// 按字段填充数据
for i, field := range p.arrowSchema.Fields() {
	fieldName := field.Name

	// timestamp字段特殊处理
	if fieldName == "timestamp" {
		builder := p.recordBuilder.Field(i).(*array.Int64Builder)
		for j := 0; j < len(entities); j++ {
			builder.Append(now)
		}
		continue
	}

	// 根据字段类型填充数据
	switch field.Type.ID() {
	case arrow.STRING:
		builder := p.recordBuilder.Field(i).(*array.StringBuilder)
		for _, entity := range entities {
			val, ok := entity.Values[fieldName]
			if !ok || val == nil {
				builder.AppendNull()
			} else {
				var strVal string
				switch v := val.(type) {
				case string:
					strVal = v
				default:
					strVal = fmt.Sprintf("%v", v)
				}
				builder.Append(strVal)
			}
		}

	case arrow.INT32:
		builder := p.recordBuilder.Field(i).(*array.Int32Builder)
		for _, entity := range entities {
			val, ok := entity.Values[fieldName]
			if !ok || val == nil {
				builder.AppendNull()
			} else {
				var intVal int32
				switch v := val.(type) {
				case int:
					intVal = int32(v)
				case int64:
					intVal = int32(v)
				case float64:
					intVal = int32(v)
				case string:
					var f float64
					if _, err := fmt.Sscanf(v, "%f", &f); err == nil {
						intVal = int32(f)
					}
				}
				builder.Append(intVal)
			}
		}

	case arrow.INT64:
		builder := p.recordBuilder.Field(i).(*array.Int64Builder)
		for _, entity := range entities {
			val, ok := entity.Values[fieldName]
			if !ok || val == nil {
				builder.AppendNull()
			} else {
				var int64Val int64
				switch v := val.(type) {
				case int:
					int64Val = int64(v)
				case int64:
					int64Val = v
				case float64:
					int64Val = int64(v)
				case time.Time:
					int64Val = v.UnixMilli()
				case string:
					var f float64
					if _, err := fmt.Sscanf(v, "%f", &f); err == nil {
						int64Val = int64(f)
					} else {
						// 尝试解析时间字符串
						formats := []string{
							time.RFC3339,
							"2006-01-02 15:04:05",
							"2006/01/02 15:04:05",
							"01/02/2006 15:04:05",
							"02/01/2006 15:04:05",
						}
						for _, format := range formats {
							if t, err := time.Parse(format, v); err == nil {
								int64Val = t.UnixMilli()
								break
							}
						}
					}
				}
				builder.Append(int64Val)
			}
		}

	case arrow.FLOAT64:
		builder := p.recordBuilder.Field(i).(*array.Float64Builder)
		for _, entity := range entities {
			val, ok := entity.Values[fieldName]
			if !ok || val == nil {
				builder.AppendNull()
			} else {
				var floatVal float64
				switch v := val.(type) {
				case float64:
					floatVal = v
				case float32:
					floatVal = float64(v)
				case int:
					floatVal = float64(v)
				case int64:
					floatVal = float64(v)
				case string:
					fmt.Sscanf(v, "%f", &floatVal)
				}
				builder.Append(floatVal)
			}
		}

	case arrow.BOOL:
		builder := p.recordBuilder.Field(i).(*array.BooleanBuilder)
		for _, entity := range entities {
			val, ok := entity.Values[fieldName]
			if !ok || val == nil {
				builder.AppendNull()
			} else {
				var boolVal bool
				switch v := val.(type) {
				case bool:
					boolVal = v
				case string:
					switch v {
					case "true", "True", "TRUE", "1":
						boolVal = true
					case "false", "False", "FALSE", "0":
						boolVal = false
					}
				case int:
					boolVal = v != 0
				case float64:
					boolVal = v != 0
				}
				builder.Append(boolVal)
			}
		}

	default:
		log.Printf("警告: 字段 %s 的类型 %s 不支持", fieldName, field.Type.ID())
	}
}

// 构建Arrow RecordBatch
recordBatch := p.recordBuilder.NewRecordBatch()
defer recordBatch.Release()

// 使用Arrow Go v18最新的方式写入Parquet文件
if err := p.arrowWriter.Write(recordBatch); err != nil {
	return fmt.Errorf("failed to write record batch to parquet: %w", err)
}

// 更新写入计数器和时间
p.writeCount++
p.lastFlushTime = time.Now()
p.currentRowGroupSize += int64(len(entities))
p.totalRecords += int64(len(entities))

// 温和的内存管理策略:记录写入统计信息
if p.writeCount%1000 == 0 {
	log.Printf("ParquetProcessor[%d] 已写入 %d 次,行组大小: %d,总记录: %d",
		p.instanceID, p.writeCount, p.currentRowGroupSize, p.totalRecords)
}

// 内存优化策略:通过控制批处理大小来减少内存压力
if len(entities) > 1000 {
	log.Printf("ParquetProcessor[%d] 警告:批处理大小较大 (%d),可能增加内存压力", p.instanceID, len(entities))
}

// 🔥 核心内存优化:智能批处理级别的C++资源释放
// 每10批数据或每1000条记录后释放C++对象资源,平衡性能和内存
if p.writeCount%10 == 0 || len(entities) >= 1000 {
	if err := p.forceCppMemoryRelease(); err != nil {
		log.Printf("ParquetProcessor[%d] C++资源释放失败: %v", p.instanceID, err)
		// 不返回错误,继续处理后续数据
	}
}

// 层级2: 每10万条记录进行深度内存释放(文件分段)- 降低阈值以防止内存溢出
/*if p.totalRecords > 0 && p.totalRecords%100000 == 0 {
	log.Printf("ParquetProcessor[%d] 总记录数达到 %d,开始深度内存释放", p.instanceID, p.totalRecords)

	if err := p.forceRowGroupFlush(); err != nil {
		log.Printf("ParquetProcessor[%d] 深度内存释放失败: %v", p.instanceID, err)
	} else {
		p.currentRowGroupSize = 0 // 重置行组计数器
		log.Printf("ParquetProcessor[%d] 深度内存释放成功,新文件已创建", p.instanceID)
	}
}*/

// 调用TotalCounter.CountN方法,记录成功写入的记录数
(*p.TotalCounter).CountN(int64(len(entities)))

return nil

}`

Component(s)

Parquet

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type: bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions