fs: port SonicBoom module to fs module as Utf8Stream

As a first step to porting portions of the pino structured
logger into the runtime, this commit ports the SonicBoom
module to the fs module as Utf8Stream.

This is a faithful port of the SonicBoom module with some
modern updates, such as converting to a Class and using
Symbol.dispose. The bulk of the implementation is unchanged
from the original.

PR-URL: https://github.com/nodejs/node/pull/58897
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Vinícius Lourenço Claro Cardoso <contact@viniciusl.com.br>
This commit is contained in:
James M Snell 2025-06-29 19:36:38 -07:00 committed by James M Snell
parent c8b1fbebac
commit 5335c101a9
18 changed files with 2888 additions and 0 deletions

View File

@ -7710,6 +7710,186 @@ added:
Type of file system. Type of file system.
### Class: `fs.Utf8Stream`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
An optimized UTF-8 stream writer that allows for flushing all the internal
buffering on demand. It handles `EAGAIN` errors correctly, allowing for
customization, for example, by dropping content if the disk is busy.
#### Event: `'close'`
The `'close'` event is emitted when the stream is fully closed.
#### Event: `'drain'`
The `'drain'` event is emitted when the internal buffer has drained sufficiently
to allow continued writing.
#### Event: `'drop'`
The `'drop'` event is emitted when to maximal length is reached and that data
will not be written. The data that was dropped is passed as the first argument
to the event handle.
#### Event: `'error'`
The `'error'` event is emitted when an error occurs.
#### Event: `'finish'`
The `'finish'` event is emitted when the stream has been ended and all data has
been flushed to the underlying file.
#### Event: `'ready'`
The `'ready'` event is emitted when the stream is ready to accept writes.
#### Event: `'write'`
The `'write'` event is emitted when a write operation has completed. The number
of bytes written is passed as the first argument to the event handler.
#### `new fs.Utf8Stream([options])`
* `options` {Object}
* `append`: {boolean} Appends writes to dest file instead of truncating it.
**Default**: `true`.
* `contentMode`: {string} Which type of data you can send to the write
function, supported values are `'utf8'` or `'buffer'`. **Default**:
`'utf8'`.
* `dest`: {string} A path to a file to be written to (mode controlled by the
append option).
* `fd`: {number} A file descriptor, something that is returned by `fs.open()`
or `fs.openSync()`.
* `fs`: {Object} An object that has the same API as the `fs` module, useful
for mocking, testing, or customizing the behavior of the stream.
* `fsync`: {boolean} Perform a `fs.fsyncSync()` every time a write is
completed.
* `maxLength`: {number} The maximum length of the internal buffer. If a write
operation would cause the buffer to exceed `maxLength`, the data written is
dropped and a drop event is emitted with the dropped data
* `maxWrite`: {number} The maximum number of bytes that can be written;
**Default**: `16384`
* `minLength`: {number} The minimum length of the internal buffer that is
required to be full before flushing.
* `mkdir`: {boolean} Ensure directory for `dest` file exists when true.
**Default**: `false`.
* `mode`: {number|string} Specify the creating file mode (see `fs.open()`).
* `periodicFlush`: {number} Calls flush every `periodicFlush` milliseconds.
* `retryEAGAIN` {Function} A function that will be called when `write()`,
`writeSync()`, or `flushSync()` encounters an `EAGAIN` or `EBUSY` error.
If the return value is `true` the operation will be retried, otherwise it
will bubble the error. The `err` is the error that caused this function to
be called, `writeBufferLen` is the length of the buffer that was written,
and `remainingBufferLen` is the length of the remaining buffer that the
stream did not try to write.
* `err` {any} An error or `null`.
* `writeBufferLen` {number}
* `remainingBufferLen`: {number}
* `sync`: {boolean} Perform writes synchronously.
#### `utf8Stream.append`
* {boolean} Whether the stream is appending to the file or truncating it.
#### `utf8Stream.contentMode`
* {string} The type of data that can be written to the stream. Supported
values are `'utf8'` or `'buffer'`. **Default**: `'utf8'`.
#### `utf8Stream.destroy()`
Close the stream immediately, without flushing the internal buffer.
#### `utf8Stream.end()`
Close the stream gracefully, flushing the internal buffer before closing.
#### `utf8Stream.fd`
* {number} The file descriptor that is being written to.
#### `utf8Stream.file`
* {string} The file that is being written to.
#### `utf8Stream.flush(callback)`
* `callback` {Function}
* `err` {Error|null} An error if the flush failed, otherwise `null`.
Writes the current buffer to the file if a write was not in progress. Do
nothing if `minLength` is zero or if it is already writing.
#### `utf8Stream.flushSync()`
Flushes the buffered data synchronously. This is a costly operation.
#### `utf8Stream.fsync`
* {boolean} Whether the stream is performing a `fs.fsyncSync()` after every
write operation.
#### `utf8Stream.maxLength`
* {number} The maximum length of the internal buffer. If a write
operation would cause the buffer to exceed `maxLength`, the data written is
dropped and a drop event is emitted with the dropped data.
#### `utf8Stream.minLength`
* {number} The minimum length of the internal buffer that is required to be
full before flushing.
#### `utf8Stream.mkdir`
* {boolean} Whether the stream should ensure that the directory for the
`dest` file exists. If `true`, it will create the directory if it does not
exist. **Default**: `false`.
#### `utf8Stream.mode`
* {number|string} The mode of the file that is being written to.
#### `utf8Stream.periodicFlush`
* {number} The number of milliseconds between flushes. If set to `0`, no
periodic flushes will be performed.
#### `utf8Stream.reopen(file)`
* `file`: {string|Buffer|URL} A path to a file to be written to (mode
controlled by the append option).
Reopen the file in place, useful for log rotation.
#### `utf8Stream.sync`
* {boolean} Whether the stream is writing synchronously or asynchronously.
#### `utf8Stream.write(data)`
* `data` {string|Buffer} The data to write.
* Returns {boolean}
When the `options.contentMode` is set to `'utf8'` when the stream is created,
the `data` argument must be a string. If the `contentMode` is set to `'buffer'`,
the `data` argument must be a {Buffer}.
#### `utf8Stream.writing`
* {boolean} Whether the stream is currently writing data to the file.
#### `utf8Stream[Symbol.dispose]()`
Calls `utf8Stream.destroy()`.
### Class: `fs.WriteStream` ### Class: `fs.WriteStream`
<!-- YAML <!-- YAML

View File

@ -165,6 +165,11 @@ let ReadFileContext;
// monkeypatching. // monkeypatching.
let FileReadStream; let FileReadStream;
let FileWriteStream; let FileWriteStream;
let Utf8Stream;
function lazyLoadUtf8Stream() {
Utf8Stream ??= require('internal/streams/fast-utf8-stream');
}
// Ensure that callbacks run in the global context. Only use this function // Ensure that callbacks run in the global context. Only use this function
// for callbacks that are passed to the binding layer, callbacks that are // for callbacks that are passed to the binding layer, callbacks that are
@ -3329,6 +3334,11 @@ module.exports = fs = {
FileWriteStream = val; FileWriteStream = val;
}, },
get Utf8Stream() {
lazyLoadUtf8Stream();
return Utf8Stream;
},
// For tests // For tests
_toUnixTimestamp: toUnixTimestamp, _toUnixTimestamp: toUnixTimestamp,
}; };

