Sunday, October 20, 2013

Custom mediator for WSO2 ESB

There are some situations in our deployment scenarios, which it is required to do a custom transformation of incoming messages. This custom tasks are sometimes can not be achieved with the inbuilt mediators in WSO2 Enterprise Service Bus. So , in those kind of scenarios , we ll have to write a custom mediator to achieve this. With this post , I am explaining how to write a custom mediator for WSO2 ESB.

Writing a custom mediator for ESB , means we are writing a class meditor for ESB which contains the required logic in the mediate() method of the class. When writing a class mediator, there is  a basic step follow to be compatible to deploy in WSO2 ESB.  We need to extend our mediator from the abstract class "org.apache.synapse.mediators.AbstractMediator". In order to do that , you will have to add a dependency to your project for  synapse-core as follows;



    <dependencies>
        <dependency>
            <groupId>org.apache.synapse</groupId>
            <artifactId>synapse-core</artifactId>
            <version>${synpase.core.version}</version>
        </dependency>
    </dependencies>


Before digging in to the code it self, let me explain , what i am going to achieve with this custom mediator.

I have following incoming message ;


         <dat:addJobWorkLocation xmlns:dat="http://ws.wso2.org/dataservice">
            <dat:job_id>2037826</dat:job_id>
            <dat:preferred_city_code>,CMB,GM,KND,</dat:preferred_city_code>
            <dat:country_code>SL</dat:country_code>
         </dat:addJobWorkLocation>

 

From this incoming message , i need to generate messages with the same type by splitting the comma separated string passed with the local name "preferred city code". After generating , it should be like follows,




         <dat:addJobWorkLocation xmlns:dat="http://ws.wso2.org/dataservice">
            <dat:job_id>2037826</dat:job_id>
            <dat:preferred_city_code>CMB</dat:preferred_city_code>
            <dat:country_code>SL</dat:country_code>
         </dat:addJobWorkLocation>



         <dat:addJobWorkLocation xmlns:dat="http://ws.wso2.org/dataservice">
            <dat:job_id>2037826</dat:job_id>
            <dat:preferred_city_code>GM</dat:preferred_city_code>
            <dat:country_code>SL</dat:country_code>
         </dat:addJobWorkLocation>



         <dat:addJobWorkLocation xmlns:dat="http://ws.wso2.org/dataservice">
            <dat:job_id>2037826</dat:job_id>
            <dat:preferred_city_code>KND</dat:preferred_city_code>
            <dat:country_code>SL</dat:country_code>
         </dat:addJobWorkLocation>






Since we do not have an inbuilt mediator to fulfill this task, i am writing a custom mediator achieve that.
Name of my custom mediator is "SmartSplitMediator" and i am extending it from the AbstractMediator as follows.

public class SmartSplitMediator extends AbstractMediator {
...
}


Once i extended my class with AbstractMediator,  it is essential to implement the methods in the AbstractMediator class. So , i am implementing the method mediate in this class as follows.


public class SmartSplitMediator extends AbstractMediator {
...


   public boolean mediate(MessageContext messageContext) {
   ...
  }


...
}


Then i need to write my logic inside this mediate method. To achieve my target , i need to pass some variables in this class. So, for that task , i need to define those variables as class variables and add getters and setters for those variables. For this particular case, i need following variables.

  • parentLocalName
  • parentNamespace
  • parentNamespacePrefix
  • IteratingElementLocalName
  • variableStringLocalName
  • constantStringsLocalNames


I am adding these variables to my class as follows;


public class SmartSplitMediator extends AbstractMediator {

    private String parentLocalName;
    private String parentNamespace;
    private String parentNamespacePrefix;
    private String IteratingElementLocalName;
    private String variableStringLocalName;
    private String constantStringsLocalNames;
  
    public boolean mediate(MessageContext messageContext) {
        ...
    }

    public String getParentLocalName() {
        return parentLocalName;
    }

    public void setParentLocalName(String parentLocalName) {
        this.parentLocalName = parentLocalName;
    }

    public String getParentNamespace() {
        return parentNamespace;
    }

    public void setParentNamespace(String parentNamespace) {
        this.parentNamespace = parentNamespace;
    }

    public String getParentNamespacePrefix() {
        return parentNamespacePrefix;
    }

    public void setParentNamespacePrefix(String parentNamespacePrefix) {
        this.parentNamespacePrefix = parentNamespacePrefix;
    }

    public String getIteratingElementLocalName() {
        return IteratingElementLocalName;
    }

    public void setIteratingElementLocalName(String iteratingElementLocalName) {
        IteratingElementLocalName = iteratingElementLocalName;
    }

