Skip to content

Commit b682713

Browse files
committed
fix bug
1 parent ea0bc8a commit b682713

File tree

5 files changed

+216
-55
lines changed

5 files changed

+216
-55
lines changed

src/PipelineConext.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
1+
import log from 'loglevel';
2+
13
const fs = require('fs');
24
const { promisify } = require('util');
35

46
const readFile = promisify(fs.readFile);
57
const writeFile = promisify(fs.writeFile);
8+
const logger = log.getLogger("meta-encryptor/PipelineContext");
69

710
export class PipelineContext {
811
constructor(options) {
912
this.context = {};
1013
this.options = options || {};
14+
this.runtime = {
15+
rawCommitted: 0,
16+
plainCommitted: 0,
17+
pendingBlocks: [] //[{rawSize, plainSize, remainingPlain}]
18+
};
1119
}
1220

1321
update(key, value) {
@@ -34,6 +42,9 @@ export class PipelineContextInFile extends PipelineContext {
3442
const meta = {};
3543
let offset = 0;
3644

45+
logger.debug("Context before save:", this.context);
46+
logger.debug("PipelineContextInFile::saveContext saving to ", this.filePath);
47+
3748
for (const [key, value] of Object.entries(this.context)) {
3849
if (Buffer.isBuffer(value)) {
3950
binaryChunks.push(value);
@@ -77,6 +88,9 @@ export class PipelineContextInFile extends PipelineContext {
7788
try {
7889
if (!fs.existsSync(this.filePath)) {
7990
this.context = {};
91+
this.runtime.rawCommitted = 0;
92+
this.runtime.plainCommitted = 0;
93+
this.runtime.pendingBlocks = [];
8094
return;
8195
}
8296

@@ -103,8 +117,16 @@ export class PipelineContextInFile extends PipelineContext {
103117
}
104118

105119
await promisify(fs.close)(fd);
120+
121+
const readStart = this.context.readStart || 0;
122+
const writeStart = this.context.writeStart || 0;
123+
this.runtime.rawCommitted = readStart;
124+
this.runtime.plainCommitted = writeStart;
125+
this.runtime.pendingBlocks = [];
106126
} catch (error) {
107127
console.error('PipelineContextInFile::loadContext error:', error.message);
128+
// 当上下文文件损坏或内容不完整时,视为无上下文,避免直接中断流程
129+
this.context = {};
108130
throw error;
109131
}
110132
}

src/Recoverable.js

Lines changed: 99 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ const {Readable, Writable} = require('stream');
44
const {SealedFileStream} = require('./SealedFileStream.js');
55
const {HeaderSize} = require('./limits.js');
66
const fs = require('fs');
7+
import log from 'loglevel';
8+
9+
const logger = log.getLogger("meta-encryptor/Recoverable");
710

811
export class RecoverableReadStream extends Readable {
912
constructor(filePath, context, options) {
@@ -23,6 +26,10 @@ export class RecoverableReadStream extends Readable {
2326
if (this.state === 'remaining') {
2427
this.push(null); // 表示流结束
2528
}
29+
// 读完后显式销毁底层 SealedFileStream,关闭 FileHandle
30+
if (this.inputStream && typeof this.inputStream.destroy === 'function') {
31+
this.inputStream.destroy();
32+
}
2633
});
2734
}
2835

@@ -33,9 +40,10 @@ export class RecoverableReadStream extends Readable {
3340
this.context.context.readStart === undefined ||
3441
Object.keys(this.context.context).length === 0
3542
) {
43+
logger.debug("No readStart in context, start from 0");
3644
return 0;
3745
}
38-
46+
logger.debug("Resuming read from position:", this.context.context['readStart']);
3947
return this.context.context['readStart'];
4048
}
4149
_getDataInContext() {
@@ -46,8 +54,10 @@ export class RecoverableReadStream extends Readable {
4654
this.context.context['data'] === null ||
4755
this.context.context['data'] === undefined
4856
) {
57+
logger.debug("No data in context, returning empty buffer");
4958
return Buffer.alloc(0);
5059
}
60+
logger.debug("Getting data from context, length:", this.context.context['data'].length);
5161
return this.context.context['data'];
5262
}
5363
_read(size) {
@@ -65,6 +75,7 @@ export class RecoverableReadStream extends Readable {
6575
this._read(size);
6676
});
6777
}
78+
logger.debug("Reading header, read so far:", this.headerRead);
6879
break;
6980
case 'contextData':
7081
this.context.context['status'] = 'context';
@@ -81,6 +92,7 @@ export class RecoverableReadStream extends Readable {
8192
this.state = 'remaining';
8293
this._read(size);
8394
}
95+
logger.debug("Reading context data, remaining length:", this.context.context['data'] ? this.context.context['data'].length : 0);
8496
break;
8597
case 'remaining':
8698
this.context.context['status'] = 'file';
@@ -94,8 +106,10 @@ export class RecoverableReadStream extends Readable {
94106
this.context.context['readStart'] = 0;
95107
}
96108
this.context.context['readStart'] += remainingChunk.length;
97-
//this.context.context['data'] = remainingChunk;
98-
//this.context.saveContext();
109+
const prevData = this.context.context['data'] || Buffer.alloc(0);
110+
this.context.context['data'] = Buffer.concat([prevData, remainingChunk]);
111+
logger.debug("Updated readStart in context to:", this.context.context['readStart'], " data length to:", this.context.context['data'].length);
112+
99113
this.push(remainingChunk);
100114
} else {
101115
if (this.inputStream.readableEnded) {
@@ -107,9 +121,18 @@ export class RecoverableReadStream extends Readable {
107121
});
108122
}
109123
}
124+
logger.debug("Reading remaining data from file");
110125
break;
111126
}
112127
}
128+
129+
_destroy(err, callback) {
130+
// 确保在流销毁时关闭底层 SealedFileStream 对应的 FileHandle
131+
if (this.inputStream && typeof this.inputStream.destroy === 'function') {
132+
this.inputStream.destroy();
133+
}
134+
callback(err);
135+
}
113136
}
114137