View File

@ -0,0 +1,919 @@
'use strict';
// This file is derived from the original SonicBoom module
// MIT License
// Copyright (c) 2017 Matteo Collina
const {
ArrayPrototypePush,
AtomicsWait,
Int32Array,
MathMax,
SymbolDispose,
globalThis: {
Number,
SharedArrayBuffer,
},
} = primordials;
const {
Buffer,
} = require('buffer');
const fs = require('fs');
const EventEmitter = require('events');
const path = require('path');
const {
clearInterval,
setInterval,
setTimeout,
} = require('timers');
const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_STATE,
ERR_OPERATION_FAILED,
},
} = require('internal/errors');
const {
validateBoolean,
validateFunction,
validateObject,
validateOneOf,
validateString,
validateUint32,
} = require('internal/validators');
const BUSY_WRITE_TIMEOUT = 100;
const kEmptyBuffer = Buffer.allocUnsafe(0);
const kNil = new Int32Array(new SharedArrayBuffer(4));
function sleep(ms) {
// Also filters out NaN, non-number types, including empty strings, but allows bigints
const valid = ms > 0 && ms < Infinity;
if (valid === false) {
if (typeof ms !== 'number' && typeof ms !== 'bigint') {
throw new ERR_INVALID_ARG_TYPE('ms', ['number', 'bigint'], ms);
}
throw new ERR_INVALID_ARG_VALUE.RangeError('ms', ms,
'must be a number greater than 0 and less than Infinity');
}
AtomicsWait(kNil, 0, 0, Number(ms));
}
// 16 KB. Don't write more than docker buffer size.
// https://github.com/moby/moby/blob/513ec73831269947d38a644c278ce3cac36783b2/daemon/logger/copier.go#L13
const kMaxWrite = 16 * 1024;
const kContentModeBuffer = 'buffer';
const kContentModeUtf8 = 'utf8';
const kNullPrototype = { __proto__: null };
// Utf8Stream is a port of the original SonicBoom module
// (https://github.com/pinojs/sonic-boom) that provides a fast and efficient
// way to write UTF-8 encoded data to a file or stream.
class Utf8Stream extends EventEmitter {
#len = 0;
#fd = -1;
#bufs = [];
#lens = [];
#writing = false;
#ending = false;
#reopening = false;
#asyncDrainScheduled = false;
#flushPending = false;
#hwm = 16387; // 16 KB
#file = null;
#destroyed = false;
#minLength = 0;
#maxLength = 0;
#maxWrite = kMaxWrite;
#opening = false;
#periodicFlush = 0;
#periodicFlushTimer = undefined;
#sync = false;
#fsync = false;
#append = true;
#mode = 0o666;
#retryEAGAIN = () => true;
#mkdir = false;
#writingBuf = '';
#write;
#flush;
#flushSync;
#actualWrite;
#fsWriteSync;
#fsWrite;
#fs;
/**
* @typedef {object} Utf8StreamOptions
* @property {string} [dest] - path to the file to write to
* @property {number} [fd] - file descriptor to write to
* @property {number} [minLength] - minimum length of the internal buffer before a write is triggered
* @property {number} [maxLength] - maximum length of the internal buffer before writes are dropped
* @property {number} [maxWrite] - maximum size of a single write operation (default 16384)
* @property {number} [periodicFlush] - interval in ms to flush the stream periodically (default 0, disabled)
* @property {boolean} [sync] - if true, writes are performed synchronously (default false)
* @property {boolean} [fsync] - if true, fsync is called after every write (default false)
* @property {boolean} [append] - if true, data is appended to the file (default true, ignored
* if fd is provided)
* @property {string} [contentMode] - 'utf8' or 'buffer'
* @property {string} [mode] - file mode (permission and sticky bits), default is 0o666
* @property {boolean} [mkdir] - if true, the directory path will be created if it does not exist (default false)
* @property {Function} [retryEAGAIN] - function that receives (err, writingBufLength,
* remainingBufLength) and returns true if EAGAIN/EBUSY should be retried
* @property {object} [fs] - custom fs implementation, must provide write, writeSync, fsync,
* fsyncSync, close, open, mkdir, mkdirSync methods. Mostly useful for testing.
* @param {Utf8StreamOptions} [options]
*/
constructor(options = kNullPrototype) {
validateObject(options, 'options');
let {
fd,
} = options;
const {
dest,
minLength,
maxLength,
maxWrite,
periodicFlush,
sync,
append = true,
mkdir,
retryEAGAIN,
fsync,
contentMode = kContentModeUtf8,
mode,
// Provides for a custom fs implementation. Mostly useful for testing.
fs: overrideFs = {},
} = options;
super();
fd ??= dest;
validateObject(overrideFs, 'options.fs');
this.#fs = { ...fs, ...overrideFs };
validateFunction(this.#fs.write, 'options.fs.write');
validateFunction(this.#fs.writeSync, 'options.fs.writeSync');
validateFunction(this.#fs.fsync, 'options.fs.fsync');
validateFunction(this.#fs.fsyncSync, 'options.fs.fsyncSync');
validateFunction(this.#fs.close, 'options.fs.close');
validateFunction(this.#fs.open, 'options.fs.open');
validateFunction(this.#fs.mkdir, 'options.fs.mkdir');
validateFunction(this.#fs.mkdirSync, 'options.fs.mkdirSync');
this.#hwm = MathMax(minLength || 0, this.#hwm);
this.#minLength = minLength || 0;
this.#maxLength = maxLength || 0;
this.#maxWrite = maxWrite || kMaxWrite;
this.#periodicFlush = periodicFlush || 0;
this.#sync = sync || false;
this.#fsync = fsync || false;
this.#append = append || false;
this.#mode = mode;
this.#retryEAGAIN = retryEAGAIN || (() => true);
this.#mkdir = mkdir || false;
validateUint32(this.#hwm, 'options.hwm');
validateUint32(this.#minLength, 'options.minLength');
validateUint32(this.#maxLength, 'options.maxLength');
validateUint32(this.#maxWrite, 'options.maxWrite');
validateUint32(this.#periodicFlush, 'options.periodicFlush');
validateBoolean(this.#sync, 'options.sync');
validateBoolean(this.#fsync, 'options.fsync');
validateBoolean(this.#append, 'options.append');
validateBoolean(this.#mkdir, 'options.mkdir');
validateFunction(this.#retryEAGAIN, 'options.retryEAGAIN');
validateOneOf(contentMode, 'options.contentMode', [kContentModeBuffer, kContentModeUtf8]);
if (contentMode === kContentModeBuffer) {
this.#writingBuf = kEmptyBuffer;
this.#write = (...args) => this.#writeBuffer(...args);
this.#flush = (...args) => this.#flushBuffer(...args);
this.#flushSync = (...args) => this.#flushBufferSync(...args);
this.#actualWrite = (...args) => this.#actualWriteBuffer(...args);
this.#fsWriteSync = () => this.#fs.writeSync(this.#fd, this.#writingBuf);
this.#fsWrite = () => this.#fs.write(this.#fd, this.#writingBuf, (...args) => {
return this.#release(...args);
});
} else {
this.#writingBuf = '';
this.#write = (...args) => this.#writeUtf8(...args);
this.#flush = (...args) => this.#flushUtf8(...args);
this.#flushSync = (...args) => this.#flushSyncUtf8(...args);
this.#actualWrite = (...args) => this.#actualWriteUtf8(...args);
this.#fsWriteSync = () => this.#fs.writeSync(this.#fd, this.#writingBuf, 'utf8');
this.#fsWrite = () => this.#fs.write(this.#fd, this.#writingBuf, 'utf8', (...args) => {
return this.#release(...args);
});
}
// TODO(@jasnell): Support passing in an AbortSignal to cancel the stream.
// TODO(@jasnell): Support FileHandle here as well?
// TODO(@jasnell): The `dest` option is a path string. We may consider
// also supporting URL and Buffer types here as well like the rest of
// the fs module APIs do.
if (typeof fd === 'number') {
this.#fd = fd;
process.nextTick(() => this.emit('ready'));
} else if (typeof fd === 'string') {
this.#openFile(fd);
} else {
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'string'], fd);
}
if (this.#minLength >= this.#maxWrite) {
throw new ERR_INVALID_ARG_VALUE.RangeError('minLength', this.#minLength,
`should be smaller than maxWrite (${this.#maxWrite})`);
}
this.on('newListener', (name) => {
if (name === 'drain') {
this._asyncDrainScheduled = false;
}
});
if (this.#periodicFlush !== 0) {
this.#periodicFlushTimer = setInterval(() => this.flush(null), this.#periodicFlush);
this.#periodicFlushTimer.unref();
}
}
/**
* @param {string|Buffer} data
* @returns {boolean}
*/
write(data) {
return this.#write(data);
}
/**
* @callback FlushCallback
* @param {Error} [err] - Error if any
* @returns {void}
*/
/**
* @param {FlushCallback} [cb]
*/
flush(cb = function(_err) {}) { this.#flush(cb); }
flushSync() { return this.#flushSync(); }
/**
* @param {string} [file]
*/
reopen(file) {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
if (this.#opening) {
this.once('ready', () => this.reopen(file));
return;
}
if (this.#ending) {
return;
}
if (!this.#file) {
throw new ERR_OPERATION_FAILED(
'Unable to reopen a file descriptor, you must pass a file to SonicBoom');
}
if (file) {
this.#file = file;
}
this.#reopening = true;
if (this.#writing) {
return;
}
const fd = this.#fd;
this.once('ready', () => {
if (fd !== this.#fd) {
this.#fs.close(fd, (err) => {
if (err) {
return this.emit('error', err);
}
});
}
});
this.#openFile(this.#file);
}
end() {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
if (this.#opening) {
this.once('ready', () => {
this.end();
});
return;
}
if (this.#ending) {
return;
}
this.#ending = true;
if (this.#writing) {
return;
}
if (this.#len > 0 && this.#fd >= 0) {
this.#actualWrite();
} else {
this.#actualClose();
}
}
destroy() {
if (this.#destroyed) {
return;
}
this.#actualClose();
}
/** @type {number} */
get mode() { return this.#mode; }
/** @type {string|undefined} */
get file() { return this.#file; }
/** @type {number} */
get fd() { return this.#fd; }
/** @type {number} */
get minLength() { return this.#minLength; }
/** @type {number} */
get maxLength() { return this.#maxLength; }
/** @type {boolean} */
get writing() { return this.#writing; }
/** @type {boolean} */
get sync() { return this.#sync; }
/** @type {boolean} */
get fsync() { return this.#fsync; }
/** @type {boolean} */
get append() { return this.#append; }
/** @type {number} */
get periodicFlush() { return this.#periodicFlush; }
/** @type {'buffer'|'utf8'} */
get contentMode() {
return this.#writingBuf instanceof Buffer ? kContentModeBuffer : kContentModeUtf8;
}
/** @type {boolean} */
get mkdir() { return this.#mkdir; }
[SymbolDispose]() { this.destroy(); }
#release(err, n) {
if (err) {
if ((err.code === 'EAGAIN' || err.code === 'EBUSY') &&
this.#retryEAGAIN(err, this.#writingBuf.length, this.#len - this.#writingBuf.length)) {
if (this.#sync) {
// This error code should not happen in sync mode, because it is
// not using the underlining operating system asynchronous functions.
// However it happens, and so we handle it.
// Ref: https://github.com/pinojs/pino/issues/783
try {
sleep(BUSY_WRITE_TIMEOUT);
this.#release(undefined, 0);
} catch (err) {
this.#release(err);
}
} else {
// Let's give the destination some time to process the chunk.
setTimeout(() => this.#fsWrite(), BUSY_WRITE_TIMEOUT);
}
} else {
this.#writing = false;
this.emit('error', err);
}
return;
}
this.emit('write', n);
const releasedBufObj = releaseWritingBuf(this.#writingBuf, this.#len, n);
this.#len = releasedBufObj.len;
this.#writingBuf = releasedBufObj.writingBuf;
if (this.#writingBuf.length) {
if (!this.#sync) {
this.#fsWrite();
return;
}
try {
do {
const n = this.#fsWriteSync();
const releasedBufObj = releaseWritingBuf(this.#writingBuf, this.#len, n);
this.#len = releasedBufObj.len;
this.#writingBuf = releasedBufObj.writingBuf;
} while (this.#writingBuf.length);
} catch (err) {
this.#release(err);
return;
}
}
if (this.#fsync) {
this.#fs.fsyncSync(this.#fd);
}
const len = this.#len;
if (this.#reopening) {
this.#writing = false;
this.#reopening = false;
this.reopen();
} else if (len > this.#minLength) {
this.#actualWrite();
} else if (this.#ending) {
if (len > 0) {
this.#actualWrite();
} else {
this.#writing = false;
this.#actualClose();
}
} else {
this.#writing = false;
if (this.#sync) {
if (!this.#asyncDrainScheduled) {
this.#asyncDrainScheduled = true;
process.nextTick(() => this.#emitDrain());
}
} else {
this.emit('drain');
}
}
}
#openFile(file) {
this.#opening = true;
this.#writing = true;
this.#asyncDrainScheduled = false;
// NOTE: 'error' and 'ready' events emitted below only relevant when sonic.sync===false
// for sync mode, there is no way to add a listener that will receive these
const fileOpened = (err, fd) => {
if (err) {
this.#reopening = false;
this.#writing = false;
this.#opening = false;
if (this.#sync) {
process.nextTick(() => {
if (this.listenerCount('error') > 0) {
this.emit('error', err);
}
});
} else {
this.emit('error', err);
}
return;
}
const reopening = this.#reopening;
this.#fd = fd;
this.#file = file;
this.#reopening = false;
this.#opening = false;
this.#writing = false;
if (this.#sync) {
process.nextTick(() => this.emit('ready'));
} else {
this.emit('ready');
}
if (this.#destroyed) {
return;
}
// start
if ((!this.#writing && this.#len > this.#minLength) || this.#flushPending) {
this.#actualWrite();
} else if (reopening) {
process.nextTick(() => this.emit('drain'));
}
};
const flags = this.#append ? 'a' : 'w';
const mode = this.#mode;
if (this.#sync) {
try {
if (this.#mkdir) this.#fs.mkdirSync(path.dirname(file), { recursive: true });
const fd = this.#fs.openSync(file, flags, mode);
fileOpened(null, fd);
} catch (err) {
fileOpened(err);
throw err;
}
} else if (this.#mkdir) {
this.#fs.mkdir(path.dirname(file), { recursive: true }, (err) => {
if (err) return fileOpened(err);
this.#fs.open(file, flags, mode, fileOpened);
});
} else {
this.#fs.open(file, flags, mode, fileOpened);
}
}
#emitDrain() {
const hasListeners = this.listenerCount('drain') > 0;
if (!hasListeners) return;
this.#asyncDrainScheduled = false;
this.emit('drain');
}
#actualClose() {
if (this.#fd === -1) {
this.once('ready', () => this.#actualClose());
return;
}
if (this.#periodicFlushTimer !== undefined) {
clearInterval(this.#periodicFlushTimer);
}
this.#destroyed = true;
this.#bufs = [];
this.#lens = [];
const done = (err) => {
if (err) {
this.emit('error', err);
return;
}
if (this.#ending && !this.#writing) {
this.emit('finish');
}
this.emit('close');
};
const closeWrapped = () => {
// We skip errors in fsync
if (this.#fd !== 1 && this.#fd !== 2) {
this.#fs.close(this.#fd, done);
} else {
done();
}
};
try {
this.#fs.fsync(this.#fd, closeWrapped);
} catch {
// Intentionally empty.
}
}
#actualWriteBuffer() {
this.#writing = true;
this.#writingBuf = this.#writingBuf.length ? this.#writingBuf : mergeBuf(this.#bufs.shift(), this.#lens.shift());
if (this.#sync) {
try {
const written = this.#fs.writeSync(this.#fd, this.#writingBuf);
this.#release(null, written);
} catch (err) {
this.#release(err);
}
} else {
// fs.write will need to copy string to buffer anyway so
// we do it here to avoid the overhead of calculating the buffer size
// in releaseWritingBuf.
this.#writingBuf = Buffer.from(this.#writingBuf);
this.#fs.write(this.#fd, this.#writingBuf, (...args) => this.#release(...args));
}
}
#actualWriteUtf8() {
this.#writing = true;
this.#writingBuf ||= this.#bufs.shift() || '';
if (this.#sync) {
try {
const written = this.#fs.writeSync(this.#fd, this.#writingBuf, 'utf8');
this.#release(null, written);
} catch (err) {
this.#release(err);
}
} else {
this.#fs.write(this.#fd, this.#writingBuf, 'utf8', (...args) => this.#release(...args));
}
}
#flushBufferSync() {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
if (this.#fd < 0) {
throw new ERR_INVALID_STATE('Invalid file descriptor');
}
if (!this.#writing && this.#writingBuf.length > 0) {
this.#bufs.unshift([this.#writingBuf]);
this.#writingBuf = kEmptyBuffer;
}
let buf = kEmptyBuffer;
while (this.#bufs.length || buf.length) {
if (buf.length <= 0) {
buf = mergeBuf(this.#bufs[0], this.#lens[0]);
}
try {
const n = this.#fs.writeSync(this.#fd, buf);
buf = buf.subarray(n);
this.#len = MathMax(this.#len - n, 0);
if (buf.length <= 0) {
this.#bufs.shift();
this.#lens.shift();
}
} catch (err) {
const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY';
if (shouldRetry && !this.#retryEAGAIN(err, buf.length, this.#len - buf.length)) {
throw err;
}
sleep(BUSY_WRITE_TIMEOUT);
}
}
}
#flushSyncUtf8() {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
if (this.#fd < 0) {
throw new ERR_INVALID_STATE('Invalid file descriptor');
}
if (!this.#writing && this.#writingBuf.length > 0) {
this.#bufs.unshift(this.#writingBuf);
this.#writingBuf = '';
}
let buf = '';
while (this.#bufs.length || buf) {
if (buf.length <= 0) {
buf = this.#bufs[0];
}
try {
const n = this.#fs.writeSync(this.#fd, buf, 'utf8');
const releasedBufObj = releaseWritingBuf(buf, this.#len, n);
buf = releasedBufObj.writingBuf;
this.#len = releasedBufObj.len;
if (buf.length <= 0) {
this.#bufs.shift();
}
} catch (err) {
const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY';
if (shouldRetry && !this.#retryEAGAIN(err, buf.length, this.#len - buf.length)) {
throw err;
}
sleep(BUSY_WRITE_TIMEOUT);
}
}
try {
this.#fs.fsyncSync(this.#fd);
} catch {
// Skip the error. The fd might not support fsync.
}
}
#callFlushCallbackOnDrain(cb) {
this.#flushPending = true;
const onDrain = () => {
// Only if _fsync is false to avoid double fsync
if (!this.#fsync && !this.#destroyed) {
try {
this.#fs.fsync(this.#fd, (err) => {
this.#flushPending = false;
// If the fd is closed, we ignore the error.
if (err?.code === 'EBADF') {
cb();
return;
}
cb(err);
});
} catch (err) {
this.#flushPending = false;
cb(err);
}
} else {
this.#flushPending = false;
cb();
}
this.off('error', onError);
};
const onError = (err) => {
this.#flushPending = false;
cb(err);
this.off('drain', onDrain);
};
this.once('drain', onDrain);
this.once('error', onError);
}
#flushBuffer(cb) {
validateFunction(cb, 'cb');
if (this.#destroyed) {
const error = new ERR_INVALID_STATE('Utf8Stream is destroyed');
if (cb) {
cb(error);
return;
}
throw error;
}
if (this.#minLength <= 0) {
cb?.();
return;
}
if (cb) {
this.#callFlushCallbackOnDrain(cb);
}
if (this.#writing) {
return;
}
if (this.#bufs.length === 0) {
ArrayPrototypePush(this.#bufs, []);
ArrayPrototypePush(this.#lens, 0);
}
this.#actualWrite();
}
#flushUtf8(cb) {
validateFunction(cb, 'cb');
if (this.#destroyed) {
const error = new ERR_INVALID_STATE('Utf8Stream is destroyed');
if (cb) {
cb(error);
return;
}
throw error;
}
if (this.#minLength <= 0) {
cb?.();
return;
}
if (cb) {
this.#callFlushCallbackOnDrain(cb);
}
if (this.#writing) {
return;
}
if (this.#bufs.length === 0) {
ArrayPrototypePush(this.#bufs, '');
}
this.#actualWrite();
}
#writeBuffer(data) {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
// TODO(@jasnell): Support any ArrayBufferView type here, not just Buffer.
if (!Buffer.isBuffer(data)) {
throw new ERR_INVALID_ARG_TYPE('data', 'Buffer', data);
}
const len = this.#len + data.length;
const bufs = this.#bufs;
const lens = this.#lens;
if (this.#maxLength && len > this.#maxLength) {
this.emit('drop', data);
return this.#len < this.#hwm;
}
if (
bufs.length === 0 ||
lens[lens.length - 1] + data.length > this.#maxWrite
) {
ArrayPrototypePush(bufs, []);
ArrayPrototypePush(lens, data.length);
} else {
ArrayPrototypePush(bufs[bufs.length - 1], data);
lens[lens.length - 1] += data.length;
}
this.#len = len;
if (!this.#writing && this.#len >= this.#minLength) {
this.#actualWrite();
}
return this.#len < this.#hwm;
}
#writeUtf8(data) {
if (this.#destroyed) {
throw new ERR_INVALID_STATE('Utf8Stream is destroyed');
}
validateString(data, 'data');
const len = this.#len + data.length;
const bufs = this.#bufs;
if (this.#maxLength && len > this.#maxLength) {
this.emit('drop', data);
return this.#len < this.#hwm;
}
if (
bufs.length === 0 ||
bufs[bufs.length - 1].length + data.length > this.#maxWrite
) {
ArrayPrototypePush(bufs, '' + data);
} else {
bufs[bufs.length - 1] += data;
}
this.#len = len;
if (!this.#writing && this.#len >= this.#minLength) {
this.#actualWrite();
}
return this.#len < this.#hwm;
}
}
/**
* Release the writingBuf after fs.write n bytes data
* @param {string | Buffer} writingBuf - currently writing buffer, usually be instance._writingBuf.
* @param {number} len - currently buffer length, usually be instance._len.
* @param {number} n - number of bytes fs already written
* @returns {{writingBuf: string | Buffer, len: number}} released writingBuf and length
*/
function releaseWritingBuf(writingBuf, len, n) {
// if Buffer.byteLength is equal to n, that means writingBuf contains no multi-byte character
if (typeof writingBuf === 'string' && Buffer.byteLength(writingBuf) !== n) {
// Since the fs.write callback parameter `n` means how many bytes the passed of string
// We calculate the original string length for avoiding the multi-byte character issue
n = Buffer.from(writingBuf).subarray(0, n).toString().length;
}
len = MathMax(len - n, 0);
writingBuf = writingBuf.slice(n);
return { writingBuf, len };
}
function mergeBuf(bufs, len) {
if (bufs.length === 0) {
return kEmptyBuffer;
}
if (bufs.length === 1) {
return bufs[0];
}
return Buffer.concat(bufs, len);
}
module.exports = Utf8Stream;

View File

@ -0,0 +1,67 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
throws,
} = require('node:assert');
const {
openSync,
readFile,
readFileSync,
} = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { isMainThread } = require('node:worker_threads');
const { join } = require('node:path');
let fileCounter = 0;
tmpdir.refresh();
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, sync: false });
// Test successful write
ok(stream.write('hello world\n'));
stream.destroy();
throws(() => stream.write('hello world\n'), Error);
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\n');
}));
stream.on('finish', common.mustNotCall());
stream.on('close', common.mustCall());
};
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, sync: true });
ok(stream.write('hello world\n'));
stream.destroy();
throws(() => stream.write('hello world\n'), Error);
const data = readFileSync(dest, 'utf8');
strictEqual(data, 'hello world\n');
};
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest });
stream.destroy();
stream.on('close', common.mustCall());
}

View File

@ -0,0 +1,112 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
strictEqual,
} = require('node:assert');
const fs = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { join } = require('node:path');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
runTests(false);
runTests(true);
function runTests(sync) {
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, minLength: 4096, sync });
stream.once('ready', common.mustCall(() => {
const after = `${dest}-moved`;
stream.reopen(after);
stream.write('after reopen\n');
stream.on('finish', common.mustCall(() => {
fs.readFile(after, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'after reopen\n');
}));
}));
stream.end();
}));
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, minLength: 4096, sync });
stream.once('ready', common.mustCall(() => {
stream.reopen(`${dest}-moved`);
const after = `${dest}-moved-moved`;
stream.reopen(after);
stream.write('after reopen\n');
stream.on('finish', common.mustCall(() => {
fs.readFile(after, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'after reopen\n');
}));
}));
stream.end();
}));
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, minLength: 4096, sync });
const after = dest + '-moved';
stream.reopen(after);
stream.write('after reopen\n');
stream.on('finish', common.mustCall(() => {
fs.readFile(after, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'after reopen\n');
}));
}));
stream.end();
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, sync });
const str = Buffer.alloc(10000).fill('a').toString();
let totalWritten = 0;
function writeData() {
if (totalWritten >= 10000) {
stream.end();
return;
}
const chunk = str.slice(totalWritten, totalWritten + 1000);
if (stream.write(chunk)) {
totalWritten += chunk.length;
setImmediate(common.mustCall(writeData));
} else {
stream.once('drain', common.mustCall(() => {
totalWritten += chunk.length;
setImmediate(common.mustCall(writeData));
}));
}
};
stream.on('finish', common.mustCall(() => {
fs.readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data.length, 10000);
strictEqual(data, str);
}));
}));
writeData();
}
}

