Trifork Blog

Running ActiveMQ using Spring

August 12th, 2010 by
|

Apache ActiveMQ is an open source messaging framework. The ActiveMQ web site is not really clear on how to integrate it with the Spring framework. Therefore, I decided to write this post to explain how to use ActiveMQ in combination with Spring and clarify some points.
The good news is that you can run JMS inside a servlet container (e.g. Apache Tomcat) without the need for a JCA adapter. This means you do not need Jencks or something similar.

Note:
This is a repost of a blog item that was originally posted in the Func knowledge base by Diego Castorina on January 29, 2010.

To get started, you can embed the configuration of a complete running environment in a Spring configuration file. Here is the example we’ll use for the remainder of this blog post:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd">
  <amq:broker brokername="test-broker" start="true">
    <amq:persistenceAdapter>
      <amq:amqPersistenceAdapter directory="/opt/activemq" maxFileLength="32mb"/>
    </amq:persistenceAdapter>
    <amq:transportconnectors>
      <amq:transportconnector uri="tcp://localhost:7171"/>
    </amq:transportconnectors>
  </amq:broker>
  <amq:connectionFactory id="amqConnectionFactory"  brokerURL="vm://test-broker"/>
  <bean class="org.springframework.jms.connection.CachingConnectionFactory" id="connectionFactory">
    <constructor-arg ref="amqConnectionFactory"/>
    <property name="sessionCacheSize" value="100"/>
  </bean>
  <amq:queue physicalName="testQueue" />
  <bean class="org.springframework.jms.core.JmsTemplate"  id="jmsTemplate">
    <constructor-arg ref="connectionFactory"/>
  </bean>
  <bean class="packageName.ClassName" id="queueProducer">
    <property name="jmsTemplate" ref="jmsTemplate"/>
    <property name="queueName" value="testQueue"/>
  </bean>
  <bean class="packageName.ClassName2" id="queueListener"/>
  <jms:listener-container concurrency="10" connectionfactory="connectionFactory">
    <jms:listener destination="testQueue" ref="queueListener"/>
  </jms:listener-container>
</beans>

That’s a lot of configuration, but let’s analyze each one:

<amq:broker brokername="test-broker" start="true">
  <amq:persistenceAdapter>
    <amq:amqPersistenceAdapter directory="/opt/activemq" maxFileLength="32mb"/>
  </amq:persistenceAdapter>
  <amq:transportconnectors>
    <amq:transportconnector uri="tcp://localhost:7171"/>
  </amq:transportconnectors>
</amq:broker>

This is the definition for an embedded JMS broker, called test-broker which listens on port 7171 using the tcp protocol and which persist data using the default AMQ Message Store in the /opt/activemq directory.

<amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://test-broker"/>

The definition of the JMS connection factory which connects to the broker using a VM transport. In this way the communication is made at the JVM level, thus avoiding network overhead.

<amq:queue id="testQueue" physicalname="TestQueue">

Defines the queue that we are going to use.

<bean class="org.springframework.jms.connection.CachingConnectionFactory" id="connectionFactory">
   <constructor-arg ref="amqConnectionFactory"/>
   <property name="sessionCacheSize" value="100"/>
</bean>

This is the connection factory we are really going to use in our application. It caches connections, sessions and even the MessageProducer. It is important to set the value of the sessionCacheSize property since the default value is 1.

<bean class="org.springframework.jms.core.JmsTemplate" id="jmsTemplate">
   <constructor-arg ref="connectionFactory"/>
</bean>

For those who know the Spring support for JDBC and Hibernate, the JmsTemplate class should sound familiar since it has a very similar design to the JdbcTemplate and HibernateTemplate class.
It hides most of the JMS-related low-level details (e.g. obtaining a session or handling acknowledgments). By default session are not transactional and auto-acknowledged.
The connectionFactory is passed as a property to the constructor.
One of its most important method on the JmsTemplate is send(String destinationName, MessageCreator messageCreator). The MessageCreator is an interface defined by Spring having only the createMessage(Session session) method. Its output is the JMS message that will be sent to destinationName.

<bean class="packageName.ClassName" id="queueProducer">
    <property name="jmsTemplate" ref="jmsTemplate"></property>
    <property name="queueName" value="testQueue"></property>
</bean>

This, finally is an object that is defined within our application, the object that produces the message queue and sends messages to it through the specified JmsTemplate.

<bean class="packageName.ClassName2" id="queueListener"/>

The bean definition that makes up the consumer of our message queue. It needs to implement the javax.jms.MessageListener interface (or some Spring specific interface like SessionAwareMessageListener).
In Spring terms it is a Message Driven Pojo, the difference with a standard Message Driven Bean is that it does not need to run in an EJB container.

<jms:listener-container concurrency="1" connection-factory="connectionFactory">
    <jms:listener destination="AggregateQueue" ref="queueListener"></jms:listener>
</jms:listener-container>

This is the bean who makes the magic happen: it registers itself as a listener for all the queues and executes the callback on the listener as a message is received in an asynchronous way using a Spring TaskExecutor. The concurrency attribute defines how many MessageConsumer instances will be created for each listener. If you use a value > 1 remember to set the prefetchLimit to 1 in order to avoid undelivered messages to the consumers.

Have fun 🙂

2 Responses

  1. August 12, 2010 at 14:35 by Benjamin Darfler

    Nice writeup, I wrote something similar up at http://codedependents.com/2009/10/16/efficient-lightweight-jms-with-spring-and-activemq/ and I have a few more additions to make to it coming up. For instance, I figured out when you want to use the CachingConnectionFactory from Spring vs the PooledConnectionFactory from amq.
    I’ve also gone back and edited the activemq.apache.org wiki to hopefully make more of this clearer. I’d love any feed back you have on the online documentation.

  2. September 3, 2010 at 19:52 by Kshitiz

    Hi,

    Your post is informative, thanks !

    I am aiming to create a notification server (a pub/sub and p2p model) and excited to use Spring Integration’s features. To make that notification server an isolated black box, I m thinking to expose its functionality thru rest based APIs. Since ActiveMQ is highly configurable so probably going to use it for the message management part.

    Do You think that it’s a feasible & a wise decision to go for an architecture like this considering scalability & extensibilty ?

    External systems > Restful APIs on Tomcat > Spring Integration acting as lightweight ESB > ActiveMQ (+MySql for message persistence and tomcat i.e. not the ‘embedded’ jetty) > Spring Integration > Restful APIs > External systems

    What all pros & cons do you suggest in this scenario…..

    Many thanks,
    Kshitiz