Microsoft Azure supports applications written in a variety of languages, one of which is Java. Azure Service Bus is its asynchronous messaging platform. Because it is based on the AMQP 1.0 standard, Service Bus can be used across the range of supported Azure platforms.
Microsoft has documentation that describes how to use Service Bus resources via the JMS specification. However, many Java applications use Spring as their application framework, and Spring JMS as a convenient template for the messaging boilerplate code.
This page will describe sample code that shows how to use Spring JMS for Queues and Topics defined in Azure Service Bus.
The full code is accessible on GitHub.
Microsoft has documentation that describes how to use Service Bus resources via the JMS specification. However, many Java applications use Spring as their application framework, and Spring JMS as a convenient template for the messaging boilerplate code.
This page will describe sample code that shows how to use Spring JMS for Queues and Topics defined in Azure Service Bus.
The full code is accessible on GitHub.
Code Description
The example defines two Java applications, app1 and app2. App1 publishes a message to a Queue (named "myqueue"), which app2 consumes. App2 publishes a message to a Topic (named "mytopic"), which app1 consumes.
Tools Used
Azure Service Bus
This uses Queues and Topics defined in a Service Bus namespace in an Azure account. No other messaging broker is required.
Apache Qpid
The Apache Qpid Project provides libraries that support the AMQP 1.0 protocol across a variety of languages. In this example, we'll be using Maven to retrieve the Qpid JMS Client, which will bring in all the necessary dependencies. The Qpid client is the library recommended by MicroSoft for interacting with Service Bus. At the time of this writing, version 0.6.0 of the Qpid JMS Client is used.
Spring Boot
Spring Boot is not specifically needed to use Spring JMS. There are many non-Boot Spring applications that use Spring JMS. However, it is exceptionally easy to start using Spring Boot. Between Spring Boot and Spring JMS, there is very little code to write. Spring Boot, Spring JMS and the Qpid JMS client are doing most of the work.
Service Bus Setup
To support the test applications, I've created the Queue and Topic via the Azure Portal. There's some additional configuration that goes into the Topic, but that will be discussed later.
Connection Factory Definition
Spring Boot recommends that the definition and configuration of Spring Beans to be in the code itself. While it still supports Spring Configuration files as well, I've adopted the programmatic approach.
Because app1 and app2 end up with very similar messaging configuration, I've extracted this configuration out to a shared JAR file which is used by both app1 and app2.
@Configuration
public class MessagingConfig {
@Value("${spring.application.name}")
private String clientId;
@Bean
public ConnectionFactory jmsConnectionFactory(MessageStoreDetails details) throws UnsupportedEncodingException {
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory(details.getUrlString());
jmsConnectionFactory.setUsername(details.getUsername());
jmsConnectionFactory.setPassword(details.getPassword());
jmsConnectionFactory.setClientID(clientId);
jmsConnectionFactory.setReceiveLocalOnly(true);
return new CachingConnectionFactory(jmsConnectionFactory);
}
...
}
The @Configuration annotation defines this class as one that defined Spring configuration. The above code defines a Spring Bean that contains an instance of the javax.jms.ConnectionFactory interface. The JmsConnectionFactory is a class provided by the Qpid JMS Client library (org.apache.qpid.jms.JmsConnectionFactory to be specific).
The URL passed into the ConnectionFactory's constructor is the path to the Service Bus Namespace that hosts the Queue and Topic. The client id is required in order to consume messages from Topics (more details about topics in later sections of this page). Setting ReceiveLocalOnly is more around how the Qpid JMS Connection interacts with the remote resources. It is not required in order to connect successfully to Service Bus.
The Username and Password come from the Shared Access Policies defined against the Service Bus Namespace. In this case, I created a policy called "ReadWrite", which has Send and Listen permissions. When configuring the ConnectionFactory, I used "ReadWrite" as the username, and the Primary Key for the password.
The specifics about the URL, Username and Password are provided by the MessageStoreDetails class, which is simply a Plain Old Java Object (POJO) defined as a Spring Bean.
package azure.example.common.messaging;
import java.io.UnsupportedEncodingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class MessageStoreDetails {
@Value("${namespace.host}")
private String host;
@Value("${namespace.username}")
private String username;
@Value("${namespace.password}")
private String password;
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getUrlString() throws UnsupportedEncodingException {
return String.format("amqps://%1s?amqp.idleTimeout=3600000", host);
}
}
The @Value annotations are just Spring's way of retrieving configuration for the application. It will pull most of this out of a text file names application.properties. But the can also be defined via command-line options (which is done for the password value).
The getUrlString method simply uses the amqps:// url format to define the location of the Service Bus Namespace. The host name to use can be retrieved from the Azure Portal, when you display the Connection Information for the namespace.
Remember that we want to use the amqps:// url, not a sb:// url. The value for the namespace I used for this sample code is "azuresamples.servicebus.windows.net", and is defined as such in each application's properties file.
It seems like the default timeout value defined by the Qpid Client is not good enough for the Service Bus namespace. In order to connect, I needed to add the query parameter ?amqp.idleTimeout=3600000 to the url in order to get a successful login.
Queues
Using AMQP Queues with the Qpid JMS Client works pretty much the same as other JMS Message Brokers. The code for listening and writing to queues looks very familiar.
Publishing
A well-known class in the Spring JMS library is org.springframework.jms.core.JmsTemplate. It provides much of the boiler-plate or repetitive tasks around using JMS Components. It allows the code that uses it to focus on the application-specific logic, leaving the common JMS calls encapsulated within the template. In this case, JmsTemplate is being used to publish a message on to the Queue "myQueue".
To achieve this, the MessagingConfig class that defined the ConnectionFactory also defines a JmsTemplate instance, which will be available as a Spring Bean. All it needs is a reference to a JMS ConnectionFactory. With the Spring auto-wiring, our ConnectionFactory will be passed into this method and injected into the JmsTemplate Spring Bean.
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory jmsConnectionFactory) {
JmsTemplate returnValue = new JmsTemplate();
returnValue.setConnectionFactory(jmsConnectionFactory);
return returnValue;
}
Now that an instance of JmsTemplate is available for injection as a Spring Bean, a new class in the app1 module named QueuePublisher will use that JmsTemplate instance to publish a message onto the Queue.
@Service
public class QueuePublisher {
private final Logger logger = LoggerFactory.getLogger(QueuePublisher.class);
@Autowired
private JmsTemplate jmsTemplate;
@PostConstruct
public void afterConstruct() {
sendPing();
}
public void sendPing() {
logger.info("Sending ping");
jmsTemplate.send("myqueue", (Session session) -> session.createTextMessage("ping"));
}
}
The annotations are Spring annotations that control how the Bean is defined and configured. For the purposes of this page, the important code is the call to jmsTemplate in the sendPing() method. With a Java 8 Lambda function, it takes a single line of code to publish the message to the Queue. This is consistent with other Spring JMS application that use a JMS Message Broker.
Consuming
There are a few different ways to consume messages from queues. This example has focused on annotation-based configuration, as it uses Spring Boot which, in its own words, favors Java-based configuration.
In order to receive messages from a Queue asynchronously using Java-based configuration, Spring has the JmsListener annotation. The JmsListener annotation allows you to define a method that will be called when a Message is received. The method must be defined so that the Message will be passed into it.
Applications that use the JmsListener annotation must provide a JmsListenerContainerFactory which will provide the components that listen to the queue and, when a message is received, pass the message onto the annotated method. In this example, the MessagingConfig class defines the default ContainerFactory, so that it uses the ConnectionFactory Bean already created.
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory returnValue = new DefaultJmsListenerContainerFactory();
returnValue.setConnectionFactory(connectionFactory);
return returnValue;
}
Now that the default JmsListenerContainerFactory is defined as a Spring Bean, methods can be annotated to received messages. In app2, the QueueReceiver class does just that
@Service
public class QueueReceiver {
private final Logger logger = LoggerFactory.getLogger(QueueReceiver.class);
@Autowired
private TopicPublisher topicPublisher;
@JmsListener(destination = "myqueue")
public void onMessage(String message) {
logger.info("Received message from queue: {}", message);
topicPublisher.sendPong();
}
}
All the JmsListener annotation needs to define is the name of the Queue whose messages will be consumed. Additional filters can be applied, but those haven't been used in this example. The onMessage method will receive messages from the queue and call the TopicPublisher.sendPong() method.
Topics
Using Service Bus Queues through the Qpid Jms Client library is pretty familiar for anyone familiar with JMS. It behaves pretty much the same: you publish and consume messages in the same approach. Topics, however, are where things start to differ. Topics (as in JMS Topics) are implemented differently in AMQP 1.0. In AMQP-speak, there are exchanges (and one type of exchange is a Topic exchange), and clients wishing to consume messages from the exchange create a Queue and bind that Queue to the Exchange. The client then listens for messages on that Queue. I'm sure this is a gross over-simplification of the functionality, and it doesn't even touch on the filtering logic that can be applied within the Exchange to ensure messages are routed to the correct Queue. For the sake of consuming "Topic" messages using Spring-JMS, it's sufficient to say that the code you've used to consume messages off Topics from other pure JMS Brokers will not be enough.
The Qpid JMS Client does take a lot of that queue registration work out of your hands. If you test your code against the Qpid Broker, you can see this for yourself via it's Web Console.
Publishing
Now, with all that said, publish messages to Topics is exactly what you would do with other JMS brokers. It works exactly the same as publishing messages to queues. In this case, the code is using an instance of the JmsTemplate class to send the message. Configured no differently than for publishing messages to a Queue.
@Service
public class TopicPublisher {
private final Logger logger = LoggerFactory.getLogger(TopicPublisher.class);
@Autowired
private JmsTemplate jmsTemplate;
public void sendPong() {
logger.info("Sending pong");
jmsTemplate.send("mytopic", (Session session) -> session.createTextMessage("pong"));
}
}
Same one-line to publish a message.
Consuming
As alluded to earlier, there are differences using this library, when compared with "standard" JMS. And consuming messages from Topics is where the differences lie. Referencing the Qpid example for Publish/Subscribe, the key piece of code in that sample is to create a durable subscriber, passing in a subscription name to the appropriate method. This subscription name is vital in order to consume messages from the Topic (Exchange). The Subscription name forms the basis for a queue that will receive messages that are published to the Topic (Exchange).
Essentially, what this means is that each potential consumer of messages against the Topic should have a distinct subscription name. And, these subscription names will need to be defined for the Topic in Service Bus. If you look at the topic used by this example application, the Topic "mytopic" has a single Subscription called "TopicReceiver".
Subscriptions for Topics can be created via the Azure Portal, or programmatically using any of the supported SDKs for Azure.
Once we know what the Subscription is for the Topic, you can use the JmsListener annotation in Spring JMS to register a listener that will consume messages on this topic. However, in addition to the name of the topic, the subscription name has to be provided as well.
@JmsListener(destination = "mytopic",
containerFactory = "topicJmsListenerContainerFactory",
subscription = "TopicReceiver")
public void onMessage(String message) {
logger.info("Received message from topic: {}", message);
try {
Thread.sleep(3000L);
queuePublisher.sendPing();
} catch (InterruptedException ex) {
}
}
As with queues, a destination value is provided, which is the name of the topic. And a subscription value is provided, which is the name of the Subscription as it has been defined in Azure. The very last important item is reflected in the named containerFactory value to the annotation. When a containerFactory value is not supplied, Spring will find the default containerFactory. However, remember that the code needs to create a durable subscription. To make sure the code is configured in such a way, so that the Spring framework creates a durable subscription, the annotation references a container factory (the "topicJmsListenerContainerFactory"), which has been configured to do just that. This container factory, specific to topics, is defined in the MessagingConfig class.
@Bean
public JmsListenerContainerFactory topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory returnValue = new DefaultJmsListenerContainerFactory();
returnValue.setConnectionFactory(connectionFactory);
returnValue.setSubscriptionDurable(Boolean.TRUE);
return returnValue;
}
This container factory is almost the same as the default container factory. The difference is where SubscriptionDurable is set to TRUE.
In order to consume messages off of topics, the Qpid Jms Client requires that the Connection Factory is defined using a Client Id. Attempting to use the Qpid JMS Connection Factory for topics will fail with exceptions, so this is enforced by the Qpid library. This wasn't always the case, and has been introduced in more recent releases of the Qpid Jms Client. I presume that this is used in the internals of the components, but I don't really know the reason why it is required. What I do know is that it is required.
Summary
There are a few distinct differences between many standard JMS Message Brokers and the AMQP 1.0 standard, specifically around how to listen to topic messages. So, it does take some understanding for how the Qpid Jms Client handles this under the covers. However, for all of the above code, Spring and Qpid are doing the lion's share of the work, leaving the application to focus on its logic. Which is exactly what you want from a framework.
Getting the following error when executing the sample code. Can someone help?
ReplyDeleteJMS message listener invoker needs to establish shared Connection
Failed to create JMS Provider instance for: amqps
Could not refresh JMS Connection for destination 'myqueue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Failed to create connection to: amqps://test.servicebus.windows.net?amqp.idleTimeout=6000000
Have you created a ServiceBus namespace named "test" with a queue named "myqueue"?
DeleteHi Ed, thanks for the response. Issue was because of a connectivity issue at my end. Service is working fine.
DeleteHi Muthu, I am facing the same issue, what did you do to resolve the connectivity issue.
DeleteBasically app1/app2 both are failing because of this connection time out issue.
Hi Ed,
ReplyDeleteJust trying the example, sending and receiving from a queue works fine, however the DefaultMessageListenerContainer in TopicReciever app1 fails to start with
2017-05-10 17:22:42.344 WARN 3301 --- [windows.net:-1]] o.a.q.j.p.a.b.AmqpResourceBuilder : Open of resource:(JmsConsumerInfo: { ID:f2f0683d-c0cc-49d2-9f3d-2422664ffda3:1:1:1, destination = mytopic }) failed: Cannot open a Topic client for entity type Subscriber. TrackingId:419690b3-f0b0-404f-ab06-a0e15b862bae_B18, SystemTracker:mdemail5:topic:mytopic~15|topicSubscription, Timestamp:5/10/2017 4:22:42 PM TrackingId:4d98996896e74a0683c6720fc5cf1255_G7, SystemTracker:gateway6, Timestamp:5/10/2017 4:22:40 PM [condition = amqp:not-allowed]
I have a subscription created in the Azure console which shows the messages sent to the topic from app2
TopicReceiver
Active
10
false
2
The queue/topics are setup as you describe
myqueue
Queues
Active
UK South
mytopic
Topic
Active
UK South
Looks like I have missed something creating the Topic/Subscription
Any ideas?
Thanks
Mick
Found the problem, I used the defaults for the topic creation and Enable partitioning was turned on.
ReplyDeleteI recreated the topic with partitioning turned off, and everything works.
Thanks for the example
Mick
Getting this below exception when I run App2:
ReplyDeleteo.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myqueue' - retrying using FixedBackOff{interval=5000, currentAttempts=5, maxAttempts=unlimited}. Cause: Failed to create connection to: amqps://samples.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<>; nested exception is java.net.ConnectException: Connection timed out: no further information: samples.servicebus.windows.net/
And when I run App1:
Failed to create connection to: amqps://samples.servicebus.windows.net/;
Hi Jaggu..I am facing similar ossue connection timwd out no further information. Any luck you found to resolve the issue?
ReplyDeleteHi. I would like to keep a check on the connection. Like, if the connection gets lost or fails to find the mentioned queue, I need to write a log entry into a file. Any way of doing that? Eagerly waiting for a reply, great tutorial by the way! :)
ReplyDeleteHi. I'm glad it is of use. Off the top of my head, I can't say the answer definitively. However, the appeal of this approach (for me) is we can use the JMS standard interfaces in the code. If you look into that, you can see that you can provide an implementation of the ExceptionListener interface to the connection factory. This ExceptionListener (looking at the code for Qpid's JmsConnectionFactory) will be applied to any JMS Connection created by the factory.
DeleteYour ExceptionListener implementation can then do whatever processing you need to happen in those exception conditions. I can't state for certain if all of those exception conditions are what you are looking for. But that is where I would start investigating.
https://docs.oracle.com/javaee/1.4/api/javax/jms/ExceptionListener.html
Good luck!
Thanks for your reply. But I tried the ExceptionListener. Seems it doesn't work in all the cases. I finally got my way around by ditching the @JmsListener annotation and instead configuring the connections and sessions manually and catching any JMSException thrown during connection establishment. It's kinda weird that the ExceptionListener didn't work for me though. I'll do some further research to see what went wrong.
DeleteHello Ed, thank you for this usefull blog.
ReplyDeleteI m currently using Service Bus and my appliation has to listen and receive message from a queue. But, I have an issue because, if I use JMS, I did not find a way to configure the ReceiveMode to PeekLock. Do you have an idea on how can I set this receiveMode ?
Thanks in advance
Hi Dani. I don't know off the top of my head. I would think that this is an issue around the Qpid JMS library. If you look at the Configuration section of the documetation (latest at the time of this writing is https://qpid.apache.org/releases/qpid-jms-0.38.0/docs/index.html), you can see everything that is available. It is m y understanding that you pass these values as part of the URL string passed to the Connection (I do that already in MessageStoreDetails.java for other settings).
DeleteI had a quick look, and I'm not sure I can see something specifically matching this. There are some jms.* settings that look plausible. You might need to get the sources of the qpid library you're using and see what they do (if you use Maven, it's pretty easy to get the source jars and view in your IDE).
Thanks for your reply, I'll have a look and keep you informed if I find something.
DeleteThnaks again.
Hello Ed, anyway to get this working over a proxy?
ReplyDeleteWe use a proxy here and don't seem to have any trouble, but perhaps we're not set up the same as you are. What problems are you experiencing?
DeleteOff the top of my head, I suspect it would need some configuration of the Qpid JMS client. Looking at it's documented configuration options (https://qpid.apache.org/releases/qpid-jms-0.38.0/docs/index.html), I don't see anything jumping out that looks like it handles proxies. Checking with them may prove helpful.
But, I never came across anything where I needed to add entries for proxies.
HI Ed , I have topic subscriber with session enabled but your code is not supporting topic subscriber with session however it works for me susbcriber without session . Can you please help me how to handle topic subscriber with session
ReplyDeleteHi Victor. I don't think I understand your question. In the example code, the connection factory is handling the sessions within the factory. Really, it's reusing connections without maintaining a specific session.
DeleteI would expect that if you are trying to use a specific session, that it would need to be supported in the Spring JMS annotations to instruct the code to use a specific session. However, I'm speculating right now. I am not sure why a session is required?
Is it possible to have something like an RabbitMQ Fan Out Exchange?
ReplyDeleteI love you!!!
ReplyDeleteI try your code (update dependency versions) with Azure Service Bus, RabbitMQ and ActiveMQ Artemis and.... Everythings is all right!!
Yes, the dependencies may be a bit old, but I'm glad it's still working and you find it helpful!
DeleteHi.Thanks for your post. It is very useful.
ReplyDeleteI have one question though. I am provided with SAS key values for service bus topic, but not for the namespace. Can I use SAS keys configured for 'Topic' while configuring JmsConnectionFactory ?
Hi. From my experience, I haven't found a way, via the Qpid JMS abstraction, to use a SAS Key against a Topic or Queue. It has been some time since I looked at this in anger, though. Perhaps the setup has been updated in Qpid which allows a SAS key to be defined as an attribute of the Queue or Topic being listened or written to.
DeleteI had a quick scan of the doco, and didn't see anything obvious. And from my experience, I was only able to use SAS keeys that were defined against the namespace.
Hi ,
ReplyDeleteI am getting below exception while connecting with session enabled subscription:
It is not possible for an entity that requires sessions to create a non-sessionful message receiver
Hi Nilendra. I do not think I am following the question. Is the ConnectionFactory configuration you're using different from what I have used above?
DeleteI haven't done much with session-enabled subscriptions. But looking at the doco, it looks like that's starting to move more info Kafka stream functionality. And I don't know if that is going to grate against the existing JMS specification.
But I don't have first hand experience with this. I would check Apache QPID's documentation to see if the JMS specification will support those features available with session-enabled subscriptions. I can only assume that Apache QPID's first priority is to implement the JMS specification. If that doesn't reflect the abilities of the session-enabled subscriptions then it may not be exposed via QPID