View File

@ -0,0 +1,105 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
} = require('node:assert');
const {
openSync,
fsyncSync,
writeSync,
write,
} = require('node:fs');
const { join } = require('node:path');
const { Utf8Stream } = require('node:fs');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
if (isMainThread) {
process.umask(0o000);
}
let fileCounter = 0;
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
runTests(false);
runTests(true);
function runTests(sync) {
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const fsOverride = {
fsync: common.mustNotCall(),
fsyncSync: common.mustCall(() => fsyncSync(fd)),
};
if (sync) {
fsOverride.writeSync = common.mustCall((...args) => writeSync(...args));
fsOverride.write = common.mustNotCall();
} else {
fsOverride.write = common.mustCall((...args) => write(...args));
fsOverride.writeSync = common.mustNotCall();
}
const stream = new Utf8Stream({
fd,
sync,
fsync: true,
minLength: 4096,
fs: fsOverride,
});
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
stream.flush(common.mustSucceed(() => stream.end()));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const testError = new Error('fsync failed');
testError.code = 'ETEST';
const fsOverride = {
fsync: common.mustCall((fd, cb) => {
process.nextTick(() => cb(testError));
}, 2),
};
if (sync) {
fsOverride.writeSync = common.mustCall((...args) => {
return writeSync(...args);
});
} else {
fsOverride.write = common.mustCall((...args) => {
return write(...args);
});
}
const stream = new Utf8Stream({
fd,
sync,
minLength: 4096,
fs: fsOverride,
});
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
stream.flush(common.mustCall((err) => {
ok(err, 'flush should return an error');
strictEqual(err.code, 'ETEST');
stream.end();
}));
}));
}
}

