RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 1303748
Accepted
aepot
aepot
Asked:2022-07-10 05:49:04 +0000 UTC2022-07-10 05:49:04 +0000 UTC 2022-07-10 05:49:04 +0000 UTC

批量异步调用,在没有信号量的情况下限制并发调用的数量

  • 772

相当简单的任务 - 有 N 个有序请求必须异步执行,同时执行的请求数量有限制,然后从它们中取出它们以及 N 个有序响应。

实际上,当有很多请求时,这可以与网络或数据库一起使用,但您不想为您的计算机、网络或服务器安排灾难。因此,对同时活动的异步任务的数量进行了限制。

正面的解决方案看起来很明显。

public async Task<Tout[]> RunSemaphoreAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
    using SemaphoreSlim semaphore = new(degree);
    return await Task.WhenAll(items.Select(async item => {
        await semaphore.WaitAsync();
        try
        {
            return await func(item);
        }
        finally
        {
            semaphore.Release();
        }
    }));
}

使用示例:

让它成为一项接受某事、做某事并回馈某事的工作任务。

public async Task<int> RunJobAsync(int n)
{
    await Task.Yield(); 
    return n + 1;
}

这是一个例子。

IEnumerable<int> numbers = Enumerable.Range(0, 100);
int[] result = await RunSemaphoreAsync(numbers, RunJobAsync, Environment.ProcessorCount * 2);
Console.WriteLine(string.Join(",", result));

一切都像发条一样,快速且按预期工作。


上面的一切都很好,但是有一天,当我再次阅读生产者/消费者模式的各种实现时,我想到了使用工人而不是信号量的想法。其实,为什么不呢。

这是方法。

public async Task<Tout[]> RunWorkersAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
    List<Task<Tout>> tasks = new();
    using (var source = items.GetEnumerator())
    {
        Task[] jobs = new Task[degree];
        for (int i = 0; i < degree; i++)
        {
            jobs[i] = ((Func<Task>)(async () =>
            {
                while (true)
                {
                    Task<Tout> task;
                    lock (source)
                    {
                        if (source.MoveNext())
                        {
                            task = func(source.Current);
                            tasks.Add(task);
                        }
                        else
                            break;
                    }
                    await task;
                }
            }))();
        }
        await Task.WhenAll(jobs);
    }
    return tasks.Select(t => t.Result).ToArray();
}

和第一个候选人一样漂亮。那么什么更好呢?


我决定测量开销。
我不是编写基准测试的高手,但这什么时候阻止了任何人?:)

class Program
{
    static void Main(string[] args)
    {
        var result = BenchmarkRunner.Run<MyBenchmarks>();
        Console.ReadKey();
    }
}

[MemoryDiagnoser]
public class MyBenchmarks
{
    private readonly List<int> numbers = Enumerable.Range(0, 2000).ToList();
    private readonly int degree = Environment.ProcessorCount * 2;

    [Benchmark]
    public Task SemaphoreTest()
    {
        return RunSemaphoreAsync(numbers, RunJobAsync, degree);
    }

    [Benchmark]
    public Task WorkersTest()
    {
        return RunWorkersAsync(numbers, RunJobAsync, degree);
    }

    public async Task<int> RunJobAsync(int n)
    {
        await Task.Yield(); 
        return n + 1;
    }

    public async Task<Tout[]> RunSemaphoreAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
    {
        using SemaphoreSlim semaphore = new(degree);
        return await Task.WhenAll(items.Select(async item => {
            await semaphore.WaitAsync();
            try
            {
                return await func(item);
            }
            finally
            {
                semaphore.Release();
            }
        }));
    }

    public async Task<Tout[]> RunWorkersAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
    {
        List<Task<Tout>> tasks = new();
        using (var source = items.GetEnumerator())
        {
            Task[] jobs = new Task[degree];
            for (int i = 0; i < degree; i++)
            {
                jobs[i] = ((Func<Task>)(async () =>
                {
                    while (true)
                    {
                        Task<Tout> task;
                        lock (source)
                        {
                            if (source.MoveNext())
                            {
                                task = func(source.Current);
                                tasks.Add(task);
                            }
                            else
                                break;
                        }
                        await task;
                    }
                }))();
            }
            await Task.WhenAll(jobs);
        }
        return tasks.Select(t => t.Result).ToArray();
    }
}

在这里我得到了一个有趣的结果

BenchmarkDotNet=v0.13.0, OS=Windows 10.0.19043.1081 (21H1/May2021Update)
Intel Core i7-4700HQ CPU 2.40GHz (Haswell), 1 CPU, 8 logical and 4 physical cores
.NET SDK=5.0.301
  [Host]     : .NET 5.0.7 (5.0.721.25508), X64 RyuJIT
  DefaultJob : .NET 5.0.7 (5.0.721.25508), X64 RyuJIT
方法 意思是 错误 标准差 0代 第一代 第 2 代 已分配
信号量测试 1,780.2 我们 4.23 我们 3.95 我们 140.6250 41.0156 - 519KB
工人测试 943.2us 18.37 我们 26.92 我们 74.2188 19.5313 - 262KB

