bookmark_borderProcessing concurrent task results with IAsyncEnumerable

Here’s a simple snippet of code to enumerate over the results of a set of concurrent tasks as they complete. You can improve performance and code clarity over using Task.WhenAll.

public static class Utils
{
    /// <summary>
    /// Returns task results as they come.
    /// </summary>
    /// <typeparam name="T">The return type of the task.</typeparam>
    /// <param name="tasks">The tasks to await.</param>
    /// <returns>The async enumerable of completed tasks.</returns>
    public static async IAsyncEnumerable<T> StreamAsync<T>(this IEnumerable<Task<T>> tasks)
    {
        var set = new HashSet<Task<T>>(tasks);
        while (set.Count > 0)
        {
            var task = await Task.WhenAny(set);
            set.Remove(task);
            yield return task.Result;
        }
    }
}

A quick demo over the functionality of this function is shown in this application. You’ll see that the results of all 100 created tasks are printed out as they are received.

internal static class Program
{
    public static async Task Main(string[] args)
    {
        var random = new Random();

        var tasks = Enumerable.Range(0, 100)
            .Select(async i =>
            {
                var time = random.Next(100, 10000);
                await Task.Delay(time);
                return time;
            });

        await foreach (var task in tasks.StreamAsync())
        {
            // Results will be printed out as they are completed.
            Console.WriteLine(task);
        }
    }
}

This is yet another great example of the power of some of the newer features of the C# language. I used a function like this when processing concurrent outgoing web api requests to read comments from the Reddit API, as the result of each request could be read as it was completed.