diff --git a/benchmark/webstreams/internal-pipe.js b/benchmark/webstreams/internal-pipe.js new file mode 100644 index 00000000000000..f81566a931fe16 --- /dev/null +++ b/benchmark/webstreams/internal-pipe.js @@ -0,0 +1,106 @@ +'use strict'; +const common = require('../common.js'); +const fsp = require('fs/promises'); +const path = require('path'); +const os = require('os'); +const { pipeline } = require('stream/promises'); +const { + ReadableStream, + WritableStream, +} = require('node:stream/web'); + +const bench = common.createBenchmark(main, { + type: [ + 'node-streams', + 'webstream-js', + 'webstream-file-read', + ], + size: [1024, 16384, 65536], + n: [1e4, 1e5], +}); + +async function main({ type, size, n }) { + const chunk = Buffer.alloc(size, 'x'); + const totalBytes = size * n; + + switch (type) { + case 'node-streams': { + // Baseline: Node.js streams + let received = 0; + const readable = new (require('stream').Readable)({ + read() { + for (let i = 0; i < 100 && received < n; i++) { + this.push(chunk); + received++; + } + if (received >= n) this.push(null); + }, + }); + + const writable = new (require('stream').Writable)({ + write(data, enc, cb) { cb(); }, + }); + + bench.start(); + await pipeline(readable, writable); + bench.end(totalBytes); + break; + } + + case 'webstream-js': { + // Web streams with pure JS source/sink + let sent = 0; + const rs = new ReadableStream({ + pull(controller) { + if (sent++ < n) { + controller.enqueue(chunk); + } else { + controller.close(); + } + }, + }); + + const ws = new WritableStream({ + write() {}, + close() { bench.end(totalBytes); }, + }); + + bench.start(); + await rs.pipeTo(ws); + break; + } + + case 'webstream-file-read': { + // Create a temporary file with test data + const tmpDir = os.tmpdir(); + const tmpFile = path.join(tmpDir, `bench-webstream-${process.pid}.tmp`); + + // Write test data to file + const fd = await fsp.open(tmpFile, 'w'); + for (let i = 0; i < n; i++) { + await fd.write(chunk); + } + await fd.close(); + + // Read using readableWebStream + const readFd = await fsp.open(tmpFile, 'r'); + const rs = readFd.readableWebStream({ type: 'bytes' }); + + const ws = new WritableStream({ + write() {}, + close() { + bench.end(totalBytes); + // Cleanup + readFd.close().then(() => fsp.unlink(tmpFile)); + }, + }); + + bench.start(); + await rs.pipeTo(ws); + break; + } + + default: + throw new Error(`Unknown type: ${type}`); + } +} diff --git a/lib/internal/test_runner/reporter/junit.js b/lib/internal/test_runner/reporter/junit.js index 130ff60af405ad..25f5667c59e273 100644 --- a/lib/internal/test_runner/reporter/junit.js +++ b/lib/internal/test_runner/reporter/junit.js @@ -5,9 +5,6 @@ const { ArrayPrototypeMap, ArrayPrototypePush, ArrayPrototypeSome, - Date, - DateNow, - DatePrototypeToISOString, NumberPrototypeToFixed, ObjectEntries, RegExpPrototypeSymbolReplace, @@ -115,11 +112,6 @@ module.exports = async function* junitReporter(source) { currentTest.attrs.tests = nonCommentChildren.length; currentTest.attrs.failures = ArrayPrototypeFilter(currentTest.children, isFailure).length; currentTest.attrs.skipped = ArrayPrototypeFilter(currentTest.children, isSkipped).length; - // A suite's `test:start` is emitted lazily (when its first subtest - // reports), so derive the start time from the end minus the measured - // duration rather than stamping the (late) test:start moment. - currentTest.attrs.timestamp = - DatePrototypeToISOString(new Date(DateNow() - event.data.details.duration_ms)); currentTest.attrs.hostname = HOSTNAME; } else { currentTest.tag = 'testcase'; diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index 1ade5d32951ff5..d5b4fa0101a7ed 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -35,6 +35,10 @@ const { ByteLengthQueuingStrategy, } = require('internal/webstreams/queuingstrategies'); +const { + kStreamBase, +} = require('internal/webstreams/util'); + const { Writable, Readable, @@ -1004,7 +1008,7 @@ function newWritableStreamFromStreamBase(streamBase, strategy) { return promise.promise; } - return new WritableStream({ + const stream = new WritableStream({ write(chunk, controller) { current = current !== undefined ? PromisePrototypeThen( @@ -1025,6 +1029,10 @@ function newWritableStreamFromStreamBase(streamBase, strategy) { return promise.promise; }, }, strategy); + + stream[kStreamBase] = streamBase; + + return stream; } /** @@ -1075,7 +1083,7 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO } }; - return new ReadableStream({ + const stream = new ReadableStream({ start(c) { controller = c; }, pull() { @@ -1098,6 +1106,10 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO return promise.promise; }, }, strategy); + + stream[kStreamBase] = streamBase; + + return stream; } module.exports = { diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index ee496f30e5934e..04433baf4e1714 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -32,6 +32,7 @@ const { const { AbortError, + ErrnoException, codes: { ERR_ARG_NOT_ITERABLE, ERR_ILLEGAL_CONSTRUCTOR, @@ -51,6 +52,13 @@ const { markPromiseAsHandled, } = internalBinding('util'); +const { + kReadBytesOrError, + streamBaseState, +} = internalBinding('stream_wrap'); + +const { UV_EOF } = internalBinding('uv'); + const { isArrayBufferView, isDataView, @@ -123,6 +131,7 @@ const { resetQueue, resolvedRecord, setPromiseHandled, + kStreamBase, } = require('internal/webstreams/util'); const { @@ -134,6 +143,7 @@ const { isWritableStreamDefaultWriter, writableStreamAbort, + writableStreamClose, writableStreamCloseQueuedOrInFlight, writableStreamDefaultWriterCloseWithErrorPropagation, writableStreamDefaultWriterRelease, @@ -1462,6 +1472,83 @@ function readableStreamPipeTo( preventCancel, signal) { + const sourceStreamBase = source?.[kStreamBase]; + const destStreamBase = dest?.[kStreamBase]; + + if (sourceStreamBase !== undefined && + destStreamBase !== undefined && + signal === undefined && + !preventClose && + !preventAbort && + !preventCancel) { + // Native C++ StreamPipe path for internal-to-internal piping. + // Ref: https://github.com/nodejs/performance/issues/134 + const promise = PromiseWithResolvers(); + + source[kState].disturbed = true; + + let pipeError = null; + let isComplete = false; + const originalSourceOnread = sourceStreamBase.onread; + + sourceStreamBase.onread = (arrayBuffer) => { + const nread = streamBaseState[kReadBytesOrError]; + if (nread < 0 && nread !== UV_EOF) { + pipeError = new ErrnoException(nread, 'read'); + } + if (originalSourceOnread) { + return originalSourceOnread(arrayBuffer); + } + }; + + function finalize(error) { + if (isComplete) return; + isComplete = true; + sourceStreamBase.onread = originalSourceOnread; + + if (error) { + if (source[kState].state === 'readable') { + readableStreamError(source, error); + } + if (dest[kState].state === 'writable') { + writableStreamAbort(dest, error); + } + promise.reject(error); + } else { + if (source[kState].state === 'readable') { + readableStreamClose(source); + } + if (dest[kState].state === 'writable' && + !writableStreamCloseQueuedOrInFlight(dest)) { + PromisePrototypeThen( + writableStreamClose(dest), + () => promise.resolve(), + (err) => promise.reject(err), + ); + } else { + promise.resolve(); + } + } + } + + try { + const pipe = new StreamPipe(sourceStreamBase, destStreamBase); + pipe.onunpipe = () => { + if (pipeError) { + finalize(pipeError); + } + }; + pipe.oncomplete = () => finalize(pipeError); + pipe.start(); + } catch (error) { + sourceStreamBase.onread = originalSourceOnread; + promise.reject(error); + } + + return promise.promise; + } + + // Use JS-based piping let reader; let writer; let disposable; diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 836e1de130505b..9f4efaf3d14f2c 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -47,6 +47,7 @@ const { const kState = Symbol('kState'); const kType = Symbol('kType'); +const kStreamBase = Symbol('kStreamBase'); const AsyncIterator = { __proto__: AsyncIteratorPrototype, @@ -280,4 +281,5 @@ module.exports = { resetQueue, resolvedRecord, setPromiseHandled, + kStreamBase, }; diff --git a/test/common/assertSnapshot.js b/test/common/assertSnapshot.js index 4fdd0be0ee32ae..9a7482020e098a 100644 --- a/test/common/assertSnapshot.js +++ b/test/common/assertSnapshot.js @@ -226,7 +226,6 @@ function replaceJunitDuration(str) { .replaceAll(/time="[0-9.]+"/g, 'time="*"') .replaceAll(/duration_ms [0-9.]+/g, 'duration_ms *') .replaceAll(`hostname="${hostname()}"`, 'hostname="HOSTNAME"') - .replaceAll(/timestamp="[^"]*"/g, 'timestamp="*"') .replaceAll(/file="[^"]*"/g, 'file="*"'); } diff --git a/test/fixtures/test-runner/output/junit_reporter.snapshot b/test/fixtures/test-runner/output/junit_reporter.snapshot index cef5f0b52da186..1142b5b31ff2e7 100644 --- a/test/fixtures/test-runner/output/junit_reporter.snapshot +++ b/test/fixtures/test-runner/output/junit_reporter.snapshot @@ -128,7 +128,7 @@ true !== false - + Error [ERR_TEST_FAILURE]: thrown from subtest sync throw fail @@ -151,15 +151,15 @@ Error [ERR_TEST_FAILURE]: thrown from subtest sync throw fail [Error [ERR_TEST_FAILURE]: Symbol(thrown symbol from sync throw non-error fail)] { code: 'ERR_TEST_FAILURE', failureType: 'testCodeFailure', cause: Symbol(thrown symbol from sync throw non-error fail) } - + - + - + @@ -266,7 +266,7 @@ Error [ERR_TEST_FAILURE]: thrown from callback async throw - + @@ -288,7 +288,7 @@ Error [ERR_TEST_FAILURE]: thrown from callback async throw } - + Error [ERR_TEST_FAILURE]: thrown from subtest sync throw fails at first diff --git a/test/parallel/test-runner-reporters.js b/test/parallel/test-runner-reporters.js index 7fed79d45b48fd..50a47578a1da7e 100644 --- a/test/parallel/test-runner-reporters.js +++ b/test/parallel/test-runner-reporters.js @@ -200,12 +200,6 @@ describe('node:test reporters', { concurrency: true }, () => { assert.strictEqual(child.stdout.toString(), ''); const fileContents = fs.readFileSync(file, 'utf8'); assert.match(fileContents, //); - // The exact timestamp format is intentionally not pinned here (still under - // discussion); assert only that the value is present and a real date. - const { 1: timestamp } = fileContents.match(/]*timestamp="([^"]+)"/) ?? []; - assert.ok(timestamp, 'testsuite should have a timestamp attribute'); - assert.match(timestamp, /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/); - assert.ok(!Number.isNaN(Date.parse(timestamp)), `expected a valid date, got ${timestamp}`); assert.match(fileContents, /\s*/); assert.match(fileContents, //); assert.match(fileContents, //); diff --git a/test/parallel/test-whatwg-webstreams-internal-pipe.js b/test/parallel/test-whatwg-webstreams-internal-pipe.js new file mode 100644 index 00000000000000..d115474c4ea1f8 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-internal-pipe.js @@ -0,0 +1,123 @@ +// Flags: --expose-internals --no-warnings +'use strict'; + +// Tests for internal StreamBase pipe optimization (nodejs/performance#134) + +const common = require('../common'); +const assert = require('assert'); +const { internalBinding } = require('internal/test/binding'); + +const { + newWritableStreamFromStreamBase, + newReadableStreamFromStreamBase, +} = require('internal/webstreams/adapters'); + +const { kStreamBase } = require('internal/webstreams/util'); +const { JSStream } = internalBinding('js_stream'); + +// kStreamBase marker is attached to ReadableStream +{ + const stream = new JSStream(); + const readable = newReadableStreamFromStreamBase(stream); + assert.strictEqual(readable[kStreamBase], stream); + stream.emitEOF(); +} + +// kStreamBase marker is attached to WritableStream +{ + const stream = new JSStream(); + stream.onwrite = common.mustNotCall(); + stream.onshutdown = (req) => req.oncomplete(); + + const writable = newWritableStreamFromStreamBase(stream); + assert.strictEqual(writable[kStreamBase], stream); + writable.close(); +} + +// Regular JS streams don't have kStreamBase +{ + const { ReadableStream, WritableStream } = require('stream/web'); + + const rs = new ReadableStream({ + pull(controller) { + controller.enqueue('chunk'); + controller.close(); + }, + }); + + const ws = new WritableStream({ write() {} }); + + assert.strictEqual(rs[kStreamBase], undefined); + assert.strictEqual(ws[kStreamBase], undefined); + + rs.pipeTo(ws).then(common.mustCall()); +} + +// Mixed streams use standard JS path +{ + const stream = new JSStream(); + stream.onshutdown = (req) => req.oncomplete(); + const readable = newReadableStreamFromStreamBase(stream); + + const { WritableStream } = require('stream/web'); + const chunks = []; + const ws = new WritableStream({ + write(chunk) { chunks.push(chunk); }, + }); + + assert.ok(readable[kStreamBase]); + assert.strictEqual(ws[kStreamBase], undefined); + + const pipePromise = readable.pipeTo(ws); + stream.readBuffer(Buffer.from('hello')); + stream.emitEOF(); + + pipePromise.then(common.mustCall(() => { + assert.strictEqual(chunks.length, 1); + })); +} + +// Verify kStreamBase symbol identity +{ + const { kStreamBase: kStreamBase2 } = require('internal/webstreams/util'); + assert.strictEqual(kStreamBase, kStreamBase2); +} + +// FileHandle.readableWebStream() uses async reads, not StreamBase +{ + const fs = require('fs/promises'); + const path = require('path'); + const os = require('os'); + + async function testFileStreamPipe() { + const tmpDir = os.tmpdir(); + const testFile = path.join(tmpDir, `test-webstream-pipe-${process.pid}.txt`); + const testData = 'Hello, WebStreams pipe!'; + + await fs.writeFile(testFile, testData); + + try { + const fileHandle = await fs.open(testFile, 'r'); + const readable = fileHandle.readableWebStream(); + + assert.strictEqual(readable[kStreamBase], undefined); + + const chunks = []; + const writable = new (require('stream/web').WritableStream)({ + write(chunk) { chunks.push(chunk); }, + }); + + await readable.pipeTo(writable); + await fileHandle.close(); + + const result = Buffer.concat(chunks.map((c) => + (c instanceof Uint8Array ? Buffer.from(c) : Buffer.from(c)), + )).toString(); + assert.strictEqual(result, testData); + } finally { + await fs.unlink(testFile).catch(() => {}); + } + } + + testFileStreamPipe().then(common.mustCall()); +}