Safely SetResult in TaskCompletionSource

tl/dr; When writing a library, or you don’t have control over the users, queue TaskCompletionSource.SetResult(…) calls in a new threadpool work item or inside of Task.Run. Since SetResult will execute anything awaiting it inline.

The C# TaskCompletionSource is an incredibly powerful part of the Task Parallel library. It’s an incredibly low level building block for asynchronous design, allowing for the creation of more advanced async structures. Its default behavior though, is unexpected and can lead to non trivial program bugs, performance issues, and deadlocks.

var source = new TaskCompletionSource<bool>();

// ...

source.SetResult(true);

An issue could arise when SetResult is called. By default, a TaskCompletionSource executes all continuations inline, when SetResult is called. This would be any case of (await, .ContinueWith). In the usual case, this is fine, but there are some scenarios when this is not expected.

The code sample below creates several tasks (Writer), which each create a “WorkItem”, and place it into a queue. The work item has a task completion source on it. Once the work item is enqueued, they each wait for the task completion source to be completed. Then They sleep for 1 second. Another task is spun up (Reader) which just waits for a WorkItem to be enqueued, dequeues it, and sets a result on the TaskCompletionSource. Ideally, the Reader should be able to dequeue every item very quickly, as its not really doing any work.

// main
TaskCompletionSourceThroughput.Run(
                
TaskCompletionSourceThroughput.TaskCompletionMode.PlainSetResult,
                count).Wait();
namespace Badflyer.TCSDeadlock
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;

    internal class TaskCompletionSourceThroughput
    {
        public enum TaskCompletionMode
        {
            /// <summary>
            /// Just use regular old set result on a task completion source.
            /// </summary>
            PlainSetResult,

            /// <summary>
            /// Call set result, but create the TCS with TaskCreationOptions.RunContinuationsAsynchronously 
            /// </summary>
            SetResultWithAsyncContinuations,

            /// <summary>
            /// Call set result in a new threadpool action:  Task.Run(() => item.Completion.TrySetResult(item.Value));
            /// </summary>
            SetResultInTaskRun
        }

        private readonly int count;
        private readonly TaskCompletionMode mode;
        private readonly CancellationTokenSource cancellation;
        private readonly SemaphoreSlim gate;
        private readonly Stopwatch stopWatch;
        private readonly ConcurrentQueue<WorkItem<string>> workItems;

        private TaskCompletionSourceThroughput(TaskCompletionMode mode, int count)
        {
            this.count = count;
            this.mode = mode;
            this.cancellation = new CancellationTokenSource();
            this.gate = new SemaphoreSlim(0);
            this.stopWatch = Stopwatch.StartNew();
            this.workItems = new ConcurrentQueue<WorkItem<string>>();
        }

        /// <summary>
        /// Runs the test.
        /// </summary>
        /// <param name="mode">How to complet the task completion source during the test.</param>
        /// <param name="numberOfItems">The number of items to run with.</param>
        /// <returns>A task when the test is completed.</returns>
        internal static Task Run(TaskCompletionMode mode, int numberOfItems)
        {
            var test = new TaskCompletionSourceThroughput(mode, numberOfItems);
            return test.Run();
        }

        /// <summary>
        /// Run the test.
        /// </summary>
        /// <returns>An async that completes when the test does.</returns>
        private async Task Run()
        {
            // Start up these worker tasks which enqueue work items.
            var tasks = Enumerable.Range(0, count)
                .Select(i => Task.Run(() => Writer(i))).ToList();

            // Start up the reader task, which dequeues work items and complets them.
            var reader = Task.Run(Reader);

            await Task.WhenAll(tasks);
            this.cancellation.Cancel();
        }

        /// <summary>
        /// Reads each item from the workitems list, and completes it as quickly as it can.
        /// </summary>
        private async Task Reader()
        {
            var token = this.cancellation.Token;
            this.LogMessage("Starting Reader.");
            while (false == token.IsCancellationRequested)
            {
                try
                {
                    await gate.WaitAsync(token);
                }
                catch (OperationCanceledException)
                {
                    break;
                }

                this.workItems.TryDequeue(out var item);

                if (this.mode == TaskCompletionMode.SetResultInTaskRun)
                {
                    _ = Task.Run(() => item.Completion.TrySetResult(item.Value));
                }
                else
                {
                    item.Completion.TrySetResult(item.Value);
                }
            }

            this.LogMessage("Finished reader.");
        }

        /// <summary>
        /// Creates a new work item, enqueues it to the work items list, and waits for the reader task to complete the item.
        /// After the item is completed, it sleeps for 1 second.
        /// </summary>
        private async Task Writer(int index)
        {
            var item = new WorkItem<string>(this.mode, $"Finished Item {index}");
            this.workItems.Enqueue(item);
            this.gate.Release();

            var text = await item.Completion.Task;

            this.LogMessage(text);

            Thread.Sleep(1000);
        }

        /// <summary>
        /// Logs a message.
        /// </summary>
        /// <param name="message">The message.</param>
        private void LogMessage(string message)
        {
            Console.WriteLine("{0:0.000} [{1}] -> {2}", stopWatch.Elapsed.TotalSeconds, this.mode, message);
        }

        /// <summary>
        /// A simple work item class.
        /// </summary>
        /// <typeparam name="T">The type of the item result.</typeparam>
        private class WorkItem<T>
        {
            public WorkItem(TaskCompletionMode mode, T value)
            {
                this.Value = value;

                if (mode == TaskCompletionMode.SetResultWithAsyncContinuations)
                {
                    this.Completion = new TaskCompletionSource<T>(TaskContinuationOptions.RunContinuationsAsynchronously);
                }
                else
                {
                    this.Completion = new TaskCompletionSource<T>();
                }
            }

            public T Value { get; }

            public TaskCompletionSource<T> Completion;
        }
    }
}

