您好,在处理 AspNetCore 2.2 上的 Web 服务的到达数据之前,需要执行一项任务。
该服务启动多达 100 个通过 POST 请求接收数据的并行设备。我不会描述设备如何处理这些数据。
在处理这些数据之前有一项任务要做。例如,在输入中,有一个字符串类型的字段“名称”,其值为“MyLongName”。工程师制定了如何转换此字符串的规则。
- 如果超过限制修剪线
- 在正确的位置插入子字符串
- 更改案例
- ...
那些。构建了一个动作管道来处理该类型。
public class MiddleWareInData<TIn> : IDisposable
{
public List<StringHandlerMiddleWare> StringHandlers { get; }
public List<DateTimeHandlerMiddleWare> DateTimeHandlers { get; }
private void HandleInvoke(IEnumerable<TIn> datas)
{
foreach (var data in datas)
{
//ОБРАБОТЧИКИ String
Parallel.ForEach(StringHandlers, (stringHandler) =>
{
//Псевдо код обработки !!!
var str = "Начальная строка"; //Найденное свойство в типе через рефлексию
var res = stringHandler.Convert(str); //Преобразование
str = res; //перезаписали занчение свойства
});
//ОБРАБОТЧИКИ DateTime
//foreach (var dateTimeHandler in DateTimeHandlers)
//{
//}
}
}
}
public class StringHandlerMiddleWare : BaseHandlerMiddleWare<string>
{
public StringHandlerMiddleWare(StringHandlerMiddleWareOption option)
{
PropName = option.PropName;
if (option.InseartStringConverterOption != null)
{
Converters.Add(new InseartStringConverter(option.InseartStringConverterOption));
}
if (option.LimitStringConverterOption != null)
{
Converters.Add(new LimitStringComverter(option.LimitStringConverterOption));
}
if (option.ReplaceEmptyStringConverterOption != null)
{
Converters.Add(new ReplaceEmptyStringConverter(option.ReplaceEmptyStringConverterOption));
}
}
}
public abstract class BaseHandlerMiddleWare<T>
{
public string PropName { get; protected set; }
protected readonly List<IConverterMiddleWare<T>> Converters = new List<IConverterMiddleWare<T>>();
public virtual T Convert(T inProp)
{
var processedPrallel = Converters
.AsParallel()
.AsOrdered();
foreach (var converter in processedPrallel)
{
inProp = converter.Convert(inProp);
}
return inProp;
}
}
在 MiddleWareInData 类中,数据进入 HandleInvoke 方法,在该方法中它们开始按顺序处理,一次 1。我决定并行处理。最简单的方法是使用Parallel.ForEach,跨类型并行处理(一个数据单元的所有String类型都是并行处理的),在String类型的处理里面,启动了一个类型处理流水线,也可以运行在使用 PLINQ 并行(保持转换器调用的顺序)。您还可以使用 Parallel.ForEach 并行开始迭代 stringHandler 的数据本身。
那些。我们得到一个从 1 开始并行处理的输入数据列表。在这个数据单元的处理程序中,所有字符串类型都是并行处理的。字符串处理管道也是并行的(保留调用的顺序)。
我考虑过使用 MapReduce,但处理结束时不需要 Reduce(卷积),最后会有与输入相同的数据列表,只有转换后的数据。这种方式并行化处理是正常的,还是需要手动限制线程数,比如Parallel.ForEach。或者也许在某处使用 Task-i。
我将举一个具体的例子来分析并行操作设备100个。每个设备接受一个包含 50 个项目的列表。每个元素由 10 个 Handler 处理(需要更改 10 个 String 类型) 每个元素的处理管道由 5 个阶段组成。
我真的不明白您将如何开始并行处理保序转换器。事实上,如果动作本身的顺序被保留,那么这不再是并行处理,而是顺序处理。
我想你会发现这个例子很有帮助:
假设有这样一个代表
我为它添加了一些功能
在此之后,您可以组装一个用于处理字符串的管道
管道将通过顺序启动处理程序来工作,您可以像这样检查它
输出将是这样的
此外,对于多行的并行处理,您可以调用它
结果
但在这种情况下,行是并行处理的,结果可以是任何顺序。例如,字符串
start number is 1 end
可以被处理并出现在字符串之前的结果中start number is 0 end
。如果需要保留输出中输入行的顺序,可以添加AsOrdered()
输出将被排序
至于管道本身的并行化,正如我所说,既然你可以严格在前一个操作完成后开始下一个操作,那么你有一个串行管道,而不是并行管道,你不能为同一条线路运行转换器平行。
还要记住,添加多线程并不总是会带来更快的性能。例如,如果您向线程发送大量小操作,那么处理器只会在线程之间进行切换,因为在线程之间切换是一项昂贵的操作。
仅当您确定自己在做什么时,手动限制线程数也是值得的。默认调度程序足够聪明,通常可以信任它来完成这项工作。
因此,如果你在输入端有一组行,而且行数很多,我会冷静地将这组委托给并行 PLINQ,并且不会发明任何东西。使传送带的台阶平行是没有意义的。此外,如果您想要速度并且您有很多转换规则并且字符串本身很大,那么可能首先,您需要寻找优化管道本身的方向,因为对字符串的任何编辑都会创建一个新字符串并在其上花费时间,与字符串的长度成正比。我建议朝那个方向看,也许调用 StringBuilder 而不是字符串,或者只是一个字符数组。但无论如何,任何优化都必须经过权衡、衡量并证明它在战斗条件下确实有效。