This example shows a sample java class that is integrated into ETL jobs using
Java input Stage and send messages to JMS.
Step 1: Create a java class using the code in the attached file.
Step 2: place the java class file on Data Stage server.
Step 3: Configure java input Stage in ETL jobs. ( Create a row and send some sample record for testing ).
Step 4: ensure the JMS connection factory & URL are correctly encoded into the code.
Example :
import com.ascentialsoftware.jds.Row;
import com.ascentialsoftware.jds.Stage;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
//Import the classes to use JNDI.
import javax.naming.*;
import java.util.*;
public class JmsTest extends Stage
{
public void initialize() {
trace("UpperCaseTarget.initialize");
_rowCount = 0;
_resultWriter = null;
String userProperties = getUserProperties();
try {
_resultWriter = new PrintWriter(new FileWriter(userProperties), true);
} catch (IOException eIO) {
fatal("Cannot open '" + userProperties + "': " + eIO.getMessage());
}
}
public void terminate() {
trace("UpperCaseTarget.terminate");
}
public int process() {
// Read a row, convert all its columns to upper case
// surrounded by double-quotes and delimited by commas,
// and write the result to a file. The file path is given
// in the user's properties of the stage.
_rowCount++;
Row inputRow = readRow();
if (inputRow == null) {
//34 Java Pack Guide
// No row currently available or end of data.
// The function must return but it could be called again later on.
// The stage actually ends when "terminate" is called.
return OUTPUT_STATUS_END_OF_DATA;
}
try {
ConnectionFactory myConnFactory;
Queue myQueue;
String MYCF_LOOKUP_NAME = "Test_ConnectionFactory";
String MYQUEUE_LOOKUP_NAME = "CDC_Queue";
Hashtable env;
Context ctx = null;
env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.webmethods.jms.naming.WmJmsNamingCtxFactory");
env.put(Context.PROVIDER_URL, "wmjmsnaming://Broker #1@<Host Name>:<Port Number>");
ctx = new InitialContext(env);
myConnFactory = (javax.jms.ConnectionFactory) ctx.lookup(MYCF_LOOKUP_NAME);
myQueue = (javax.jms.Queue)ctx.lookup(MYQUEUE_LOOKUP_NAME);
Connection myConn = myConnFactory.createConnection();
Session mySess = myConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer myMsgProducer = mySess.createProducer(myQueue);
TextMessage myTextMsg = mySess.createTextMessage();
myTextMsg.setText("Hello World From ECllipse");
_resultWriter.print("Sending Message: " + myTextMsg.getText());
myMsgProducer.send(myTextMsg);
mySess.close();
myConn.close();
} catch (Exception jmse) {
System.out.println("Exception occurred : " + jmse.toString());
jmse.printStackTrace();
}
// No rows were written since there is no output link.
return OUTPUT_STATUS_NOT_READY;
}
private int _rowCount;
private PrintWriter _resultWriter;
}
Step 1: Create a java class using the code in the attached file.
Step 2: place the java class file on Data Stage server.
Step 3: Configure java input Stage in ETL jobs. ( Create a row and send some sample record for testing ).
Step 4: ensure the JMS connection factory & URL are correctly encoded into the code.
Example :
import com.ascentialsoftware.jds.Row;
import com.ascentialsoftware.jds.Stage;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
//Import the classes to use JNDI.
import javax.naming.*;
import java.util.*;
public class JmsTest extends Stage
{
public void initialize() {
trace("UpperCaseTarget.initialize");
_rowCount = 0;
_resultWriter = null;
String userProperties = getUserProperties();
try {
_resultWriter = new PrintWriter(new FileWriter(userProperties), true);
} catch (IOException eIO) {
fatal("Cannot open '" + userProperties + "': " + eIO.getMessage());
}
}
public void terminate() {
trace("UpperCaseTarget.terminate");
}
public int process() {
// Read a row, convert all its columns to upper case
// surrounded by double-quotes and delimited by commas,
// and write the result to a file. The file path is given
// in the user's properties of the stage.
_rowCount++;
Row inputRow = readRow();
if (inputRow == null) {
//34 Java Pack Guide
// No row currently available or end of data.
// The function must return but it could be called again later on.
// The stage actually ends when "terminate" is called.
return OUTPUT_STATUS_END_OF_DATA;
}
try {
ConnectionFactory myConnFactory;
Queue myQueue;
String MYCF_LOOKUP_NAME = "Test_ConnectionFactory";
String MYQUEUE_LOOKUP_NAME = "CDC_Queue";
Hashtable env;
Context ctx = null;
env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.webmethods.jms.naming.WmJmsNamingCtxFactory");
env.put(Context.PROVIDER_URL, "wmjmsnaming://Broker #1@<Host Name>:<Port Number>");
ctx = new InitialContext(env);
myConnFactory = (javax.jms.ConnectionFactory) ctx.lookup(MYCF_LOOKUP_NAME);
myQueue = (javax.jms.Queue)ctx.lookup(MYQUEUE_LOOKUP_NAME);
Connection myConn = myConnFactory.createConnection();
Session mySess = myConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer myMsgProducer = mySess.createProducer(myQueue);
TextMessage myTextMsg = mySess.createTextMessage();
myTextMsg.setText("Hello World From ECllipse");
_resultWriter.print("Sending Message: " + myTextMsg.getText());
myMsgProducer.send(myTextMsg);
mySess.close();
myConn.close();
} catch (Exception jmse) {
System.out.println("Exception occurred : " + jmse.toString());
jmse.printStackTrace();
}
// No rows were written since there is no output link.
return OUTPUT_STATUS_NOT_READY;
}
private int _rowCount;
private PrintWriter _resultWriter;
}
No comments:
Post a Comment