眼睛拒绝相信。为什么带有信号量的方法只是像这样合并,或者可能是一个歪曲的测试或歪曲的实现?请考虑。

PS我没有立即在SO上运行,但首先我寻找关于缓慢的投诉SemaphoreSlim......并没有找到它,但我发现了这个:https ://github.com/dotnet/runtime/pull/55262 。换句话说,红绿灯在 .NET 6 中会飞一点。

c#
  • 2 2 个回答
  • 10 Views

2 个回答

  • Voted
  1. Best Answer
    user177221
    2022-07-18T22:14:04Z2022-07-18T22:14:04Z

    RunWorkersAsync 和 RunSemaphoreAsync 的代码以不同的方式在线程之间分配工作。

    在 RunWorkersAsync 中,每个worker 的 while 循环的第一个操作只在原线程中处理,之后在多个线程中进行迭代,因为 调用 await 后的所有内容都在池中的线​​程上处理。代码中不再有任何指向某个公共线程的链接。

    在 RunSemaphoreAsync 中,在原始线程中,对于每个元素,它执行

    items.Select(async item => {
            semaphore.WaitAsync(); // await и продолжение - на потоке из пула
    

    因此,在本实施例中,主流成为瓶颈。处理程序清理工作的速度不能比这个线程创建它的速度更快。

    这可以通过在循环开始时强制屈服来解决:

    public async Task<Tout[]> RunSemaphoreAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
    {
        using SemaphoreSlim semaphore = new(degree);
        return await Task.WhenAll(items.Select(async item =>
        {
            await Task.Yield();
            await semaphore.WaitAsync();
            try
            {
                return await func(item);
            }
            finally
            {
                semaphore.Release();
            }
        }));
    }
    

    结果会好一点,但作为单一流的瓶颈本身仍然不会消失。

    • 8
  2. Alexander Petrov
    2022-07-18T23:18:06Z2022-07-18T23:18:06Z

    我决定尝试申请频道 - 频道。代码变成了这样:

    public async Task<Tout[]> RunChannelAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
    {
        var channel = Channel.CreateBounded<Task<Tout>>(degree);
    
        var writer = channel.Writer;
        var reader = channel.Reader;
    
        Task.Run(async () =>
        {
            foreach (Tin item in items)
                await writer.WriteAsync(func(item));
    
            writer.Complete();
        });
    
        var tasks = new List<Tout>();
    
        await foreach (Task<Tout> item in reader.ReadAllAsync())
            tasks.Add(await item);
    
        return tasks.ToArray();
    }
    

    同时执行的任务数在CreateBounded.

    表现一般。
    原因与信号量基本相同:任务是由单个线程创建的,正如 PashaPash 所解释的那样。

    方法 意思是 错误 标准差 0代 第一代 第 2 代 已分配
    信号量测试 1,148.9us 15.50 我们 14.50 我们 156.2500 48.8281 - 527KB
    信号量产量测试 1,154.4us 22.51 我们 40.59 我们 169.9219 68.3594 - 711KB
    工人测试 763.3 我们 5.61 我们 5.25us 87.8906 27.3438 - 260KB
    通道测试 991.8us 6.64 我们 6.21 我们 125.0000 - - 246KB

    除非产生一点垃圾。它甚至没有到达第一代。

    我还想指出,await Task.Yield();在我的机器上使用信号量的选项在误差范围内甚至更糟都可以正常工作。


    我决定写的主要原因是这个。经过测试的方法接受IEnumerable- 这很好,可以从任何地方动态生成或获取值。
    但是结果会立即以数组的形式完整返回。因此,异步性会部分丢失,因为在创建/接收该数组时获得了一个块。

    • 4

相关问题

  • 使用嵌套类导出 xml 文件

  • 分层数据模板 [WPF]

  • 如何在 WPF 中为 ListView 手动创建列?

  • 在 2D 空间中,Collider 2D 挂在玩家身上,它对敌人的重量相同,我需要它这样当它们碰撞时,它们不会飞向不同的方向。统一

  • 如何在 c# 中使用 python 神经网络来创建语音合成?

  • 如何知道类中的方法是否属于接口?

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    表格填充不起作用

    • 2 个回答
  • Marko Smith

    提示 50/50,有两个,其中一个是正确的

    • 1 个回答
  • Marko Smith

    在 PyQt5 中停止进程

    • 1 个回答
  • Marko Smith

    我的脚本不工作

    • 1 个回答
  • Marko Smith

    在文本文件中写入和读取列表

    • 2 个回答
  • Marko Smith

    如何像屏幕截图中那样并排排列这些块?

    • 1 个回答
  • Marko Smith

    确定文本文件中每一行的字符数

    • 2 个回答
  • Marko Smith

    将接口对象传递给 JAVA 构造函数

    • 1 个回答
  • Marko Smith

    正确更新数据库中的数据

    • 1 个回答
  • Marko Smith

    Python解析不是css

    • 1 个回答
  • Martin Hope
    Alexandr_TT 2020年新年大赛! 2020-12-20 18:20:21 +0000 UTC
  • Martin Hope
    Alexandr_TT 圣诞树动画 2020-12-23 00:38:08 +0000 UTC
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5