KafkaProducerConfig.java 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package com.tidecloud.dataacceptance.config;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.apache.kafka.clients.producer.ProducerConfig;
  5. import org.apache.kafka.common.serialization.ByteArraySerializer;
  6. import org.apache.kafka.common.serialization.IntegerSerializer;
  7. import org.apache.kafka.common.serialization.StringSerializer;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.kafka.annotation.EnableKafka;
  12. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  13. import org.springframework.kafka.core.KafkaTemplate;
  14. import org.springframework.kafka.core.ProducerFactory;
  15. /**
  16. * Created by ryan on 18/4/27.
  17. */
  18. @Configuration
  19. @EnableKafka
  20. public class KafkaProducerConfig {
  21. @Value("${spring.kafka.bootstrap-servers}")
  22. private String kafkaBrokers;
  23. @Bean
  24. public ProducerFactory<Integer, byte[]> producerFactory() {
  25. return new DefaultKafkaProducerFactory<>(producerConfigs());
  26. }
  27. @Bean
  28. public ProducerFactory<Integer, String> stringProducerFactory() {
  29. return new DefaultKafkaProducerFactory<>(stringProducerConfigs());
  30. }
  31. @Bean
  32. public Map<String, Object> producerConfigs() {
  33. Map<String, Object> props = new HashMap<>();
  34. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
  35. props.put(ProducerConfig.RETRIES_CONFIG, 2);
  36. props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  37. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  38. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  39. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
  40. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  41. return props;
  42. }
  43. @Bean
  44. public Map<String, Object> stringProducerConfigs() {
  45. Map<String, Object> props = new HashMap<>();
  46. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
  47. props.put(ProducerConfig.RETRIES_CONFIG, 2);
  48. props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  49. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  50. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  51. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
  52. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  53. return props;
  54. }
  55. @Bean
  56. public KafkaTemplate<Integer, byte[]> kafkaTemplate() {
  57. return new KafkaTemplate<Integer, byte[]>(producerFactory());
  58. }
  59. @Bean
  60. public KafkaTemplate<Integer, String> IntegerStringkafkaTemplate() {
  61. return new KafkaTemplate<Integer, String>(stringProducerFactory());
  62. }
  63. }