A producer is a application which produces messages.
Spring boot Kafka Producer Example:
Lets create a Spring boot Kafka Producer Example.
In this tutorial we will not be creating and using any local kafka topics rather we will be using the cloud kafka topics created on conduktor.
Create a spring boot application from Spring Initialzr and add below dependency:
Create a producer java class like below.
In the class you need to set some properties like – servers, protocol, config, SASL etc., to connect to the console of conduktor. These properties details you will get it from the conduktor console.
Then you have to mention the serialization method.
Then create the Producer object and ProducerRecord object.
Then send the data.
@SpringBootApplication public class KafkaWithConduktorApplicationProducer { public static void main(String[] args) { SpringApplication.run(KafkaWithConduktorApplicationProducer.class, args); Properties properties=new Properties(); // set producer properties properties.setProperty("bootstrap.servers", "cluster.playground.cdkt.io:9092"); properties.setProperty("security.protocol", "SASL_SSL"); properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"prasanna\" password=\"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJodHRwczovL2F1dGguY29uZHVrdG9yLmlvIiwic291cmNlQXBwbGljYXRpb24iOiJhZG1pbiIsInVzZXJNYWlsIjpudWxsLCJwYXlsb2FkIjp7InZhbGlkRm9yVXNlcm5hbWUiOiJwcmFzYW5uYSIsIm9yZ2FuaXphdGlvbklkIjo3MDQ2OSwidXNlcklkIjpudWxsLCJmb3JFeHBpcmF0aW9uQ2hlY2siOiJjYTE3OTkxYi1hNDNlLTQxNTktOTQyNS0yYjU3ZDhjMDZlNzEifX0.xUqONoItEtNqos1G6B0_yNoYKZin8vxhZ-ej-2RCdaI\";"); properties.setProperty("sasl.mechanism", "PLAIN"); properties.setProperty("key.serializer", StringSerializer.class.getName()); properties.setProperty("value.serializer", StringSerializer.class.getName()); // create the Producer KafkaProducer<String, String> producer=new KafkaProducer<>(properties); // create a Producer Record ProducerRecord<String, String> producerRecord=new ProducerRecord<>("Heapsteep_second_topic","key 24", "Message 24"); // send data producer.send(producerRecord); // flush and close the producer producer.flush(); producer.close(); } }
Make sure you would have created the mentioned topic by logging in to the conduktor console.
Now, run the application.
You will see 1 message being pushed to the mentioned topic.
After this the application will get terminated automatically.
Source code of this can be found here : https://github.com/heapsteep/kafka-with-conduktor
Now, you should visit the Kafka Consumer tutorial at this link- Kafka Consumer Example Using Spring boot.
Spring boot Kafka Consumer Example:
Lets now create a spring boot project for Kafka Consumer.
You can use the same project that we did in the above example. Add a new java class like below:
@SpringBootApplication public class KafkaWithConduktorApplicationConsumer { public static void main(String[] args) { SpringApplication.run(KafkaWithConduktorApplicationConsumer.class, args); Properties properties=new Properties(); //connect to Kafka Conducktor Playground properties.setProperty("bootstrap.servers", "cluster.playground.cdkt.io:9092"); properties.setProperty("security.protocol", "SASL_SSL"); properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"prasanna\" password=\"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJodHRwczovL2F1dGguY29uZHVrdG9yLmlvIiwic291cmNlQXBwbGljYXRpb24iOiJhZG1pbiIsInVzZXJNYWlsIjpudWxsLCJwYXlsb2FkIjp7InZhbGlkRm9yVXNlcm5hbWUiOiJwcmFzYW5uYSIsIm9yZ2FuaXphdGlvbklkIjo3MDQ2OSwidXNlcklkIjpudWxsLCJmb3JFeHBpcmF0aW9uQ2hlY2siOiJjYTE3OTkxYi1hNDNlLTQxNTktOTQyNS0yYjU3ZDhjMDZlNzEifX0.xUqONoItEtNqos1G6B0_yNoYKZin8vxhZ-ej-2RCdaI\";"); properties.setProperty("sasl.mechanism", "PLAIN"); //set consumer configs properties.setProperty("key.deserializer", StringDeserializer.class.getName()); properties.setProperty("value.deserializer", StringDeserializer.class.getName()); properties.setProperty("group.id", "My Group Id 1"); properties.setProperty("auto.offset.reset", "earliest"); //create a consumer KafkaConsumer<String, String> consumer=new KafkaConsumer<>(properties); //subscribe to a topic consumer.subscribe(Arrays.asList("Heapsteep_second_topic")); // poll for data while (true) { System.out.println("Polling"); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record: records) { System.out.println("Key: " + record.key() + ", Value: " + record.value()); System.out.println("Partition: " + record.partition() + ", Offset: " + record.offset()); } } } }
Just right click on the class and run as java application.
You can see the messages being printed in the console.
Please note that in the above class we had not gracefully shutdown the consumer (which we will be doing in our next step), so the class will keep polling indefinitely.
Source code is available at : https://github.com/heapsteep/kafka-with-conduktor