RabbitMQ

Topics:

RabbitMQ is open source message broker software that implements version 0-9-1 of the Advanced Message Queuing Protocol (AMQP). The listener supports AMQP version 0-9-1 exclusively since that is the version supported natively by RabbitMQ.

Note: AMQP version 1.0 is a newer version, but it is not compatible and should be considered a different protocol, despite the similar name.

iWay Service Manager (iSM) provides native bi-directional support for communication with RabbitMQ to read/write messages. This does not require any additional layer for communication such as Java Message Service (JMS), which can also be used as an alternative, but does have its limitations.

Configuring the RabbitMQ Listener (com.ibi.edaqm.XDRabbitMQMaster)

Syntax:

com.ibi.edaqm.XDRabbitMQMaster

Description:

The RabbitMQ listener uses the Java client library to communicate with the RabbitMQ server. The required third-party library is automatically installed with the listener. There are no extra installation steps that are required.

Each worker runs a consumer thread to accept messages from the designated queue. After a message is processed, a response can optionally be sent. The listener always reads directly from a queue. If applicable, the response is sent to an exchange. The exchange will route the message to the appropriate queues.

The listener expects the queue and exchange to be pre-configured in the RabbitMQ server. The listener does not declare those objects automatically.

Parameters:

The following table describes the parameters of the RabbitMQ Listener (com.ibi.edaqm.XDRabbitMQMaster).

Parameter

Description

Broker URI *

The Broker URI, in the following format:

scheme://host:port/virtualhost

where:

scheme

Is AMQP or AMQPS.

port

Is optional defaulting to 5672 for AMQP and 5671 for AMQPS.

virtualhost

Is also optional.

SSL Context Provider

The iWay Security Provider for SSL Context. If you are using AMQPS and SSL Context Provider is left blank, then the default provider will be used.

User *

User ID for the Broker.

Password *

Password that is associated with the user ID for the Broker.

Queue Name *

Messages will be consumed from this queue.

Auto Acknowledgement

Determines whether the acknowledgement is sent automatically or is delayed until the message is processed.

By default, this parameter is set to false.

Request Header Namespace

The special register namespace into which protocol headers from the incoming request will be saved.

By default, this parameter is set to Default Namespace.

Default Reply

Outgoing Exchange Name

The exchange where the outgoing message will be published.

Routing Key

The routing key of the outgoing message.

Mandatory

Determines what happens when a message is routed to zero queues. When set to true, an error is returned. When set to false, the message is discarded.

By default, this parameter is set to false.

App ID

Creating application ID.

Content Encoding

MIME Content Encoding of the payload.

Content Type

MIME Content Type of the payload.

Correlation ID

Application correlation identifier.

Delivery Mode

Determines whether the message is persistent (2) or non-persistent (1).

By default, this parameter is set to 1 (persistent).

Expiration

The message time to live in the queue, non-negative integral number of milliseconds.

Message ID

Application message identifier.

Priority

Message priority (0 to 255).

Response Reply To

Destination to reply to this response.

Timestamp

The value of the timestamp property in the outgoing message, in milliseconds since the epoch.

Type

Message type name.

User ID

Creating user ID.

Response Header Namespace

The special register namespace from which protocol headers for the outbound response will be taken.

By default, this parameter is set to Default Namespace.

Tuning

Multithreading

Indicates the number of worker threads (documents or requests) that iWay Service Manager can handle in parallel. Setting this to a value of greater than 1 enables the listener to handle a second request while an earlier request is still being processed. The total throughput of a system can be affected by the number of threads operating. Increasing the number of parallel operations may not necessarily improve throughput.

The default is 1.

The max value is 99.

Optimize Favoring

Use this option to customize how the listener performs. For smaller transactions, select performance. For large input documents that could monopolize the amount of memory used by iWay Service Manager, select memory.

Events

Failed ReplyTo Flow

Name of a published process flow to run if a message cannot be emitted on an address in its reply address list.

Dead Letter Flow

