diff --git a/benchmark/webstreams/internal-pipe.js b/benchmark/webstreams/internal-pipe.js
new file mode 100644
index 00000000000000..f81566a931fe16
--- /dev/null
+++ b/benchmark/webstreams/internal-pipe.js
@@ -0,0 +1,106 @@
+'use strict';
+const common = require('../common.js');
+const fsp = require('fs/promises');
+const path = require('path');
+const os = require('os');
+const { pipeline } = require('stream/promises');
+const {
+ ReadableStream,
+ WritableStream,
+} = require('node:stream/web');
+
+const bench = common.createBenchmark(main, {
+ type: [
+ 'node-streams',
+ 'webstream-js',
+ 'webstream-file-read',
+ ],
+ size: [1024, 16384, 65536],
+ n: [1e4, 1e5],
+});
+
+async function main({ type, size, n }) {
+ const chunk = Buffer.alloc(size, 'x');
+ const totalBytes = size * n;
+
+ switch (type) {
+ case 'node-streams': {
+ // Baseline: Node.js streams
+ let received = 0;
+ const readable = new (require('stream').Readable)({
+ read() {
+ for (let i = 0; i < 100 && received < n; i++) {
+ this.push(chunk);
+ received++;
+ }
+ if (received >= n) this.push(null);
+ },
+ });
+
+ const writable = new (require('stream').Writable)({
+ write(data, enc, cb) { cb(); },
+ });
+
+ bench.start();
+ await pipeline(readable, writable);
+ bench.end(totalBytes);
+ break;
+ }
+
+ case 'webstream-js': {
+ // Web streams with pure JS source/sink
+ let sent = 0;
+ const rs = new ReadableStream({
+ pull(controller) {
+ if (sent++ < n) {
+ controller.enqueue(chunk);
+ } else {
+ controller.close();
+ }
+ },
+ });
+
+ const ws = new WritableStream({
+ write() {},
+ close() { bench.end(totalBytes); },
+ });
+
+ bench.start();
+ await rs.pipeTo(ws);
+ break;
+ }
+
+ case 'webstream-file-read': {
+ // Create a temporary file with test data
+ const tmpDir = os.tmpdir();
+ const tmpFile = path.join(tmpDir, `bench-webstream-${process.pid}.tmp`);
+
+ // Write test data to file
+ const fd = await fsp.open(tmpFile, 'w');
+ for (let i = 0; i < n; i++) {
+ await fd.write(chunk);
+ }
+ await fd.close();
+
+ // Read using readableWebStream
+ const readFd = await fsp.open(tmpFile, 'r');
+ const rs = readFd.readableWebStream({ type: 'bytes' });
+
+ const ws = new WritableStream({
+ write() {},
+ close() {
+ bench.end(totalBytes);
+ // Cleanup
+ readFd.close().then(() => fsp.unlink(tmpFile));
+ },
+ });
+
+ bench.start();
+ await rs.pipeTo(ws);
+ break;
+ }
+
+ default:
+ throw new Error(`Unknown type: ${type}`);
+ }
+}
diff --git a/lib/internal/test_runner/reporter/junit.js b/lib/internal/test_runner/reporter/junit.js
index 130ff60af405ad..25f5667c59e273 100644
--- a/lib/internal/test_runner/reporter/junit.js
+++ b/lib/internal/test_runner/reporter/junit.js
@@ -5,9 +5,6 @@ const {
ArrayPrototypeMap,
ArrayPrototypePush,
ArrayPrototypeSome,
- Date,
- DateNow,
- DatePrototypeToISOString,
NumberPrototypeToFixed,
ObjectEntries,
RegExpPrototypeSymbolReplace,
@@ -115,11 +112,6 @@ module.exports = async function* junitReporter(source) {
currentTest.attrs.tests = nonCommentChildren.length;
currentTest.attrs.failures = ArrayPrototypeFilter(currentTest.children, isFailure).length;
currentTest.attrs.skipped = ArrayPrototypeFilter(currentTest.children, isSkipped).length;
- // A suite's `test:start` is emitted lazily (when its first subtest
- // reports), so derive the start time from the end minus the measured
- // duration rather than stamping the (late) test:start moment.
- currentTest.attrs.timestamp =
- DatePrototypeToISOString(new Date(DateNow() - event.data.details.duration_ms));
currentTest.attrs.hostname = HOSTNAME;
} else {
currentTest.tag = 'testcase';
diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js
index 1ade5d32951ff5..d5b4fa0101a7ed 100644
--- a/lib/internal/webstreams/adapters.js
+++ b/lib/internal/webstreams/adapters.js
@@ -35,6 +35,10 @@ const {
ByteLengthQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');
+const {
+ kStreamBase,
+} = require('internal/webstreams/util');
+
const {
Writable,
Readable,
@@ -1004,7 +1008,7 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
return promise.promise;
}
- return new WritableStream({
+ const stream = new WritableStream({
write(chunk, controller) {
current = current !== undefined ?
PromisePrototypeThen(
@@ -1025,6 +1029,10 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
return promise.promise;
},
}, strategy);
+
+ stream[kStreamBase] = streamBase;
+
+ return stream;
}
/**
@@ -1075,7 +1083,7 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
}
};
- return new ReadableStream({
+ const stream = new ReadableStream({
start(c) { controller = c; },
pull() {
@@ -1098,6 +1106,10 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
return promise.promise;
},
}, strategy);
+
+ stream[kStreamBase] = streamBase;
+
+ return stream;
}
module.exports = {
diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js
index ee496f30e5934e..04433baf4e1714 100644
--- a/lib/internal/webstreams/readablestream.js
+++ b/lib/internal/webstreams/readablestream.js
@@ -32,6 +32,7 @@ const {
const {
AbortError,
+ ErrnoException,
codes: {
ERR_ARG_NOT_ITERABLE,
ERR_ILLEGAL_CONSTRUCTOR,
@@ -51,6 +52,13 @@ const {
markPromiseAsHandled,
} = internalBinding('util');
+const {
+ kReadBytesOrError,
+ streamBaseState,
+} = internalBinding('stream_wrap');
+
+const { UV_EOF } = internalBinding('uv');
+
const {
isArrayBufferView,
isDataView,
@@ -123,6 +131,7 @@ const {
resetQueue,
resolvedRecord,
setPromiseHandled,
+ kStreamBase,
} = require('internal/webstreams/util');
const {
@@ -134,6 +143,7 @@ const {
isWritableStreamDefaultWriter,
writableStreamAbort,
+ writableStreamClose,
writableStreamCloseQueuedOrInFlight,
writableStreamDefaultWriterCloseWithErrorPropagation,
writableStreamDefaultWriterRelease,
@@ -1462,6 +1472,83 @@ function readableStreamPipeTo(
preventCancel,
signal) {
+ const sourceStreamBase = source?.[kStreamBase];
+ const destStreamBase = dest?.[kStreamBase];
+
+ if (sourceStreamBase !== undefined &&
+ destStreamBase !== undefined &&
+ signal === undefined &&
+ !preventClose &&
+ !preventAbort &&
+ !preventCancel) {
+ // Native C++ StreamPipe path for internal-to-internal piping.
+ // Ref: https://github.com/nodejs/performance/issues/134
+ const promise = PromiseWithResolvers();
+
+ source[kState].disturbed = true;
+
+ let pipeError = null;
+ let isComplete = false;
+ const originalSourceOnread = sourceStreamBase.onread;
+
+ sourceStreamBase.onread = (arrayBuffer) => {
+ const nread = streamBaseState[kReadBytesOrError];
+ if (nread < 0 && nread !== UV_EOF) {
+ pipeError = new ErrnoException(nread, 'read');
+ }
+ if (originalSourceOnread) {
+ return originalSourceOnread(arrayBuffer);
+ }
+ };
+
+ function finalize(error) {
+ if (isComplete) return;
+ isComplete = true;
+ sourceStreamBase.onread = originalSourceOnread;
+
+ if (error) {
+ if (source[kState].state === 'readable') {
+ readableStreamError(source, error);
+ }
+ if (dest[kState].state === 'writable') {
+ writableStreamAbort(dest, error);
+ }
+ promise.reject(error);
+ } else {
+ if (source[kState].state === 'readable') {
+ readableStreamClose(source);
+ }
+ if (dest[kState].state === 'writable' &&
+ !writableStreamCloseQueuedOrInFlight(dest)) {
+ PromisePrototypeThen(
+ writableStreamClose(dest),
+ () => promise.resolve(),
+ (err) => promise.reject(err),
+ );
+ } else {
+ promise.resolve();
+ }
+ }
+ }
+
+ try {
+ const pipe = new StreamPipe(sourceStreamBase, destStreamBase);
+ pipe.onunpipe = () => {
+ if (pipeError) {
+ finalize(pipeError);
+ }
+ };
+ pipe.oncomplete = () => finalize(pipeError);
+ pipe.start();
+ } catch (error) {
+ sourceStreamBase.onread = originalSourceOnread;
+ promise.reject(error);
+ }
+
+ return promise.promise;
+ }
+
+ // Use JS-based piping
let reader;
let writer;
let disposable;
diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js
index 836e1de130505b..9f4efaf3d14f2c 100644
--- a/lib/internal/webstreams/util.js
+++ b/lib/internal/webstreams/util.js
@@ -47,6 +47,7 @@ const {
const kState = Symbol('kState');
const kType = Symbol('kType');
+const kStreamBase = Symbol('kStreamBase');
const AsyncIterator = {
__proto__: AsyncIteratorPrototype,
@@ -280,4 +281,5 @@ module.exports = {
resetQueue,
resolvedRecord,
setPromiseHandled,
+ kStreamBase,
};
diff --git a/test/common/assertSnapshot.js b/test/common/assertSnapshot.js
index 4fdd0be0ee32ae..9a7482020e098a 100644
--- a/test/common/assertSnapshot.js
+++ b/test/common/assertSnapshot.js
@@ -226,7 +226,6 @@ function replaceJunitDuration(str) {
.replaceAll(/time="[0-9.]+"/g, 'time="*"')
.replaceAll(/duration_ms [0-9.]+/g, 'duration_ms *')
.replaceAll(`hostname="${hostname()}"`, 'hostname="HOSTNAME"')
- .replaceAll(/timestamp="[^"]*"/g, 'timestamp="*"')
.replaceAll(/file="[^"]*"/g, 'file="*"');
}
diff --git a/test/fixtures/test-runner/output/junit_reporter.snapshot b/test/fixtures/test-runner/output/junit_reporter.snapshot
index cef5f0b52da186..1142b5b31ff2e7 100644
--- a/test/fixtures/test-runner/output/junit_reporter.snapshot
+++ b/test/fixtures/test-runner/output/junit_reporter.snapshot
@@ -128,7 +128,7 @@ true !== false
-
+
Error [ERR_TEST_FAILURE]: thrown from subtest sync throw fail
@@ -151,15 +151,15 @@ Error [ERR_TEST_FAILURE]: thrown from subtest sync throw fail
[Error [ERR_TEST_FAILURE]: Symbol(thrown symbol from sync throw non-error fail)] { code: 'ERR_TEST_FAILURE', failureType: 'testCodeFailure', cause: Symbol(thrown symbol from sync throw non-error fail) }
-
+
-
+
-
+
@@ -266,7 +266,7 @@ Error [ERR_TEST_FAILURE]: thrown from callback async throw
-
+
@@ -288,7 +288,7 @@ Error [ERR_TEST_FAILURE]: thrown from callback async throw
}
-
+
Error [ERR_TEST_FAILURE]: thrown from subtest sync throw fails at first
diff --git a/test/parallel/test-runner-reporters.js b/test/parallel/test-runner-reporters.js
index 7fed79d45b48fd..50a47578a1da7e 100644
--- a/test/parallel/test-runner-reporters.js
+++ b/test/parallel/test-runner-reporters.js
@@ -200,12 +200,6 @@ describe('node:test reporters', { concurrency: true }, () => {
assert.strictEqual(child.stdout.toString(), '');
const fileContents = fs.readFileSync(file, 'utf8');
assert.match(fileContents, //);
- // The exact timestamp format is intentionally not pinned here (still under
- // discussion); assert only that the value is present and a real date.
- const { 1: timestamp } = fileContents.match(/]*timestamp="([^"]+)"/) ?? [];
- assert.ok(timestamp, 'testsuite should have a timestamp attribute');
- assert.match(timestamp, /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/);
- assert.ok(!Number.isNaN(Date.parse(timestamp)), `expected a valid date, got ${timestamp}`);
assert.match(fileContents, /\s*/);
assert.match(fileContents, //);
assert.match(fileContents, //);
diff --git a/test/parallel/test-whatwg-webstreams-internal-pipe.js b/test/parallel/test-whatwg-webstreams-internal-pipe.js
new file mode 100644
index 00000000000000..d115474c4ea1f8
--- /dev/null
+++ b/test/parallel/test-whatwg-webstreams-internal-pipe.js
@@ -0,0 +1,123 @@
+// Flags: --expose-internals --no-warnings
+'use strict';
+
+// Tests for internal StreamBase pipe optimization (nodejs/performance#134)
+
+const common = require('../common');
+const assert = require('assert');
+const { internalBinding } = require('internal/test/binding');
+
+const {
+ newWritableStreamFromStreamBase,
+ newReadableStreamFromStreamBase,
+} = require('internal/webstreams/adapters');
+
+const { kStreamBase } = require('internal/webstreams/util');
+const { JSStream } = internalBinding('js_stream');
+
+// kStreamBase marker is attached to ReadableStream
+{
+ const stream = new JSStream();
+ const readable = newReadableStreamFromStreamBase(stream);
+ assert.strictEqual(readable[kStreamBase], stream);
+ stream.emitEOF();
+}
+
+// kStreamBase marker is attached to WritableStream
+{
+ const stream = new JSStream();
+ stream.onwrite = common.mustNotCall();
+ stream.onshutdown = (req) => req.oncomplete();
+
+ const writable = newWritableStreamFromStreamBase(stream);
+ assert.strictEqual(writable[kStreamBase], stream);
+ writable.close();
+}
+
+// Regular JS streams don't have kStreamBase
+{
+ const { ReadableStream, WritableStream } = require('stream/web');
+
+ const rs = new ReadableStream({
+ pull(controller) {
+ controller.enqueue('chunk');
+ controller.close();
+ },
+ });
+
+ const ws = new WritableStream({ write() {} });
+
+ assert.strictEqual(rs[kStreamBase], undefined);
+ assert.strictEqual(ws[kStreamBase], undefined);
+
+ rs.pipeTo(ws).then(common.mustCall());
+}
+
+// Mixed streams use standard JS path
+{
+ const stream = new JSStream();
+ stream.onshutdown = (req) => req.oncomplete();
+ const readable = newReadableStreamFromStreamBase(stream);
+
+ const { WritableStream } = require('stream/web');
+ const chunks = [];
+ const ws = new WritableStream({
+ write(chunk) { chunks.push(chunk); },
+ });
+
+ assert.ok(readable[kStreamBase]);
+ assert.strictEqual(ws[kStreamBase], undefined);
+
+ const pipePromise = readable.pipeTo(ws);
+ stream.readBuffer(Buffer.from('hello'));
+ stream.emitEOF();
+
+ pipePromise.then(common.mustCall(() => {
+ assert.strictEqual(chunks.length, 1);
+ }));
+}
+
+// Verify kStreamBase symbol identity
+{
+ const { kStreamBase: kStreamBase2 } = require('internal/webstreams/util');
+ assert.strictEqual(kStreamBase, kStreamBase2);
+}
+
+// FileHandle.readableWebStream() uses async reads, not StreamBase
+{
+ const fs = require('fs/promises');
+ const path = require('path');
+ const os = require('os');
+
+ async function testFileStreamPipe() {
+ const tmpDir = os.tmpdir();
+ const testFile = path.join(tmpDir, `test-webstream-pipe-${process.pid}.txt`);
+ const testData = 'Hello, WebStreams pipe!';
+
+ await fs.writeFile(testFile, testData);
+
+ try {
+ const fileHandle = await fs.open(testFile, 'r');
+ const readable = fileHandle.readableWebStream();
+
+ assert.strictEqual(readable[kStreamBase], undefined);
+
+ const chunks = [];
+ const writable = new (require('stream/web').WritableStream)({
+ write(chunk) { chunks.push(chunk); },
+ });
+
+ await readable.pipeTo(writable);
+ await fileHandle.close();
+
+ const result = Buffer.concat(chunks.map((c) =>
+ (c instanceof Uint8Array ? Buffer.from(c) : Buffer.from(c)),
+ )).toString();
+ assert.strictEqual(result, testData);
+ } finally {
+ await fs.unlink(testFile).catch(() => {});
+ }
+ }
+
+ testFileStreamPipe().then(common.mustCall());
+}