import org.apache.kafka.clients.consumer.ConsumerConfigimport org.apache.kafka.tools.OffsetManagerimport java.{util => ju}object Test { private def buildDefaultConfiguration(brokers: String, groupId: String): ju.HashMap[String, Object] = { val props = new ju.HashMap[String, Object] props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ConsumerConfig.CLIENT_ID_CONFIG, "monitor-test-2") props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props } private def addSecure(userName: String, password: String,props: ju.HashMap[String, Object]): ju.HashMap[String, Object] = { props.put("security.protocol", "SASL_PLAINTEXT") props.put("sasl.mechanism", "PLAIN") props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + userName + "\" password=\"" + password + "\";") props } def main(args: Array[String]): Unit = { val brokers = "kafka 集群列表" val topic = "设置的Topic" val username = "xxx" val password = "xxxxx" val groupId = "需要设置的group" val kafkaParam = addSecure(username, password, buildDefaultConfiguration(brokers, groupId)) val manager = OffsetManager(topic, kafkaParam)// new KafkaConsumer(new Properties()) manager.resetOffsetToLatest() }} |
转载请注明:SuperIT » kafka 设置消费group为最新offset
