你好,你需要实现一个内部的EventBus(比如不是基于RabbitMq的微服务交互)。所有发布者都将带有类型数据(MyData)的事件生成写入公共总线。订阅者侦听总线并按国家类型(MyData)过滤发布者。最好在订阅时返回 IDisposable。发布者也可以取消订阅所有订阅者。
没有时间寻找现成的解决方案,所以我很快做了一个 ReactiveExt 的包装器
public class ReactiveEventBus
{
public ISubject<object> Bus { get; set; }
public ReactiveEventBus()
{
Bus = new Subject<object>();
}
public void Publish<TMessage>(TMessage msg)
{
Bus.OnNext(msg);
}
public IDisposable Subscribe<TMessage>(Action<TMessage> action)
{
return Bus
.Where(item=> item.GetType() == typeof(TMessage))
.Subscribe((obj) =>
{
var val = (TMessage) obj;
action(val);
});
}
}
那些。订阅适用于所有发布,仅适用于必要的事件,我按类型过滤 Where。
boxing/unboxing
如果 TMessage 值是类型的,就会出现问题,因为 ( var val = (TMessage) obj)
在调用委托本身之前有一个对象的转换。但我不太可能有价值事件。
Where
如果订阅者很多且事件频繁发布,过滤 () 对 Rx 事件的性能有多大影响。
是否值得用现成的解决方案替换这个实现(MemBus 遇到的第一件事)?
过滤本身不会以任何方式影响性能。它只是一个条件运算符,尽管在几个包装器中......
另一件事很重要:这种方案中的每条消息都会立即发送给所有订阅者,然后部分丢弃。也就是说,订阅者越多 - 工作越慢。
如果您打算通过这样的总线发送大量数据,最好为每种类型的消息创建一个单独的主题。
PS,如果您决定继续使用当前架构 - 了解 OfType 方法的作用:-)