The issue here will be caused in “Writer” during Thread.Sleep(1000). Its not exactly obvious, but since the TaskCompletionSource was being await, this part of the method is being run as a continuation to the completion source. Since by default, the TaskCompletionSource will queue and run everything inline when SetResult is called, this is actually running in the same thread as “Reader”.

TaskCompletionSource.SetResult doesn’t seem like it should be an expensive call, but it could be, depending on what the child decides to do. So in this case, Reader, which should be a super fast loop releasing a bunch of other threads, actually serializes the execution of all of the work items.

// Here is the output of the program
1.212 [PlainSetResult] -> Finished Item 2
2.214 [PlainSetResult] -> Finished Item 0
3.220 [PlainSetResult] -> Finished Item 3
4.222 [PlainSetResult] -> Finished Item 4
5.236 [PlainSetResult] -> Finished reader.

Now, task completion source does provide an option to specify TaskContinuationOptions. One of which is RunContinuationsAsynchronously. This does not seem to work as expected. This program can run with different arguments to utilize this feature.

// main

// This will create the TaskCompletionSource with the RunContinuationsAsynchronously parameter. 
// aka: new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)

TaskCompletionSourceThroughput.Run(TaskCompletionSourceThroughput.TaskCompletionMode.SetResultWithAsyncContinuations).Wait();
// The results still show the output running serialized.

0.008 [SetResultWithAsyncContinuations] -> Starting Reader.
0.008 [SetResultWithAsyncContinuations] -> Finished Item 0
1.010 [SetResultWithAsyncContinuations] -> Finished Item 1
2.013 [SetResultWithAsyncContinuations] -> Finished Item 2
3.016 [SetResultWithAsyncContinuations] -> Finished Item 3
4.018 [SetResultWithAsyncContinuations] -> Finished Item 4
5.020 [SetResultWithAsyncContinuations] -> Finished reader.

For best performance, if you know that your continuations will be running lots of work, and you don’t want them to be serialized. You can always force the continuations to run in a different threadpool work item.

// Always run in a new threadpool thread.
Task.Run(() => taskCompletionSource.TrySetResult(result));

Doing this will add a lot of overhead if continuations are really quick, but will drastically improve performance if they are slow.

// main

// This will complete the task completion source task in a new threadpool thread.

TaskCompletionSourceThroughput.Run(TaskCompletionSourceThroughput.TaskCompletionMode.SetResultInTaskRun).Wait();
// Results

0.034 [SetResultInTaskRun] -> Starting Reader.
0.036 [SetResultInTaskRun] -> Finished Item 1
0.036 [SetResultInTaskRun] -> Finished Item 0
0.037 [SetResultInTaskRun] -> Finished Item 2
0.039 [SetResultInTaskRun] -> Finished Item 3
0.040 [SetResultInTaskRun] -> Finished Item 4
1.188 [SetResultInTaskRun] -> Finished reader

As you can see, these actually ended up running in a parallel nature.

The takeaway from this is that if you know that the functions awaiting your TaskCompletionSource are going to be doing heavy, slow work, or if you have no control over what they could be then be sure to call SetResult in a new threadpool thread. Another good example of this is Sytem.Net.Threading.SemaphoreSlim, which will always completion its waiting TaskCompletionSource in a new threadpool thread. https://referencesource.microsoft.com/#mscorlib/system/threading/SemaphoreSlim.cs,117283ce80a6dfb9

Leave a Reply

Your email address will not be published. Required fields are marked *