mirror of
https://github.com/microsoft/PowerToys.git
synced 2026-04-03 17:56:44 +02:00
CmdPal: Add adaptive parallel fallback processing and consistent updates (#42273)
## Summary of the Pull Request This PR replaces sequential fallback processing with an adaptive parallel dispatch model and isolates fallback work onto a dedicated thread pool, preventing misbehaving extensions from starving the .NET ThreadPool on blocked synchronous COM RPC call. The other major change is allowing MainListPage to allow take control over the debounce of search updates directly, reducing latency and improving smoothness of the search. - Adds `DedicatedThreadPool` — an elastic pool of background threads (min 2, max 32) that expand on demand when all threads are blocked in COM calls and shrink after 30s idle. - Extracts all fallback dispatch machinery (adaptive workers, per-command inflight tracking, pending-retry slots) from MainListPage into a standalone FallbackUpdateManager. - Prevents one fallback from monopolizing all threads by capping concurrent in-flight calls per fallback to 4. - Starts with low degree of parallelism (2) and gently scales up to half of CPU cores per batch. If a fallback takes more than 200ms, another worker is spawned so remaining commands aren't blocked. - Adds `ThrottledDebouncedAction` to coalesce rapid `RaiseItemsChanged` calls from fallback completions and user input (100ms for external events, 50ms adjusted for keystrokes), replacing unbatched direct calls. - Bypasses the UI-layer debounce timer for the main list page since it now handles its own throttling, eliminating double-debounce latency. - Introduces diagnostics for fallbacks and timing hidden behind feature flags. <!-- Please review the items on the PR checklist before submitting--> ## PR Checklist - [x] Closes: #42286 - [x] Related to: #44407 - [ ] **Communication:** I've discussed this with core contributors already. If the work hasn't been agreed, this work might be rejected - [ ] **Tests:** Added/updated and all pass - [ ] **Localization:** All end-user-facing strings can be localized - [ ] **Dev docs:** Added/updated - [ ] **New binaries:** Added on the required places - [ ] [JSON for signing](https://github.com/microsoft/PowerToys/blob/main/.pipelines/ESRPSigning_core.json) for new binaries - [ ] [WXS for installer](https://github.com/microsoft/PowerToys/blob/main/installer/PowerToysSetup/Product.wxs) for new binaries and localization folder - [ ] [YML for CI pipeline](https://github.com/microsoft/PowerToys/blob/main/.pipelines/ci/templates/build-powertoys-steps.yml) for new test projects - [ ] [YML for signed pipeline](https://github.com/microsoft/PowerToys/blob/main/.pipelines/release.yml) - [ ] **Documentation updated:** If checked, please file a pull request on [our docs repo](https://github.com/MicrosoftDocs/windows-uwp/tree/docs/hub/powertoys) and link it here: #xxx <!-- Provide a more detailed description of the PR, other things fixed, or any additional comments/features here --> ## Detailed Description of the Pull Request / Additional comments <!-- Describe how you validated the behavior. Add automated tests wherever possible, but list manual validation steps taken as well --> ## Validation Steps Performed
This commit is contained in:
@@ -15,7 +15,7 @@ internal static class BatchUpdateManager
|
||||
// 30 ms chosen empirically to balance responsiveness and batching:
|
||||
// - Keeps perceived latency low (< ~50 ms) for user-visible updates.
|
||||
// - Still allows multiple COM/background events to be coalesced into a single batch.
|
||||
private static readonly TimeSpan BatchDelay = TimeSpan.FromMilliseconds(30);
|
||||
private static readonly TimeSpan BatchDelay = TimeSpan.FromMilliseconds(40);
|
||||
private static readonly ConcurrentQueue<IBatchUpdateTarget> DirtyQueue = [];
|
||||
private static readonly Timer Timer = new(static _ => Flush(), null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// The Microsoft Corporation licenses this file to you under the MIT license.
|
||||
// See the LICENSE file in the project root for more information.
|
||||
|
||||
using Microsoft.CmdPal.UI.ViewModels.MainPage;
|
||||
using Microsoft.CommandPalette.Extensions;
|
||||
|
||||
namespace Microsoft.CmdPal.UI.ViewModels;
|
||||
@@ -22,6 +23,7 @@ public class CommandPalettePageViewModelFactory
|
||||
{
|
||||
return page switch
|
||||
{
|
||||
MainListPage listPage => new ListViewModel(listPage, _scheduler, host, providerContext, _contextMenuFactory) { IsRootPage = !nested, IsMainPage = true },
|
||||
IListPage listPage => new ListViewModel(listPage, _scheduler, host, providerContext, _contextMenuFactory) { IsRootPage = !nested },
|
||||
IContentPage contentPage => new CommandPaletteContentPageViewModel(contentPage, _scheduler, host, providerContext),
|
||||
_ => null,
|
||||
|
||||
@@ -2,13 +2,17 @@
|
||||
// The Microsoft Corporation licenses this file to you under the MIT license.
|
||||
// See the LICENSE file in the project root for more information.
|
||||
|
||||
using System.Collections.Immutable;
|
||||
/*
|
||||
#define CMDPAL_FF_MAINPAGE_TIME_RAISE_ITEMS
|
||||
*/
|
||||
|
||||
using System.Collections.Specialized;
|
||||
using System.Diagnostics;
|
||||
using CommunityToolkit.Mvvm.Messaging;
|
||||
using ManagedCommon;
|
||||
using Microsoft.CmdPal.Common.Helpers;
|
||||
using Microsoft.CmdPal.Common.Text;
|
||||
using Microsoft.CmdPal.Core.Common.Helpers;
|
||||
using Microsoft.CmdPal.Ext.Apps;
|
||||
using Microsoft.CmdPal.Ext.Apps.Programs;
|
||||
using Microsoft.CmdPal.UI.ViewModels.Commands;
|
||||
@@ -25,8 +29,17 @@ namespace Microsoft.CmdPal.UI.ViewModels.MainPage;
|
||||
/// </summary>
|
||||
public sealed partial class MainListPage : DynamicListPage,
|
||||
IRecipient<ClearSearchMessage>,
|
||||
IRecipient<UpdateFallbackItemsMessage>, IDisposable
|
||||
IRecipient<UpdateFallbackItemsMessage>,
|
||||
IDisposable
|
||||
{
|
||||
// Throttle for raising items changed events from external sources
|
||||
private static readonly TimeSpan RaiseItemsChangedThrottle = TimeSpan.FromMilliseconds(100);
|
||||
|
||||
// Throttle for raising items changed events from user input - we want this to feel more responsive, so a shorter throttle.
|
||||
private static readonly TimeSpan RaiseItemsChangedThrottleForUserInput = TimeSpan.FromMilliseconds(50);
|
||||
|
||||
private readonly FallbackUpdateManager _fallbackUpdateManager;
|
||||
private readonly ThrottledDebouncedAction _refreshThrottledDebouncedAction;
|
||||
private readonly TopLevelCommandManager _tlcManager;
|
||||
private readonly AliasManager _aliasManager;
|
||||
private readonly SettingsModel _settings;
|
||||
@@ -54,11 +67,16 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
|
||||
private int AppResultLimit => AllAppsCommandProvider.TopLevelResultLimit;
|
||||
|
||||
private InterlockedBoolean _fullRefreshRequested;
|
||||
private InterlockedBoolean _refreshRunning;
|
||||
private InterlockedBoolean _refreshRequested;
|
||||
|
||||
private CancellationTokenSource? _cancellationTokenSource;
|
||||
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_RAISE_ITEMS
|
||||
private DateTimeOffset _last = DateTimeOffset.UtcNow;
|
||||
#endif
|
||||
|
||||
public MainListPage(
|
||||
TopLevelCommandManager topLevelCommandManager,
|
||||
SettingsModel settings,
|
||||
@@ -82,16 +100,52 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
_tlcManager.PropertyChanged += TlcManager_PropertyChanged;
|
||||
_tlcManager.TopLevelCommands.CollectionChanged += Commands_CollectionChanged;
|
||||
|
||||
_refreshThrottledDebouncedAction = new ThrottledDebouncedAction(
|
||||
() =>
|
||||
{
|
||||
try
|
||||
{
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_RAISE_ITEMS
|
||||
var delta = DateTimeOffset.UtcNow - _last;
|
||||
_last = DateTimeOffset.UtcNow;
|
||||
Logger.LogDebug($"UpdateFallbacks: RaiseItemsChanged, delta {delta}");
|
||||
|
||||
var sw = Stopwatch.StartNew();
|
||||
#endif
|
||||
if (_fullRefreshRequested.Clear())
|
||||
{
|
||||
// full refresh
|
||||
RaiseItemsChanged();
|
||||
}
|
||||
else
|
||||
{
|
||||
// preserve selection
|
||||
RaiseItemsChanged(ListViewModel.IncrementalRefresh);
|
||||
}
|
||||
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_RAISE_ITEMS
|
||||
Logger.LogInfo($"UpdateFallbacks: RaiseItemsChanged took {sw.Elapsed}");
|
||||
#endif
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError("Unhandled exception in MainListPage refresh debounced action", ex);
|
||||
}
|
||||
},
|
||||
RaiseItemsChangedThrottle);
|
||||
|
||||
_fallbackUpdateManager = new FallbackUpdateManager(() => RequestRefresh(fullRefresh: false));
|
||||
|
||||
// The all apps page will kick off a BG thread to start loading apps.
|
||||
// We just want to know when it is done.
|
||||
var allApps = AllAppsCommandProvider.Page;
|
||||
allApps.PropChanged += (s, p) =>
|
||||
{
|
||||
if (p.PropertyName == nameof(allApps.IsLoading))
|
||||
{
|
||||
if (p.PropertyName == nameof(allApps.IsLoading))
|
||||
{
|
||||
IsLoading = ActuallyLoading();
|
||||
}
|
||||
};
|
||||
IsLoading = ActuallyLoading();
|
||||
}
|
||||
};
|
||||
|
||||
WeakReferenceMessenger.Default.Register<ClearSearchMessage>(this);
|
||||
WeakReferenceMessenger.Default.Register<UpdateFallbackItemsMessage>(this);
|
||||
@@ -120,10 +174,20 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
}
|
||||
else
|
||||
{
|
||||
RaiseItemsChanged(ListViewModel.IncrementalRefresh);
|
||||
RequestRefresh(fullRefresh: false);
|
||||
}
|
||||
}
|
||||
|
||||
private void RequestRefresh(bool fullRefresh, TimeSpan? interval = null)
|
||||
{
|
||||
if (fullRefresh)
|
||||
{
|
||||
_fullRefreshRequested.Set();
|
||||
}
|
||||
|
||||
_refreshThrottledDebouncedAction.Invoke(interval);
|
||||
}
|
||||
|
||||
private void ReapplySearchInBackground()
|
||||
{
|
||||
_refreshRequested.Set();
|
||||
@@ -151,7 +215,7 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
}
|
||||
|
||||
var currentSearchText = SearchText;
|
||||
UpdateSearchText(currentSearchText, currentSearchText);
|
||||
UpdateSearchTextCore(currentSearchText, currentSearchText, isUserInput: false);
|
||||
}
|
||||
while (_refreshRequested.Value);
|
||||
}
|
||||
@@ -243,6 +307,11 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
}
|
||||
|
||||
public override void UpdateSearchText(string oldSearch, string newSearch)
|
||||
{
|
||||
UpdateSearchTextCore(oldSearch, newSearch, isUserInput: true);
|
||||
}
|
||||
|
||||
private void UpdateSearchTextCore(string oldSearch, string newSearch, bool isUserInput)
|
||||
{
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
|
||||
@@ -297,7 +366,7 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
// prefilter fallbacks
|
||||
var globalFallbacks = _settings.GetGlobalFallbacks();
|
||||
var specialFallbacks = new List<TopLevelViewModel>(globalFallbacks.Length);
|
||||
var commonFallbacks = new List<TopLevelViewModel>();
|
||||
var commonFallbacks = new List<TopLevelViewModel>(commands.Count - globalFallbacks.Length);
|
||||
|
||||
foreach (var s in commands)
|
||||
{
|
||||
@@ -316,10 +385,7 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
}
|
||||
}
|
||||
|
||||
// start update of fallbacks; update special fallbacks separately,
|
||||
// so they can finish faster
|
||||
UpdateFallbacks(SearchText, specialFallbacks, token);
|
||||
UpdateFallbacks(SearchText, commonFallbacks, token);
|
||||
_fallbackUpdateManager.BeginUpdate(SearchText, [.. specialFallbacks, .. commonFallbacks], token);
|
||||
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
@@ -327,11 +393,13 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
}
|
||||
|
||||
// Cleared out the filter text? easy. Reset _filteredItems, and bail out.
|
||||
if (string.IsNullOrEmpty(newSearch))
|
||||
if (string.IsNullOrWhiteSpace(newSearch))
|
||||
{
|
||||
_filteredItemsIncludesApps = _includeApps;
|
||||
ClearResults();
|
||||
RaiseItemsChanged();
|
||||
var wasAlreadyEmpty = string.IsNullOrWhiteSpace(oldSearch);
|
||||
RequestRefresh(fullRefresh: true, interval: wasAlreadyEmpty ? null : TimeSpan.Zero);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -466,49 +534,35 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
}
|
||||
}
|
||||
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_RAISE_ITEMS
|
||||
var filterDoneTimestamp = stopwatch.ElapsedMilliseconds;
|
||||
Logger.LogDebug($"Filter with '{newSearch}' in {filterDoneTimestamp}ms");
|
||||
#endif
|
||||
if (isUserInput)
|
||||
{
|
||||
// Make sure that the throttle delay is consistent from the user's perspective, even if filtering
|
||||
// takes a long time. If we always use the full throttle duration, then a slow filter could make the UI feel sluggish.
|
||||
var adjustedInterval = RaiseItemsChangedThrottleForUserInput - stopwatch.Elapsed;
|
||||
if (adjustedInterval < TimeSpan.Zero)
|
||||
{
|
||||
adjustedInterval = TimeSpan.Zero;
|
||||
}
|
||||
|
||||
RaiseItemsChanged();
|
||||
RequestRefresh(fullRefresh: true, adjustedInterval);
|
||||
}
|
||||
else
|
||||
{
|
||||
RequestRefresh(fullRefresh: true);
|
||||
}
|
||||
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_RAISE_ITEMS
|
||||
var listPageUpdatedTimestamp = stopwatch.ElapsedMilliseconds;
|
||||
Logger.LogDebug($"Render items with '{newSearch}' in {listPageUpdatedTimestamp}ms /d {listPageUpdatedTimestamp - filterDoneTimestamp}ms");
|
||||
#endif
|
||||
|
||||
stopwatch.Stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void UpdateFallbacks(string newSearch, IReadOnlyList<TopLevelViewModel> commands, CancellationToken token)
|
||||
{
|
||||
_ = Task.Run(
|
||||
() =>
|
||||
{
|
||||
var needsToUpdate = false;
|
||||
|
||||
foreach (var command in commands)
|
||||
{
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var changedVisibility = command.SafeUpdateFallbackTextSynchronous(newSearch);
|
||||
needsToUpdate = needsToUpdate || changedVisibility;
|
||||
}
|
||||
|
||||
if (needsToUpdate)
|
||||
{
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
RaiseItemsChanged(ListViewModel.IncrementalRefresh);
|
||||
}
|
||||
},
|
||||
token);
|
||||
}
|
||||
|
||||
private bool ActuallyLoading()
|
||||
{
|
||||
var allApps = AllAppsCommandProvider.Page;
|
||||
@@ -644,7 +698,10 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
|
||||
public void Receive(ClearSearchMessage message) => SearchText = string.Empty;
|
||||
|
||||
public void Receive(UpdateFallbackItemsMessage message) => RaiseItemsChanged(_tlcManager.TopLevelCommands.Count);
|
||||
public void Receive(UpdateFallbackItemsMessage message)
|
||||
{
|
||||
RequestRefresh(fullRefresh: false);
|
||||
}
|
||||
|
||||
private void SettingsChangedHandler(SettingsModel sender, object? args) => HotReloadSettings(sender);
|
||||
|
||||
@@ -654,6 +711,7 @@ public sealed partial class MainListPage : DynamicListPage,
|
||||
{
|
||||
_cancellationTokenSource?.Cancel();
|
||||
_cancellationTokenSource?.Dispose();
|
||||
_fallbackUpdateManager.Dispose();
|
||||
|
||||
_tlcManager.PropertyChanged -= TlcManager_PropertyChanged;
|
||||
_tlcManager.TopLevelCommands.CollectionChanged -= Commands_CollectionChanged;
|
||||
|
||||
@@ -0,0 +1,302 @@
|
||||
// Copyright (c) Microsoft Corporation
|
||||
// The Microsoft Corporation licenses this file to you under the MIT license.
|
||||
// See the LICENSE file in the project root for more information.
|
||||
|
||||
using System.Collections.Concurrent;
|
||||
using Microsoft.CmdPal.Common.Helpers;
|
||||
|
||||
namespace Microsoft.CmdPal.UI.ViewModels;
|
||||
|
||||
/// <summary>
|
||||
/// An elastic pool of dedicated background threads for running blocking work
|
||||
/// off the ThreadPool. Starts with <c>minThreads</c> always-alive threads and
|
||||
/// expands up to <c>maxThreads</c> on demand. Threads above the minimum exit
|
||||
/// automatically after <c>idleTimeout</c> with no work. Items are processed
|
||||
/// FIFO; cancelled items are skipped at dequeue time.
|
||||
/// </summary>
|
||||
internal sealed partial class DedicatedThreadPool : IDisposable
|
||||
{
|
||||
private const int DrainTimeoutMs = 3000;
|
||||
|
||||
private readonly BlockingCollection<Action> _workQueue = new();
|
||||
private readonly int _minThreads;
|
||||
private readonly int _maxThreads;
|
||||
private readonly TimeSpan _idleTimeout;
|
||||
private readonly string _name;
|
||||
|
||||
// Total live threads (Interlocked). Owned by the thread that wins the CAS.
|
||||
private int _threadCount;
|
||||
|
||||
// Threads currently blocked in TryTake waiting for work (Interlocked).
|
||||
// Used as the expansion trigger: if zero, all threads are busy.
|
||||
private int _idleCount;
|
||||
|
||||
// Ever-increasing counter for unique thread names across expand/shrink cycles.
|
||||
private int _nextThreadId;
|
||||
|
||||
private InterlockedBoolean _disposed;
|
||||
|
||||
public DedicatedThreadPool(int minThreads, int maxThreads, string name = "DedicatedWorker", TimeSpan? idleTimeout = null)
|
||||
{
|
||||
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(minThreads);
|
||||
ArgumentOutOfRangeException.ThrowIfLessThan(maxThreads, minThreads);
|
||||
|
||||
_minThreads = minThreads;
|
||||
_maxThreads = maxThreads;
|
||||
_name = name;
|
||||
_idleTimeout = idleTimeout ?? TimeSpan.FromSeconds(30);
|
||||
|
||||
_threadCount = minThreads;
|
||||
for (var i = 0; i < minThreads; i++)
|
||||
{
|
||||
StartThread();
|
||||
}
|
||||
}
|
||||
|
||||
private void StartThread()
|
||||
{
|
||||
var id = Interlocked.Increment(ref _nextThreadId);
|
||||
var thread = new Thread(WorkerLoop)
|
||||
{
|
||||
IsBackground = true,
|
||||
Name = $"{_name}-{id}",
|
||||
Priority = ThreadPriority.BelowNormal,
|
||||
};
|
||||
thread.Start();
|
||||
}
|
||||
|
||||
private void WorkerLoop()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
Interlocked.Increment(ref _idleCount);
|
||||
|
||||
bool got;
|
||||
Action? action;
|
||||
try
|
||||
{
|
||||
got = _workQueue.TryTake(out action, _idleTimeout);
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
// Pool was disposed while we were waiting.
|
||||
Interlocked.Decrement(ref _idleCount);
|
||||
Interlocked.Decrement(ref _threadCount);
|
||||
return;
|
||||
}
|
||||
|
||||
Interlocked.Decrement(ref _idleCount);
|
||||
|
||||
if (got)
|
||||
{
|
||||
try
|
||||
{
|
||||
action!();
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// QueueAsync wraps work in its own try-catch, so this should
|
||||
// never fire. Keep the thread alive defensively.
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// TryTake timed out (no work for idleTimeout).
|
||||
if (_workQueue.IsCompleted)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// Try to shrink: exit if we're above the minimum.
|
||||
// CAS ensures exactly one thread wins each decrement race.
|
||||
while (true)
|
||||
{
|
||||
var count = _threadCount;
|
||||
if (count <= _minThreads)
|
||||
{
|
||||
break; // At minimum — stay alive.
|
||||
}
|
||||
|
||||
if (Interlocked.CompareExchange(ref _threadCount, count - 1, count) == count)
|
||||
{
|
||||
return; // Decremented successfully — this thread exits.
|
||||
}
|
||||
|
||||
// Another thread changed _threadCount concurrently; retry.
|
||||
}
|
||||
}
|
||||
|
||||
Interlocked.Decrement(ref _threadCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Queue a blocking work item. Returns a <see cref="Task"/> that
|
||||
/// completes when the work finishes on a dedicated thread.
|
||||
/// If <paramref name="cancellationToken"/> is already cancelled when
|
||||
/// the item reaches the front of the queue, it is skipped immediately.
|
||||
/// Spawns an extra thread (up to <c>maxThreads</c>) if all current
|
||||
/// threads are occupied.
|
||||
/// </summary>
|
||||
public Task QueueAsync(Action work, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
try
|
||||
{
|
||||
_workQueue.Add(
|
||||
() =>
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
tcs.TrySetCanceled(cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
work();
|
||||
tcs.TrySetResult();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
tcs.TrySetException(ex);
|
||||
}
|
||||
},
|
||||
cancellationToken);
|
||||
|
||||
// If no thread is idle, all are blocked in COM calls — try to expand.
|
||||
if (Volatile.Read(ref _idleCount) == 0)
|
||||
{
|
||||
TryExpand();
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
tcs.TrySetCanceled(cancellationToken);
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
tcs.TrySetCanceled(CancellationToken.None);
|
||||
}
|
||||
catch (InvalidOperationException)
|
||||
{
|
||||
// CompleteAdding was called — pool is shutting down.
|
||||
tcs.TrySetCanceled(CancellationToken.None);
|
||||
}
|
||||
|
||||
return tcs.Task;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Queue a blocking work item. Returns a <see cref="Task{T}"/> that
|
||||
/// completes when the work finishes on a dedicated thread.
|
||||
/// If <paramref name="cancellationToken"/> is already cancelled when
|
||||
/// the item reaches the front of the queue, it is skipped immediately.
|
||||
/// Spawns an extra thread (up to <c>maxThreads</c>) if all current
|
||||
/// threads are occupied.
|
||||
/// </summary>
|
||||
public Task<T> QueueAsync<T>(Func<T> work, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
try
|
||||
{
|
||||
_workQueue.Add(
|
||||
() =>
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
tcs.TrySetCanceled(cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
tcs.TrySetResult(work());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
tcs.TrySetException(ex);
|
||||
}
|
||||
},
|
||||
cancellationToken);
|
||||
|
||||
// If no thread is idle, all are blocked in COM calls — try to expand.
|
||||
if (Volatile.Read(ref _idleCount) == 0)
|
||||
{
|
||||
TryExpand();
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
tcs.TrySetCanceled(cancellationToken);
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
tcs.TrySetCanceled(CancellationToken.None);
|
||||
}
|
||||
catch (InvalidOperationException)
|
||||
{
|
||||
// CompleteAdding was called — pool is shutting down.
|
||||
tcs.TrySetCanceled(CancellationToken.None);
|
||||
}
|
||||
|
||||
return tcs.Task;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Attempt to spawn one additional thread, up to <c>maxThreads</c>.
|
||||
/// CAS on <c>_threadCount</c> ensures at most one thread wins per slot.
|
||||
/// </summary>
|
||||
private void TryExpand()
|
||||
{
|
||||
if (_disposed.Value)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
var count = _threadCount;
|
||||
if (count >= _maxThreads)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (Interlocked.CompareExchange(ref _threadCount, count + 1, count) == count)
|
||||
{
|
||||
StartThread();
|
||||
return;
|
||||
}
|
||||
|
||||
// Another concurrent expand won this slot; recheck the ceiling.
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (!_disposed.Set())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_workQueue.CompleteAdding();
|
||||
|
||||
// Give worker threads a chance to drain remaining items and exit.
|
||||
// After CompleteAdding, idle threads see IsCompleted and exit
|
||||
// quickly, but threads blocked in long COM calls won't return
|
||||
// until their call finishes — don't wait forever.
|
||||
var deadline = Environment.TickCount64 + DrainTimeoutMs;
|
||||
var spin = default(SpinWait);
|
||||
while (Volatile.Read(ref _threadCount) > 0 && Environment.TickCount64 < deadline)
|
||||
{
|
||||
spin.SpinOnce();
|
||||
}
|
||||
|
||||
// Dispose the queue even if threads are still alive. Threads
|
||||
// blocked in TryTake will get ObjectDisposedException and exit
|
||||
// via the catch in WorkerLoop. Threads busy in action!() will
|
||||
// finish their item, then hit ObjectDisposedException on the
|
||||
// next TryTake and exit.
|
||||
_workQueue.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,292 @@
|
||||
// Copyright (c) Microsoft Corporation
|
||||
// The Microsoft Corporation licenses this file to you under the MIT license.
|
||||
// See the LICENSE file in the project root for more information.
|
||||
|
||||
/*
|
||||
#define CMDPAL_FF_MAINPAGE_TIME_FALLBACK_UPDATES
|
||||
*/
|
||||
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics;
|
||||
using ManagedCommon;
|
||||
using Microsoft.CmdPal.UI.ViewModels.Commands;
|
||||
|
||||
namespace Microsoft.CmdPal.UI.ViewModels;
|
||||
|
||||
/// <summary>
|
||||
/// Manages adaptive dispatch of fallback update work on a dedicated thread pool.
|
||||
/// Tracks per-command inflight calls, pending-retry slots, and enforces a per-batch
|
||||
/// sibling-spawn cap to prevent runaway thread expansion.
|
||||
/// </summary>
|
||||
internal sealed partial class FallbackUpdateManager : IDisposable
|
||||
{
|
||||
// For individual fallback item updates - if an item takes longer than this, we will detach it
|
||||
// and continue with others.
|
||||
private static readonly TimeSpan FallbackItemSlowTimeout = TimeSpan.FromMilliseconds(200);
|
||||
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_FALLBACK_UPDATES
|
||||
// For reporting only - if an item takes longer than this, we'll log it.
|
||||
private static readonly TimeSpan FallbackItemUltraSlowTimeout = TimeSpan.FromMilliseconds(1000);
|
||||
#endif
|
||||
|
||||
// Initial number of workers to use for fallback updates.
|
||||
private const int InitialFallbackWorkers = 2;
|
||||
|
||||
// Upper limit of threads in case things go awry
|
||||
private const int MaximumFallbackWorkersMaxThreads = 32;
|
||||
|
||||
// Per-command limit on concurrent in-flight COM calls. Prevents a single
|
||||
// misbehaving extension from monopolizing the pool across overlapping query batches.
|
||||
private const int MaxInflightPerFallback = 4;
|
||||
|
||||
// Per-batch cap on sibling workers
|
||||
private static readonly int MaxWorkersPerBatch = Math.Max(2, Environment.ProcessorCount / 2);
|
||||
|
||||
private readonly ConcurrentDictionary<string, InflightCounter> _inflightFallbacks = new();
|
||||
|
||||
// Dedicated background threads for fallback COM/RPC calls so they never block the
|
||||
// ThreadPool. Stuck extensions consume a dedicated thread, not a pool thread.
|
||||
// Max is intentionally above ProcessorCount: blocked threads consume no CPU, so
|
||||
// core count is not the right ceiling. Pool expands on demand and shrinks when idle.
|
||||
private readonly DedicatedThreadPool _fallbackThreadPool = new(minThreads: InitialFallbackWorkers, maxThreads: MaximumFallbackWorkersMaxThreads, name: "Fallbacks");
|
||||
|
||||
private readonly Action _onFallbackChanged;
|
||||
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_FALLBACK_UPDATES
|
||||
private ulong _updateBatchCounter;
|
||||
#endif
|
||||
|
||||
internal FallbackUpdateManager(Action onFallbackChanged)
|
||||
{
|
||||
_onFallbackChanged = onFallbackChanged;
|
||||
}
|
||||
|
||||
internal void BeginUpdate(string query, IReadOnlyList<TopLevelViewModel> commands, CancellationToken cancellationToken)
|
||||
{
|
||||
if (commands.Count == 0 || string.IsNullOrWhiteSpace(query))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_FALLBACK_UPDATES
|
||||
var batchNumber = _updateBatchCounter++;
|
||||
Logger.LogDebug($"UpdateFallbacks: Batch start {batchNumber} for query '{query}'");
|
||||
#endif
|
||||
|
||||
// Adaptive dispatch on dedicated threads — same semantics as the old
|
||||
// ParallelHelper.AdaptiveForEachAdaptiveAsync, but without any ThreadPool involvement:
|
||||
// - Start 2 workers; each claims commands via a shared atomic index (FIFO, no double-work).
|
||||
// - If a command is slow (> FallbackItemSlowTimeout), the worker spawns a sibling so
|
||||
// remaining fast commands aren't blocked waiting in the worker's loop.
|
||||
// - _onFallbackChanged is called on the dedicated thread when a result changes
|
||||
var sharedIndex = 0;
|
||||
var totalCommands = commands.Count;
|
||||
var startingWorkers = Math.Min(InitialFallbackWorkers, totalCommands);
|
||||
var activeWorkerCount = startingWorkers;
|
||||
|
||||
void Worker()
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
var i = Interlocked.Increment(ref sharedIndex) - 1;
|
||||
if (i >= totalCommands)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var command = commands[i];
|
||||
var counter = _inflightFallbacks.GetOrAdd(command.Id, static _ => new InflightCounter());
|
||||
if (!counter.TryClaim(MaxInflightPerFallback))
|
||||
{
|
||||
// At capacity — store this query as a pending retry so it runs
|
||||
// when one of the in-flight calls finishes. Latest query wins.
|
||||
var pendingCommand = command;
|
||||
var pendingQuery = query;
|
||||
var pendingCt = cancellationToken;
|
||||
counter.SetPending(() => RetryFallbackUpdate(pendingCommand, pendingQuery, pendingCt, counter), pendingCt);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Arm a timer: if this item is still running after FallbackItemSlowTimeout,
|
||||
// spawn a sibling worker WHILE we're blocked in the COM call so remaining
|
||||
// commands don't have to wait for us to finish first.
|
||||
// Linking to cancellationToken cancels the timer immediately when the outer
|
||||
// query is abandoned — preventing stale siblings from being scheduled.
|
||||
// Disposing the linked CTS at iteration end removes the link registration.
|
||||
using var expandCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
expandCts.CancelAfter(FallbackItemSlowTimeout);
|
||||
expandCts.Token.Register(() =>
|
||||
{
|
||||
// Fires on timeout (slow item) OR on outer cancellation.
|
||||
// Only spawn a sibling on timeout — when the outer query is still active.
|
||||
if (!cancellationToken.IsCancellationRequested && Volatile.Read(ref sharedIndex) < totalCommands)
|
||||
{
|
||||
// Per-batch cap — restore the constraint from ParallelHelper
|
||||
var current = Volatile.Read(ref activeWorkerCount);
|
||||
if (current < MaxWorkersPerBatch
|
||||
&& Interlocked.CompareExchange(ref activeWorkerCount, current + 1, current) == current)
|
||||
{
|
||||
_ = _fallbackThreadPool.QueueAsync(Worker, cancellationToken);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
var changed = false;
|
||||
try
|
||||
{
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_FALLBACK_UPDATES
|
||||
var sw = Stopwatch.StartNew();
|
||||
Logger.LogDebug($"UpdateFallbacks: Worker: command id '{command.Id}', '{command.DisplayTitle}' updating with '{query}'");
|
||||
#endif
|
||||
changed = command.SafeUpdateFallbackTextSynchronous(query);
|
||||
#if CMDPAL_FF_MAINPAGE_TIME_FALLBACK_UPDATES
|
||||
var elapsed = sw.Elapsed;
|
||||
var tail = elapsed > FallbackItemSlowTimeout ? " is slow" : string.Empty;
|
||||
if (elapsed > FallbackItemUltraSlowTimeout)
|
||||
{
|
||||
tail += " <---------------- (ultra slow)";
|
||||
}
|
||||
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
Logger.LogDebug($"UpdateFallbacks: Worker: command id '{command.Id}', '{command.DisplayTitle}' updated with '{query}' processed in {elapsed}, has {(changed ? "changed" : "not changed")} and title is '{command.Title}'{tail}");
|
||||
#endif
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError($"UpdateFallbacks: Worker: command id '{command.Id}', '{command.DisplayTitle}' failed to update fallback text with '{query}'", ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
counter.Release();
|
||||
DispatchPending(counter.TakePending());
|
||||
}
|
||||
|
||||
// Guard against a stale refresh if the COM call returned after cancellation.
|
||||
if (changed && !cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
_onFallbackChanged();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatches a pending work item to the dedicated pool. The pending's
|
||||
// own CT is forwarded so the pool can skip it at dequeue time when the
|
||||
// originating query batch has been superseded by a newer keystroke.
|
||||
void DispatchPending(PendingWork? pending)
|
||||
{
|
||||
if (pending == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_ = _fallbackThreadPool.QueueAsync(pending.Work, pending.CancellationToken);
|
||||
}
|
||||
|
||||
for (var i = 0; i < startingWorkers; i++)
|
||||
{
|
||||
_ = _fallbackThreadPool.QueueAsync(Worker, cancellationToken);
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
// One-shot retry for a command that was skipped due to MaxInflightPerFallback.
|
||||
// Claims a slot, runs the COM call, releases, and propagates the next pending (if any).
|
||||
void RetryFallbackUpdate(TopLevelViewModel cmd, string q, CancellationToken ct, InflightCounter ctr)
|
||||
{
|
||||
if (ct.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (!ctr.TryClaim(MaxInflightPerFallback))
|
||||
{
|
||||
// Still at capacity (a newer worker claimed the freed slot first).
|
||||
// The pending was already consumed from TakePending, so it's dropped here.
|
||||
return;
|
||||
}
|
||||
|
||||
var changed = false;
|
||||
try
|
||||
{
|
||||
changed = cmd.SafeUpdateFallbackTextSynchronous(q);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError($"UpdateFallbacks: Pending retry: command id '{cmd.Id}', '{cmd.DisplayTitle}' failed with '{q}'", ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
ctr.Release();
|
||||
DispatchPending(ctr.TakePending());
|
||||
}
|
||||
|
||||
if (changed && !ct.IsCancellationRequested)
|
||||
{
|
||||
_onFallbackChanged();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_fallbackThreadPool.Dispose();
|
||||
_inflightFallbacks.Clear();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A pending work item paired with the cancellation token of the query
|
||||
/// batch that created it, so the pool can skip it at dequeue time when
|
||||
/// a newer keystroke has already superseded the query.
|
||||
/// </summary>
|
||||
private sealed record PendingWork(Action Work, CancellationToken CancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Thread-safe counter for tracking concurrent in-flight calls per command,
|
||||
/// with a single pending retry slot for queries that couldn't claim immediately.
|
||||
/// </summary>
|
||||
private sealed class InflightCounter
|
||||
{
|
||||
private int _count;
|
||||
|
||||
// Latest pending work item. Only one is stored; newer queries overwrite older ones.
|
||||
private PendingWork? _pendingWork;
|
||||
|
||||
/// <summary>
|
||||
/// Try to claim a slot. Returns true if the count was below
|
||||
/// <paramref name="max"/> and was incremented; false if at capacity.
|
||||
/// </summary>
|
||||
public bool TryClaim(int max)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var current = Volatile.Read(ref _count);
|
||||
if (current >= max)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (Interlocked.CompareExchange(ref _count, current + 1, current) == current)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stores a pending work item to run when the next slot opens.
|
||||
/// Overwrites any previously stored item — latest query always wins.
|
||||
/// </summary>
|
||||
public void SetPending(Action work, CancellationToken ct) => Interlocked.Exchange(ref _pendingWork, new PendingWork(work, ct));
|
||||
|
||||
/// <summary>
|
||||
/// Atomically removes and returns any pending work item, or null if none.
|
||||
/// </summary>
|
||||
public PendingWork? TakePending() => Interlocked.Exchange(ref _pendingWork, null);
|
||||
|
||||
public void Release() => Interlocked.Decrement(ref _count);
|
||||
}
|
||||
}
|
||||
@@ -70,6 +70,8 @@ public partial class ListViewModel : PageViewModel, IDisposable
|
||||
|
||||
public bool IsMainPage { get; init; }
|
||||
|
||||
public bool HasCustomDebounceLogic => IsMainPage;
|
||||
|
||||
private bool _isDynamic;
|
||||
|
||||
private Task? _initializeItemsTask;
|
||||
|
||||
Reference in New Issue
Block a user