Publish-Subscribe With ActiveMq And NMS | Re.Mark
Maybe your like
In my previous articles about using ActiveMq and NMS, I have looked at sending a message from a sender to a receiver and implementing request-response. In this article, I’m going to investigate how to use the publish-subscribe pattern with ActiveMq and NMS.
The Publish-Subscribe pattern
The Publish-Subscribe pattern (pub-sub) can be thought of as a distributed implementation of the Observer pattern. The Observer pattern defines a “dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.” In Enterprise Integration Patterns, Publish-Subscribe is defined as a way for “the sender [to] broadcast an event to all interested receivers.” The JMS tutorial describes the basics of pub-sub as applied to JMS. It is important to note the difference between a subscriber and a durable subscriber. As subscriber will only receive messages that were sent while it is connected whereas a durable subscriber can receive messages sent while they were disconnected. Also worth noting is the difference between synchronous message consumption and asynchronous message consumption.
Before we go any further…
…I’m making a couple of assumptions. I’m assuming that you have already installed ActiveMq and have it running. I’m also assuming that you have downloaded Spring.Messaging.Nms. If you need more information about how to ensure these assumptions are valid, read my first article about ActiveMq and NMS.
A simple subscriber
Let’s get started by creating a simple subscriber. Create a Windows Class Library project. I called mine Core. Add references to:
- ActiveMQ
- NMS
Create a class called SimpleTopicSubscriber. The code for the class is listed below:
using System; using ActiveMQ; using ActiveMQ.Commands; using NMS; namespace Core { public delegate void MessageReceivedDelegate(string message); public class SimpleTopicSubscriber : IDisposable { private readonly string topicName = null; private readonly IConnectionFactory connectionFactory; private readonly IConnection connection; private readonly ISession session; private readonly IMessageConsumer consumer; private bool isDisposed = false; public event MessageReceivedDelegate OnMessageReceived; public SimpleTopicSubscriber(string topicName, string brokerUri, string clientId, string consumerId) { this.topicName = topicName; this.connectionFactory = new ConnectionFactory(brokerUri); this.connection = this.connectionFactory.CreateConnection(); this.connection.ClientId = clientId; this.connection.Start(); this.session = connection.CreateSession(); ActiveMQTopic topic = new ActiveMQTopic(topicName); this.consumer = this.session.CreateDurableConsumer(topic, consumerId, "2 > 1", false); this.consumer.Listener += new MessageListener(OnMessage); } public void OnMessage(IMessage message) { ITextMessage textMessage = message as ITextMessage; if (this.OnMessageReceived != null) { this.OnMessageReceived(textMessage.Text); } } #region IDisposable Members public void Dispose() { if (!this.isDisposed) { this.consumer.Dispose(); this.session.Dispose(); this.connection.Dispose(); this.isDisposed = true; } } #endregion } }This class creates a durable subscription and consumes messages asynchronously. All the work is done in the constructor. The assignment of the client id is important in making this subscription durable. Registering the OnMessage event handler provides the means to consume messages asynchronously. The CreateDurableConsumer method on the session takes four parameters:
- NMS.ITopic destination- the topic to which to subscribe
- string name – the name of the consumer
- string selector – I’m not sure what this parameter means yet. I have copied this setting from some test cases I found for NMS. And it seems to work. I’ll try and figure out what it does and update the article because I don’t like to depend on magic.
- bool noLocal – indicates whether you want to consume messages that originate locally. So, setting this to false means we’ll receive all messages sent to the topic.
It’s worth noting that there’s a bunch of stuff that’ll need disposing – the connection, session and message consumer. So, there’s a rudimentary implementation of IDisposable to clean up. Finally, note that there’s an event that allows this class to bubble up the text in each message received.
Subscribe in action
Let’s see how this subscriber works. Create a Windows Console application project. I called mine ActiveMqFirstSubscriber. This project needs references to:
- the Core project
- ActiveMQ
- NMS
The code for Program class is:
using System; using Core; namespace ActiveMqFirstSubscriber { class Program { const string TOPIC_NAME = "SampleSubscriptionTopic"; const string BROKER = "tcp://localhost:61616"; const string CLIENT_ID = "ActiveMqFirstSubscriber"; const string CONSUMER_ID = "ActiveMqFirstSubscriber"; static void Main(string[] args) { try { using (SimpleTopicSubscriber subscriber = new SimpleTopicSubscriber(TOPIC_NAME, BROKER, CLIENT_ID, CONSUMER_ID)) { subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived); Console.WriteLine("Press any key to exit..."); Console.ReadKey(); } } catch(Exception ex) { Console.WriteLine(ex); Console.WriteLine("Press any key to exit..."); Console.ReadKey(); } } static void subscriber_OnMessageReceived(string message) { Console.WriteLine(message); } } }Run the console. You should see a line telling you that you can press any key to exit. Don’t exit yet. Run jconsole and connect to ActiveMq (instructions here.) Select the MBeans tab. Navigate to org.apache.activemq->localhost->Topic->SampleSubscriptionTopic->Operations->sendTextMessage as shown below:
![]()
In the text box next to the sendTextMessage button, enter some text. Not wanting to break with tradition, I entered “Hello”. You should see the text you entered show up in the console. Now press any key in the console window and it will exit. Send another message via jconsole (this time I sent “Hello again.”) Run the console and there’s your message.
A simple publisher
Now we can subscribe to messages, the next step is to write a publisher. Add a new class to the Core project and call it SimpleTopicPublisher. Here’s the code:
using System; using ActiveMQ; using ActiveMQ.Commands; using NMS; namespace Core { public class SimpleTopicPublisher : IDisposable { private readonly string topicName = null; private readonly IConnectionFactory connectionFactory; private readonly IConnection connection; private readonly ISession session; private readonly IMessageProducer producer; private bool isDisposed = false; public SimpleTopicPublisher(string topicName, string brokerUri) { this.topicName = topicName; this.connectionFactory = new ConnectionFactory(brokerUri); this.connection = this.connectionFactory.CreateConnection(); this.connection.Start(); this.session = connection.CreateSession(); ActiveMQTopic topic = new ActiveMQTopic(topicName); this.producer = this.session.CreateProducer(topic); } public void SendMessage(string message) { if (!this.isDisposed) { ITextMessage textMessage = this.session.CreateTextMessage(message); this.producer.Send(textMessage); } else { throw new ObjectDisposedException(this.GetType().FullName); } } #region IDisposable Members public void Dispose() { if (!this.isDisposed) { this.producer.Dispose(); this.session.Dispose(); this.connection.Dispose(); this.isDisposed = true; } } #endregion } }This code should all look pretty familiar after the subscriber code. We don’t need to assign a clientId to the connection. To be able to send messages, we create an IMessageProducer from the session. The SendMessage method provides client code the ability to send messages to the topic. Just like the subscriber, there’s a little cleaning up to do, so once again there’s a rudimentary implementation of IDisposable.
The publisher in action
Create a Windows Forms project. I called mine TopicPublisher. Create a form and call it MainForm. Set the text of the form to “Publisher”. Add a label called instructionLabel and set the text to “Enter a message.” Add a text box called messageTextBox. Add a button called sendButton and set the text to “Send Message”. Create event handlers for the click event of button and the load and closed events of the form. Here’s the code for the form:
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Text; using System.Windows.Forms; using Core; namespace TopicPublisher { public partial class FirstMainForm : Form { const string TOPIC_NAME = "SampleSubscriptionTopic"; const string BROKER = "tcp://localhost:61616"; private SimpleTopicPublisher publisher; private readonly StringBuilder builder = new StringBuilder(); private delegate void SetTextCallback(string text); public FirstMainForm() { InitializeComponent(); } private void FirstMainForm_Load(object sender, EventArgs e) { try { this.publisher = new SimpleTopicPublisher(TOPIC_NAME, BROKER); } catch (Exception ex) { MessageBox.Show(ex.ToString()); this.Close(); } } private void sendButton_Click(object sender, EventArgs e) { this.publisher.SendMessage(this.messageTextBox.Text); } private void FirstMainForm_FormClosed(object sender, FormClosedEventArgs e) { try { this.publisher.Dispose(); } catch (Exception ex) { MessageBox.Show(ex.ToString()); } } } }Hopefully, this is all fairly obvious. We create a publisher on load and dispose of it when the form closes. When the button is clicked the text from the text box is sent to all subscribers. If, like me, you have these projects in one solution, you’ll need to set both the Console application and the Windows Forms application to run at startup. Go ahead and run the subscriber and the publisher. You should be able to send messages form the form to the console.
The next step
So we can send messages from a to b. And we know that if b is disconnected, it will receive the messages as soon as it reconnects. To make this a little more interesting, how about a form that is both a publisher and a subscriber?
A little light refactoring
The obvious thing about SimpleTopicPublisher and SImpleTopicSubscriber is that they use their own connections and sessions. If we are going to publish and subscribe from the same form, it’d make more sense to share connections and sessions. Here’s the new code for SimpleTopicSubscriber:
using System; using System.Collections.Generic; using System.Text; using ActiveMQ; using NMS; namespace Core { public delegate void MessageReceivedDelegate(string message); public class SimpleTopicSubscriber : IDisposable { private readonly IMessageConsumer consumer; private bool isDisposed = false; public event MessageReceivedDelegate OnMessageReceived; public SimpleTopicSubscriber(IMessageConsumer consumer) { this.consumer = consumer; this.consumer.Listener += new MessageListener(OnMessage); } public void OnMessage(IMessage message) { ITextMessage textMessage = message as ITextMessage; if (this.OnMessageReceived != null) { this.OnMessageReceived(textMessage.Text); } } #region IDisposable Members public void Dispose() { if (!this.isDisposed) { this.consumer.Dispose(); this.isDisposed = true; } } #endregion } }Now we pass an instance of IMessageConsumer in the constructor. We can make a similar change to SimpleTopicPubisher by passing an instance of IMessageProducer in the constructor. Here’s the code:
using System; using System.Collections.Generic; using System.Text; using ActiveMQ; using ActiveMQ.Commands; using NMS; namespace Core { public class SimpleTopicPublisher : IDisposable { private readonly IMessageProducer producer; private bool isDisposed = false; public SimpleTopicPublisher(IMessageProducer producer) { this.producer = producer; } public void SendMessage(string message) { if (!this.isDisposed) { ITextMessage textMessage = new ActiveMQTextMessage(message); this.producer.Send(textMessage); } else { throw new ObjectDisposedException(this.GetType().FullName); } } #region IDisposable Members public void Dispose() { if (!this.isDisposed) { this.producer.Dispose(); this.isDisposed = true; } } #endregion } }Since we probably don’t want to put all the code that creates IMessageConsumer instances and IMessageProducer interfaces into our form, we’ll need a new class that takes that responsibility. Add a new class to the Core project and call it TopicConnection. The code for the class is listed below:
using System; using System.Collections.Generic; using System.Text; using ActiveMQ; using ActiveMQ.Commands; using NMS; namespace Core { public class TopicConnection : IDisposable { private readonly IConnection connection; private readonly ISession session; private readonly ITopic topic; private bool isDisposed = false; public TopicConnection(IConnectionFactory connectionFactory, string clientId, string topicName) { this.connection = connectionFactory.CreateConnection(); this.connection.ClientId = clientId; this.connection.Start(); this.session = this.connection.CreateSession(); this.topic = new ActiveMQTopic(topicName); } public SimpleTopicPublisher CreateTopicPublisher() { IMessageProducer producer = this.session.CreateProducer(this.topic); return new SimpleTopicPublisher(producer); } public SimpleTopicSubscriber CreateSimpleTopicSubscriber(string consumerId) { IMessageConsumer consumer = this.session.CreateDurableConsumer(this.topic, consumerId, "2 > 1", false); return new SimpleTopicSubscriber(consumer); } #region IDisposable Members public void Dispose() { if (!this.isDisposed) { this.session.Dispose(); this.connection.Dispose(); this.isDisposed = true; } } #endregion } }This class has a method to create a SimpleTopicSubscriber and another to create a SimpleTopicPublisher. The constructor takes an instance of IConnectionFactory, the client id and the name of the topic. To complete this exercise, let’s add a class called TopicConnectionFactory. This is a simple class that does little more than hold a reference to an instance of IConnectionFactory. Here’s the code:
using System; using System.Collections.Generic; using System.Text; using ActiveMQ; using NMS; namespace Core { public class TopicConnectionFactory { private readonly IConnectionFactory connectionFactory; public TopicConnectionFactory(IConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public TopicConnection CreateConnection(string clientId, string topicName) { return new TopicConnection(this.connectionFactory, clientId, topicName); } } }Now the Core code is in better shape, let’s sort the form out. We want to be able to set the client id from the form, so add a label, a text box and a button. Call the label clientIdLabel and set the text to “Client Id”. Call the text box clientIdTextBox. Call the button connectButton and set the text to “Connect”. Wire up an event handler to the click event of the button. Here’s the code for the event handler:
private void connectButton_Click(object sender, EventArgs e) { try { this.clientId = this.clientIdTextBox.Text; this.consumerId = this.clientId; this.connection = this.connectionFactory.CreateConnection(this.clientId, TOPIC_NAME); this.publisher = this.connection.CreateTopicPublisher(); this.subscriber = this.connection.CreateSimpleTopicSubscriber(this.consumerId); this.subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived); this.clientIdLabel.Enabled = false; this.clientIdTextBox.Enabled = false; this.connectButton.Enabled = false; this.messageTextBox.Enabled = true; this.instructionLabel.Enabled = true; this.historyTextBox.Enabled = true; this.submitButton.Enabled = true; } catch (Exception ex) { MessageBox.Show(ex.ToString()); this.Close(); } }So that the publish and subscribe bits are disabled (until a connection is established, remove the form load event and set the enabled property of messageTextBox, instructionLabel and submitButton to false. You can also see from the code above that I’ve added another text box called historyTextBox. This text box has the MultiLine property set to true. Here’s how the form looks:
![]()
Here’s the full listing for the form:
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Text; using System.Windows.Forms; using Core; using ActiveMQ; namespace ActiveMqMessageChat { public partial class MainForm : Form { const string TOPIC_NAME = "SampleSubscriptionTopic"; const string BROKER = "tcp://localhost:61616"; private readonly TopicConnectionFactory connectionFactory = new TopicConnectionFactory(new ConnectionFactory(BROKER)); private TopicConnection connection; private SimpleTopicPublisher publisher; private SimpleTopicSubscriber subscriber; private string clientId; private string consumerId; private readonly StringBuilder builder = new StringBuilder(); private delegate void SetTextCallback(string text); public MainForm() { InitializeComponent(); } private void submitButton_Click(object sender, EventArgs e) { this.publisher.SendMessage(this.messageTextBox.Text); } private void subscriber_OnMessageReceived(string message) { this.builder.AppendLine(message); SetText(this.builder.ToString()); } private void SetText(string text) { // InvokeRequired required compares the thread ID of the // calling thread to the thread ID of the creating thread. // If these threads are different, it returns true. if (this.historyTextBox.InvokeRequired) { SetTextCallback d = new SetTextCallback(SetText); this.Invoke(d, new object[] { text }); } else { this.historyTextBox.Text = text; } } private void MainForm_FormClosed(object sender, FormClosedEventArgs e) { try { this.publisher.Dispose(); this.subscriber.Dispose(); this.connection.Dispose(); } catch { } } private void connectButton_Click(object sender, EventArgs e) { try { this.clientId = this.clientIdTextBox.Text; this.consumerId = this.clientId; this.connection = this.connectionFactory.CreateConnection(this.clientId, TOPIC_NAME); this.publisher = this.connection.CreateTopicPublisher(); this.subscriber = this.connection.CreateSimpleTopicSubscriber(this.consumerId); this.subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived); this.clientIdLabel.Enabled = false; this.clientIdTextBox.Enabled = false; this.connectButton.Enabled = false; this.messageTextBox.Enabled = true; this.instructionLabel.Enabled = true; this.historyTextBox.Enabled = true; this.submitButton.Enabled = true; } catch (Exception ex) { MessageBox.Show(ex.ToString()); this.Close(); } } } }The launcher
To make this a touch simpler to run, add another form and call it LaunchForm. Make this form the form that gets run from the Program class. Add one button called launchButton and set the text to “Create Client”. Wire up an event handler so that every time the button is clicked, a new instance of MainForm is created.
OK. Time to run the new form. Create two instances and marvel at the ability to see messages sent to and fro. Remember the purpose of the client id. With two forms you can see the same message sent to two subscribers. If you’ve got the console running, too then you can see the same message being sent to three subscribers.
And finally
It doesn’t end here. The code in this article is an entry point to publish-subscribe with ActiveMQ and NMS. There’s a lot more you can do. Take a look at Virtual Destinations and Subscription Recovery Policy for starters.
Technorati Tags: activemq, nms, c#, software, design, development, opensourceShare this:
- X
- More
Tag » Activemq Topic String
-
Advisory Message - ActiveMQ - The Apache Software Foundation!
-
Apache ActiveMQ ™ -- Handling Advisory Messages
-
Queues Vs Topics And Examples With Java, Spring Boot And Apache ...
-
How To Configure Topic/Queue On Active MQ? | H2kinfosys Blog
-
Create A Durable Topic Subscriber For ActiveMQ - Level Up Coding
-
Chapter 11. ActiveMQ Broker Features In Action - LiveBook · Manning
-
Chapter 8. Addresses, Queues, And Topics Red Hat AMQ 7.2
-
A Java Object That Handles Sending/publishing A ... - Gists · GitHub
-
Better To Filter JMS Message By Topic Name Or String Property?
-
Differences Between Azure Service Bus And Apache ActiveMQ
-
Working Examples Of Using Java Message Service (JMS) With ...
-
Org.mand.tTopicName Java ...
-
Creating Destinations In A JMS Application - IBM
-
ActiveMQ Architecture And Key Metrics - Datadog