RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 1537272
Accepted
Dany
Dany
Asked:2023-08-23 21:30:04 +0000 UTC2023-08-23 21:30:04 +0000 UTC 2023-08-23 21:30:04 +0000 UTC

使用数百万个任务会导致 System.OutOfMemoryException (C#)

  • 772

我有 600 万个小文件(平均大小约为 15 字节),我需要读取这些文件,然后使用处理器进行处理。我之前已经实现过这个,Task.Factory并且它在 asp .net core 2.1 上运行没有问题。花了大约20个小时。

我现在已将应用程序移植到 asp.net 6,并且在测试服务器上,我的 Web 应用程序在运行这些文件操作并崩溃后停止响应任何请求。我在日志中看到错误System.OutOfMemoryException。

我相信我的实现方式还很不理想。我想知道任何其他方法来多线程这项工作或您对当前代码的评论。

控制器方法ImportSignatures:

[HttpPost("ImportSignatures")]
public async Task<JsonResult> ImportSignatures()
{
    try
    {
        ImportSigningCertsResult res = await SignatureImportService.ImportSigningCerts();
        return Json(res);
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
        return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
    }
}

方法ImportSigningCerts:

public static async Task<ImportSigningCertsResult> ImportSigningCerts()
{
    LogsHelper.WriteEventLog("Запуск SignatureImportService");
    WasCancelled = false;
    IsWorking = true;
    ResultStr = "";
    totalSignatures = 0;
    processedSignatures = 0;

    var cancelMsg = "Импорт сертификатов был прерван. \n";
    var endMsg = "Импорт сертификатов успешно завершён. \n";
    var toDelete = new List<string>();

    try
    {
        var configuration = SignatureImportConfiguration.FromCfg();

        using (s_tokenSource = new CancellationTokenSource())
        {
            IEnumerable<string> signatures = Directory.EnumerateFiles(configuration.Path, "*.sig");
            totalSignatures = signatures.Count();

            Store mainStore = StoreMan.GetStore("Main");
            var importStats = new ImportStats();
            List<Task> tasks = new();

            int saveIndex = 1;
            const int proccessedForSave = 100000; // Через какое кол-во обработанных подписей произвести промежуточное сохранение хранилища и удаление подписей
            CancellationToken token = s_tokenSource.Token;

            ThreadPool.GetMinThreads(out int minWorkerThreads, out _);
            using SemaphoreSlim semaphore = new(minWorkerThreads);

            foreach (string path in signatures)
            {
                semaphore.Wait();

                if (WasCancelled)
                    break;

                tasks.Add(Task.Factory.StartNew(() =>
                {
                    try
                    {
                        token.ThrowIfCancellationRequested();

                        if (UploadSigningCerts(mainStore, path, importStats))
                        {
                            if (configuration.NeedCleaning)
                            {
                                lock (s_toDeleteListLockObj)
                                    toDelete.Add(path);
                            }
                        }

                        Interlocked.Increment(ref processedSignatures);
                        lock (s_intermediateSaveLockObj)
                        {
                            if (processedSignatures > proccessedForSave * saveIndex)
                            {
                                LogsHelper.WriteEventLog("Промежуточное сохранение хранилища сертификатов...");

                                mainStore.WriteIfChanged();
                                StartRemovingSignatures(toDelete);
                                saveIndex++;
                            }
                        }
                    }
                    catch (Exception e)
                    {
                        if (e is not OperationCanceledException)
                            LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts:Task.Factory.StartNew", e);
                    }
                    finally
                    {
                        semaphore.Release();
                    }
                }, token));
            }

            try
            {
                await Task.WhenAll(tasks);
            }
            catch (OperationCanceledException) { }

            mainStore.WriteIfChanged();
            StartRemovingSignatures(toDelete);
            ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
        }

        LogsHelper.WriteEventLog(ResultStr);
        return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
    }
    finally
    {
        IsWorking = false;
    }
}

方法UploadSigningCerts:

private static bool UploadSigningCerts(Store store, string path, ImportStats importStats)
{
    bool toBeDeleted = true;
    CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;

    try
    {
        List<CertInfo> certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToList();

        Interlocked.Add(ref importStats.all, certs.Count);

        for (int i = 0; i < certs.Count; i++)
        {
            lock (s_importLockObj)
            {
                // Код по валидации каждого сертификата из файла, принятие решения об импорте, импорт в хранилище...
            }
        }
        return toBeDeleted;
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
        LogsHelper.WriteEventLog($"Ошибка импорта сертификата из подписи: {Path.GetFileName(path)};");
        Interlocked.Increment(ref importStats.errors);
        return false;
    }
}

方法StartRemovingSignatures:

private static void StartRemovingSignatures(List<string> toDelete)
{
    if (toDelete.Count > 0)
    {
        List<string> tempToDelete;
        lock (s_toDeleteListLockObj)
        {
            tempToDelete = new List<string>(toDelete);
            toDelete.Clear();
        }

        LogsHelper.WriteEventLog("Удаление успешно обработанных файлов подписей...");

        Task.Factory.StartNew(() =>
        {
            tempToDelete.ForEach(path =>
            {
                try
                {
                    File.Delete(path);
                }
                catch (Exception e)
                {
                    LogsHelper.WriteLog("ImportResult/DeleteSignatures", e);
                }
            });
        });
    }
}

错误文本:

20.08.2023 11:58:01 api/Settings/ImportSignatures
Exception of type 'System.OutOfMemoryException' was thrown.
   at System.Threading.Tasks.Task.EnsureContingentPropertiesInitializedUnsafe()
   at System.Threading.Tasks.Task.AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation)
   at System.Threading.Tasks.Task.TaskConstructorCore(Delegate action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
   at Store.Services.SignatureImportService.<>c__DisplayClass20_0.<ImportSigningCerts>b__0(String path)
   at System.Collections.Generic.List`1.ForEach(Action`1 action)
   at Store.Services.SignatureImportService.ImportSigningCerts()
   at Store.Controllers.SettingsController.ImportSignatures()

