Skip to content
Draft
2 changes: 2 additions & 0 deletions builds/e2e/templates/e2e-run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ steps:
# Below tests were disabled and marked for re-enable when a blocking item was resolved.
# When it was resolved the tests were never enabled. We need to re-enable these.
$filter += '&FullyQualifiedName!~Provisioning&FullyQualifiedName!~SasOutOfScope&FullyQualifiedName!~X509ManualProvision&FullyQualifiedName!~AuthorizationPolicyUpdateTest&FullyQualifiedName!~AuthorizationPolicyExplicitPolicyTest'
$filter += '&(FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.Module|FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.Metrics|FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.IoTEdgeCheck)'
}
elseif ($test_type -eq 'nestededge_amqp')
{
Expand All @@ -67,6 +68,7 @@ steps:
# Below tests were disabled and marked for re-enable when a blocking item was resolved.
# When it was resolved the tests were never enabled. We need to re-enable these.
$filter += '&FullyQualifiedName!~Provisioning&FullyQualifiedName!~SasOutOfScope&FullyQualifiedName!~X509ManualProvision&FullyQualifiedName!~AuthorizationPolicyUpdateTest&FullyQualifiedName!~AuthorizationPolicyExplicitPolicyTest'
$filter += '&(FullyQualifiedName~QuickstartCerts|FullyQualifiedName~RouteMessageL3LeafToL4Module)'
}
elseif ($test_type -eq 'nestededge_isa95')
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,66 @@ void HandleDeviceDisconnectedEvent()
void InternalConnectionStatusChangedHandler(ConnectionStatus status, ConnectionStatusChangeReason reason)
{
Events.ReceivedDeviceSdkCallback(this.identity, status, reason);
// @TODO: Ignore callback from Device SDK since it seems to be generating a lot of spurious Connected/NotConnected callbacks
/*

// TEMP: Bridge selected SDK callbacks into connectivity manager updates.
// Guards below prevent callback flapping from creating duplicate transitions.
if (status == ConnectionStatus.Connected)
{
this.deviceConnectivityManager.CallSucceeded();
this.HandleDeviceConnectedEvent();
if (!this.isConnected.Get())
{
this.HandleDeviceConnectedEvent();
Events.TempSdkBridgeApplied(this.identity, status, reason, "connected");
_ = this.ReportSdkConnectedAsync(status, reason);
}
else
{
Events.TempSdkBridgeIgnored(this.identity, status, reason, "already connected");
}
}
else if (status == ConnectionStatus.Disconnected || status == ConnectionStatus.Disabled)
else if (status == ConnectionStatus.Disconnected || status == ConnectionStatus.Disconnected_Retrying || status == ConnectionStatus.Disabled)
{
this.deviceConnectivityManager.CallTimedOut();
this.HandleDeviceDisconnectedEvent();
// Ignore intentional close transitions to avoid unnecessary connection churn.
if (reason == ConnectionStatusChangeReason.Client_Close)
{
Events.TempSdkBridgeIgnored(this.identity, status, reason, "client close");
return;
}

if (this.isConnected.Get())
{
this.HandleDeviceDisconnectedEvent();
Events.TempSdkBridgeApplied(this.identity, status, reason, "disconnected");
_ = this.ReportSdkDisconnectedAsync(status, reason);
}
else
{
Events.TempSdkBridgeIgnored(this.identity, status, reason, "already disconnected");
}
}
}

async Task ReportSdkConnectedAsync(ConnectionStatus status, ConnectionStatusChangeReason reason)
{
try
{
await this.deviceConnectivityManager.CallSucceeded();
}
catch (Exception ex)
{
Events.TempSdkBridgeError(this.identity, status, reason, ex);
}
}

async Task ReportSdkDisconnectedAsync(ConnectionStatus status, ConnectionStatusChangeReason reason)
{
try
{
await this.deviceConnectivityManager.CallTimedOut();
}
catch (Exception ex)
{
Events.TempSdkBridgeError(this.identity, status, reason, ex);
}
this.connectionStatusChangedHandler?.Invoke(status, reason);
*/
}

