我有一个类似的机制pub/sub
,通过 实现Queue<T>
,我delegate
将消息放入队列中,然后在一般循环中开始处理它们,Dataflow.ActionBlock
通过以下方式限制最大块数MaxDegreeOfParallelism
:
Console.WriteLine($"Processing {ClosedOrders.Count} closed orders ...");
var closedOrdersBlock = new ActionBlock<OrderMessage>(OnPositionClosed,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
while (ClosedOrders.Count > 0)
{
ClosedOrders.TryDequeue(out var closedOrder);
if (closedOrder != null)
closedOrdersBlock.Post(closedOrder);
}
closedOrdersBlock.Complete();
closedOrdersBlock.Completion.Wait();
这是委托处理:
protected virtual void OnPositionClose(OrderMessage message)
{
try
{
foreach (var orderToClose in ClosedOrders.ToList())
{
if (message.Order == null || orderToClose.Order == null ||
orderToClose.Order.Ticket != message.Order.Ticket) continue;
Console.WriteLine("Duplicate order for closing received for ticket " + orderToClose.Order.Ticket +
", skipping");
return;
}
ClosedOrders.Enqueue(message);
}
catch (Exception ex)
{
Console.WriteLine($"Failed to enqueue order with ticket {message.Order?.Ticket} for closing: {ex.Message}");
Console.WriteLine($"Stack trace: {ex.StackTrace}");
}
}
发生错误
“未能将关闭顺序排入队列:集合已修改;枚举操作可能无法执行。”
,据我了解,发生这种情况是因为我在处理队列时向队列添加了新条目,尽管异常表明这种情况发生在 foreach 循环中的委托本身中。这意味着什么?我没有对其进行任何更改,我只是浏览列表并检查数据,但我不会以任何方式更改它。
或者这是因为在处理过程中,新记录被放入队列中?