Hi,
I have exactly the same issue as you sjakovac. I have also created a ticket at smartbear for this issue. Unfortunately they require my project in order to investigate the issue but since I have lot of firewalls and certificates involved they will not be able to run exactly my project.
I suggest smartbear should do an own test on this and if they can get it to work they can create a webpage where they describe this. I mean, there are lot of people having the same issue.
However, in order to get around this issue I have been investigating some groovy scripting and I found this link which I have modified a bit and now I have a solution which works for me.
https://community.smartbear.com/t5/SoapUI-Open-Source/Need-groovy-script-to-read-messages-from-kafka-topic/td-p/196019
Here is the complete code. Hope it will help other people as well.
Btw, thanks to the guy sharing the original code with us.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
log.info("*************************************************************************")
log.info(" TOPIC NAME ")
log.info("*************************************************************************")
//SAVE RECEIVED DATA
def tsuite1 = testRunner.testCase.testSuite.project.getTestSuiteByName("TEST SUITE NAME");
def tcase1= tsuite1.getTestCaseByName("TEST CASE NAME");
db = tcase1.testSteps['Data Source'].dataSource.gridModel
int row = testRunner.testCase.testSteps['Data Source'].currentRow
def guid = "ABC12345DEF"
//KAFKA SETTINGS
String topic = "MY-TOPIC-NAME"
Properties properties = new Properties();
properties.put("group.id", "test");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("security.protocol","SSL");
properties.put("ssl.keystore.password", "XXX");
properties.put("bootstrap.servers", "XXX:6667");
properties.put("ssl.key.password", "XXX");
properties.put("ssl.truststore.password", "XXX");
properties.put("ssl.truststore.location", "C:\\MyCertificates\\certificate.jks");
properties.put("ssl.keystore.location", "C:\\MyCertificates\\certificate.jks");
//properties.put("session.timeout.ms", "10000");
properties.put("max.poll.records", "5");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic));
long t = System.currentTimeMillis();
long end = t + 9000;
while (System.currentTimeMillis()<end){
log.info("START READING")
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records){
log.info("key = " + record.key())
log.info("value = " + record.value());
if(record.value().contains(guid)){
log.info("found data")
db.setValueAt(topic, row, 1);
db.setValueAt(guid, row, 2);
}
}
log.info("STOP READING")
}
consumer.close();