This is a simple integration I used with springboot, camel, kafka. I had to consume from a topic and do some logic to produce on another topic. I followed the same idea to develop a "topic switcher" that chose on what topic to produce (not in this post but the solution is immediate).
The routebuilder is the main camel class that call "getBody" method on a bean that do the magic. The bean call back the routebuilder to produce on Kafka.
//ROUTEBUILDER:
@Component
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("kafka:{{consumer.topic}}?brokers={{kafka.brokers}}&consumersCount={{consumer.consumersCount}}&groupId={{consumer.group}}")
.routeId("FromKafka")
.log("------- ROUTE BUILDER -------")
.onException(Exception.class)
.log(LoggingLevel.ERROR, "crash-data-stream", "Invalid Input")
.maximumRedeliveries(2).redeliveryDelay(30000)
.end()
.to("bean:externalBean?method=getBody(${body})");
from("direct:myproducer")
.routeId("myproducer")
.to("kafka:{{producer.topic}}?brokers={{kafka.brokers}}")
;
}
}
----------------------------------
//EXTERNAL BEAN
@Service
public class ExternalBean {
@EndpointInject("direct:myproducer")
ProducerTemplate producer;
public void getBody(String body) throws Exception {
//do stuff and send back to camel
myproducer.sendBody(body);
}
}
The routebuilder is the main camel class that call "getBody" method on a bean that do the magic. The bean call back the routebuilder to produce on Kafka.
//ROUTEBUILDER:
@Component
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("kafka:{{consumer.topic}}?brokers={{kafka.brokers}}&consumersCount={{consumer.consumersCount}}&groupId={{consumer.group}}")
.routeId("FromKafka")
.log("------- ROUTE BUILDER -------")
.onException(Exception.class)
.log(LoggingLevel.ERROR, "crash-data-stream", "Invalid Input")
.maximumRedeliveries(2).redeliveryDelay(30000)
.end()
.to("bean:externalBean?method=getBody(${body})");
from("direct:myproducer")
.routeId("myproducer")
.to("kafka:{{producer.topic}}?brokers={{kafka.brokers}}")
;
}
}
----------------------------------
//EXTERNAL BEAN
@Service
public class ExternalBean {
@EndpointInject("direct:myproducer")
ProducerTemplate producer;
public void getBody(String body) throws Exception {
//do stuff and send back to camel
myproducer.sendBody(body);
}
}
Comments