115138
export class RecoverableWriteStream extends Writable {
@@ -131,14 +154,14 @@ export class RecoverableWriteStream extends Writable {
131154
flags: 'r+',
132155
start: writeStart
133156
};
134-
console.log(`Opening file for resuming write at position: ${writeStart}`);
157+
logger.debug(`Opening file ${filePath} for resuming write at position: ${writeStart}`);
135158
} else {
136159
// 新文件或从头开始 - 也用使用 'r+' 模式,否则会自动截断start后面的内容
137160
streamOptions = {
138161
flags: 'r+',
139162
start: 0
140163
};
141-
console.log(`Creating new file for writing`);
164+
logger.debug(`File is empty. Creating new file ${filePath} for writing`);
142165
}
143166
} else {
144167
// 文件不存在,创建新文件
@@ -150,20 +173,14 @@ export class RecoverableWriteStream extends Writable {
150173
flags: 'r+',
151174
start: 0
152175
};
153-
console.log(`Created new file for writing`);
176+
logger.debug(`File not exist.Created new file ${filePath} for writing`);
154177
}
155178
this.writeStream = new WriteStream(filePath, streamOptions);
156179

157180
this.writeStream.on('error', (err) => {
158181
this.emit('error', err);
159182
});
160183
this.writeStream.on('close', () => {
161-
// fs.truncate(this.filePath, this.context.context['writeStart'], (truncateErr) => {
162-
// if (truncateErr) {
163-
// this.emit('error', truncateErr);
164-
// } else {
165-
// }
166-
// });
167184
});
168185
}
169186

@@ -173,13 +190,14 @@ export class RecoverableWriteStream extends Writable {
173190
this.context.context === undefined ||
174191
Object.keys(this.context.context).length === 0
175192
) {
193+
logger.debug("No writeStart in context, start from 0");
176194
return 0;
177195
}
178196
let writeStart = this.context.context['writeStart'];
179197
if (!Number.isInteger(writeStart)) {
180198
writeStart = 0;
181199
}
182-
200+
logger.debug("Resuming write from position:", writeStart);
183201
return writeStart;
184202
}
185203