更新:代码已被编辑

c#
  • 1 1 个回答
  • 159 Views

1 个回答

  • Voted
  1. Best Answer
    aepot
    2023-08-26T03:01:05Z2023-08-26T03:01:05Z

    让我们尝试稳定这段代码

    • 需要摆脱多余的代表
    • 从一堆线程驱动文件系统是没有意义的,它是同步的,它不能在一个介质中同时执行一堆操作
    • 没有必要去修改线程池的设置,它是正常配置的,你只需要小心对待它
    • 你不能阻塞控制器线程,所以你挂掉了整个服务器,为此有异步操作
    • 许多锁 - 不利于代码理解和性能,请使用线程安全集合
    • 读取int变量的过程本身是原子的,因此读取时不需要锁,摆脱不必要的锁
    • 摆脱静电
    • 将同时运行的任务数量限制为服务器核心数量乘以二(从历史上看,这是最佳的,您应该从这个开始)
    • 您创建的每个任务都应该是预期的,这样您就可以控制流程并且更容易捕获异常
    • 删除文件一般可以通过Producer/Consumer来完成,这样它们一出现在删除队列中就被删除,同时又不会拖慢主代码的速度。

    结果是这样的

    [HttpPost("ImportSignatures")]
    public async Task<JsonResult> ImportSignatures()
    {
        SignatureImportService importService = new SignatureImportService();
        try
        {
            ImportSigningCertsResult res = await importService.ImportSigningCerts();
            return Json(res);
        }
        catch (Exception e)
        {
            LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
            return Json(new ImportSigningCertsResult(e.Message, importService.WasCancelled));
        }
    }
    
    public async Task<ImportSigningCertsResult> ImportSigningCerts()
    {
        LogsHelper.WriteEventLog("Запуск SignatureImportService");
    
        const string cancelMsg = "Импорт сертификатов был прерван. \n";
        const string endMsg = "Импорт сертификатов успешно завершён. \n";
        try
        {
            using (_cts = new CancellationTokenSource())
            {
                var configuration = SignatureImportConfiguration.FromCfg();
                string[] signatures = Directory.GetFiles(configuration.Path, "*.sig");
                totalSignatures = signatures.Length;
                processedSignatures = 0;
    
                Store mainStore = StoreMan.GetStore("Main");
                var importStats = new ImportStats();
                List<Task> tasks = new();
    
                int saveIndex = 1;
                const int proccessedForSave = 100000; // Через какое кол-во обработанных подписей произвести промежуточное сохранение хранилища и удаление подписей
                CancellationToken token = _cts.Token;
                Channel<string> channel = Channel.CreateUnbounded<string>();
                ChannelWriter<string> deleteQueue = channel.Writer;
                Task deleteTask = DeleteWorkerAsync(channel.Reader, token);
                object saveSync = new object();
                int maxJobs = Environment.ProcessorCount * 2;
                using SemaphoreSlim semaphore = new(maxJobs);
                try
                {
                    foreach (string path in signatures)
                    {
                        await semaphore.WaitAsync(token);
                        if (tasks.Count > maxJobs * 2)
                        {
                            for (int i = 0; i < tasks.Count; i++) // не позволит списку тасков расти бесконечно
                            {
                                if (tasks[i].IsCompletedSuccessfully)
                                {
                                    tasks.RemoveAt(i--);
                                }
                            }
                        }
                        tasks.Add(Task.Run(() =>
                        {
                            try
                            {
                                token.ThrowIfCancellationRequested();
    
                                if (UploadSigningCerts(mainStore, path, importStats) && configuration.NeedCleaning)
                                {
                                    deleteQueue.TryWrite(path);
                                }
                                bool needSave = false;
                                lock (saveSync)
                                {
                                    if (++processedSignatures > proccessedForSave * saveIndex)
                                    {
                                        saveIndex++;
                                        needSave = true;
                                    }
                                }
                                if (needSave)
                                {
                                    LogsHelper.WriteEventLog("Промежуточное сохранение хранилища сертификатов...");
                                    mainStore.WriteIfChanged();
                                }
                            }
                            catch (Exception e) when (e is not OperationCanceledException)
                            {
                                LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts:Task.Factory.StartNew", e);
                            }
                            finally
                            {
                                semaphore.Release();
                            }
                        }, token));
                    }
                            
                    await Task.WhenAll(tasks);
                    mainStore.WriteIfChanged();
                    deleteQueue.Complete();
                    await deleteTask;
                }
                catch (OperationCanceledException) { }
    
                string result = (token.IsCancellationRequested ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
                LogsHelper.WriteEventLog(result);
    
                return new ImportSigningCertsResult(result, token.IsCancellationRequested);
            }
        }
        finally
        {
            _cts = null;
        }
    }
    
    private bool UploadSigningCerts(Store store, string path, ImportStats importStats)
    {
        bool toBeDeleted = true;
        CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;
    
        try
        {
            CertInfo[] certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToArray(); // здесь непонятно, что такое за коллекция Certs и зачем ее клонировать
    
            Interlocked.Add(ref importStats.all, certs.Length);
    
            for (int i = 0; i < certs.Length; i++)
            {
    
            }
            return toBeDeleted;
        }
        catch (Exception e)
        {
            LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
            LogsHelper.WriteEventLog($"Ошибка импорта сертификата из подписи: {Path.GetFileName(path)};");
            Interlocked.Increment(ref importStats.errors);
            return false;
        }
    }
    
    private async Task DeleteWorkerAsync(ChannelReader<string> reader, CancellationToken token)
    {
        await foreach (string path in reader.ReadAllAsync(token))
        {
            try
            {
                File.Delete(path); // если файловая система медленная, здесь можно await Task.Run(() => File.Delete(path));
            }
            catch (Exception e)
            {
                LogsHelper.WriteLog("ImportResult/DeleteSignatures", e);
            }
        }
    }
    

    bool变量也没有意义,你有一个线程安全的CTS,使用它,例如像这样

    public bool WasCancelled => _cts?.IsCancellationRequested ?? false;
    public bool IsWorking => !_cts?.IsCancellationRequested ?? false;
    
    private CancellationTokenSource _cts;
    

    进一步消除静电。如果您不需要动态添加和删除列表中的元素,请摆脱使用列表的习惯,改用数组。数组更轻、更快。

    这是该主题的另一读物:c # how to speed upAddingthreads using async-await

    • 2

相关问题

  • 使用嵌套类导出 xml 文件

  • 分层数据模板 [WPF]

  • 如何在 WPF 中为 ListView 手动创建列?

  • 在 2D 空间中,Collider 2D 挂在玩家身上,它对敌人的重量相同,我需要它这样当它们碰撞时,它们不会飞向不同的方向。统一

  • 如何在 c# 中使用 python 神经网络来创建语音合成?

  • 如何知道类中的方法是否属于接口?

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    我看不懂措辞

    • 1 个回答
  • Marko Smith

    请求的模块“del”不提供名为“default”的导出

    • 3 个回答
  • Marko Smith

    "!+tab" 在 HTML 的 vs 代码中不起作用

    • 5 个回答
  • Marko Smith

    我正在尝试解决“猜词”的问题。Python

    • 2 个回答
  • Marko Smith

    可以使用哪些命令将当前指针移动到指定的提交而不更改工作目录中的文件?

    • 1 个回答
  • Marko Smith

    Python解析野莓

    • 1 个回答
  • Marko Smith

    问题:“警告:检查最新版本的 pip 时出错。”

    • 2 个回答
  • Marko Smith

    帮助编写一个用值填充变量的循环。解决这个问题

    • 2 个回答
  • Marko Smith

    尽管依赖数组为空,但在渲染上调用了 2 次 useEffect

    • 2 个回答
  • Marko Smith

    数据不通过 Telegram.WebApp.sendData 发送

    • 1 个回答
  • Martin Hope
    Alexandr_TT 2020年新年大赛! 2020-12-20 18:20:21 +0000 UTC
  • Martin Hope
    Alexandr_TT 圣诞树动画 2020-12-23 00:38:08 +0000 UTC
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5