我有一个事件源source,它会在短时间内产生几个事件,然后保持沉默。我几乎写了自己的实现Window,在文档中找到了实现,但偶然发现了死锁。
这是我尝试转换IObservable<T>的方式IObservable<IList<T>(将我在此期间收到的所有事件都окна放入列表中):
source.Window(TimeSpan.FromSeconds(1)).Select(obs => obs.ToListObservable())
按照设计,它应该阻止结果事件的输出直到结束окна,但它只是死锁。
告诉我如何在不手动处理内部回调的情况下实现这一点IObservable
UPD:
我想将 2 个事件分层到一个窗口中。这样第一个事件开始窗口的倒计时,然后窗口本身通过 TimeSpan 关闭。用代码测试:
var source = Observable
.Interval(TimeSpan.FromMilliseconds(300));
var output = source.Window(TimeSpan.FromMilliseconds(700)).SelectMany(window => window.ToList());
using var subscription = output
.Subscribe(list => Console.WriteLine(">>" + string.Join("; ", list)));
Console.ReadLine();
UPD2:
我仍在尝试将“窗口”设置为像“开关”一样工作:打开它 - 它开始录制,关闭它 - 它结束录制并且可以打开它。
var state = false;
var output = source.Delay(TimeSpan.FromMilliseconds(50))
.Window(source.Where(next => state == false).Select(next=>
{
state = true;
return DateTime.UtcNow + TimeSpan.FromMilliseconds(610);
}), time => source.Where(next =>
{
var shellEnd = time < DateTime.UtcNow;
if (shellEnd)
{
state = false;
}
return shellEnd;
}))
.SelectMany(window => window.ToList());
这里仍然存在问题:
- 有时列表中有三个事件而不是两个
- 延迟
Delay往往无济于事。它应该稍微延迟事件调用,以便在事件传播之前打开/关闭窗口
该功能
Window非常强大,它允许您将序列分成小块(“窗口”),并手动控制这些窗口的边界。我们将需要一个重载Func<IObservable<TWindowClosing>> windowClosingSelector(参见描述,例如,在这里,滚动到“RxNET Window”并打开扰流板)。这个重载
Window是这样工作的。第一个窗口立即打开,并调用关闭事件生成器:生成器返回IObservable<T>,当第一个元素到达时,窗口关闭。立即打开一个新窗口,并再次请求生成器生成新的关闭事件。等等。作为关闭事件的生成器,我们可以方便地使用
Delay应用于初始序列的运算符:它将所有元素从当前元素开始移动一段固定的时间。结果,当调用生成器时,生成的第一个元素将是原始序列中当前尚未发出的下一个元素,并在时间上移动了所需的窗口宽度。也就是说,窗口将在第一个元素到达后的指定时间段后关闭。获得必要的窗口后,使用简单的
.ToList().结果代码:
请注意,我们无法缓存
source.Delay(windowDuration),因为我们需要调用时的状态。如果你的
source很冷并且为不同的订阅者产生不同的序列,你需要使用.Publish().RefCount(). 该代码基于流 inWindow和 inDelay相同的事实。