View File

@ -0,0 +1,139 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
throws,
} = require('node:assert');
const {
openSync,
readFile,
readFileSync,
writeSync,
} = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { join } = require('node:path');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
runTests(false);
runTests(true);
function runTests(sync) {
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 4096, sync });
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.flushSync();
setImmediate(common.mustCall(() => {
stream.end();
const data = readFileSync(dest, 'utf8');
strictEqual(data, 'hello world\nsomething else\n');
stream.on('close', common.mustCall());
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
let reportEagain = true;
const fsOverride = {
writeSync: common.mustCall((...args) => {
if (reportEagain) {
reportEagain = false;
const err = new Error('EAGAIN');
err.code = 'EAGAIN';
throw err;
}
writeSync(...args);
}, 2),
};
const stream = new Utf8Stream({
fd,
sync: false,
minLength: 0,
fs: fsOverride,
});
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.flushSync();
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
let retryCallCount = 0;
const err = new Error('EAGAIN');
err.code = 'EAGAIN';
let reportError = true;
const fsOverride = {
writeSync: common.mustCall((...args) => {
if (reportError) {
reportError = false;
throw err;
}
return writeSync(...args);
}, 2),
};
const stream = new Utf8Stream({
fd,
sync: false,
minLength: 1000,
retryEAGAIN: (err, writeBufferLen, remainingBufferLen) => {
retryCallCount++;
strictEqual(err.code, 'EAGAIN');
strictEqual(writeBufferLen, 12);
strictEqual(remainingBufferLen, 0);
return false; // Don't retry
},
fs: fsOverride,
});
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
throws(() => stream.flushSync(), err);
ok(stream.write('something else\n'));
stream.flushSync();
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
strictEqual(retryCallCount, 1);
}));
}));
}));
}

