Topics: |
iWay Service Manager listeners receive messages on an appropriate protocol, and pass them on to the document processing sequence. Good practice dissociates the receipt of the message of its handling, which is performed by the processing sequence.
Listeners always consist of two classes:
The example shown is for a channel master/worker pair that watches clock ticks. While functionally and syntactically correct, it is simplified for example use.
The developed channel master/worker pair directly extends XDMaster and XDWorker, which are documented in the supplied Javadoc. iWay strongly discourages direct extension of superclassed protocols such as XDFileMaster and XDFileWorker should you need to develop your own variation. The implementation of specific protocols can vary from release to release and direct extension can result in your protocol not performing correctly as releases change.
package com.ibi.edaqm; import java.util.*; import java.net.*; import java.io.*; import com.ibi.config.MasterInfoTick; public class XDTickMaster extends XDMaster implements Runnable { public XDTickMaster() { init("Tick"); } public XDTickMaster (HashMap xdm, XDManager mgr, XDControl xdc) { super(xdm, mgr, xdc); // store properties and set common values init("Tick"); } private static final int TREE=0; private static final int BYTES=1; private byte[] inputArray= null; private XDNode treeRoot= null; private String fileName = null; private int inputType = 0; /** * Initialize the master (channel). Gather parms * and check that the master can run. * * @exception XDException */ public void init() throws XDException { initGlobalFields(); // fills in general parameter we need for execution fileName = property(MasterInfoTick.FILE); String t = property(MasterInfoTick.INTYPE);
if (!isPresent(t)) { t="tree"; } if (t.equals("tree")) { inputType = TREE; } else if (t.equals("bytes")) { inputType=BYTES; } else { throw new XDException("Specified input type '"+t+"' not valid"); } if (isPresent(fileName)) { File f = new File(fileName); if (!f.exists()) { throw new XDException("Specified file "+fileName+" does not exist"); } if (inputType == TREE) { try { XDParser parser = new XDParser(); parser.parseIt(new FileInputStream(f)); treeRoot = parser.getResult(); parser.reset(); parser=null; } catch (Exception e) { throw new XDException("Specified file "+fileName+"cannot be parsed: "+e); } }
else { try { inputArray = XD.getFileContentsByte(f); } catch (Exception e) { throw new XDException("Specified file "+fileName+"cannot be loaded: "+e); } } f=null; } if (treeRoot == null) { treeRoot = new XDNode("tick"); A simple root node fileName="default"; } logger.info("Tick listener started"); } /** * The run loop. Process a message and await the next one */ public void run() { boolean firsttime = true; for (;!stopActivity;) { try { if (!firsttime) { if (awaitTimeoutCycle()) break; } firsttime = false; XDTickWorker w = null; w = (XDTickWorker) waitForWorker(); // hold on until we have a worker if (inputType == TREE) { XDNode myRoot = XDNode.cloneTree(treeRoot); w.setRecord(myRoot,fileName); } else if (inputType == BYTES) { byte[] b = new byte[inputArray.length]; System.arraycopy(inputArray,0,b,0, inputArray.length); w.setRecord(b,fileName); } } catch (Exception e) { logger.debug("Tick - Exception in run loop: "+e); } } setState(ST_STOPPED); }
/** * This call is made by the manager after init() to prepare * for running * * @exception Exception If error occurs */ public void startMaster() throws Exception { setupMaster(); // call to start statistics, etc for (int i=0; i<autoStartWorkers;i++ ) { makeOneWorker(); } } /** * The master(channel) is to stop. This allows the master to * close operations, stop listening, and control whether the * workers should "spin out" any remaining work. Unless you need to do * a "spin out" you do not need this call. * * @return Reserved for future use. Always return zero. */ public int stopAll() { super.stopAll(); // first shut everything down return 0; } }
For the example XDTickMaster:
boolean isRunnable() { returns false; }
The default is true.
package com.ibi.edaqm; import java.io.*; import java.net.*; import java.util.*; import com.ibi.common.ISpecRegManager; public class XDTickWorker extends XDWorker { private XDMaster Master; XDTickWorker(XDTickMaster master) throws Exception { super(master); this.Master = master; } void resetWorker() { resetBaseWorker(); isBusy = false; makeWorkerAvailable(); } public void run() { int rc=0; for (;;) { if (stopFlag || Master.stopActivity) { break; } try { try { waitForDocument(); if (stopFlag) // in case stopFlag set while waiting { break; } }
catch (InterruptedException e) { logger.error("Exception waiting for document: "+e); resetWorker(); continue; } xddIn.reset(); rc = XDException.SUCCESS; try { parseAndValidate(xddIn); } catch (XDException e) { rc = e.getType(); // what class of exception do we have? logger.error("Exception from ParseAndValidate rc="+rc+": " + e); rc = XDException.FAIL; } catch (Exception e) { logger.error("Exception from ParseAndValidate: " + e,e); rc = XDException.FAIL; } if (rc == XDException.FAIL) // was an error created? { resetWorker(); continue; } else if (rc == XDException.PENDED) // document already sent to pending queue, finished for now { resetWorker(); continue; } rc = invokeAgent(); resetWorker(); } catch (Exception e) { logger.error("Exception during Tick Worker loop: "+e,e); resetWorker(); continue; } }
if (stopFlag) { markStopped(); logger.info("Tick worker stop"); } } /** * This stop method completes any operation in progress and causes * the worker to terminate operations. */ public synchronized void stop() { logger.debug("Tick stop received"); stopFlag = true; notify(); // notify myself to break wait for a message */ } /** * Accept a message for working. * @param Root of the tree that constitutes this message * @param Name of the file from which the tree was originally taken * */ synchronized void setRecord(XDNode root,String fileName) { orgdata.store(root); ISpecRegManager srm = getSRM(); // get the context Special Register Manager srm.storeSpecialRegister("source",fileName,XDSpecReg.USER); srm.storeSpecialRegister("type","tree",XDSpecReg.USER); isBusy = true; notify(); } synchronized void setRecord(byte[] input ,String fileName) { orgdata.store(input); ISpecRegManager srm = getSRM(); // get the context Special Register Manager srm.storeSpecialRegister("source",fileName,XDSpecReg.USER); srm.storeSpecialRegister("type","array",XDSpecReg.USER); isBusy = true; notify(); }
/** * Wait for a document to arrive or for the stop to be requested. * * @exception InterruptedException */ synchronized void waitForDocument() throws InterruptedException { while (!isBusy && !stopFlag) { try { wait(250); // wake up and see if a stop is requested } catch (InterruptedException e) { logger.debug("waitforClockingDocument interrupted"); break; } } } }
For the example XDTickWorker: