我设计了一个可以包含无效元素的反应流。按照设计,流应该简单地跳过它们。为此,我使用Observable.empty
. 例如:
Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
.flatMap(i =>
if (i % 2 == 0) { // Bad i
Observable.empty
} else
Observable.pure(i)
)
.foreachL(i => print(s"Good i: $i")) /*Output: Good i: 1
Good i: 3
Good i: 5
Good i: 7
Good i: 9*/
这段代码基本上可以工作,尽管它按顺序处理元素。但我积极使用长 IO 操作,所以我决定从.mapParallelUnordered
. 结果是这样的:
Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
.mapParallelOrdered(3)(i =>
if (i % 2 == 0) {
Task.raiseError(new Exception(s"Bad i: $i"))
} else
Task.pure(i)
)
.foreachL(i => print(s"Good i: $i")) /*Output: Good i: 1*/
问题是它会Task.raiseError...
杀死整个线程,导致它在第一个偶数处停止。
在第二种情况下,您如何跳过Observable.empty
无效元素的反应流程 (a la ) 的所有其他步骤,同时保持流程的执行?
让我们举一个更清楚的例子。
有一项严肃的任务需要很长时间才能思考,并且可能会因错误而失败:
对于每个元素,执行任务,将结果包装在 中
Observable
,获取Observable[Observable[Long]]
无序结果,然后折叠到Observable[Long]
.transform
您可以处理错误,重新启动任务等,而不是简单的。