View File

@ -0,0 +1,142 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
} = require('node:assert');
const {
openSync,
readFile,
writeFileSync,
} = require('node:fs');
const { join } = require('node:path');
const { Utf8Stream } = require('node:fs');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
runTests(false);
runTests(true);
function runTests(sync) {
{
const dest = getTempFile();
writeFileSync(dest, 'hello world\n');
const stream = new Utf8Stream({ dest, append: false, sync });
stream.on('ready', () => {
ok(stream.write('something else\n'));
stream.flush();
stream.on('drain', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'something else\n');
stream.end();
}));
}));
});
}
{
const dest = join(getTempFile(), 'out.log');
const stream = new Utf8Stream({ dest, mkdir: true, sync });
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
stream.flush();
stream.on('drain', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\n');
stream.end();
}));
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 4096, sync });
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.flush();
stream.on('drain', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
stream.end();
}));
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 4096, sync });
stream.on('ready', common.mustCall(() => {
stream.flush();
stream.on('drain', common.mustCall(() => {
stream.end();
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 4096, sync });
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.flush(common.mustSucceed(() => {
stream.end();
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 4096, sync });
stream.on('ready', common.mustCall(() => {
stream.flush(common.mustSucceed(() => {
stream.end();
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 0, sync });
stream.flush(common.mustSucceed(() => {
stream.end();
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 4096, sync });
stream.destroy();
stream.flush(common.mustCall(ok));
}
}

View File

@ -0,0 +1,74 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
} = require('node:assert');
const {
fsyncSync,
openSync,
readFile,
readFileSync,
} = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { join } = require('node:path');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({
fd,
sync: true,
fsync: true,
fs: {
fsyncSync: common.mustCall(fsyncSync, 2),
},
});
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
const data = readFileSync(dest, 'utf8');
strictEqual(data, 'hello world\nsomething else\n');
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({
fd,
fsync: true,
fs: {
fsyncSync: common.mustCall(fsyncSync, 2),
}
});
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
stream.on('close', common.mustCall());
}

View File

@ -0,0 +1,49 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
throws,
} = require('node:assert');
const {
closeSync,
openSync,
} = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { join } = require('node:path');
tmpdir.refresh();
let fileCounter = 0;
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
const MAX_WRITE = 16 * 1024;
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, sync: false, minLength: 9999 });
ok(stream.write(Buffer.alloc(1500).fill('x').toString()));
ok(stream.write(Buffer.alloc(1500).fill('x').toString()));
ok(!stream.write(Buffer.alloc(MAX_WRITE).fill('x').toString()));
stream.on('drain', common.mustCall(() => stream.end()));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
throws(() => {
new Utf8Stream({
fd,
minLength: MAX_WRITE
});
}, Error);
closeSync(fd);
}

View File

@ -0,0 +1,111 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir'); ;
const {
ok,
strictEqual,
} = require('node:assert');
const {
readFile,
statSync,
writeFileSync,
} = require('node:fs');
const { join } = require('node:path');
const { Utf8Stream } = require('node:fs');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
const isWindows = process.platform === 'win32';
runTests(false);
runTests(true);
function runTests(sync) {
{
const dest = getTempFile();
const mode = 0o666;
const stream = new Utf8Stream({ dest, sync, mode });
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
// The actual mode may vary depending on the platform,
// so we check only the first bit.
strictEqual(statSync(dest).mode & 0o700, 0o600);
}));
}));
}));
}
{
const dest = getTempFile();
const defaultMode = 0o600;
const stream = new Utf8Stream({ dest, sync });
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
strictEqual(statSync(dest).mode & 0o700, defaultMode);
}));
}));
}));
}
{
const dest = join(getTempFile(), 'out.log');
const mode = 0o666;
const stream = new Utf8Stream({ dest, mkdir: true, mode, sync });
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
stream.flush();
stream.on('drain', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\n');
stream.end();
}));
}));
}));
}
{
const dest = getTempFile();
// Create file with writable mode first, then change mode after Utf8Stream creation
writeFileSync(dest, 'hello world\n', { encoding: 'utf8' });
const mode = isWindows ? 0o444 : 0o666;
const stream = new Utf8Stream({ dest, append: false, mode, sync });
stream.on('ready', common.mustCall(() => {
ok(stream.write('something else\n'));
stream.flush();
stream.on('drain', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'something else\n');
stream.end();
}));
}));
}));
}
}

