我有很多主题的 Kafka。并且有 Consumer Kafka 用于读取这些主题的记录:
public Set<ConsumerRecord> consumeKafka() {
consumer.subscribe(topics);
Set<ConsumerRecord> resultRecords = new HashSet<>();
int i = 0;
while (i++ < topicIteration) {
ConsumerRecords<Object, Object> records = consumer.poll(100);
System.out.println(records.partitions());
for (ConsumerRecord consumerRecord : records){
resultRecords.add(consumerRecord);
}
}
return resultRecords;
}
private Consumer consumerInit(String consumerId){
props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
if (isActualizationTopics) {
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
} else {
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicLimit);
}
return new KafkaConsumer<>(props);
}
为了不连续读取所有主题,我需要对主题列表进行排序,并从最长时间未读取数据的主题中获取数据。我知道我需要获取每个主题的最后偏移日期并对其进行排序?如何才能做到这一点?或者也许还有其他解决方法?
没有这种可能性。Kafka 仅适用于偏移量,对日期不感兴趣。您可以尝试通过将这些信息以 <topic, last_read_date> 格式存储在应用程序端来解决问题。