相当简单的任务 - 有 N 个有序请求必须异步执行,同时执行的请求数量有限制,然后从它们中取出它们以及 N 个有序响应。
实际上,当有很多请求时,这可以与网络或数据库一起使用,但您不想为您的计算机、网络或服务器安排灾难。因此,对同时活动的异步任务的数量进行了限制。
正面的解决方案看起来很明显。
public async Task<Tout[]> RunSemaphoreAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
using SemaphoreSlim semaphore = new(degree);
return await Task.WhenAll(items.Select(async item => {
await semaphore.WaitAsync();
try
{
return await func(item);
}
finally
{
semaphore.Release();
}
}));
}
使用示例:
让它成为一项接受某事、做某事并回馈某事的工作任务。
public async Task<int> RunJobAsync(int n)
{
await Task.Yield();
return n + 1;
}
这是一个例子。
IEnumerable<int> numbers = Enumerable.Range(0, 100);
int[] result = await RunSemaphoreAsync(numbers, RunJobAsync, Environment.ProcessorCount * 2);
Console.WriteLine(string.Join(",", result));
一切都像发条一样,快速且按预期工作。
上面的一切都很好,但是有一天,当我再次阅读生产者/消费者模式的各种实现时,我想到了使用工人而不是信号量的想法。其实,为什么不呢。
这是方法。
public async Task<Tout[]> RunWorkersAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
List<Task<Tout>> tasks = new();
using (var source = items.GetEnumerator())
{
Task[] jobs = new Task[degree];
for (int i = 0; i < degree; i++)
{
jobs[i] = ((Func<Task>)(async () =>
{
while (true)
{
Task<Tout> task;
lock (source)
{
if (source.MoveNext())
{
task = func(source.Current);
tasks.Add(task);
}
else
break;
}
await task;
}
}))();
}
await Task.WhenAll(jobs);
}
return tasks.Select(t => t.Result).ToArray();
}
和第一个候选人一样漂亮。那么什么更好呢?
我决定测量开销。
我不是编写基准测试的高手,但这什么时候阻止了任何人?:)
class Program
{
static void Main(string[] args)
{
var result = BenchmarkRunner.Run<MyBenchmarks>();
Console.ReadKey();
}
}
[MemoryDiagnoser]
public class MyBenchmarks
{
private readonly List<int> numbers = Enumerable.Range(0, 2000).ToList();
private readonly int degree = Environment.ProcessorCount * 2;
[Benchmark]
public Task SemaphoreTest()
{
return RunSemaphoreAsync(numbers, RunJobAsync, degree);
}
[Benchmark]
public Task WorkersTest()
{
return RunWorkersAsync(numbers, RunJobAsync, degree);
}
public async Task<int> RunJobAsync(int n)
{
await Task.Yield();
return n + 1;
}
public async Task<Tout[]> RunSemaphoreAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
using SemaphoreSlim semaphore = new(degree);
return await Task.WhenAll(items.Select(async item => {
await semaphore.WaitAsync();
try
{
return await func(item);
}
finally
{
semaphore.Release();
}
}));
}
public async Task<Tout[]> RunWorkersAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
List<Task<Tout>> tasks = new();
using (var source = items.GetEnumerator())
{
Task[] jobs = new Task[degree];
for (int i = 0; i < degree; i++)
{
jobs[i] = ((Func<Task>)(async () =>
{
while (true)
{
Task<Tout> task;
lock (source)
{
if (source.MoveNext())
{
task = func(source.Current);
tasks.Add(task);
}
else
break;
}
await task;
}
}))();
}
await Task.WhenAll(jobs);
}
return tasks.Select(t => t.Result).ToArray();
}
}
在这里我得到了一个有趣的结果
BenchmarkDotNet=v0.13.0, OS=Windows 10.0.19043.1081 (21H1/May2021Update)
Intel Core i7-4700HQ CPU 2.40GHz (Haswell), 1 CPU, 8 logical and 4 physical cores
.NET SDK=5.0.301
[Host] : .NET 5.0.7 (5.0.721.25508), X64 RyuJIT
DefaultJob : .NET 5.0.7 (5.0.721.25508), X64 RyuJIT
| 方法 | 意思是 | 错误 | 标准差 | 0代 | 第一代 | 第 2 代 | 已分配 |
|---|---|---|---|---|---|---|---|
| 信号量测试 | 1,780.2 我们 | 4.23 我们 | 3.95 我们 | 140.6250 | 41.0156 | - | 519KB |
| 工人测试 | 943.2us | 18.37 我们 | 26.92 我们 | 74.2188 | 19.5313 | - | 262KB |
眼睛拒绝相信。为什么带有信号量的方法只是像这样合并,或者可能是一个歪曲的测试或歪曲的实现?请考虑。
PS我没有立即在SO上运行,但首先我寻找关于缓慢的投诉SemaphoreSlim......并没有找到它,但我发现了这个:https ://github.com/dotnet/runtime/pull/55262 。换句话说,红绿灯在 .NET 6 中会飞一点。
RunWorkersAsync 和 RunSemaphoreAsync 的代码以不同的方式在线程之间分配工作。
在 RunWorkersAsync 中,每个worker 的 while 循环的第一个操作只在原线程中处理,之后在多个线程中进行迭代,因为 调用 await 后的所有内容都在池中的线程上处理。代码中不再有任何指向某个公共线程的链接。
在 RunSemaphoreAsync 中,在原始线程中,对于每个元素,它执行
因此,在本实施例中,主流成为瓶颈。处理程序清理工作的速度不能比这个线程创建它的速度更快。
这可以通过在循环开始时强制屈服来解决:
结果会好一点,但作为单一流的瓶颈本身仍然不会消失。
我决定尝试申请频道 - 频道。代码变成了这样:
同时执行的任务数在
CreateBounded.表现一般。
原因与信号量基本相同:任务是由单个线程创建的,正如 PashaPash 所解释的那样。
除非产生一点垃圾。它甚至没有到达第一代。
我还想指出,
await Task.Yield();在我的机器上使用信号量的选项在误差范围内甚至更糟都可以正常工作。我决定写的主要原因是这个。经过测试的方法接受
IEnumerable- 这很好,可以从任何地方动态生成或获取值。但是结果会立即以数组的形式完整返回。因此,异步性会部分丢失,因为在创建/接收该数组时获得了一个块。