Андрей Козицкий Asked:2020-06-21 18:23:01 +0800 CST2020-06-21 18:23:01 +0800 CST 2020-06-21 18:23:01 +0800 CST 如何在批处理模式下使用 spark 从 kafka 中减去完整主题 772 在 spark/kafka 上的 google 中有流式传输的示例,如何在批处理模式下使用 spark(任何 api java/scala)从 kafka 中减去数据。 hadoop 1 个回答 Voted Best Answer Alex Chermenin 2020-06-27T20:34:28+08:002020-06-27T20:34:28+08:00 您可以在流模式下读取 Kafka 主题,也可以在批处理模式下执行查询,指定开始和结束偏移量,如下所示: val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] 您可以在文档中找到更多详细信息:https ://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries
您可以在流模式下读取 Kafka 主题,也可以在批处理模式下执行查询,指定开始和结束偏移量,如下所示:
您可以在文档中找到更多详细信息:https ://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries