Monday, September 1, 2014

Making JMS Connection from DataStage ETL Jobs

This example shows a sample java class that is integrated into ETL jobs using Java input Stage and send messages to JMS.
Step 1: Create a java class using the code in the attached file.
Step 2: place the java class file on Data Stage server.
Step 3: Configure java input Stage in ETL jobs. ( Create a row and send some sample record for testing ).
Step 4: ensure the JMS connection factory & URL are correctly encoded into the code.


Example :

import com.ascentialsoftware.jds.Row;
import com.ascentialsoftware.jds.Stage;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
//Import the classes to use JNDI.
import javax.naming.*;

import java.util.*;

public class JmsTest extends Stage
{
public void initialize() {
trace("UpperCaseTarget.initialize");
_rowCount = 0;
_resultWriter = null;
String userProperties = getUserProperties();
try {
_resultWriter = new PrintWriter(new FileWriter(userProperties), true);
} catch (IOException eIO) {
fatal("Cannot open '" + userProperties + "': " + eIO.getMessage());
}
}
public void terminate() {
trace("UpperCaseTarget.terminate");
}
public int process() {
// Read a row, convert all its columns to upper case
// surrounded by double-quotes and delimited by commas,
// and write the result to a file. The file path is given
// in the user's properties of the stage.
_rowCount++;
Row inputRow = readRow();
if (inputRow == null) {
//34 Java Pack Guide
// No row currently available or end of data.
// The function must return but it could be called again later on.
// The stage actually ends when "terminate" is called.
return OUTPUT_STATUS_END_OF_DATA;
}

try {

    ConnectionFactory myConnFactory;
    Queue myQueue;
        String MYCF_LOOKUP_NAME = "Test_ConnectionFactory";
        String MYQUEUE_LOOKUP_NAME = "CDC_Queue";

        Hashtable env;
        Context ctx = null;

        env = new Hashtable();

        env.put(Context.INITIAL_CONTEXT_FACTORY, "com.webmethods.jms.naming.WmJmsNamingCtxFactory");
        env.put(Context.PROVIDER_URL, "wmjmsnaming://Broker #1@<Host Name>:<Port Number>");

        ctx = new InitialContext(env);

        myConnFactory = (javax.jms.ConnectionFactory) ctx.lookup(MYCF_LOOKUP_NAME);

        myQueue = (javax.jms.Queue)ctx.lookup(MYQUEUE_LOOKUP_NAME);

    Connection myConn = myConnFactory.createConnection();

    Session mySess = myConn.createSession(false, Session.AUTO_ACKNOWLEDGE);

    MessageProducer myMsgProducer = mySess.createProducer(myQueue);

    TextMessage myTextMsg = mySess.createTextMessage();
    myTextMsg.setText("Hello World From ECllipse");
    _resultWriter.print("Sending Message: " + myTextMsg.getText());
    myMsgProducer.send(myTextMsg);

    mySess.close();
    myConn.close();

} catch (Exception jmse) {
    System.out.println("Exception occurred : " + jmse.toString());
    jmse.printStackTrace();
}
// No rows were written since there is no output link.
return OUTPUT_STATUS_NOT_READY;
}
private int _rowCount;
private PrintWriter _resultWriter;
}

No comments:

Post a Comment