    public String getVariableStringLocalName() {
        return variableStringLocalName;
    }

    public void setVariableStringLocalName(String variableStringLocalName) {
        this.variableStringLocalName = variableStringLocalName;
    }

    public String getConstantStringsLocalNames() {
        return constantStringsLocalNames;
    }

    public void setConstantStringsLocalNames(String constantStringsLocalNames) {
        this.constantStringsLocalNames = constantStringsLocalNames;
    }
}



Then the remaining part is to write the logic inside the mediate method. With the above passed in variables i have the following logic inside my mediate method.




    public boolean mediate(MessageContext messageContext) {

        SOAPFactory fac = OMAbstractFactory.getSOAP11Factory();
        SOAPBody soapBody = messageContext.getEnvelope().getBody();
        OMElement parentElement = (OMElement) soapBody.getFirstElement();
        String variableString = parentElement.getFirstChildWithName(new QName(getParentNamespace(),getVariableStringLocalName(),getParentNamespacePrefix())).getText();
      
//Here we are tokenizing the incoming string from commas
  StringTokenizer variableTokenizer = new StringTokenizer(variableString,",");

        OMElement newParentElement = fac.createOMElement(new QName(getParentNamespace(), getParentLocalName(), getParentNamespacePrefix()));
        while (variableTokenizer.hasMoreTokens()) {
            String variableValue = variableTokenizer.nextToken();
            if (variableValue.length() > 0) {
                OMElement secondLevelElement = fac.createOMElement(new QName(getParentNamespace(), getIteratingElementLocalName(), getParentNamespacePrefix()));
                StringTokenizer tokenizer = new StringTokenizer(getConstantStringsLocalNames(), ",");
                while (tokenizer.hasMoreTokens()) {
                    String constantLocalName = tokenizer.nextToken();
                    OMElement jobIdElement;
                    if (constantLocalName.equalsIgnoreCase(getVariableStringLocalName())) {
                        jobIdElement = fac.createOMElement(new QName(getParentNamespace(), constantLocalName, getParentNamespacePrefix()));
                        jobIdElement.setText(variableValue);
                    } else {
                        jobIdElement = fac.createOMElement(new QName(getParentNamespace(), constantLocalName, getParentNamespacePrefix()));
                        jobIdElement.setText(parentElement.getFirstChildWithName(new QName(getParentNamespace(),constantLocalName,getParentNamespacePrefix())).getText());
                    }
                    secondLevelElement.addChild(jobIdElement);
                }
                newParentElement.addChild(secondLevelElement);
            }
        }
//Now we have completed the preparation of the new body, We need to detach the existing body and attach the new body. We are doing it here.
        SOAPBody body = messageContext.getEnvelope().getBody();
        if (body.getFirstElement() != null) {
            body.getFirstElement().detach();
        }
        body.addChild(newParentElement);

        return true;
    }




After completing the logic in the above method we are done in implementation. Now we need to compile this mediator and deploy it in the  wso2esb-4.x.x/repository components/lib directory to be used in any of the sequences.

Once you deploy it in the above directory , you can use it as follows in any of the sequence;


     <inSequence>
            <log level="full"/>
            <class name="com.js.mediator.split.SmartSplitMediator">
               <property name="variableStringLocalName" value="preferred_city_code"/>
               <property name="parentLocalName" value="WorkAuthorization"/>
               <property name="constantStringsLocalNames" value="preferred_city_code,
country_code"/>

               <property name="parentNamespacePrefix" value="dat"/>
               <property name="parentNamespace" value="http://ws.wso2.org/dataservice"/>
               <property name="IteratingElementLocalName" value="addJobWorkLocation"/>
            </class>
            <log level="full"/>
            <iterate xmlns:dat="http://ws.wso2.org/dataservice"
                     id="foo"
                     expression="//dat:WorkAuthorization/dat:addJobWorkLocation"
                     sequential="true">
               <target>
                  <sequence>
                     <log level="full"/>
                     <drop/>
                  </sequence>
               </target>
            </iterate>
         </inSequence>   

   

Here i am attaching following items with related to this blog post.



Monday, September 23, 2013

How to detect CPU loading issues with JTOP , Jconsole plugin


Why we need JTOP plugin ?

With JConsole , we can detect that there are high CPU load. But it does not tell what are the possible root causes for this CPU load. So , with JTOP plugin , it lists all the top level CPU usages in the process.

How to use JTOP plugin with JConsole ?

When starting the JConsole, you need to give the path to the JTop plugin. 

Eg: My JavaHome is : /opt/software/jdk1.6.0_29

