Skip to content

WIP: impl a streaming writer temporary file storage abstraction #2175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion excelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,24 @@ type charsetTranscoderFn func(charset string, input io.Reader) (rdr io.Reader, e
//
// CultureInfo specifies the country code for applying built-in language number
// format code these effect by the system's local language settings.
//
// TmpDir specifies the temporary directory for creating temporary files, if the
// value is empty, the system default temporary directory will be used.
//
// StreamingTmpFile specifies the temporary file for streaming writing streaming writer temporary file,
// if the value is nil, the system default temporary file will be used to write streaming data.
type Options struct {
MaxCalcIterations uint
Password string
RawCellValue bool
UnzipSizeLimit int64
UnzipXMLSizeLimit int64
TmpDir string
ShortDatePattern string
LongDatePattern string
LongTimePattern string
CultureInfo CultureName
TmpDir string
StreamingTmpFile *TmpFile
}

// OpenFile take the name of a spreadsheet file and returns a populated
Expand Down
10 changes: 7 additions & 3 deletions file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ func TestWriteTo(t *testing.T) {
f, buf := File{Pkg: sync.Map{}}, bytes.Buffer{}
f.Pkg.Store("s", nil)
f.streams = make(map[string]*StreamWriter)
file, _ := os.Open("123")
f.streams["s"] = &StreamWriter{rawData: bufferedWriter{tmp: file}}
_, err := f.WriteTo(bufio.NewWriter(&buf))
file, err := os.Open("123")
assert.Error(t, err)

rawData := newBufferedWriter(f.options.TmpDir, nil)
rawData.tmp = file
f.streams["s"] = &StreamWriter{rawData: rawData}
_, err = f.WriteTo(bufio.NewWriter(&buf))
assert.Nil(t, err)
}
// Test write with temporary file
Expand Down
10 changes: 9 additions & 1 deletion lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,15 @@ func (f *File) readXML(name string) []byte {
return content.([]byte)
}
if content, ok := f.streams[name]; ok {
return content.rawData.buf.Bytes()
rawDataReader, err := content.rawData.Reader()
if err != nil {
return []byte{}
}
rawDataContent, err := io.ReadAll(rawDataReader)
if err != nil {
return []byte{}
}
return rawDataContent
}
return []byte{}
}
Expand Down
51 changes: 42 additions & 9 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type StreamWriter struct {
SheetID int
sheetWritten bool
worksheet *xlsxWorksheet
rawData bufferedWriter
rawData TmpFile
rows int
mergeCellsCount int
mergeCells strings.Builder
Expand Down Expand Up @@ -119,11 +119,17 @@ func (f *File) NewStreamWriter(sheet string) (*StreamWriter, error) {
if sheetID == -1 {
return nil, ErrSheetNotExist{sheet}
}

rawData := TmpFile(newBufferedWriter(f.options.TmpDir, nil))
if f.options.StreamingTmpFile != nil {
rawData = *f.options.StreamingTmpFile
}

sw := &StreamWriter{
file: f,
Sheet: sheet,
SheetID: sheetID,
rawData: bufferedWriter{tmpDir: f.options.TmpDir},
rawData: rawData,
}
var err error
sw.worksheet, err = f.workSheetReader(sheet)
Expand All @@ -138,7 +144,7 @@ func (f *File) NewStreamWriter(sheet string) (*StreamWriter, error) {
f.streams[sheetXMLPath] = sw

_, _ = sw.rawData.WriteString(xml.Header + `<worksheet` + templateNamespaceIDMap)
bulkAppendFields(&sw.rawData, sw.worksheet, 3, 4)
bulkAppendFields(sw.rawData, sw.worksheet, 3, 4)
return sw, err
}

Expand Down Expand Up @@ -429,7 +435,7 @@ func (sw *StreamWriter) SetRow(cell string, values []interface{}, opts ...RowOpt
_, _ = sw.rawData.WriteString(`</row>`)
return err
}
writeCell(&sw.rawData, c)
writeCell(sw.rawData, c)
}
_, _ = sw.rawData.WriteString(`</row>`)
return sw.rawData.Sync()
Expand Down Expand Up @@ -602,7 +608,7 @@ func setCellIntFunc(c *xlsxC, val interface{}) {
}

// writeCell constructs a cell XML and writes it to the buffer.
func writeCell(buf *bufferedWriter, c xlsxC) {
func writeCell(buf TmpFile, c xlsxC) {
_, _ = buf.WriteString(`<c`)
if c.XMLSpace.Value != "" {
_, _ = buf.WriteString(` xml:`)
Expand Down Expand Up @@ -663,7 +669,7 @@ func writeCell(buf *bufferedWriter, c xlsxC) {
// sheetData XML start element to the buffer.
func (sw *StreamWriter) writeSheetData() {
if !sw.sheetWritten {
bulkAppendFields(&sw.rawData, sw.worksheet, 5, 6)
bulkAppendFields(sw.rawData, sw.worksheet, 5, 6)
if sw.worksheet.Cols != nil {
_, _ = sw.rawData.WriteString("<cols>")
for _, col := range sw.worksheet.Cols.Col {
Expand Down Expand Up @@ -695,7 +701,7 @@ func (sw *StreamWriter) writeSheetData() {
func (sw *StreamWriter) Flush() error {
sw.writeSheetData()
_, _ = sw.rawData.WriteString(`</sheetData>`)
bulkAppendFields(&sw.rawData, sw.worksheet, 9, 16)
bulkAppendFields(sw.rawData, sw.worksheet, 9, 16)
mergeCells := strings.Builder{}
if sw.mergeCellsCount > 0 {
_, _ = mergeCells.WriteString(`<mergeCells count="`)
Expand All @@ -705,9 +711,9 @@ func (sw *StreamWriter) Flush() error {
_, _ = mergeCells.WriteString(`</mergeCells>`)
}
_, _ = sw.rawData.WriteString(mergeCells.String())
bulkAppendFields(&sw.rawData, sw.worksheet, 18, 39)
bulkAppendFields(sw.rawData, sw.worksheet, 18, 39)
_, _ = sw.rawData.WriteString(sw.tableParts)
bulkAppendFields(&sw.rawData, sw.worksheet, 41, 41)
bulkAppendFields(sw.rawData, sw.worksheet, 41, 41)
_, _ = sw.rawData.WriteString(`</worksheet>`)
if err := sw.rawData.Flush(); err != nil {
return err
Expand All @@ -733,11 +739,38 @@ func bulkAppendFields(w io.Writer, ws *xlsxWorksheet, from, to int) {
}
}

// TmpFile is an interface for a streaming writer temporary file abstraction, implement it to support
// custom temporary file storage.
type TmpFile interface {
Close() error
Reader() (io.Reader, error)
Sync() error
Write(p []byte) (n int, err error)
WriteString(s string) (n int, err error)
Flush() error
}

// newBufferedWriter create a new bufferedWriter, which will write to a temp
// file if the buffer size exceeds the chunkSize. when chunkSize is nil, the
// default chunkSize which is StreamChunkSize will be used.
func newBufferedWriter(tmpDir string, chunkSize *int) *bufferedWriter {
tarChunkSize := StreamChunkSize
if chunkSize != nil {
tarChunkSize = *chunkSize
}
return &bufferedWriter{
chunkSize: tarChunkSize,
tmpDir: tmpDir,
}
}

// bufferedWriter uses a temp file to store an extended buffer. Writes are
// always made to an in-memory buffer, which will always succeed. The buffer
// is written to the temp file with Sync, which may return an error.
// Therefore, Sync should be periodically called and the error checked.
type bufferedWriter struct {
chunkSize int

tmpDir string
tmp *os.File
buf bytes.Buffer
Expand Down
38 changes: 24 additions & 14 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ func TestStreamWriter(t *testing.T) {
assert.NoError(t, streamWriter.rawData.Close())
assert.Error(t, streamWriter.Flush())

streamWriter.rawData.tmp, err = os.CreateTemp(os.TempDir(), "excelize-")
tmpFile, err := os.CreateTemp(os.TempDir(), "excelize-")
assert.NoError(t, err)
streamWriter.rawData.(*bufferedWriter).tmp = tmpFile

assert.NoError(t, err)
_, err = streamWriter.rawData.Reader()
assert.NoError(t, err)
assert.NoError(t, streamWriter.rawData.tmp.Close())
assert.NoError(t, os.Remove(streamWriter.rawData.tmp.Name()))
assert.NoError(t, streamWriter.rawData.Close())

// Test create stream writer with unsupported charset
file = NewFile()
Expand Down Expand Up @@ -441,38 +443,46 @@ func TestStreamWriterReader(t *testing.T) {
var (
err error
sw = StreamWriter{
rawData: bufferedWriter{},
rawData: newBufferedWriter("", nil),
}
)
sw.rawData.tmp, err = os.CreateTemp(os.TempDir(), "excelize-")

tmpFile, err := os.CreateTemp(os.TempDir(), "excelize-")
assert.NoError(t, err)
assert.NoError(t, sw.rawData.tmp.Close())

rawData := newBufferedWriter("", nil)
rawData.tmp = tmpFile
sw.rawData = rawData

assert.NoError(t, tmpFile.Close())
// Test reader stat a closed temp file
_, err = sw.rawData.Reader()
assert.Error(t, err)
_, err = sw.getRowValues(1, 1, 1)
assert.Error(t, err)
os.Remove(sw.rawData.tmp.Name())
err = os.Remove(tmpFile.Name())
assert.NoError(t, err)

bw := newBufferedWriter("", nil)
sw = StreamWriter{
file: NewFile(),
rawData: bufferedWriter{},
rawData: bw,
}
// Test getRowValues without expected row
sw.rawData.buf.WriteString("<worksheet><row r=\"1\"><c r=\"B1\"></c></row><worksheet/>")
bw.buf.WriteString("<worksheet><row r=\"1\"><c r=\"B1\"></c></row><worksheet/>")
_, err = sw.getRowValues(1, 1, 1)
assert.NoError(t, err)
sw.rawData.buf.Reset()
bw.buf.Reset()
// Test getRowValues with illegal cell reference
sw.rawData.buf.WriteString("<worksheet><row r=\"1\"><c r=\"A\"></c></row><worksheet/>")
bw.buf.WriteString("<worksheet><row r=\"1\"><c r=\"A\"></c></row><worksheet/>")
_, err = sw.getRowValues(1, 1, 1)
assert.Equal(t, newCellNameToCoordinatesError("A", newInvalidCellNameError("A")), err)
sw.rawData.buf.Reset()
bw.buf.Reset()
// Test getRowValues with invalid c element characters
sw.rawData.buf.WriteString("<worksheet><row r=\"1\"><c></row><worksheet/>")
bw.buf.WriteString("<worksheet><row r=\"1\"><c></row><worksheet/>")
_, err = sw.getRowValues(1, 1, 1)
assert.EqualError(t, err, "XML syntax error on line 1: element <c> closed by </row>")
sw.rawData.buf.Reset()
bw.buf.Reset()
}

func TestStreamWriterGetRowElement(t *testing.T) {
Expand Down
Loading