Sunday, May 4, 2014

Apache Camel - EIP Pattern Splitter

The Splitter from the EIP patterns allows you split a message into a number of pieces and process them individually.

You need to specify a Splitter as split(). In earlier versions of Camel, you need to use splitter()

I have given samples how we can implement splitter with Java DSL

Sample 1 - Split the Message Body with @ sign.

 CamelContext context = new DefaultCamelContext();

  ProducerTemplate camelTemplate = context.createProducerTemplate();

  context.addRoutes(new RouteBuilder() {

   @Override
   public void configure() throws Exception {
    // this routes starts from the direct:start endpoint
    // the body is then splitted based on @ separator
    // the splitter in Camel supports InOut as well and for that we
    // need
    // to be able to aggregate what response we need to send back,
    // so we provide our
    // own strategy with the class AggregationStrategy.
    from("direct:start")
      .split(body().tokenize("@"), new AggregationStrategy())
      // each splitted message is then send to this bean where
      // we can process it
      .to("bean:common.SearchRequestService?method=handleOrder")
      // this is important to end the splitter route as we do
      // not want to do more routing
      // on each splitted message
      .end()
      // after we have splitted and handled each message we
      // want to send a single combined
      // response back to the original caller, so we let this
      // bean build it for us
      // this bean will receive the result of the aggregate
      // strategy: AggregationStrategy
      .to("bean:common.SearchRequestService?method=buildCombinedResponse");

   }
  });
  context.start();
  camelTemplate.sendBodyAndHeader("direct:start", "A@B@C",
    "searchCriteria", "headingValue");

  context.stop();

AggregationStrategy Class aggregate method.
 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
  // put hotel search criteria together in old exchange 
  // by adding the search criteria from new exchange

  if (oldExchange == null) {
   // the first time we aggregate we only have the new exchange,
   // so we just return it
   return newExchange;
  }

  String requests = oldExchange.getIn().getBody(String.class);
  String newLine = newExchange.getIn().getBody(String.class);

  logger.info("Aggregate old requests: " + requests);
  logger.info("Aggregate new requests: " + newLine);

  // put Request together separating by semi colon
  requests = requests + ";" + newLine;
  // put combined Request back on old to preserve it
  oldExchange.getIn().setBody(requests);

  // return old as this is the one that has all the Request gathered until
  // now
  return oldExchange;
 }
Sample 2 - Split the message with split().method()
  CamelContext context = new DefaultCamelContext();

  ProducerTemplate camelTemplate = context.createProducerTemplate();
  context.addRoutes(new RouteBuilder() {
   @Override
   public void configure() throws Exception {
    from("direct:start").to("log:+++before+++?showHeaders=true")
      .split().method(MySplitterBean.class, "splitBody")
      .streaming().to("log:+++after+++?showHeaders=true")
      .choice().when(header("foo").contains("bar"))
      .to("mock:mock").otherwise().to("mock:error");
   }
  });
  context.start();
  camelTemplate.sendBodyAndHeader("direct:start", "msg1,msg2", "foo",
    "bar");

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.