I would like some general feedback about my implementation and thoughts on possible issues it could cause, but most importantly, can you see problems with this construct causing deadlocks?
ProducerConsumerCollection:
public static class ProducerConsumerCollection { public static ProducerConsumerCollection<T, ConcurrentQueue<T>> NewQueue<T>(int maxCount) => new ProducerConsumerCollection<T, ConcurrentQueue<T>>(maxCount); public static ProducerConsumerCollection<T, ConcurrentStack<T>> NewStack<T>(int maxCount) => new ProducerConsumerCollection<T, ConcurrentStack<T>>(maxCount); public static ProducerConsumerCollection<T, ConcurrentBag<T>> NewBag<T>(int maxCount) => new ProducerConsumerCollection<T, ConcurrentBag<T>>(maxCount); } public sealed class ProducerConsumerCollection<T,TCollection> where TCollection : IProducerConsumerCollection<T>, new() { readonly TCollection _collection = new TCollection(); readonly AwaitableSignal<bool> _signalAdded = new AwaitableSignal<bool>(); readonly AwaitableSignal<bool> _signalTaken = new AwaitableSignal<bool>(); /// <summary> /// The maximum number of items this collection should store. /// </summary> public int MaxCount { get; } public ProducerConsumerCollection(int maxCount) { if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount)); MaxCount = maxCount; } /// <summary> /// Cancel any pending <see cref="TakeAsync"/> calls. /// </summary> public void CancelTake() => _signalAdded.ReleaseAll(false); /// <summary> /// Cancel any pending <see cref="AddAsync"/> calls. /// </summary> public void CancelAdd() => _signalTaken.ReleaseAll(false); /// <summary> /// Adds a new item to the collection. If <see cref="MaxCount"/> is already reached, /// the method waits asynchroneously until items are taken and space is available again. /// </summary> /// <returns><c>true</c> if the item was added to the collection, otherwise <c>false</c>.</returns> public async Task<bool> AddAsync(T item) { while (_collection.Count >= MaxCount) { Logger.Write<ProducerConsumerCollection<T, TCollection>>("MaxCount reached", System.Diagnostics.TraceEventType.Warning); if (!await _signalTaken) return false; } if (_collection.TryAdd(item)) { _signalAdded.Signal(true); return true; } else Logger.Write<ProducerConsumerCollection<T, TCollection>>("Collection.TryAdd failed.", System.Diagnostics.TraceEventType.Warning); return false; } /// <summary> /// Adds new items to the collection. If <see cref="MaxCount"/> is already reached, /// the method waits asynchroneously until items are taken and space is available again. /// </summary> /// <returns>The number of items that where successfully added to the collection.</returns> public async Task<int> AddAsync(IEnumerable<T> items) { int numPushed = 0; bool needToSignal = false; foreach (var item in items) { while (_collection.Count >= MaxCount) { Logger.Write<ProducerConsumerCollection<T, TCollection>>("MaxCount reached", System.Diagnostics.TraceEventType.Warning); if (needToSignal) { _signalAdded.Signal(true); needToSignal = false; } if (!await _signalTaken) break; } if (!_collection.TryAdd(item)) { Logger.Write<ProducerConsumerCollection<T, TCollection>>("Collection.TryAdd failed.", System.Diagnostics.TraceEventType.Warning); break; } needToSignal = true; numPushed++; } if (needToSignal) _signalAdded.Signal(true); return numPushed; } /// <summary> /// Takes the next item from the collection. If there are no items, /// the method waits asynchroneously until new items are added. /// Throws a <see cref="TaskCanceledException"/> if cancelled by means of <see cref="CancelTake"/>. /// </summary> /// <returns>The next item.</returns> public async Task<T> TakeAsync() { if (_collection.TryTake(out var item)) { _signalTaken.Signal(true); return item; } while (await _signalAdded) { if (_collection.TryTake(out item)) { _signalTaken.Signal(true); return item; } } if (_collection.TryTake(out item)) { _signalTaken.Signal(true); return item; } throw new TaskCanceledException(); } /// <summary> /// Takes the next item from the collection. If there are no items, /// the method waits asynchroneously until new items are added. /// </summary> /// <returns><paramref name="valueIfCancelled"/> if cancelled by means of <see cref="CancelTake"/>, otherwise the next item.</returns> public async Task<T> TakeAsync(T valueIfCancelled) { if (_collection.TryTake(out var item)) { _signalTaken.Signal(true); return item; } while (await _signalAdded) { if (_collection.TryTake(out item)) { _signalTaken.Signal(true); return item; } } if (_collection.TryTake(out item)) { _signalTaken.Signal(true); return item; } return valueIfCancelled; }
AwaitableSignal:
public sealed class AwaitableSignal<T> { readonly IProducerConsumerCollection<Action> _continuations = new ConcurrentQueue<Action>(); T _value; bool _releaseAll; void AddContinuation(Action continuation) { if (!_releaseAll) _continuations.TryAdd(continuation); else continuation(); } public Awaiter GetAwaiter() => new Awaiter(this); public void Signal(T value, int maxAwaitersReleased = int.MaxValue) { _value = value; _releaseAll = false; for (int i = 0; i < maxAwaitersReleased; i++) { if (!_continuations.TryTake(out var scheduleContinuation)) break; scheduleContinuation(); } } public void ReleaseAll(T value) { _value = value; _releaseAll = true; while (_continuations.TryTake(out var scheduleContinuation)) scheduleContinuation(); } public struct Awaiter : INotifyCompletion { readonly AwaitableSignal<T> _source; public T GetResult() => _source._value; public bool IsCompleted => false; internal Awaiter(AwaitableSignal<T> source) { _source = source; } public void OnCompleted(Action continuation) { if (continuation == null) throw new ArgumentNullException(nameof(continuation)); // Get the current SynchronizationContext, and if there is one, // post the continuation to it. However, treat the base type // as if there wasn't a SynchronizationContext, since that's what it // logically represents. var syncCtx = SynchronizationContext.Current; Action runContinuation; if (syncCtx != null && syncCtx.GetType() != typeof(SynchronizationContext)) runContinuation = () => syncCtx.Post(RunAction, continuation); else { // If we're targeting the default scheduler, queue to the thread pool, so that we go into the global // queue. As we're going into the global queue, we might as well use QUWI, which for the global queue is // just a tad faster than task, due to a smaller object getting allocated and less work on the execution path. TaskScheduler scheduler = TaskScheduler.Current; if (scheduler == TaskScheduler.Default) runContinuation = () => ThreadPool.QueueUserWorkItem(RunAction, continuation); else // We're targeting a custom scheduler, so queue a task. runContinuation = () => Task.Factory.StartNew(continuation, default(CancellationToken), TaskCreationOptions.PreferFairness, scheduler); } _source.AddContinuation(runContinuation); } static void RunAction(object state) => ((Action)state)(); } }
Example usage:
class Program { static async Task Main(string[] args) { var collection = ProducerConsumerCollection.NewQueue<int>(int.MaxValue); var t1 = Task.Run(async () => { var rand = new Random(); for (int i = 0; i < 100000; i++) { Console.WriteLine($ "Adding: {i}"); await collection.AddAsync(i); Console.WriteLine("Added"); await Task.Delay(rand.Next(50)); } }); var t2 = Task.Run(async () => { var rand = new Random(); while (true) { Console.WriteLine("Taking"); var val = await collection.TakeAsync(-1); if (val < 0) { Console.WriteLine("Done"); break; } else Console.WriteLine($ "Taken: {val}"); await Task.Delay(rand.Next(30)); } }); await Task.WhenAll(t1, t2); } }