Hi Team,
Below is the code I'm using for now.
String topic = "sample";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
log.info("Subscribed to topic " + topic);
long t = System.currentTimeMillis();
long end = t + 9000;
while (System.currentTimeMillis()<end)
{
ConsumerRecords<String, String> records = consumer.poll(9000);
for (ConsumerRecord<String, String> record : records)
{
log.info ( "offset = " + record.offset() +" value = " + record.value());
}
consumer.commitSync();
}
consumer.close();
The test step neither returns error message nor returns the kafka mewssage. instead iot executes for sometimes and exits without returning anything.
Can anyone help me in resolving this query? Appreciate your help in advance.