View File

@ -0,0 +1,81 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
} = require('node:assert');
const {
openSync,
readFile,
} = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { join } = require('node:path');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
runTests(false);
runTests(true);
function runTests(sync) {
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, sync, minLength: 5000 });
ok(stream.write('hello world\n'));
setTimeout(common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, '');
}));
}), 1500);
stream.destroy();
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
// Test that periodicFlush property is set correctly
const stream1 = new Utf8Stream({ fd, sync, minLength: 5000 });
strictEqual(stream1.periodicFlush, 0);
stream1.destroy();
const fd2 = openSync(dest, 'w');
const stream2 = new Utf8Stream({ fd: fd2, sync, minLength: 5000, periodicFlush: 1000 });
strictEqual(stream2.periodicFlush, 1000);
stream2.destroy();
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, sync, minLength: 5000 });
ok(stream.write('hello world\n'));
// Manually flush to test that data can be written
stream.flush(common.mustSucceed());
setTimeout(common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\n');
}));
}), 500);
stream.destroy();
}
}

View File

@ -0,0 +1,192 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
throws,
} = require('node:assert');
const {
open,
openSync,
readFile,
renameSync,
} = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { join } = require('node:path');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
runTests(false);
runTests(true);
function runTests(sync) {
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, sync });
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
const after = dest + '-moved';
stream.once('drain', common.mustCall(() => {
renameSync(dest, after);
stream.reopen();
stream.once('ready', common.mustCall(() => {
ok(stream.write('after reopen\n'));
stream.once('drain', common.mustCall(() => {
readFile(after, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'after reopen\n');
stream.end();
}));
}));
}));
}));
}));
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, sync });
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.reopen();
stream.end();
stream.on('close', common.mustCall());
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, minLength: 0, sync });
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
const after = dest + '-new';
stream.once('drain', common.mustCall(() => {
stream.reopen(after);
strictEqual(stream.file, after);
stream.once('ready', common.mustCall(() => {
ok(stream.write('after reopen\n'));
stream.once('drain', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
readFile(after, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'after reopen\n');
stream.end();
}));
}));
}));
}));
}));
}
{
let throwOnNextOpen = false;
const err = new Error('open error');
const fsOverride = {};
if (sync) {
fsOverride.openSync = function(...args) {
if (throwOnNextOpen) {
throwOnNextOpen = false;
throw err;
}
return openSync(...args);
};
} else {
fsOverride.open = function(file, flags, mode, cb) {
if (throwOnNextOpen) {
throwOnNextOpen = false;
process.nextTick(() => cb(err));
return;
}
return open(file, flags, mode, cb);
};
}
const dest = getTempFile();
const stream = new Utf8Stream({
dest,
sync,
fs: fsOverride,
});
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
const after = dest + '-moved';
stream.on('error', common.mustCall());
stream.once('drain', common.mustCall(() => {
renameSync(dest, after);
throwOnNextOpen = true;
if (sync) {
throws(() => stream.reopen(), err);
} else {
stream.reopen();
}
setTimeout(common.mustCall(() => {
ok(stream.write('after reopen\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(after, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\nafter reopen\n');
}));
}));
}), 10);
}));
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, sync });
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
const after = dest + '-moved';
stream.once('drain', common.mustCall(() => {
renameSync(dest, after);
stream.reopen();
stream.once('drain', common.mustCall(() => {
ok(stream.write('after reopen\n'));
stream.once('drain', common.mustCall(() => {
readFile(after, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'after reopen\n');
stream.end();
}));
}));
}));
}));
}));
}
}

