TPL Producer Consumer in a FIFO order C#
OK after the edit - instead of adding the results in the BlockingCollection, add the Tasks in the blocking collection. This has the feature where the items are processed in order AND there is a maximum parallelism which will prevent too many threads from kicking off and you eating up all your memory.
https://dotnetfiddle.net/lUbSqB
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
public class Program
{
private static BlockingCollection<Task<int>> BlockingCollection {get;set;}
public static void Producer(int numTasks)
{
Random r = new Random(7);
for(int i = 0 ; i < numTasks ; i++)
{
int closured = i;
Task<int> task = new Task<int>(()=>
{
Thread.Sleep(r.Next(100));
Console.WriteLine("Produced: " + closured);
return closured;
});
BlockingCollection.Add(task);
task.Start();
}
BlockingCollection.CompleteAdding();
}
public static void Main()
{
int numTasks = 20;
int maxParallelism = 3;
BlockingCollection = new BlockingCollection<Task<int>>(maxParallelism);
Task.Factory.StartNew(()=> Producer(numTasks));
foreach(var task in BlockingCollection.GetConsumingEnumerable())
{
task.Wait();
Console.WriteLine(" Consumed: "+ task.Result);
task.Dispose();
}
}
}
And the results:
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 3
Produced: 2
Consumed: 2
Consumed: 3
Produced: 4
Consumed: 4
Produced: 6
Produced: 5
Consumed: 5
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 10
Produced: 9
Consumed: 9
Consumed: 10
Produced: 12
Produced: 13
Produced: 11
Consumed: 11
Consumed: 12
Consumed: 13
Produced: 15
Produced: 14
Consumed: 14
Consumed: 15
Produced: 17
Produced: 16
Produced: 18
Consumed: 16
Consumed: 17
Consumed: 18
Produced: 19
Consumed: 19
Producer producing data every one second and Consumer consuming after every minute
In this example two tasks are created: producer and consumer. In the first, data is generated every second and placed in the collection. In the second, the data is extracted from the collection and processed every minute.
var produced = new BlockingCollection<Price>();
var producer = Task.Run(async () =>
{
try
{
var random = new Random();
while (true)
{
produced.Add(new Price { Low = random.Next(500), High = random.Next(500, 1000) });
await Task.Delay(1000);
}
}
finally
{
produced.CompleteAdding();
}
});
var consumer = Task.Run(async () =>
{
const int interval = 60; // seconds
var values = new List<Price>();
foreach (var value in produced.GetConsumingEnumerable())
{
values.Add(value);
if (DateTime.UtcNow.Second % interval == 0)
{
Console.WriteLine(values.Average(p => p.High)); // do some work
values.Clear();
}
}
});
Task.WaitAll(producer, consumer);
C# Producer/Consumer setup, Consumer never works if there's a UI?
In the example a console app is used. They use .wait to make sure the console app does not quit but in your case you should probably await it because .wait is blocking. But please share your actual code. As you might imagine it is kind of hard for us to tell what is going in if you don't post your code
In case anyone else was stuck like I was, changing the "consumer.Wait()" to "await consumer" as @PeterBons suggested was the answer. In my case it still acts a bit funky, but the full functionality does work, just a bit more behind the scenes than I expected.
Infinite producer/consumer via serial port data
First things first, starting multiple asynchronous operations and awaiting them one by one is wrong:
// Wrong
await producer;
await consumer;
The reason is that if the first operation fails, the second operation will become fire-and-forget. And allowing tasks to escape your supervision and continue running unattended, can only contribute to your program's instability. Nothing good can come out from that.
// Correct
await Task.WhenAll(producer, consumer)
Now regarding your main issue, which is how to make sure that a failure in one task will cause the timely completion of the other task. My suggestion is to hook the failure of each task with the cancellation of a CancellationTokenSource
. In addition, both tasks should watch the associated CancellationToken
, and complete cooperatively as soon as possible after they receive a cancellation signal.
var cts = new CancellationTokenSource();
Task producer = StartProducerAsync(cts.Token).OnErrorCancel(cts);
Task consumer = StartConsumerAsync(cts.Token).OnErrorCancel(cts);
await Task.WhenAll(producer, consumer)
Here is the OnErrorCancel
extension method:
public static Task OnErrorCancel(this Task task, CancellationTokenSource cts)
{
return task.ContinueWith(t =>
{
if (t.IsFaulted) cts.Cancel();
return t;
}, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
}
Instead of doing this, you can also just add an all-enclosing try
/catch
block inside each task, and call cts.Cancel()
in the catch
.
Related Topics
Dependency Injection in Attributes
How to Show the "Paste JSON Class" in Visual Studio 2012 When Clicking on Paste Special
In Unity (C#), Why am I Getting a Nullreferenceexception and How to Fix It
What How to Use for Good Quality Code Coverage for C#/.Net
Why Would I Ever Need to Use C# Nested Classes
Setting an Object to Null VS Dispose()
How to Update Record Using Entity Framework 6
How to Create a Custom Membership Provider for ASP.NET MVC 2
How to Seed a Random Class to Avoid Getting Duplicate Random Values
Win32 API Function to Programmatically Enable/Disable Device
Cannot Use Ref or Out Parameter in Lambda Expressions
How to Store User Settings for a .Net Application
When to Dispose Cancellationtokensource
"A Namespace Cannot Directly Contain Members Such as Fields or Methods"