@@ -188,13 +206,74 @@ export class RecoverableWriteStream extends Writable {
188206
if (err) {
189207
callback(err);
190208
} else {
191-
//!We only update, don't save context
192-
this.context.context['writeStart'] = this._getWriteStartInContext() + chunk.length;
193-
callback();
209+
210+
//logger.debug("Updated writeStart in context to:", this.context.context['writeStart']);
211+
this._onPlaintextWritten(chunk.length).then(() => {
212+
callback();
213+
}).catch((error) => {
214+
callback(error);
215+
});
194216
}
195217
});
196218
}
197219

220+
_onPlaintextWritten(writtenBytes){
221+
if(!this.context || !this.context.runtime){
222+
return Promise.resolve();
223+
}
224+
225+
let remain = writtenBytes;
226+
const runtime = this.context.runtime;
227+
const blocks = runtime.pendingBlocks || [];
228+
229+
let hasCommittedBlock = false;
230+
let committedRawBytes = 0;
231+
232+
logger.debug("On plaintext written:", writtenBytes, " bytes. Current runtime:", runtime);
233+
while(remain > 0 && blocks.length > 0){
234+
logger.debug("Remaining to commit:", remain, " bytes. Current block:", blocks[0]);
235+
const block = blocks[0];
236+
const canConsume = Math.min(remain, block.remainingPlain);
237+
block.remainingPlain -= canConsume;
238+
remain -= canConsume;
239+
240+
if(block.remainingPlain === 0){
241+
// Block fully committed
242+
runtime.rawCommitted += block.rawSize;
243+
runtime.plainCommitted += block.plainSize;
244+
committedRawBytes += block.rawSize;
245+
blocks.shift();
246+
hasCommittedBlock = true;
247+
}
248+
}
249+
logger.debug("After committing, remaining to commit:", remain, " bytes. Updated runtime:", runtime);
250+
if(!hasCommittedBlock){
251+
logger.debug("No full block committed yet.");
252+
return Promise.resolve();
253+
}
254+
if(this.context.context){
255+
const buf = this.context.context['data'];
256+
if(Buffer.isBuffer(buf) && buf.length > 0 &&
257+
committedRawBytes > 0){
258+
if(committedRawBytes >= buf.length){
259+
// All data committed
260+
this.context.context['data'] = Buffer.alloc(0);
261+
}else{
262+
// Partial data committed
263+
this.context.context['data'] = buf.subarray(committedRawBytes);
264+
}
265+
}
266+
this.context.context['readStart'] = runtime.rawCommitted;
267+
this.context.context['writeStart'] = runtime.plainCommitted;
268+
logger.debug("After writing, updated readStart to:", this.context.context['readStart'],
269+
" writeStart to:", this.context.context['writeStart'],
270+
" data length to:", this.context.context['data'] ? this.context.context['data'].length : 0);
271+
//
272+
this.context.saveContext();
273+
}
274+
return Promise.resolve();
275+
}
276+
198277
_final(callback) {
199278
this.writeStream.on('finish', () => {
200279
const readStart = this.context.context['readStart'] || 0;
@@ -206,16 +285,20 @@ export class RecoverableWriteStream extends Writable {
206285
// 到达文件末尾,执行截断
207286
fs.truncate(this.filePath, writeStart, (truncateErr) => {
208287
if (truncateErr) {
288+
logger.warn("Error truncating file:", truncateErr);
209289
callback(truncateErr);
210290
} else {
291+
logger.debug("File truncated successfully to length:", writeStart);
211292
callback();
212293
}
213294
});
214295
} else {
215296
// 未到达文件末尾,不执行截断
297+
logger.debug("Not truncating file as not at the end. readStart + length:", readStart + length, ", fileSize:", this.fileSize);
216298
callback();
217299
}
218300
});
219301
this.writeStream.end();
302+
logger.debug("Finalizing write stream");
220303
}
221304
}

0 commit comments

Comments
 (0)