我使用 flink cep 处理来自 kafka 的传入流。描述的模式。StreamExecutionEnviroment 中指定的并行度。在 flink 仪表板中,我看到一条链,其中消息处理本身的并行度 == 1,其他的值为 3。我直接向处理操作员指示并行度 = 3
CEP.pattern(stream, pattern)
.select(...)
.setParallelism(3)
为什么我收到错误
the parallelism of non parallel operator must be 1
这是因为 select 返回的流对象不支持大于 1 的值。我的任务是我只需要使用 flink cep 将消息从一个 kafka 轻弹到另一个 kafka。也许你不应该使用 select。如何并行化 CEP.pattern?
stream.keyBy 解决了这个问题。话题关闭。