Name of a published process flow to run if an error cannot be emitted on an address in its error address list.

Channel Failure Flow

Name of a published process flow to run if this channel cannot start or fails during message handling. iWay Service Manager will attempt to call this process flow during channel shut down due to the error.

Parse Failure Flow

Name of published process flow to run if XML parsing fails for incoming message.

Channel Startup Flow

Name of published process flow to run prior to starting the channel.

Channel Shutdown Flow

Name of published process flow to run when the channel is shut down.

Other

Whitespace Normalization

Specifies how the parser treats whitespace in Element content. Choose preserve to turn off all normalization as prescribed by the XML Specification. Choose condense to remove extra whitespace in pretty printed documents and for compatibility with earlier versions.

Accepts non-XML (flat) only

If set to true, the input data is sent directly to the business logic step. The data is not preparsed, parsed, or validated. This flag is used primarily to send non-XML to the business logic or replyTo without processing it.

Execution Time Limit

The maximum time that a request may take to complete. Used to prevent runaway requests. Any request that takes longer to complete than this value will be attempted to be terminated.

Default Java File Encoding

The default encoding if incoming message is not self-declaring (that is, XML).

Agent Precedence

Sets the order by which iWay Service Manager selects agents. iWay Service Manager selects the agent or agents to process the document by searching through the configuration dictionary. Usually, it looks for a document entry in the configuration and when a match is found, the agent specified in that document entry is selected. If a matching document entry is not found, or no agent is specified, the engine looks in the input protocol configuration (listener). To have the processing agent taken directly from the listener (thus ignoring the document entry), use <listener> overrides <document>.

Possible values are <document> overrides <listener> and <listener> overrides <document>.

The default value is <document> overrides <listener>.

Always reply to listener default

If set to true, the default reply definition is used in addition to defined reply-to and error-to destinations.

Error Documents treated normally

If set to true, error documents are processed by any configured preemitters.

Listener is Transaction Manager

If set to true, agents run within a local transaction.

Record in Activity Log(s)

If set to true, activity on this channel will be recorded in the activity logs, otherwise the activity will not be recorded.

AES Key

If the channel will receive encrypted AFTI messages, set the AES key (maximum 16 characters) to be used for decrypting.

Startup Dependencies

A comma-separated list of channel names that must be started before this one is called.

The Broker URI has the following format:

scheme://host:port/virtualhost

The scheme is AMQP for communication in clear with the port defaulting to 5672. For secure communication over TLS, the scheme is AMQPS with the port defaulting to 5671. The virtual host is a RabbitMQ concept that allows multiple configurations for the same RabbitMQ server. When /virtualhost is not specified, the virtual host defaults to /. The syntax scheme://host:port is the only way to specify / as the virtual host, since ending the URI with / is not allowed.

When using AMQPS, the TLS configuration is specified with an SSL Context provider. This provider is created in the iSM Administration Console within the Security Provider page under the Server tab.

The user name and password are the credentials to access the RabbitMQ server. The user is created in the RabbitMQ console within the Users page under the Admin tab. Make sure the user is given permission to access the chosen virtual host. The RabbitMQ console can usually be accessed at:

http://host:15672

The Auto Acknowledgement parameter controls whether the acknowledgement is sent automatically or is delayed until the message is processed.

If there are no other replies configured, then the default reply will be sent. The destination of the default reply is controlled by the Outgoing Exchange Name parameter with the value specified by the Routing Key. If the Outgoing Exchange Name parameter has no value specified, then it defaults to the amqp.replyTo special register (SREG). The Mandatory parameter determines what happens when the message is routed to zero queues. An error is returned if the Mandatory parameter is set to true, otherwise the message is silently discarded. User-configured headers can be added by specifying a value for the Response Header Namespace parameter. Each register of type HDR in that namespace will create a header of the same name.