View File

@ -0,0 +1,216 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
} = require('node:assert');
const {
openSync,
writeSync,
write,
readFile,
} = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { join } = require('node:path');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
runTests(false);
runTests(true);
function runTests(sync) {
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const fsOverride = {};
let errOnNext = true;
const err = new Error('EAGAIN');
err.code = 'EAGAIN';
if (sync) {
fsOverride.writeSync = common.mustCall((...args) => {
if (errOnNext) {
errOnNext = false;
throw err;
}
writeSync(...args);
}, 3);
} else {
fsOverride.write = common.mustCall((...args) => {
if (errOnNext) {
errOnNext = false;
const callback = args[args.length - 1];
process.nextTick(callback, err);
return;
}
write(...args);
}, 3);
}
const stream = new Utf8Stream({
fd,
sync,
minLength: 0,
fs: fsOverride,
});
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const err = new Error('EAGAIN');
err.code = 'EAGAIN';
let errorOnNext = true;
const stream = new Utf8Stream({
fd,
sync: false,
minLength: 12,
retryEAGAIN: (err, writeBufferLen, remainingBufferLen) => {
strictEqual(err.code, 'EAGAIN');
strictEqual(writeBufferLen, 12);
strictEqual(remainingBufferLen, 0);
return false; // Don't retry
},
fs: {
write: common.mustCall((...args) => {
if (errorOnNext) {
errorOnNext = false;
const callback = args[args.length - 1];
process.nextTick(callback, err);
return;
}
write(...args);
}, 3),
}
});
stream.on('ready', common.mustCall(() => {
stream.once('error', common.mustCall((err) => {
strictEqual(err.code, 'EAGAIN');
ok(stream.write('something else\n'));
}));
ok(stream.write('hello world\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const err = new Error('EBUSY');
err.code = 'EBUSY';
let errorOnNext = true;
const stream = new Utf8Stream({
fd,
sync: false,
minLength: 0,
fs: {
write: common.mustCall((...args) => {
if (errorOnNext) {
errorOnNext = false;
const callback = args[args.length - 1];
process.nextTick(callback, err);
return;
}
write(...args);
}, 3),
}
});
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const err = new Error('EBUSY');
err.code = 'EBUSY';
let errorOnNext = true;
const stream = new Utf8Stream({
fd,
sync: false,
minLength: 12,
retryEAGAIN: (err, writeBufferLen, remainingBufferLen) => {
strictEqual(err.code, 'EBUSY');
strictEqual(writeBufferLen, 12);
strictEqual(remainingBufferLen, 0);
return false; // Don't retry
},
fs: {
write: common.mustCall((...args) => {
if (errorOnNext) {
errorOnNext = false;
const callback = args[args.length - 1];
process.nextTick(callback, err);
return;
}
write(...args);
}, 3),
}
});
stream.on('ready', common.mustCall(() => {
stream.once('error', common.mustCall((err) => {
strictEqual(err.code, 'EBUSY');
ok(stream.write('something else\n'));
}));
ok(stream.write('hello world\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}

View File

@ -0,0 +1,211 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
throws,
} = require('node:assert');
const {
openSync,
closeSync,
readFile,
readSync,
readFileSync,
writeSync,
stat,
} = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { join } = require('node:path');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
let callCount = 0;
const stream = new Utf8Stream({
fd,
minLength: 0,
sync: true,
fs: {
writeSync: common.mustCall((...args) => {
if (callCount++ === 0) return 0;
writeSync(...args);
}, 3),
}
});
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
let callCount = 0;
const stream = new Utf8Stream({
fd,
minLength: 100,
sync: false,
fs: {
writeSync: common.mustCall((...args) => {
if (callCount++ === 0) return 0;
return writeSync(...args);
}, 2),
}
});
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.flushSync();
stream.on('write', common.mustNotCall());
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({
fd,
minLength: 0,
sync: true,
fs: {
writeSync: common.mustCall(writeSync, 2),
}
});
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.on('drain', common.mustCall(() => {
const data = readFileSync(dest, 'utf8');
strictEqual(data, 'hello world\nsomething else\n');
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 0, sync: true });
const buf = Buffer.alloc(1024).fill('x').toString(); // 1 KB
let length = 0;
// Reduce iterations to avoid test timeout
for (let i = 0; i < 1024; i++) {
length += buf.length;
stream.write(buf);
}
stream.end();
stream.on('finish', common.mustCall(() => {
stat(dest, common.mustSucceed((stat) => {
strictEqual(stat.size, length);
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 0, sync: true });
let buf = Buffer.alloc((1024 * 16) - 2).fill('x'); // 16KB - 2B
const length = buf.length + 4;
buf = buf.toString() + '🌲'; // 16 KB + 4B emoji
stream.write(buf);
stream.end();
stream.on('finish', common.mustCall(() => {
stat(dest, common.mustSucceed((stat) => {
strictEqual(stat.size, length);
const char = Buffer.alloc(4);
const readFd = openSync(dest, 'r');
readSync(readFd, char, 0, 4, length - 4);
closeSync(readFd);
strictEqual(char.toString(), '🌲');
}));
}));
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, sync: true });
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.flushSync();
// If we get here without error, the test passes
stream.end();
}
throws(() => {
new Utf8Stream({ dest: '/path/to/nowhere', sync: true });
}, /ENOENT|EACCES/);
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, sync: true });
ok(stream.write('hello world 👀\n'));
ok(stream.write('another line 👀\n'));
// Check internal buffer length (may not be available in Utf8Stream)
// This is implementation-specific, so we just verify writes succeeded
ok(true, 'writes completed successfully');
stream.end();
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, sync: true, minLength: 20 });
let str = '';
for (let i = 0; i < 20; i++) {
ok(stream.write('👀'));
str += '👀';
}
// Check internal buffer length (implementation-specific)
ok(true, 'writes completed successfully');
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, str);
}));
}

View File

@ -0,0 +1,273 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const {
ok,
strictEqual,
} = require('node:assert');
const {
openSync,
readFile,
createReadStream,
write,
writeSync,
stat,
} = require('node:fs');
const { Utf8Stream } = require('node:fs');
const { join } = require('node:path');
const { isMainThread } = require('node:worker_threads');
tmpdir.refresh();
let fileCounter = 0;
if (isMainThread) {
process.umask(0o000);
}
function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
}
runTests(false);
runTests(true);
function runTests(sync) {
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, sync });
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, sync });
stream.once('drain', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\n');
ok(stream.write('something else\n'));
stream.once('drain', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
stream.end();
}));
}));
}));
}));
ok(stream.write('hello world\n'));
};
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, sync });
const source = createReadStream(__filename, { encoding: 'utf8' });
source.pipe(stream);
stream.on('finish', common.mustCall(() => {
readFile(__filename, 'utf8', common.mustSucceed((expected) => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, expected);
}));
}));
}));
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, sync });
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, minLength: 4096, sync });
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.on('drain', common.mustNotCall());
setTimeout(common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, ''); // Should be empty due to minLength
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}), 100);
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
let throwOnNext = true;
const fsOverride = {};
if (sync) {
fsOverride.writeSync = common.mustCall((...args) => {
if (throwOnNext) {
throw new Error('recoverable error');
}
return writeSync(...args);
}, 3);
} else {
fsOverride.write = common.mustCall((...args) => {
if (throwOnNext) {
const callback = args[args.length - 1];
process.nextTick(callback, new Error('recoverable error'));
return;
}
return write(...args);
}, 3);
}
const stream = new Utf8Stream({
fd,
minLength: 0,
sync,
fs: fsOverride,
});
stream.on('ready', common.mustCall(() => {
stream.on('error', common.mustCall());
ok(stream.write('hello world\n'));
setTimeout(common.mustCall(() => {
throwOnNext = false;
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}), 10);
}));
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, sync });
stream.on('ready', common.mustCall(() => {
let length = 0;
stream.on('write', (bytes) => {
length += bytes;
});
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
strictEqual(length, 27);
}));
}));
}));
}
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
let callCount = 0;
const stream = new Utf8Stream({
fd,
minLength: 0,
sync: false,
fs: {
write: common.mustCall((...args) => {
if (callCount++ === 0) {
const callback = args[args.length - 1];
process.nextTick(callback, null, 0);
return;
}
write(...args);
}, 3),
}
});
stream.on('ready', common.mustCall(() => {
ok(stream.write('hello world\n'));
ok(stream.write('something else\n'));
stream.end();
stream.on('finish', common.mustCall(() => {
readFile(dest, 'utf8', common.mustSucceed((data) => {
strictEqual(data, 'hello world\nsomething else\n');
}));
}));
}));
}
{
const dest = getTempFile();
const fd = openSync(dest, 'w');
const stream = new Utf8Stream({ fd, minLength: 0, sync: false });
const buf = Buffer.alloc(1024).fill('x').toString(); // 1 KB
let length = 0;
// Reduce iterations to avoid test timeout
for (let i = 0; i < 1024; i++) {
length += buf.length;
stream.write(buf);
}
stream.end();
stream.on('finish', common.mustCall(() => {
stat(dest, common.mustSucceed((stat) => {
strictEqual(stat.size, length);
}));
}));
}
{
const dest = getTempFile();
const stream = new Utf8Stream({ dest, maxLength: 65536 });
strictEqual(stream.maxLength, 65536);
stream.end();
}

