This Post explains Topics in WSO2 Message Broker (MB) with Subscribing and Publishing.For this we will write two java clients.
  • TopicSubscriber.java to Subcribe for messages
  • TopicPublisher.java to to Publish the messages

Let's Start.

[1] Get WSO2 MB from http://wso2.com/products/message-broker/
[2] Create Porject "Client" on IDE that you preferred
[3] Add below to lib Dir in the project (Those jars can be found in Client Lib in MB)

  • andes-client-0.13.wso2v4.jar
  • geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
  • log4j-1.2.17.jar
  • org.wso2.carbon.event.client-4.0.0.jar
  • org.wso2.carbon.event.client.stub-4.0.0.jar
  • slf4j-1.5.10.wso2v1.jar

[4] Creat class "TopicSubscriber.java" to Subcribe for messages

 
package simple;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class TopicSubscriber {

 private String topicName = "news.sport";
 private String initialContextFactory = "org.wso2.andes.jndi."
+"PropertiesFileInitialContextFactory";
 private String connectionString = "amqp:"
+"//admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
 private boolean messageReceived = false;

 public static void main(String[] args) {
  TopicSubscriber subscriber = new TopicSubscriber();
  subscriber.subscribeWithTopicLookup();
 }

 public void subscribeWithTopicLookup() {

  Properties properties = new Properties();
  TopicConnection topicConnection = null;
  properties.put("java.naming.factory.initial", initialContextFactory);
  properties.put("connectionfactory.QueueConnectionFactory",
    connectionString);
  properties.put("topic." + topicName, topicName);
  try {
   InitialContext ctx = new InitialContext(properties);
   TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
     .lookup("QueueConnectionFactory");
   topicConnection = topicConnectionFactory.createTopicConnection();
   System.out
     .println("Create Topic Connection for Topic " + topicName);

   while (!messageReceived) {
    try {
     TopicSession topicSession = topicConnection
       .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

     Topic topic = (Topic) ctx.lookup(topicName);
     // start the connection
     topicConnection.start();

     // create a topic subscriber
     javax.jms.TopicSubscriber topicSubscriber = topicSession
       .createSubscriber(topic);

     TestMessageListener messageListener = new TestMessageListener();
     topicSubscriber.setMessageListener(messageListener);

     Thread.sleep(5000);
     topicSubscriber.close();
     topicSession.close();
    } catch (JMSException e) {
     e.printStackTrace();
    } catch (NamingException e) {
     e.printStackTrace();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }

  } catch (NamingException e) {
   throw new RuntimeException("Error in initial context lookup", e);
  } catch (JMSException e) {
   throw new RuntimeException("Error in JMS operations", e);
  } finally {
   if (topicConnection != null) {
    try {
     topicConnection.close();
    } catch (JMSException e) {
     throw new RuntimeException(
       "Error in closing topic connection", e);
    }
   }
  }
 }

 public class TestMessageListener implements MessageListener {
  public void onMessage(Message message) {
   try {
    System.out.println("Got the Message : "
      + ((TextMessage) message).getText());
    messageReceived = true;
   } catch (JMSException e) {
    e.printStackTrace();
   }
  }
 }

}

 
[5] Creat class "TopicPublisher.java" to to Publish the messages
 
package simple;

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class TopicPublisher {
 private String topicName = "news.sport";
 private String initialContextFactory = "org.wso2.andes.jndi."
 +"PropertiesFileInitialContextFactory";
 private String connectionString = "amqp:"
 +"//admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";

 public static void main(String[] args) {
  TopicPublisher publisher = new TopicPublisher();
  publisher.publishWithTopicLookup();
 }

 public void publishWithTopicLookup() {
  Properties properties = new Properties();
  TopicConnection topicConnection = null;
  properties.put("java.naming.factory.initial", initialContextFactory);
  properties.put("connectionfactory.QueueConnectionFactory",
    connectionString);
  properties.put("topic." + topicName, topicName);

  try {
   // initialize
   // the required connection factories
   InitialContext ctx = new InitialContext(properties);
   TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
     .lookup("QueueConnectionFactory");
   topicConnection = topicConnectionFactory.createTopicConnection();

   try {
    TopicSession topicSession = topicConnection.createTopicSession(
      false, Session.AUTO_ACKNOWLEDGE);
    // create or use the topic
    System.out.println("Use the Topic " + topicName);
    Topic topic = (Topic) ctx.lookup(topicName);
    javax.jms.TopicPublisher topicPublisher = topicSession
      .createPublisher(topic);

    String msg = "Hi, I am Test Message";
    TextMessage textMessage = topicSession.createTextMessage(msg);

      topicPublisher.publish(textMessage);
    System.out.println("Publishing message " +textMessage);
    topicPublisher.close();
    topicSession.close();

    Thread.sleep(20);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }

  } catch (JMSException e) {
   throw new RuntimeException("Error in JMS operations", e);
  } catch (NamingException e) {
   throw new RuntimeException("Error in initial context lookup", e);
  }
 }

}

 
[6] Firstly Run "TopicSubscriber.java" and then run "TopicPublisher.java"

Here is out put from both
TopicSubscriber::

Create Topic Connection for Topic news.sport
Got the Message : Hi, I am Test Message

TopicPublisher::

Use the Topic news.sport Publishing message
Body: Hi, I am Test Message
JMS Correlation ID: null
JMS timestamp: 1359720212306
JMS expiration: 0
JMS priority: 4
JMS delivery mode: 2
JMS reply to: null
JMS Redelivered: false
JMS Destination: topic://amq.topic/news.sport/?routingkey='news.sport'&exclusive='true'&autodelete='true'
JMS Type: null
JMS MessageID: ID:d7915d2c-6ddc-3b8a-b1aa-7a63009c6cae
JMS Content-Type: text/plain AMQ message number: -1
Properties:
 JMS_QPID_DESTTYPE = 2

 [More] Here is full message that we have send to TopicSubscriber. We can get that any parameter in above.
Here is sample to get TimeStamp and ID from JMS message.

 public class TestMessageListener implements MessageListener {
  public void onMessage(Message message) {
   try {
    System.out.println("Got the Message  TimeStamp: "
      +  message.getJMSTimestamp());
    System.out.println("Got the Message JMS ID : "
      +  message.getJMSMessageID());
    messageReceived = true;
   } catch (JMSException e) {
    e.printStackTrace();
   }
  }
 } 
 
1

View comments

  1. Hi,

    how can we het the topics list? I dont want to hard code my topic name... Is there a way to get list of all topics?

    Thanks.
    Naveen

    ReplyDelete
Prevent the breaking of a Singleton Class Pattern
Prevent the breaking of a Singleton Class Pattern
13
Design Patterns for Microservices
Design Patterns for Microservices
12
API Monetization Models
API Monetization Models
1
Kubernetes command-line tool for Windows
2
WSO2 Enterprise Integrator with message broker profile
1
Messaging Patterns on Enterprise integration
Messaging Patterns on Enterprise integration
2
Writing Micro Services with msf4j
SMPP to wso2 ESB / EI
1
WSO2 APIM - Deployment Patterns and Profiles
SMS with WSO2 ESB
5
Reading Value from uri-template in WS02 ESB
1
Estimation for Software project development
Estimation for Software project development
1
JAVA8 Stream API and New Class Optional
Lifecycle of a Book in WSO2 Greg
1
Enterprise Data integration Directions
Enterprise Data integration Directions
Handling BigDecimal in Talend
Vehicles registration services - Part 01– PayloadFactory and Validate with JSON
1
Handling simple denormalized data from Talend
WSO2 ESB with JavaScript Object Notation
Cleaning OSSIM Alarms
Syscheck in OSSEC
Syscheck in OSSEC
Triggering action or email over the event occurrence in OSSIM
Adding More user data field for Event
Connecting to OSSEC rule from OSSIM
Creating New Rule set for OSSEC Server
1
OSSEC Rule Testing
Sending Brute force attack
DiskPart in window (Fdisk in windows 8)
Uncomplicated Firewall
Uncomplicated Firewall
Grep quotes in Linux
Grep quotes in Linux
OSSEC Decoder
How access log work with OSSIM
HIDS Agentless in AlienVault USM
Install OSSIM
OSSEC configure to new log file
OSSEC configure to new log file
Testing Log forwarding in OSSEC
Host based firewall in Linux
Adding OSSEC client to OSSEC Server
Creating Correlation Rules and Alarms in AlienVault
Advance Tutorial in OSSIM Directive
Simple OSSIM Directives
Making OSSIM Alarm from Event
Reading a custom log file from OSSIM
2
OSSIM components
module.exports VS exports
Adding agent for OSSIM from OSSEC
OSSEC service for Centos7
1
JavaScript references with setTimeout() and setInterval()
JavaScript references with setTimeout() and setInterval()
CROS in Node
CROS in Node
Node.JS with Express Session
Node.JS with Express Session
dhis2-android-dashboard Build from Source
Python make life easy
Python make life easy
Installing NodeJS in CentOS
Packaging and Distributing Python Projects
Zeppelin Data Validation Service
Zeppelin Data Validation Service
Introducing New Chart Library and Types for Apache Zeppelin
1
Tutorial with Map Visualization in Apache Zeppelin
Zeppelin Docs
Data validation
Generate a AngularJS application with grunt and bower
7
Git simple Feature Branch Workflow
Git simple Feature Branch Workflow
Workflows for Git
Workflows for Git
1
Chart Types and Data Models in Google Charts
Options for Google Charts
Google Chart with AngularJS
3
Grammar induction
Adding Configuration file for Python
NLTK tutorial–03 (n-gram)
NLTK tutorial–02 (Texts as Lists of Words / Frequency words)
Natural Language Toolkit (NLTK) sample and tutorial - 01
AffinityPropagation Clustering Algorithm
AffinityPropagation Clustering Algorithm
Building Zeppelin in windows 8
1
Zeppelin Note for load data and Analyzing
Zeppelin NoteBook
Data Binding in Angular
5
AngularJS and Angular Directives
AngularJS and Angular Directives
19
Density-based clustering algorithm (DBSAN) and Implementation
scikit-learn to generate isotropic Gaussian blobs
1
CouchDB 2.0 (Developer Preview) with HTTP APIs
CouchDB-fauxton introduction
2
Building Apache Zeppelin
CouchDB with Fauxton in windows 8
Installing Flask in Windows8
1
Basic Functionality of Series or DataFrame in Pandas
Pandas for Data Manipulation and Analysis
Pandas for Data Manipulation and Analysis
Install python 2.7.X on Ubuntu
Install python 2.7.X on Ubuntu
Python For Beginners
Python with CSV
Python Class
Python Class
Maven 3.3.x for Mint
Bower: Front-end Package Manager
Predictive modeling
Predictive modeling
Gaussian function
Installing external package for canopy / (python)
Regular Expressions with Python
2
Python Code for File reading
Estimation Theory
AngularJS and solr
3
Apache Solr and Search
Loading