南京网站建设招标,做网站想注册商标是哪一类,网站如何建设数据库,渠道网站server:port: 8080
spring:kafka:bootstrap-servers: 192.168.79.104:9092producer: # 生产者retries: 3 # 设置大于 0 的值#xff0c;则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息体的编解码方式key-serializer:…server:port: 8080
spring:kafka:bootstrap-servers: 192.168.79.104:9092producer: # 生产者retries: 3 # 设置大于 0 的值则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500listener:# 当每一条记录被消费者监听器ListenerConsumer处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器ListenerConsumer处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器ListenerConsumer处理之后距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器ListenerConsumer处理之后被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交一般使用这种# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATEredis:host: 192.168.79.104port: 6379password: 123321lettuce:pool:max-active: 10max-idle: 10min-idle: 1time-between-eviction-runs: 10s
Configuration
public class KafkaProducerConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Beanpublic MapString, Object producerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}Beanpublic ProducerFactoryString, String producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs());}Beanpublic KafkaTemplateString, String kafkaTemplate() {return new KafkaTemplate(producerFactory());}}RestController
public class KafkaController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;PostMapping(/send)public void sendMessage(RequestBody String message) {kafkaTemplate.send(my-topic, message);}}Configuration
EnableKafka
public class KafkaConsumerConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Value(${spring.kafka.consumer.group-id})private String groupId;Beanpublic MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}Beanpublic ConsumerFactoryString, String consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}Beanpublic ConcurrentKafkaListenerContainerFactoryString, String kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryString, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());return factory;}}Service
public class KafkaConsumer {KafkaListener(topics my-topic, groupId default-group)public void consume(String message) {System.out.println(Received message: message);}}在上面的代码中我们使用 KafkaListener 注解声明了一个消费者方法用于接收从 my-topic 主题中读取的消息。在这里我们将消费者组 ID 设置为default-group。 现在我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run 命令启动应用程序并使用 curl 命令发送 POST 请求到 http://localhost:8080/send 端点以将消息发送到 Kafka。然后我们可以在控制台上查看消费者接收到的消息。 这就是使用 Spring Boot 和 Kafka 的基本设置。我们可以根据需要进行更改和扩展以满足特定的需求。