代码拉取完成,页面将自动刷新
简化kafka client操作
使用约定和一些设计模式来简化kafka client操作
class Demo{
public static void demo() {
MessageProducer producer = new MessageProducer("server1:9092,server2:9092", "test");
producer.sendMessage("hello", "hello", resp -> {
logger.info(resp.toString());
});
}
}
class Demo {
public static void demo() {
String servers = "server1:9092,server2:9092";
String topic = "test";
String groupId = "test";
logger.info("starting test consumer, servers: {}, topics: {}", servers, topic);
MessageConsumer consumer = new MessageConsumer(
servers,
groupId,
false,
Collections.singletonList(topic));
MessageConsumer consumer1 = new MessageConsumer(
servers,
groupId,
false,
Collections.singletonList(topic)
);
Consumer<ConsumerRecords<String, String>> consumerRecordsConsumer = rs -> {
Iterable<ConsumerRecord<String, String>> records = rs.records(topic);
for (ConsumerRecord<String, String> record : records) {
logger.info("receive message, key: {}, value: {}, partition: {}, timestamp: {}, offset: {}",
record.key(),
record.value(),
record.partition(),
record.timestamp(),
record.offset()
);
}
};
consumer.listening(consumerRecordsConsumer);
MessageProducer producer = new MessageProducer(servers, topic);
HashMap<Object, Object> data = new HashMap<>();
data.put("1","2");
data.put("3","4");
data.put("5","6");
producer.sendMessage(
"msg key 1", data,
r -> logger.info("send message, offset: {}, timestamp: {}, partition: {}", r.offset(), r.timestamp(), r.partition())
);
logger.info("message receive test completed, servers: {}, topics: {}", servers, topic);
sleep(5000);
logger.info("consumer suspend test start, servers: {}, topics: {}", servers, topic);
consumer.suspend();
logger.info("consumer suspend test completed, servers: {}, topics: {}", servers, topic);
sleep(5000);
logger.info("consumer terminal test start, servers: {}, topics: {}", servers, topic);
consumer.terminal();
logger.info("consumer terminal test completed, servers: {}, topics: {}", servers, topic);
sleep(5000);
logger.info("new consumer join test start, servers: {}, topics: {}", servers, topic);
consumer1.listening(consumerRecordsConsumer);
data.put("7", "7");
producer.sendMessage("msg key 2", data);
logger.info("new consumer join test completed, servers: {}, topics: {}", servers, topic);
sleep(5000);
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。