The properties in the Default Reply group support deferred evaluation (except for Response Header Namespace). This means prepending the expression with a backtick character (`) causes the expression to be evaluated just before the reply is sent. This makes it possible to get different values for each reply, which might depend on the input message. For example, the default value of the Outgoing Exchange Name parameter can be emulated with the following expression:

`_SREG(amqp.replyTo)

If the backtick character is absent, then the parameter is evaluated once when the listener is initialized, just like the other parameters.

Special Registers (SREGs)

There is a fixed list of message properties as documented in the AMQP specification. The service stores these properties in Special Registers (SREGs) in the AMQP namespace.

The following table describes the SREGs assigned by the RabbitMQ Read service based on the properties in the message envelope.

SREG Name

Description

amqp.deliveryTag

The server-assigned and channel-specific delivery tag.

amqp.exchange

The name of the exchange that routed this message.

amqp.routingKey

The key used to route this message.

amqp.isRedeliver

The boolean redelivery flag.

The following table describes the special registers assigned by the RabbitMQ Read service based on the properties in the message.

SREG Name

Description

amqp.appId

Creating application ID.

amqp.contentEncoding

MIME content encoding.

amqp.contentType

MIME content type.

amqp.correlationId

Application correlation identifier.

amqp.deliveryMode

A value of 1 indicates non-persistent. A value of 2 indicates persistent.

amqp.exiration

Message time to live in the queue, integer >= 0 in milliseconds.

amqp.messageId

Application message identifier.

amqp.priority

Message priority, 0 to 255.

amqp.replyTo

Destination to reply to.

amqp.timestamp

Message timestamp, number of seconds since the epoch.

amqp.type

Message type name.

amqp.userId

Creating user ID.

Unlike message properties, message headers are not standardized. They are under the sending application control. When present, message headers are stored as special registers in the specified Request Header Namespace. For example, if a header is named hdr1 and the Request Header Namespace is ns, then the special register will be called ns.hdr1.

SSL Configuration

The initial configuration of RabbitMQ declares a regular TCP listener listening on port 5672. The configuration file must be edited if an SSL listener is required. On Windows, the location of the configuration file is:

%APPDATA%\RabbitMQ\rabbitmq.confi

Here is a sample configuration with an SSL listener listening on port 5671 and no TCP listeners.

[
  {rabbit, [
     {tcp_listeners, []},
     {ssl_listeners, [5671]},
     {ssl_options, [{cacertfile,"C:\\rabbitmq\\ssl\\cacert.pem"},
                    {certfile,"C:\\rabbitmq\\ssl\\cert.pem"},
                    {keyfile,"C:\\rabbitmq\\ssl\\key.pem"},
                    {verify,verify_none},
                    {fail_if_no_peer_cert,false}]}
   ]}
].

The Broker URI to access this server has the following format:

amqps://hostname:5671/vhost

The Java keytool command can be used to extract a certificate from a keystore. The following command should be entered on one line (shown on multiple lines for display purposes only):

keytool -exportcert -rfc -file cert.pem -alias myalias -keystore mystore.p12 -storepass secret -storetype PKCS12

Unfortunately, the keytool command does not have an option to export a private key. This can be done with openssl:

openssl pkcs12 -in mystore.p12 -out keys.pem -clcerts -nocerts -nodes

Edit keys.pem with a text editor to keep only the key you want. Then run the following command:

openssl rsa -in keys.pem -out key.pem

RabbitMQ Emit Service (com.ibi.agents.XDRabbitMQEmitAgent)

Syntax:

com.ibi.agents.XDRabbitMQEmitAgent

Description:

This service emits a document to a RabbitMQ server using version 0-9-1 of the Advanced Message Queuing Protocol (AMQP). The Broker URI is the server address. The destination is a combination of the Outgoing Exchange Name and the Routing Key.

The Broker URI has the following format:

scheme://host:port/virtualhost

The scheme is AMQP for communication in clear with the port defaulting to 5672. For secure communication over TLS, the scheme is AMQPS with the port defaulting to 5671. The virtual host is a RabbitMQ concept that allows multiple configurations for the same RabbitMQ server. When /virtualhost is not specified, the virtual host defaults to /. The syntax scheme://host:port is the only way to specify / as the virtual host, since ending the URI with / is not allowed.

When using AMQPS, the TLS configuration is specified with an SSL Context provider. This provider is created in the iSM Administration Console within the Security Provider page under the Server tab.

The user name and password are the credentials to access the RabbitMQ server. The user is created in the RabbitMQ console within the Users page under the Admin tab. Make sure the user is given permission to access the chosen virtual host. The RabbitMQ console can usually be accessed at:

http://host:15672

In version 0-9-1 of AMQP, the message is not sent directly to a queue. Instead, the sender sends the message to an exchange and associates a Routing Key. It is the responsibility of the exchange to route the message to all the queues with a binding that matches the message and the routing key. The queue bindings are part of the exchange configuration on the server. The matching rule depends on the exchange type as follows.

Exchange Type

Matching Rule

direct

The routing key must equal the binding key.

fanout

The routing key is ignored and the binding always matches.

headers

The routing key is ignored. The binding specifies a header name and the value it must have to match. It is possible to specify multiple headers at the same time, in which case the x match attribute determines if 'any' or 'all' headers must match.

topic

The routing key is matched with wildcards.

The Mandatory property determines what happens when the message does not match any queue bindings and therefore was routed to zero queues. When this happens, an error is returned if mandatory is set, otherwise the message is silently discarded.

The RabbitMQ Emit Service expects the exchange, the queue bindings, and the destination queues to be pre-configured on the server. The service does not declare those objects automatically.

AMQP defines a set of properties that are always part of the message. Most properties are open to interpretation by the receiving application. These are: App ID, Content Encoding, Content Type, Correlation ID, Message ID, Response Reply To, Timestamp, Type and User ID. For example, the RabbitMQ server considers the payload to be a byte string. The receiving application may use the Content Encoding and Content Type to reconstruct the message, but that is just by convention.

The properties that affect the broker are: Delivery Mode, Expiration and Priority. The delivery mode is Persistent or Non-Persistent. Persistent messages held in a durable queue will survive a broker restart.

The Expiration is the time the message can remain in the queue before it expires. Expired messages are either discarded or dead-lettered, but only when they reach the head of the queue. Until then, they occupy storage and are counted in the queue statistics. The Priority affects the order messages are delivered by a priority queue. The AMQP specification says the priority is from 0 to 9 but RabbitMQ supports priorities from 0 to 255.

Unlike message properties, message headers are not standardized. They are under the sending application control. When the Request Header Namespace is specified, every special register of type HDR it contains will create a message header of the same name. For example, if the Request Header Namespace is ns and there is special register of type HDR called ns.hdr1 with value v1, then a message header called hdr1 will be created with value v1.

The Avoid Preemitter parameter determines whether preemitters will be executed. The default is to skip the preemitters.

The Return Document parameter determines the contents of the output document. Choose status to return a status document, or input to return the input document of the service.

Parameters:

The following table describes the parameters of the RabbitMQ Emit service (com.ibi.agents.XDRabbitMQEmitAgent).

Parameter

Description

Outgoing Exchange Name

The exchange where the outgoing message will be published.

Broker URI *

The Broker URI, in the following format:

scheme://host:port/virtualhost

where:

scheme

Is AMQP or AMQPS.

port

Is optional defaulting to 5672 for AMQP and 5671 for AMQPS.

virtualhost

Is also optional.

SSL Context Provider

The iWay Security Provider for SSL Context. If you are using AMQPS and SSL Context Provider is left blank, then the default provider will be used.

User *

User ID for the Broker.

Password *

Password that is associated with the user ID for the Broker.

Routing Key

The routing key of the outgoing message.

Mandatory

Determines what happens when a message is routed to zero queues. When set to true, an error is returned. When set to false, the message is discarded.

By default, this parameter is set to false.

App ID

Creating application ID.

Content Encoding

MIME Content Encoding of the payload.

Content Type

MIME Content Type of the payload.

Correlation ID

Application correlation identifier.

Delivery Mode

Determines whether the message is persistent (2) or non-persistent (1).

By default, this parameter is set to 1 (persistent).

Expiration

The message time to live in the queue, non-negative integral number of milliseconds.

Message ID

Application message identifier.

Priority

Message priority (0 to 255).

Response Reply To

Destination to reply to this response.

Timestamp

The value of the timestamp property in the outgoing message, in milliseconds since the epoch.

Type

Message type name.

User ID

Creating user ID.

Request Header Namespace

The special register namespace from which protocol headers for the outbound request will be taken.

By default, this parameter is set to none.

Avoid Preemitter

Determines whether any preemitter should be avoided.

By default, this parameter is set to true.

Return Document

Determines whether the output document will be a status document or the input document.

By default, this parameter is set to status.

Edges:

The following table lists and describes the line edges that are returned by the RabbitMQ Emit service (com.ibi.agents.XDRabbitMQEmitAgent).

Line Edge

Description

success

The message was successfully sent.

fail_parse

An iFL expression could not be evaluated.

fail_connect

The service could not connect to the broker.

fail_operation

The operation could not be completed successfully.

RabbitMQ Read Service (com.ibi.agents.XDRabbitMQReadAgent)

Syntax:

com.ibi.agents.XDRabbitMQReadAgent

Description:

This service reads a single message from a RabbitMQ queue using version 0-9-1 of the Advanced Message Queuing Protocol (AMQP).

The Broker URI has the following format:

scheme://host:port/virtualhost

The scheme is AMQP for communication in clear with the port defaulting to 5672. For secure communication over TLS, the scheme is AMQPS with the port defaulting to 5671. The virtual host is a RabbitMQ concept that allows multiple configurations for the same RabbitMQ server. When /virtualhost is not specified, the virtual host defaults to /. The syntax scheme://host:port is the only way to specify / as the virtual host, since ending the URI with / is not allowed.

When using AMQPS, the TLS configuration is specified with an SSL Context provider. This provider is created in the iSM Administration Console within the Security Provider page under the Server tab.

The user name and password are the credentials to access the RabbitMQ server. The user is created in the RabbitMQ console within the Users page under the Admin tab. Make sure the user is given permission to access the chosen virtual host. The RabbitMQ console can usually be accessed at:

http://host:15672

The Queue Name identifies which queue will be read.

The Timeout is the period to wait in milliseconds for the message to become available. When the timeout expires, the service returns the input document with the edge noMessage. A value of zero (0) indicates to return immediately.

Parameters:

The following table describes the parameters of the RabbitMQ Read service (com.ibi.agents.XDRabbitMQReadAgent).

Parameter

Description

Broker URI *

The Broker URI, in the following format:

scheme://host:port/virtualhost

where:

scheme

Is AMQP or AMQPS.

port

Is optional defaulting to 5672 for AMQP and 5671 for AMQPS.

virtualhost

Is also optional.

SSL Context Provider

The iWay Security Provider for SSL Context. If you are using AMQPS and SSL Context Provider is left blank, then the default provider will be used.

User *

User ID for the Broker.

Password *

Password that is associated with the user ID for the Broker.

Queue Name *

The name of the queue from which messages will be consumed.

Request Header Namespace

The special register namespace into which protocol headers from the incoming request will be saved.

By default, this parameter is set to Default Namespace.

Timeout

Period to wait in milliseconds for the message to become available. A value of zero (0) indicates to return immediately.

By default, this parameter is set to 5000.

Edges:

The following table lists and describes the line edges that are returned by the RabbitMQ Read service (com.ibi.agents.XDRabbitMQReadAgent).

Line Edge

Description

success

The message was successfully sent.

fail_parse

An iFL expression could not be evaluated.

fail_connect

The service could not connect to the broker.

fail_operation

The operation could not be completed successfully.

fail_partner

The operation failed because of an error from the peer.

noMessage

The timeout expired before a message became available.