View File

@ -67,6 +67,10 @@ const ignoreList = [
'R_OK', 'R_OK',
'F_OK', 'F_OK',
'Dir', 'Dir',
// the Utf8Stream is implemented in terms of functions
// on the fs module that have permission checks, so we don't
// need to check it here.
'Utf8Stream',
'FileReadStream', 'FileReadStream',
'FileWriteStream', 'FileWriteStream',
'_toUnixTimestamp', '_toUnixTimestamp',

View File

@ -156,4 +156,7 @@ addlicense "node-fs-extra" "lib/internal/fs/cp" "$licenseText"
licenseText="$(curl -sL https://raw.githubusercontent.com/mcollina/on-exit-leak-free/2a01c7e66c690aca17187b10b0cecbe43e083eb2/LICENSE)" licenseText="$(curl -sL https://raw.githubusercontent.com/mcollina/on-exit-leak-free/2a01c7e66c690aca17187b10b0cecbe43e083eb2/LICENSE)"
addlicense "on-exit-leak-free" "lib/internal/process/finalization" "$licenseText" addlicense "on-exit-leak-free" "lib/internal/process/finalization" "$licenseText"
licenseText="$(curl -sL https://raw.githubusercontent.com/pinojs/sonic-boom/refs/heads/master/LICENSE)"
addlicense "sonic-boom" "lib/internal/streams/fast-utf8-stream.js" "$licenseText"
mv "$tmplicense" "$licensefile" mv "$tmplicense" "$licensefile"