Writing an iWay Service Manager Listener

Topics:

iWay Service Manager listeners receive messages on an appropriate protocol, and pass them on to the document processing sequence. Good practice dissociates the receipt of the message of its handling, which is performed by the processing sequence.

Listeners always consist of two classes:

The example shown is for a channel master/worker pair that watches clock ticks. While functionally and syntactically correct, it is simplified for example use.

The developed channel master/worker pair directly extends XDMaster and XDWorker, which are documented in the supplied Javadoc. iWay strongly discourages direct extension of superclassed protocols such as XDFileMaster and XDFileWorker should you need to develop your own variation. The implementation of specific protocols can vary from release to release and direct extension can result in your protocol not performing correctly as releases change.

XDTickMaster

package com.ibi.edaqm;
import java.util.*;
import java.net.*;
import java.io.*;
import com.ibi.config.MasterInfoTick;
public class XDTickMaster extends XDMaster implements Runnable
{
  public XDTickMaster()
  {
    init("Tick");
  }
  public XDTickMaster (HashMap xdm, XDManager mgr, XDControl xdc)
  {
    super(xdm, mgr, xdc);   // store properties and set common values
    init("Tick");
  }
  private static final int TREE=0;
  private static final int BYTES=1;
  private byte[]  inputArray= null;
  private XDNode  treeRoot= null;
  private String fileName = null;
  private int inputType = 0;
  /**
   * Initialize the master (channel). Gather parms
   * and check that the master can run. 
   * 
   * @exception XDException
   */
  public void init() throws XDException
  {
    initGlobalFields(); // fills in general parameter we need for execution
    fileName = property(MasterInfoTick.FILE);
    String t = property(MasterInfoTick.INTYPE);
    if (!isPresent(t))
    {
      t="tree";
    }
    if (t.equals("tree"))
    {
      inputType = TREE;
    }
    else if (t.equals("bytes"))
    {
      inputType=BYTES;
    }
    else
    {
       throw new XDException("Specified input type '"+t+"' not valid");
    }
    if (isPresent(fileName))
    {
      File f = new File(fileName);
      if (!f.exists())
      {
        throw new XDException("Specified file "+fileName+" does not exist");
      }
      if (inputType == TREE)
      {
        try
        {
          XDParser parser = new XDParser();
          parser.parseIt(new FileInputStream(f));
          treeRoot = parser.getResult();
          parser.reset();
          parser=null;
        }
        catch (Exception e)
        {
          throw new XDException("Specified file "+fileName+"cannot be parsed: "+e);
        }
      }
      else
      {
        try
        {
          inputArray = XD.getFileContentsByte(f);
        }
        catch (Exception e)
        {
          throw new XDException("Specified file "+fileName+"cannot be loaded: "+e);
        }
      }
      f=null;
    }
    if (treeRoot == null)
    {
      treeRoot = new XDNode("tick"); A simple root node
      fileName="default";
    }
    logger.info("Tick listener started");
  }   
  /**
   * The run loop. Process a message and await the next one
   */
  public void run()
  {   
    boolean firsttime = true;
    for (;!stopActivity;)
    {
      try
      {
        if (!firsttime)
        {
          if (awaitTimeoutCycle())
          break;
        }
        firsttime = false;
        XDTickWorker w = null;
        w = (XDTickWorker) waitForWorker();  // hold on until we have a worker
        if (inputType == TREE)
        {
          XDNode myRoot = XDNode.cloneTree(treeRoot);
          w.setRecord(myRoot,fileName);
        }
        else if (inputType == BYTES)
        {
          byte[] b = new byte[inputArray.length];
          System.arraycopy(inputArray,0,b,0, inputArray.length);
          w.setRecord(b,fileName);
         }
       }
       catch (Exception e)
       {
         logger.debug("Tick - Exception in run loop: "+e);
       }
     }
     setState(ST_STOPPED);   
  }   
  /**
   * This call is made by the manager after init() to prepare
   * for running
   * 
   * @exception Exception If error occurs
   */
  public void startMaster() throws Exception
  {
    setupMaster();  // call to start statistics, etc
    for (int i=0; i<autoStartWorkers;i++ )
    {
      makeOneWorker();
    }
  } 
  /**
   * The master(channel) is to stop. This allows the master to
   * close operations, stop listening, and control whether the
   * workers should "spin out" any remaining work. Unless you need to do
   * a "spin out" you do not need this call.
   * 
   * @return Reserved for future use. Always return zero.
   */
  public int stopAll()   
  {
    super.stopAll(); // first shut everything down
    return 0;
  }
}