$jconsole -pluginpath /opt/software/jdk1.6.0_29/demo/management/JTop/JTop.jar

General way to start the JConsole with JTop is described in Evanthika's Blog.

Why i wanted to use JTOP?

When i was testing my server, i experienced a high CPU usage with that server. So i wanted to isolate the issue i have in that server. So i started the JConsole with JTop plugin and my out put was as in following image.




With that i could find that , CPU is highly consumed by these threads. So when i looked in to the thread with "Thread" tab in JConsole, i could identify the root cause for the problem.




So, to identify the root cause of the CPU overloading issue, JTOP plugin helped me a lot.

Monday, July 29, 2013

WSO2 Data Services Server (DSS) not consuming messages from JMS Queues

As we know , we can integrate wso2 data services server with wso2 message broker , in a way that Data services server consumes messages from the queues of the Message  broker.

When it comes to large scale like, more than 100 data services consume messages from Message broker, it will throw following exception and will not start to consume messages from queues.

WARN {org.apache.axis2.transport.jms.JMSListener} -  Polling tasks on destination : xxx of type queue for service xxx have not yet started after 3 seconds ..


The reason behind this warning and the situation is, controlled limitation of the system. In Data Services   server, it has been limited the number of threads which consume messages from jms queues from the implementation to 100.

It has been done by adding a configurable parameter to the start up script ( wso2server.sh) of the server. By default the value of the parameter snd_t_core=100. If any one getting above exception in a situation like more than 100 consumers, he can increase this parameter to overcome that situation as highlighted bellow;



    -Dcom.sun.jndi.ldap.connect.pool.authentication=simple  \
    -Dcom.sun.jndi.ldap.connect.pool.timeout=3000  \
    -Dorg.terracotta.quartz.skipUpdateCheck=true \
    -Dsnd_t_core=200 \
    -Dsnd_t_max=250 \
    -Djava.security.egd=file:/dev/./urandom \


Tuesday, July 23, 2013

How to start SimpleAxis2Server in debug mode..



In most of the wso2 products , we could find the simple axis2 server shipped in it's samples directory. I had a situation which i wanted to debug an axis2 service deployed in this simple axis2 server.


Yes, It is very simple thing to start the simple axis2 server in debug mode..... But i had to dig in to the start up script to find out how to do that.

we can start the simple axis2 server in debug  mode with following:

sh axis2server.sh  -xdebug

it will start the server in debug mode with listening to the port 8000

Tuesday, February 19, 2013

UnsupportedOperationException in Creating Topics in WSO2 Message Broker


Some times we are getting the following error when try to create topics in WSO2 Message Broker.


Exception in thread "main" java.lang.UnsupportedOperationException: The new addressing based sytanx is not supported for AMQP 0-8/0-9 versions

at org.wso2.andes.client.AMQSession_0_8.handleAddressBasedDestination(AMQSession_0_8.java:572)
at org.wso2.andes.client.AMQSession.registerConsumer(AMQSession.java:2838)
at org.wso2.andes.client.AMQSession.access$500(AMQSession.java:117)
at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:2031)
at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:1997)
at org.wso2.andes.client.AMQConnectionDelegate_8_0.executeRetrySupport(AMQConnectionDelegate_8_0.java:305)
at org.wso2.andes.client.AMQConnection.executeRetrySupport(AMQConnection.java:621)
at org.wso2.andes.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:102)
at org.wso2.andes.client.AMQSession.createConsumerImpl(AMQSession.java:1995)
at org.wso2.andes.client.AMQSession.createExclusiveConsumer(AMQSession.java:976)
at org.wso2.andes.client.AMQSession.createSubscriber(AMQSession.java:1443)
at org.wso2.andes.client.AMQTopicSessionAdaptor.createSubscriber(AMQTopicSessionAdaptor.java:63)
at Subscriber.subscribe(Subscriber.java:52)





The reason for above exception is an implementation limitation. As we know wso2 Message Broker
is supporting AMQP 0-91 Specification and the core of the wso2 Message Broker is Andes which uses the apache-qpid in transport level communication.

In Andes implementation , there is a limitation that, it does not support dynamic queues or topics. If some one needs to use dynamic topics or queues , there is a solution for that.

Normally We create a topic as follows:


session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
TopicSubscriber subscriber = session.createSubscriber(topic);

But if we do this with wso2 Message broker, we get the above exception. 

So we can get rid of this by adding the "BURL" syntax, before the name of the topic. It is as


session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("BURL:"+topicName);
TopicSubscriber subscriber = session.createSubscriber(topic);