diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js index 8d2d374dc8e6ae..cfcbb2ea1f326b 100644 --- a/lib/diagnostics_channel.js +++ b/lib/diagnostics_channel.js @@ -2,7 +2,6 @@ const { ArrayPrototypeAt, - ArrayPrototypeIndexOf, ArrayPrototypePush, ArrayPrototypePushApply, ArrayPrototypeSlice, @@ -15,6 +14,7 @@ const { ReflectApply, SafeFinalizationRegistry, SafeMap, + SafeSet, SymbolDispose, SymbolHasInstance, } = primordials; @@ -36,6 +36,19 @@ const { subscribers: subscriberCounts } = dc_binding; const { WeakReference } = require('internal/util'); const { isPromise } = require('internal/util/types'); +let suppressionStorage; + +function getSuppressionsStorage() { + if (suppressionStorage === undefined) { + const { AsyncLocalStorage } = require('async_hooks'); + suppressionStorage = new AsyncLocalStorage(); + } + return suppressionStorage; +} + +function withSuppressionsContext(set, fn, thisArg, args) { + return getSuppressionsStorage().run(set, fn.bind(thisArg), ...args); +} // Can't delete when weakref count reaches 0 as it could increment again. // Only GC can be used as a valid time to clean up the channels map. class WeakRefMap extends SafeMap { @@ -93,9 +106,16 @@ class RunStoresScope { // Enter stores using withScope if (activeChannel._stores) { + const activeKeys = getSuppressionsStorage().getStore(); for (const entry of activeChannel._stores.entries()) { const store = entry[0]; - const transform = entry[1]; + const { transform, subscriberId = null } = entry[1]; + + // Skip this bound store if it opted into suppression and its key + // is active in the current async context. + if (subscriberId !== null && activeKeys?.has(subscriberId)) { + continue; + } let newContext = data; if (transform) { @@ -127,16 +147,32 @@ class RunStoresScope { // TODO(qard): should there be a C++ channel interface? class ActiveChannel { - subscribe(subscription) { + subscribe(subscription, options = {}) { validateFunction(subscription, 'subscription'); + const subscriberId = options && options.subscriberId !== undefined ? options.subscriberId : null; + if (subscriberId !== null) { + const t = typeof subscriberId; + if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') { + throw new ERR_INVALID_ARG_TYPE('subscriberId', ['object', 'symbol', 'function'], subscriberId); + } + } + + const handler = subscription; this._subscribers = ArrayPrototypeSlice(this._subscribers); - ArrayPrototypePush(this._subscribers, subscription); + ArrayPrototypePush(this._subscribers, { handler, subscriberId }); channels.incRef(this.name); if (this._index !== undefined) subscriberCounts[this._index]++; } unsubscribe(subscription) { - const index = ArrayPrototypeIndexOf(this._subscribers, subscription); + // Find subscriber entry by handler identity. + let index = -1; + for (let i = 0; i < (this._subscribers?.length || 0); i++) { + if (this._subscribers[i].handler === subscription) { + index = i; + break; + } + } if (index === -1) return false; const before = ArrayPrototypeSlice(this._subscribers, 0, index); @@ -151,13 +187,21 @@ class ActiveChannel { return true; } - bindStore(store, transform) { + bindStore(store, transform, options = {}) { + const subscriberId = options && options.subscriberId !== undefined ? options.subscriberId : null; + if (subscriberId !== null) { + const t = typeof subscriberId; + if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') { + throw new ERR_INVALID_ARG_TYPE('subscriberId', ['object', 'symbol', 'function'], subscriberId); + } + } + const replacing = this._stores.has(store); if (!replacing) { channels.incRef(this.name); if (this._index !== undefined) subscriberCounts[this._index]++; } - this._stores.set(store, transform); + this._stores.set(store, { transform, subscriberId }); } unbindStore(store) { @@ -180,10 +224,14 @@ class ActiveChannel { publish(data) { const subscribers = this._subscribers; + const activeKeys = getSuppressionsStorage().getStore(); for (let i = 0; i < (subscribers?.length || 0); i++) { try { - const onMessage = subscribers[i]; - onMessage(data, this.name); + const { handler, subscriberId = null } = subscribers[i]; + if (subscriberId !== null && activeKeys?.has(subscriberId)) { + continue; + } + handler(data, this.name); } catch (err) { process.nextTick(() => { triggerUncaughtException(err, false); @@ -221,18 +269,18 @@ class Channel { prototype === ActiveChannel.prototype; } - subscribe(subscription) { + subscribe(subscription, options) { markActive(this); - this.subscribe(subscription); + this.subscribe(subscription, options); } unsubscribe() { return false; } - bindStore(store, transform) { + bindStore(store, transform, options) { markActive(this); - this.bindStore(store, transform); + this.bindStore(store, transform, options); } unbindStore() { @@ -366,12 +414,12 @@ class BoundedChannel { this.end?.hasSubscribers; } - subscribe(handlers) { + subscribe(handlers, options) { for (let i = 0; i < boundedEvents.length; ++i) { const name = boundedEvents[i]; if (!handlers[name]) continue; - this[name]?.subscribe(handlers[name]); + this[name]?.subscribe(handlers[name], options); } } @@ -458,13 +506,13 @@ class TracingChannel { this.error?.hasSubscribers; } - subscribe(handlers) { + subscribe(handlers, options) { // Subscribe to call window (start/end) if (handlers.start || handlers.end) { this.#callWindow.subscribe({ start: handlers.start, end: handlers.end, - }); + }, options); } // Subscribe to continuation window (asyncStart/asyncEnd) @@ -472,12 +520,12 @@ class TracingChannel { this.#continuationWindow.subscribe({ start: handlers.asyncStart, end: handlers.asyncEnd, - }); + }, options); } // Subscribe to error channel if (handlers.error) { - this.error.subscribe(handlers.error); + this.error.subscribe(handlers.error, options); } } @@ -633,10 +681,28 @@ function tracingChannel(nameOrChannels) { dc_binding.linkNativeChannel((name) => channel(name)); +function suppressed(key, fn, thisArg, ...args) { + validateFunction(fn, 'fn'); + + if (key === null || key === undefined) { + throw new ERR_INVALID_ARG_TYPE('key', ['object', 'symbol', 'function'], key); + } + const t = typeof key; + if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') { + throw new ERR_INVALID_ARG_TYPE('key', ['object', 'symbol', 'function'], key); + } + + const currentSet = getSuppressionsStorage().getStore(); + const next = currentSet ? new SafeSet(currentSet) : new SafeSet(); + next.add(key); + return withSuppressionsContext(next, fn, thisArg, args); +} + module.exports = { channel, hasSubscribers, subscribe, + suppressed, tracingChannel, unsubscribe, boundedChannel, diff --git a/src/node_errors.cc b/src/node_errors.cc index 63db97f6a56db0..2c0fc0c33c0266 100644 --- a/src/node_errors.cc +++ b/src/node_errors.cc @@ -1064,12 +1064,8 @@ void PerIsolateMessageListener(Local message, Local error) { filename, message->GetLineNumber(env->context()).FromMaybe(-1), msg); - // Defer the warning to the next event loop iteration. This prevents - // crashes when V8 emits warnings during code evaluation with - // throwOnSideEffect. - env->SetImmediate([warning](Environment* env) { - ProcessEmitWarningGeneric(env, warning, "V8"); - }); + // Emit the warning immediately. + USE(ProcessEmitWarningGeneric(env, warning, "V8")); break; } case Isolate::MessageErrorLevel::kMessageError: diff --git a/test/parallel/test-diagnostics-channel-suppression.js b/test/parallel/test-diagnostics-channel-suppression.js new file mode 100644 index 00000000000000..cd3523848a8127 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-suppression.js @@ -0,0 +1,202 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { channel, suppressed } = require('node:diagnostics_channel'); +const { AsyncLocalStorage } = require('async_hooks'); + +// Test 1: Basic suppression - subscriber with subscriberId is skipped inside suppressed() +{ + const key = Symbol('tracer'); + const ch = channel('test-suppression-basic'); + const handler = common.mustNotCall(); + ch.subscribe(handler, { subscriberId: key }); + + suppressed(key, common.mustCall(() => { + ch.publish({}); + })); + + ch.unsubscribe(handler); +} + +// Test 2: Non-opted subscriber fires even inside suppressed() scope +{ + const key = Symbol('tracer2'); + const ch = channel('test-suppression-nonopted'); + const optedHandler = common.mustNotCall(); + const regularHandler = common.mustCall(); + ch.subscribe(optedHandler, { subscriberId: key }); + ch.subscribe(regularHandler); // no suppression + + suppressed(key, common.mustCall(() => { + ch.publish({}); + })); + + ch.unsubscribe(optedHandler); + ch.unsubscribe(regularHandler); +} + +// Test 3: Two APMs with different keys don't suppress each other +{ + const k1 = Symbol('k1'); + const k2 = Symbol('k2'); + const ch = channel('test-suppression-two-keys'); + let h1Calls = 0; + let h2Calls = 0; + const h1 = common.mustCall(() => { h1Calls++; }, 1); + const h2 = common.mustCall(() => { h2Calls++; }, 1); + ch.subscribe(h1, { subscriberId: k1 }); + ch.subscribe(h2, { subscriberId: k2 }); + + suppressed(k1, common.mustCall(() => { + ch.publish({}); + })); + + assert.strictEqual(h1Calls, 0); + assert.strictEqual(h2Calls, 1); + + suppressed(k2, common.mustCall(() => { + ch.publish({}); + })); + + assert.strictEqual(h1Calls, 1); + assert.strictEqual(h2Calls, 1); + + ch.unsubscribe(h1); + ch.unsubscribe(h2); +} + +// Test 4: Nested suppressed() calls (same key, different keys) +{ + const k1 = Symbol('nested1'); + const k2 = Symbol('nested2'); + const ch = channel('test-suppression-nested'); + const h1 = common.mustNotCall(); + let h2Calls = 0; + const h2 = common.mustCall(() => { h2Calls++; }, 2); + ch.subscribe(h1, { subscriberId: k1 }); + ch.subscribe(h2, { subscriberId: k2 }); + + suppressed(k1, common.mustCall(() => { + // Inside k1, h1 skipped, h2 runs + ch.publish({}); + assert.strictEqual(h2Calls, 1); + + suppressed(k2, common.mustCall(() => { + // Inside both, both skipped + ch.publish({}); + assert.strictEqual(h2Calls, 1); + })); + + // Back to only k1 + ch.publish({}); + assert.strictEqual(h2Calls, 2); + })); + + ch.unsubscribe(h1); + ch.unsubscribe(h2); +} + +// Test 5: suppressed() across a Promise boundary +{ + const key = Symbol('promise'); + const ch = channel('test-suppression-promise'); + const handler = common.mustNotCall(); + ch.subscribe(handler, { subscriberId: key }); + const done = common.mustCall(); + + suppressed(key, common.mustCall(async () => { + await Promise.resolve(); + ch.publish({}); + })).then(common.mustCall(() => { + ch.unsubscribe(handler); + done(); + })); +} + +// Test 6: suppressed() across setImmediate and queueMicrotask +{ + const key = Symbol('timers'); + const ch = channel('test-suppression-timers'); + const handler = common.mustNotCall(); + ch.subscribe(handler, { subscriberId: key }); + const done = common.mustCall(); + + suppressed(key, common.mustCall(async () => { + await new Promise((resolve) => { + setImmediate(common.mustCall(() => { + ch.publish({}); + + queueMicrotask(common.mustCall(() => { + ch.publish({}); + resolve(); + })); + })); + }); + })).then(common.mustCall(() => { + ch.unsubscribe(handler); + done(); + })); +} + +// Test 7: unsubscribe() works correctly after using subscriberId +{ + const key = Symbol('unsub'); + const ch = channel('test-suppression-unsubscribe'); + const handler = common.mustNotCall(); + ch.subscribe(handler, { subscriberId: key }); + ch.unsubscribe(handler); + + // Should not throw and should not be called + suppressed(key, common.mustCall(() => { + ch.publish({}); + })); + +} + +// Test 8: bindStore with subscriberId is skipped inside suppressed() +{ + const key = Symbol('store'); + const ch = channel('test-suppression-store'); + const als = new AsyncLocalStorage(); + + const handler = common.mustCall(() => { + assert.strictEqual(als.getStore(), undefined); + }); + + ch.subscribe(handler); + ch.bindStore(als, common.mustNotCall(), { subscriberId: key }); + + suppressed(key, common.mustCall(() => { + ch.publish({}); + })); + + ch.unsubscribe(handler); + ch.unbindStore(als); +} + +// Test 9: Wrong type for subscriberId throws ERR_INVALID_ARG_TYPE +{ + const ch = channel('test-suppression-wrong-type'); + const bad = 'not-allowed'; + assert.throws(() => ch.subscribe(() => {}, { subscriberId: bad }), { + name: 'TypeError' + }); + const als = new AsyncLocalStorage(); + assert.throws(() => ch.bindStore(als, (d) => d, { subscriberId: bad }), { + name: 'TypeError' + }); +} + +// Test 10: suppressed() return value passes through fn's return value +{ + const key = Symbol('return'); + const receiver = { value: 41 }; + const result = suppressed(key, common.mustCall(function(a, b) { + assert.strictEqual(this, receiver); + assert.strictEqual(a, 'a'); + assert.strictEqual(b, 'b'); + return this.value + 1; + }), receiver, 'a', 'b'); + assert.strictEqual(result, 42); +}