For the example XDTickMaster:

  1. Tell iWay Service Manager your protocol.
  2. Initialize the master initGlobalFields() to load common properties, then load your own. All properties are visible to you by the property() call.
  3. Start up the routine. If you throw an exception, iWay Service Manager retries your master later.
  4. The manager starts a thread to do the work. If you want the manager to simply call your run method without giving it a separate thread, add the metadata method:
    boolean isRunnable()
    {
        returns false;
    }

    The default is true.

  5. When the manager needs to stop your master, it calls two methods. The super.stopAll() gets the stopActivity flag in XDMaster, which this code tests at the top of its run loop.
  6. awaitTimeoutCycle() helps you poll by "sleeping" for the timeout period established in your configuration. Each second it checks for the stopActivity flag, and returns immediately if it is set. You can provide your own logic here to configure the loop. awaitTimeoutCycle() is a convenience, but it is not required.

XDTickWorker

package com.ibi.edaqm;
import java.io.*;
import java.net.*;
import java.util.*;
import com.ibi.common.ISpecRegManager;
public class XDTickWorker extends XDWorker
{
  private XDMaster Master;
  XDTickWorker(XDTickMaster master) throws Exception
  {
    super(master);
    this.Master = master;
  }
  void resetWorker()
  {
    resetBaseWorker();
    isBusy = false;
    makeWorkerAvailable();
  }
  public void run()
  {
    int  rc=0;
    for (;;)
    {
      if (stopFlag || Master.stopActivity)
      {
        break;
      }
      try
      {
        try
        {
          waitForDocument();
          if (stopFlag)   // in case stopFlag set while waiting
          {
            break;
          }
        }
        catch (InterruptedException e)
        {
          logger.error("Exception waiting for document: "+e);
          resetWorker();
          continue;
        }
        xddIn.reset();
        rc = XDException.SUCCESS;
        try
        {
          parseAndValidate(xddIn);
        }
        catch (XDException e)
        {
          rc = e.getType();  // what class of exception do we have?
          logger.error("Exception from ParseAndValidate rc="+rc+": " + e);
          rc = XDException.FAIL;
        }
        catch (Exception e)
        {
          logger.error("Exception from ParseAndValidate: " + e,e);
          rc = XDException.FAIL;
        }
        if (rc == XDException.FAIL) // was an error created?
        {
          resetWorker();
          continue;
        }
        else if (rc == XDException.PENDED) // document already sent to pending queue, finished for now
        {
          resetWorker();
          continue;
        }
        rc = invokeAgent();
        resetWorker();
      }
      catch (Exception e)
      {
        logger.error("Exception during Tick Worker loop: "+e,e);
        resetWorker();
        continue;
      }
    }
    if (stopFlag)
    {
       markStopped();
       logger.info("Tick worker stop");
    }
  }
  /**
   * This stop method completes any operation in progress and causes 
   * the worker to terminate operations.
   */
  public synchronized void stop()
  {
    logger.debug("Tick stop received");
    stopFlag = true;
    notify();    // notify myself to break wait for a message */
  }
  /**
   *  Accept a message for working.
   * @param  Root of the tree that constitutes this message
   * @param  Name of the file from which the tree was originally taken
   * 
   */
  synchronized void setRecord(XDNode root,String fileName)
  {
    orgdata.store(root);
    ISpecRegManager srm = getSRM(); // get the context Special Register Manager
    srm.storeSpecialRegister("source",fileName,XDSpecReg.USER);
    srm.storeSpecialRegister("type","tree",XDSpecReg.USER);
    isBusy = true;
    notify();
  }
  synchronized void setRecord(byte[] input ,String fileName)
  {
    orgdata.store(input);
    ISpecRegManager srm = getSRM(); // get the context Special Register Manager
    srm.storeSpecialRegister("source",fileName,XDSpecReg.USER);
    srm.storeSpecialRegister("type","array",XDSpecReg.USER);
    isBusy = true;
    notify();
  }
  /**
   * Wait for a document to arrive or for the stop to be requested. 
   * 
   * @exception InterruptedException
   */
  synchronized void waitForDocument() throws InterruptedException
  {
    while (!isBusy && !stopFlag)
    {
      try
      {
         wait(250); // wake up and see if a stop is requested
      }
      catch (InterruptedException e)
      {
        logger.debug("waitforClockingDocument interrupted");
        break;
      }
    }
  }
}

For the example XDTickWorker:

  1. The worker is threaded at two points. Notice that it watches for the stopFlag to rejoin the master thread when told to stop. It finishes any work in process and then returns.
  2. The stop() method is called by the master when the master is ready to stop. This example signals to run loop to stop before starting another piece of work.
  3. The master passes work to the worker (process loop) by calling here. Set isBusy to prevent the worker from being selected for another message while this one is in process.
  4. When the message is complete, this tells the master that the worker can be reassigned to another message.