async Task<T> InvokeFunc<T>(Func<Task<T>> func, string operation, bool useForConnectivityCheck = true)
Expand Down Expand Up @@ -209,7 +255,10 @@ enum EventIds
OperationFailed,
OperationSucceeded,
ChangingStatus,
FailOverDetected
FailOverDetected,
TempSdkBridgeApplied,
TempSdkBridgeIgnored,
TempSdkBridgeError
}

public static void ReceivedDeviceSdkCallback(IIdentity identity, ConnectionStatus status, ConnectionStatusChangeReason reason)
Expand Down Expand Up @@ -241,6 +290,21 @@ public static void FailOverDetected(IIdentity identity, string operation, Except
{
Log.LogInformation((int)EventIds.FailOverDetected, ex, $"Operation {operation} failed for {identity.Id} because of fail-over");
}

public static void TempSdkBridgeApplied(IIdentity identity, ConnectionStatus status, ConnectionStatusChangeReason reason, string action)
{
Log.LogWarning((int)EventIds.TempSdkBridgeApplied, $"[TEMP SdkReconnectBridgeApplied] Action={action}, status={status}, reason={reason}, identity={identity.Id}");
}

public static void TempSdkBridgeIgnored(IIdentity identity, ConnectionStatus status, ConnectionStatusChangeReason reason, string note)
{
Log.LogDebug((int)EventIds.TempSdkBridgeIgnored, $"[TEMP SdkReconnectBridgeIgnored] Note={note}, status={status}, reason={reason}, identity={identity.Id}");
}

public static void TempSdkBridgeError(IIdentity identity, ConnectionStatus status, ConnectionStatusChangeReason reason, Exception ex)
{
Log.LogWarning((int)EventIds.TempSdkBridgeError, ex, $"[TEMP SdkReconnectBridgeError] Failed to report SDK callback. status={status}, reason={reason}, identity={identity.Id}");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ public Task<ulong> GetMessageCountFromOffset(string endpointId, long offset)
return sequentialStore.GetCountFromOffset(offset);
}

/// <summary>
/// Triggers an immediate cleanup attempt. Used for connection recovery
/// to retry checkpoint commits that may have failed during network outages.
/// </summary>
public void TriggerCleanup()
{
this.messagesCleaner.TriggerCleanup();
}

public void Dispose()
{
this.Dispose(true);
Expand Down Expand Up @@ -202,6 +211,7 @@ class CleanupProcessor : IDisposable
readonly MessageStore messageStore;
readonly Timer ensureCleanupTaskTimer;
readonly CancellationTokenSource cancellationTokenSource;
readonly SemaphoreSlim cleanupWakeSignal;
readonly bool checkEntireQueueOnCleanup;
readonly int messageCleanupIntervalSecs;
readonly IMetricsCounter expiredCounter;
Expand All @@ -212,6 +222,7 @@ public CleanupProcessor(MessageStore messageStore, bool checkEntireQueueOnCleanu
this.checkEntireQueueOnCleanup = checkEntireQueueOnCleanup;
this.messageStore = messageStore;
this.cancellationTokenSource = new CancellationTokenSource();
this.cleanupWakeSignal = new SemaphoreSlim(0);
this.messageCleanupIntervalSecs = messageCleanupIntervalSecs;
this.expiredCounter = Metrics.Instance.CreateCounter(
"messages_dropped",
Expand All @@ -225,11 +236,25 @@ public void Dispose()
{
this.ensureCleanupTaskTimer?.Dispose();
this.cancellationTokenSource?.Cancel();
this.cleanupWakeSignal?.Release();
// wait for 30 secs for the cleanup task to finish.
this.cleanupTask?.Wait(TimeSpan.FromSeconds(30));
this.cleanupWakeSignal?.Dispose();
// Not disposing the cleanup task, in case it is not completed yet.
}

/// <summary>
/// Triggers an immediate cleanup attempt. Called when cloud connection is restored
/// to retry checkpoint commits for messages that were sent but not acknowledged.
/// </summary>
public void TriggerCleanup()
{
this.EnsureCleanupTask(null);
this.cleanupWakeSignal.Release();
Events.CleanupTriggeredByConnectionRecovery();
Events.TempCleanupWakeSignal();
}

void EnsureCleanupTask(object state)
{
if (this.cleanupTask == null || this.cleanupTask.IsCompleted)
Expand Down Expand Up @@ -306,9 +331,16 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup)
Events.CleanupTaskStarted(messageQueueId);
CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(messageQueueId, CancellationToken.None);
ISequentialStore<MessageRef> sequentialStore = endpointSequentialStore.Value;
long queueHeadOffset = sequentialStore.GetHeadOffset(this.cancellationTokenSource.Token);
long unreadStartOffset = checkpointData.Offset + 1;
Events.TempCleanupCorrelation(messageQueueId, checkpointData.Offset, unreadStartOffset, queueHeadOffset);
Events.CleanupCheckpointState(messageQueueId, checkpointData);
int cleanupEntityStoreCount = 0;

// Track orphaned messages for observability
int orphanedMessageCount = 0;
DateTime earliestOrphanedMessageTime = DateTime.MaxValue;

async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
var expiry = messageRef.TimeStamp + messageRef.TimeToLive;
Expand All @@ -317,6 +349,16 @@ async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
return false;
}

// Detect orphaned messages (expired but can't clean due to offset gap)
if (offset > checkpointData.Offset && expiry <= DateTime.UtcNow)
{
orphanedMessageCount++;
if (messageRef.TimeStamp < earliestOrphanedMessageTime)
{
earliestOrphanedMessageTime = messageRef.TimeStamp;
}
}

var message = await this.TryDecrementRefCountUpdate(messageRef.EdgeMessageId, messageQueueId);

await message.ForEachAsync(async msg =>
Expand Down Expand Up @@ -378,14 +420,23 @@ await message.ForEachAsync(async msg =>
totalCleanupCount += cleanupCount;
totalCleanupStoreCount += cleanupEntityStoreCount;
Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);

// Log orphaned messages for observability
if (orphanedMessageCount > 0)
{
TimeSpan oldestOrphanAge = DateTime.UtcNow - earliestOrphanedMessageTime;
Events.OrphanedMessagesDetected(messageQueueId, orphanedMessageCount, checkpointData.Offset, oldestOrphanAge);
}
}
catch (Exception ex)
{
Events.ErrorCleaningMessagesForEndpoint(ex, messageQueueId);
}
}

await Task.Delay(this.GetCleanupTaskSleepTime());
await Task.WhenAny(
Task.Delay(this.GetCleanupTaskSleepTime(), this.cancellationTokenSource.Token),
this.cleanupWakeSignal.WaitAsync(this.cancellationTokenSource.Token));
}
}

