Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 85 additions & 19 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const {
ArrayPrototypeAt,
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypePushApply,
ArrayPrototypeSlice,
Expand All @@ -15,6 +14,7 @@ const {
ReflectApply,
SafeFinalizationRegistry,
SafeMap,
SafeSet,
SymbolDispose,
SymbolHasInstance,
} = primordials;
Expand All @@ -36,6 +36,19 @@ const { subscribers: subscriberCounts } = dc_binding;
const { WeakReference } = require('internal/util');
const { isPromise } = require('internal/util/types');

let suppressionStorage;

function getSuppressionsStorage() {
if (suppressionStorage === undefined) {
const { AsyncLocalStorage } = require('async_hooks');
suppressionStorage = new AsyncLocalStorage();
}
return suppressionStorage;
}

function withSuppressionsContext(set, fn, thisArg, args) {
return getSuppressionsStorage().run(set, fn.bind(thisArg), ...args);
}
// Can't delete when weakref count reaches 0 as it could increment again.
// Only GC can be used as a valid time to clean up the channels map.
class WeakRefMap extends SafeMap {
Expand Down Expand Up @@ -93,9 +106,16 @@ class RunStoresScope {

// Enter stores using withScope
if (activeChannel._stores) {
const activeKeys = getSuppressionsStorage().getStore();
for (const entry of activeChannel._stores.entries()) {
const store = entry[0];
const transform = entry[1];
const { transform, subscriberId = null } = entry[1];

// Skip this bound store if it opted into suppression and its key
// is active in the current async context.
if (subscriberId !== null && activeKeys?.has(subscriberId)) {
continue;
}

let newContext = data;
if (transform) {
Expand Down Expand Up @@ -127,16 +147,32 @@ class RunStoresScope {

// TODO(qard): should there be a C++ channel interface?
class ActiveChannel {
subscribe(subscription) {
subscribe(subscription, options = {}) {
validateFunction(subscription, 'subscription');
const subscriberId = options && options.subscriberId !== undefined ? options.subscriberId : null;
if (subscriberId !== null) {
const t = typeof subscriberId;
if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') {
throw new ERR_INVALID_ARG_TYPE('subscriberId', ['object', 'symbol', 'function'], subscriberId);
}
}

const handler = subscription;
this._subscribers = ArrayPrototypeSlice(this._subscribers);
ArrayPrototypePush(this._subscribers, subscription);
ArrayPrototypePush(this._subscribers, { handler, subscriberId });
channels.incRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]++;
}

unsubscribe(subscription) {
const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
// Find subscriber entry by handler identity.
let index = -1;
for (let i = 0; i < (this._subscribers?.length || 0); i++) {
if (this._subscribers[i].handler === subscription) {
index = i;
break;
}
}
if (index === -1) return false;

const before = ArrayPrototypeSlice(this._subscribers, 0, index);
Expand All @@ -151,13 +187,21 @@ class ActiveChannel {
return true;
}

bindStore(store, transform) {
bindStore(store, transform, options = {}) {
const subscriberId = options && options.subscriberId !== undefined ? options.subscriberId : null;
if (subscriberId !== null) {
const t = typeof subscriberId;
if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') {
throw new ERR_INVALID_ARG_TYPE('subscriberId', ['object', 'symbol', 'function'], subscriberId);
}
}

const replacing = this._stores.has(store);
if (!replacing) {
channels.incRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]++;
}
this._stores.set(store, transform);
this._stores.set(store, { transform, subscriberId });
}

unbindStore(store) {
Expand All @@ -180,10 +224,14 @@ class ActiveChannel {

publish(data) {
const subscribers = this._subscribers;
const activeKeys = getSuppressionsStorage().getStore();
for (let i = 0; i < (subscribers?.length || 0); i++) {
try {
const onMessage = subscribers[i];
onMessage(data, this.name);
const { handler, subscriberId = null } = subscribers[i];
if (subscriberId !== null && activeKeys?.has(subscriberId)) {
continue;
}
handler(data, this.name);
} catch (err) {
process.nextTick(() => {
triggerUncaughtException(err, false);
Expand Down Expand Up @@ -221,18 +269,18 @@ class Channel {
prototype === ActiveChannel.prototype;
}

subscribe(subscription) {
subscribe(subscription, options) {
markActive(this);
this.subscribe(subscription);
this.subscribe(subscription, options);
}

unsubscribe() {
return false;
}

bindStore(store, transform) {
bindStore(store, transform, options) {
markActive(this);
this.bindStore(store, transform);
this.bindStore(store, transform, options);
}

unbindStore() {
Expand Down Expand Up @@ -366,12 +414,12 @@ class BoundedChannel {
this.end?.hasSubscribers;
}

subscribe(handlers) {
subscribe(handlers, options) {
for (let i = 0; i < boundedEvents.length; ++i) {
const name = boundedEvents[i];
if (!handlers[name]) continue;

this[name]?.subscribe(handlers[name]);
this[name]?.subscribe(handlers[name], options);
}
}

Expand Down Expand Up @@ -458,26 +506,26 @@ class TracingChannel {
this.error?.hasSubscribers;
}

subscribe(handlers) {
subscribe(handlers, options) {
// Subscribe to call window (start/end)
if (handlers.start || handlers.end) {
this.#callWindow.subscribe({
start: handlers.start,
end: handlers.end,
});
}, options);
}

// Subscribe to continuation window (asyncStart/asyncEnd)
if (handlers.asyncStart || handlers.asyncEnd) {
this.#continuationWindow.subscribe({
start: handlers.asyncStart,
end: handlers.asyncEnd,
});
}, options);
}

// Subscribe to error channel
if (handlers.error) {
this.error.subscribe(handlers.error);
this.error.subscribe(handlers.error, options);
}
}

Expand Down Expand Up @@ -633,10 +681,28 @@ function tracingChannel(nameOrChannels) {

dc_binding.linkNativeChannel((name) => channel(name));

function suppressed(key, fn, thisArg, ...args) {
validateFunction(fn, 'fn');

if (key === null || key === undefined) {
throw new ERR_INVALID_ARG_TYPE('key', ['object', 'symbol', 'function'], key);
}
const t = typeof key;
if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') {
throw new ERR_INVALID_ARG_TYPE('key', ['object', 'symbol', 'function'], key);
}

const currentSet = getSuppressionsStorage().getStore();
const next = currentSet ? new SafeSet(currentSet) : new SafeSet();
next.add(key);
return withSuppressionsContext(next, fn, thisArg, args);
}

module.exports = {
channel,
hasSubscribers,
subscribe,
suppressed,
tracingChannel,
unsubscribe,
boundedChannel,
Expand Down
7 changes: 6 additions & 1 deletion src/node_errors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,12 @@ void PerIsolateMessageListener(Local<Message> message, Local<Value> error) {
filename,
message->GetLineNumber(env->context()).FromMaybe(-1),
msg);
USE(ProcessEmitWarningGeneric(env, warning, "V8"));
// Defer the warning to the next event loop iteration. This prevents
// crashes when V8 emits warnings during code evaluation with
// throwOnSideEffect.
env->SetImmediate([warning](Environment* env) {
ProcessEmitWarningGeneric(env, warning, "V8");
});
break;
}
case Isolate::MessageErrorLevel::kMessageError:
Expand Down
Loading
Loading