I find myself having to loop through a lot of async calls all over the place in my code base. The current pattern we are using is to declare SemaphoreSlim(maxcount) and then await sem.WaitAsync(), Create a new task, add it to a List and then repeat. To control the release the new task itself has a reference to sem and does the release on final. There is a step in there to look for and remove completed tasks. This is a pattern that was inherited and is used for multiple different types of async calls. As I develop new code I was hoping to simply this into a single helper class where I can just queue the work until I hit a set limit and then have to wait to add the next once a slot as freed up.
The calls are all async (as this is a 3rd party lib that we don’t control). Also, the library I maintain has to be used in both winform and asp.net processes, so keeping it async seems ideal. The targets are usually waiting web services (5-10s range), in which we don’t want to slam, but at the same time we typically have 1000’s of items in the queue (3rd part has no bulk update implementation — one element at a time type situation).
This is what I have come up with (naming is still a work in progress):
// Assume this is the parameter to the method we're calling over and over again private class _ProcessArg { public Guid ID { get; set; } } // The method that we are calling over and over again private async Task _Process(_ProcessArg arg, CancellationToken ct) { await Task.Run(() => { System.Diagnostics.Debug.WriteLine(arg.ID.ToString())}); } // The main loop where we generated the data for the call. private async Task _RunMainLoop(CancellationToken ct) { int maxThreads = 10; ConcurrentQueue<Guid> queue = new ConcurrentQueue<Guid>(); // Typically this would be the database load/whatnot for (int i = 0; i < 100000; i++) { queue.Enqueue(Guid.NewGuid()); } AsyncTaskMutex mutex = new AsyncTaskMutex(maxThreads); while (true) { Guid id; if (queue.TryDequeue(out id)) { await mutex.QueueTask<_ProcessArg>(_Process, new _ProcessArg { ID = id, }, ct); } else { await mutex.DrainQueue(ct); break; } } } // Class I'm looking for peer review on public class AsyncTaskMutex { private SemaphoreSlim _sem; private List<Task> _tasks; public AsyncTaskMutex() : this(10) { } public AsyncTaskMutex(int maxTasks) { _sem = new SemaphoreSlim(maxTasks, maxTasks); _tasks = new List<Task>(); } public async Task DrainQueue(CancellationToken ct) { await Task.WhenAll(_tasks); _tasks.RemoveAll(t => t.IsCompleted); } public async Task QueueTask<T>(Func<T, CancellationToken, Task> func, T args, CancellationToken ct = default(CancellationToken)) { await _sem.WaitAsync(ct); try { Task task = func(args, ct); task.GetAwaiter().OnCompleted(_OnCompleted); _tasks.Add(task); } catch (OperationCanceledException) { // Intentional ignore return; } } private void _OnCompleted() { _sem.Release(1); _tasks.RemoveAll(t => t.IsCompleted); } }