Expand Down Expand Up @@ -415,10 +466,14 @@ enum EventIds
GettingNextBatch,
ObtainedNextBatch,
CleanupCheckpointState,
TempCleanupCorrelation,
MessageAdded,
ErrorGettingMessagesBatch,
CreatedCleanupProcessor,
ErrorUpdatingMessageForEndpoint
ErrorUpdatingMessageForEndpoint,
CleanupTriggeredByConnectionRecovery,
TempCleanupWakeSignal,
OrphanedMessagesDetected
}

public static void MessageStoreCreated()
Expand All @@ -441,6 +496,16 @@ public static void CleanupTaskInitialized()
Log.LogInformation((int)EventIds.CleanupTaskStarted, "Started task to cleanup processed and stale messages");
}

public static void CleanupTriggeredByConnectionRecovery()
{
Log.LogInformation((int)EventIds.CleanupTriggeredByConnectionRecovery, "Triggering cleanup due to cloud connection recovery to retry pending checkpoint commits");
}

public static void TempCleanupWakeSignal()
{
Log.LogWarning((int)EventIds.TempCleanupWakeSignal, "[TEMP CleanupWakeSignal] Cleanup wake signal sent due to connection recovery. Cleanup loop should run immediately.");
}

public static void ErrorCleaningMessagesForEndpoint(Exception ex, string endpointId)
{
Log.LogWarning((int)EventIds.ErrorCleaningMessagesForEndpoint, ex, Invariant($"Error cleaning up messages for endpoint {endpointId}"));
Expand All @@ -462,6 +527,11 @@ public static void CleanupCompleted(string endpointId, int queueMessagesCount, i
Log.LogDebug((int)EventIds.CleanupCompleted, Invariant($"Total messages cleaned up from queue for endpoint {endpointId} = {totalQueueMessagesCount}, and total messages cleaned up for message store = {totalStoreMessagesCount}."));
}

public static void OrphanedMessagesDetected(string endpointId, int orphanedCount, long checkpointOffset, TimeSpan oldestAge)
{
Log.LogWarning((int)EventIds.OrphanedMessagesDetected, Invariant($"Detected {orphanedCount} orphaned message(s) in endpoint {endpointId}. Checkpoint offset={checkpointOffset}, oldest message age={oldestAge.TotalSeconds:F1}s. Messages are stuck in store because checkpoint has not advanced. This indicates a potential message acknowledgment failure during network disruption. Checkpoint retries or the cleanup trigger on connection recovery should resolve this."));
}

public static void ErrorGettingMessagesBatch(string entityName, Exception ex)
{
Log.LogWarning((int)EventIds.ErrorGettingMessagesBatch, ex, $"Error getting next batch for endpoint {entityName}.");
Expand Down Expand Up @@ -502,6 +572,13 @@ internal static void CleanupCheckpointState(string endpointId, CheckpointData ch
Log.LogDebug((int)EventIds.CleanupCheckpointState, Invariant($"Checkpoint for endpoint {endpointId} is {checkpointData.Offset}"));
}

internal static void TempCleanupCorrelation(string endpointId, long checkpointOffset, long unreadStartOffset, long queueHeadOffset)
{
Log.LogInformation(
(int)EventIds.TempCleanupCorrelation,
Invariant($"[TEMP CleanupCorrelation] Endpoint={endpointId}, checkpointOffset={checkpointOffset}, unreadStartOffset={unreadStartOffset}, queueHeadOffset={queueHeadOffset}"));
}

internal static void MessageAdded(long offset, string edgeMessageId, string endpointId)
{
// Print only after every 1000th message to avoid flooding logs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ protected override void Load(ContainerBuilder builder)
var connectionManagerTask = c.Resolve<Task<IConnectionManager>>();
var subscriptionProcessorTask = c.Resolve<Task<ISubscriptionProcessor>>();
var deviceScopeIdentitiesCacheTask = c.Resolve<Task<IDeviceScopeIdentitiesCache>>();
var messageStoreTask = this.isStoreAndForwardEnabled ? c.Resolve<Task<IMessageStore>>() : null;
Router router = await routerTask;
ITwinManager twinManager = await twinManagerTask;
IConnectionManager connectionManager = await connectionManagerTask;
Expand All @@ -539,6 +540,18 @@ protected override void Load(ContainerBuilder builder)
invokeMethodHandler,
subscriptionProcessor,
deviceScopeIdentitiesCache);

// Subscribe MessageStore to connection recovery events
// to trigger cleanup when cloud connection is restored
if (messageStoreTask != null)
{
IMessageStore messageStore = await messageStoreTask;
connectionManager.CloudConnectionEstablished += (sender, identity) =>
{
messageStore.TriggerCleanup();
};
}

return hub;
})
.As<Task<IEdgeHub>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,11 @@ public interface IMessageStore : IDisposable
/// Returns the number of messages in the store from a offset
/// </summary>
Task<ulong> GetMessageCountFromOffset(string endpointId, long offset);

/// <summary>
/// Triggers an immediate cleanup attempt. Called when cloud connection is restored
/// to retry checkpoint commits for messages that were sent but not acknowledged.
/// </summary>
void TriggerCleanup();
}
}
Loading
Loading