仅使用 spark sql 时,对数据库的请求处理得非常快,但是当 JavaPairJDD 连接时,它开始变慢。
我想知道是否可以加快数据处理速度或以某种方式改进代码以使其不会冻结?
这是没有 JavaPairRDD 的运行
collectAsList at StackOverFlow.java:56, took 0.883860 s
在这里使用 JavaPairRDD
collectAsMap at StackOverFlow.java:61, took 128.340516 s
这是代码
public List<Order> getAllWithoutPairRDD(Optional<String> search, Optional<Integer> size) {
SparkSession session = SparkSession.builder().config(config).getOrCreate();
Properties properties = new Properties();
properties.setProperty("partitionColumn", "id");
properties.setProperty("fetchsize", "1000");
properties.setProperty("driver", "org.postgresql.Driver");
properties.setProperty("user", "postgres");
properties.setProperty("password", "password");
Dataset<Row> jdbc = session.read().jdbc("jdbc:postgresql://localhost/orders"
, "orders"
, "id"
, 1L
, 60000000L
, 100
, properties
);
Dataset<Row> ordersData = jdbc.select(col("*")).where(col("city_id").equalTo(3L)).limit(size.orElse(1));
JavaPairRDD<Timestamp, Integer> analyticPairRDD = ordersData.toJavaRDD().mapToPair((PairFunction<Row, Timestamp, Integer>) row -> new Tuple2<Timestamp, Integer>((Timestamp) row.get(0), 1));
JavaPairRDD<Timestamp,Integer> result = analyticPairRDD.groupByKey().mapValues(Iterables::size);
return ordersData.as(orderEncoder).collectAsList();
}
public Map<Timestamp, Integer> getAllWithPairRDD(Optional<String> search, Optional<Integer> size) {
SparkSession session = SparkSession.builder().config(config).getOrCreate();
Properties properties = new Properties();
properties.setProperty("partitionColumn", "id");
properties.setProperty("fetchsize", "1000");
properties.setProperty("driver", "org.postgresql.Driver");
properties.setProperty("user", "postgres");
properties.setProperty("password", "password");
Dataset<Row> jdbc = session.read().jdbc("jdbc:postgresql://localhost/orders"
, "orders"
, "id"
, 1L
, 60000000L
, 100
, properties
);
Dataset<Row> ordersData = jdbc.select(col("operation_date")).where(col("city_id").equalTo(3L)).limit(size.orElse(1));
JavaPairRDD<Timestamp, Integer> analyticPairRDD = ordersData.toJavaRDD().mapToPair((PairFunction<Row, Timestamp, Integer>) row -> new Tuple2<Timestamp, Integer>((Timestamp) row.get(0), 1));
JavaPairRDD<Timestamp,Integer> result = analyticPairRDD.groupByKey().mapValues(Iterables::size);
return result.collectAsMap();
}
@Bean
public SparkConf sparkConf() {
return new SparkConf()
.setAppName(appName)
.setMaster(masterUri)
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "10")
.set("spark.executor.memoryOverhead", "1g")
.set("spark.driver.cores", "10")
.set("spark.driver.memory", "3g")
.set("spark.yarn.am.memory", "2g")
.set("spark.yarn.am.cores", "4")
.set("spark.sql.shuffle.partitions", "1000")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
}
它很可能会因为数据必须分散在执行程序上而变慢 - 对 RDD 要求数据分散在密钥上,这可能是一个缓慢的过程,当然这取决于数据量......
但是在这种情况下,我认为没有必要使用 RDD——最好使用 Spark SQL 函数,或者甚至在纯 SQL 中工作,执行所有必要的聚合——然后 Spark 本身会优化执行,这与 RDD 不同的是,没有这样做.
PS在这种情况下还有很多其他问题:
spark.sql.shuffle.partitions设置得太高1000- 这意味着数据将分散在 1000 个部分(如果有随机播放),并且将为每个部分创建一个必须执行的任务,并且执行器上有 10 个核心,这将为 100 次迭代。通常此参数设置为执行器上的核心总数的 1 或 2 倍