RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 972636
Accepted
Oleg
Oleg
Asked:2020-04-22 21:14:34 +0000 UTC2020-04-22 21:14:34 +0000 UTC 2020-04-22 21:14:34 +0000 UTC

莫尼克斯。如何处理 .mapParallelUnordered 方法中的错误

  • 772

我设计了一个可以包含无效元素的反应流。按照设计,流应该简单地跳过它们。为此,我使用Observable.empty. 例如:

  Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
    .flatMap(i =>
      if (i % 2 == 0) {   // Bad i
        Observable.empty
      } else
        Observable.pure(i)
    )
    .foreachL(i => print(s"Good i: $i"))   /*Output: Good i: 1
                                                     Good i: 3
                                                     Good i: 5
                                                     Good i: 7
                                                     Good i: 9*/

这段代码基本上可以工作,尽管它按顺序处理元素。但我积极使用长 IO 操作,所以我决定从.mapParallelUnordered. 结果是这样的:

  Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
    .mapParallelOrdered(3)(i =>
      if (i % 2 == 0) {
        Task.raiseError(new Exception(s"Bad i: $i"))
      } else
        Task.pure(i)
    )
    .foreachL(i => print(s"Good i: $i"))    /*Output: Good i: 1*/

问题是它会Task.raiseError...杀死整个线程,导致它在第一个偶数处停止。

在第二种情况下,您如何跳过Observable.empty无效元素的反应流程 (a la ) 的所有其他步骤,同时保持流程的执行?

scala
  • 1 1 个回答
  • 10 Views

1 个回答

  • Voted
  1. Best Answer
    extrn
    2020-04-23T06:23:05Z2020-04-23T06:23:05Z

    让我们举一个更清楚的例子。

    有一项严肃的任务需要很长时间才能思考,并且可能会因错误而失败:

    def process(x: Long): Task[Long] = Task.eval {
      println(s"Start processing $x")
      Thread.sleep(Random.nextInt(2000))
    
      val result =
        if (x % 2 != 0) throw new Error(s"Error $x is odd")
        else x
    
      println(s"Finish processing $x")
      result
    }
    

    对于每个元素,执行任务,将结果包装在 中Observable,获取Observable[Observable[Long]]无序结果,然后折叠到Observable[Long].

    val result: Observable[Long] =
      Observable.range(1, 10)
        .mapParallelUnordered(10) { x =>
          process(x).transform(Observable.pure, _ => Observable.empty)
        }.flatten
    

    transform您可以处理错误,重新启动任务等,而不是简单的。

    process(x)
      .map(Observable.pure)
      .onErrorRestart(2)
      .onErrorRecover {
        case NonFatal(ex) =>
          println(ex)
          Observable.empty
      }
    
    • 2

相关问题

  • 使用环形光学递归遍历 JSON

  • 如何在 build.sbt 中为项目设置 Java 选项

  • 如何在自定义 sbt 存储库中搜索特定依赖项?

  • 在Scala中按条件从另外两个创建列表

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    根据浏览器窗口的大小调整背景图案的大小

    • 2 个回答
  • Marko Smith

    理解for循环的执行逻辑

    • 1 个回答
  • Marko Smith

    复制动态数组时出错(C++)

    • 1 个回答
  • Marko Smith

    Or and If,elif,else 构造[重复]

    • 1 个回答
  • Marko Smith

    如何构建支持 x64 的 APK

    • 1 个回答
  • Marko Smith

    如何使按钮的输入宽度?

    • 2 个回答
  • Marko Smith

    如何显示对象变量的名称?

    • 3 个回答
  • Marko Smith

    如何循环一个函数?

    • 1 个回答
  • Marko Smith

    LOWORD 宏有什么作用?

    • 2 个回答
  • Marko Smith

    从字符串的开头删除直到并包括一个字符

    • 2 个回答
  • Martin Hope
    Alexandr_TT 2020年新年大赛! 2020-12-20 18:20:21 +0000 UTC
  • Martin Hope
    Alexandr_TT 圣诞树动画 2020-12-23 00:38:08 +0000 UTC
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5