微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

kafka 设置消费group为最新offset

kafka aide_941 7℃
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.tools.OffsetManager
import java.{util => ju}
object Test {
  private def buildDefaultConfiguration(brokers: String, groupId: String): ju.HashMap[String, Object] = {
    val props = new ju.HashMap[String, Object]
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "monitor-test-2")
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    props
  }
  private def addSecure(userName: String, password: String,props: ju.HashMap[String, Object]): ju.HashMap[String, Object] = {
    props.put("security.protocol""SASL_PLAINTEXT")
    props.put("sasl.mechanism""PLAIN")
    props.put("sasl.jaas.config""org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
      userName + "\"  password=\"" + password + "\";")
    props
  }
  def main(args: Array[String]): Unit = {
    val brokers = "kafka 集群列表"
    val topic = "设置的Topic"
    val username = "xxx"
    val password = "xxxxx"
    val groupId = "需要设置的group"
    val kafkaParam = addSecure(username, password, buildDefaultConfiguration(brokers, groupId))
    val manager = OffsetManager(topic, kafkaParam)
//    new KafkaConsumer(new Properties())
    manager.resetOffsetToLatest()
  }
}

转载请注明:SuperIT » kafka 设置消费group为最新offset

喜欢 (6)or分享 (0)