Spring Boot + Camel + Atmosphere-websocket

Technology

We all know that clear documentation with great and simple examples can save hours of development time. That is why I decided to create simple examples, using popular night now technology Spring Boot with Camel component Atmosphere-Websocket. You can read about the Atmosphere project here.

Lets go to the task.

You need to do an integration project with consume/produce from/to websocket endpoint. Camel can provide three components to solve this kind of problem:

All of them have their own benefits and drawbacks. We are not going to compare them in this blog post, but concentrate on implementation with atmosphere-websocket.

Step 1: pom.xml

Finding the right combination of libs is a very important task. Most of the problems you can encounter are due to an incompatibility issue.

Here I will show compatible versions for this moment of time.

<java.version>1.8</java.version>
<spring-boot.version>1.5.6.RELEASE</spring-boot.version>
<camel-spring-boot.version>2.20.0</camel-spring-boot.version>
<maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
<atmosphere.version>2.4.10</atmosphere.version>
<jetty.version>9.4.6.v20170531</jetty.version>
<!-- Tomcat -->
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Jetty -->
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 <exclusions>
 <exclusion>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-tomcat</artifactId>
 </exclusion>
 </exclusions>
</dependency>
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-jetty</artifactId>
</dependency>
<dependency>
 <groupId>org.eclipse.jetty.websocket</groupId>
 <artifactId>websocket-api</artifactId>
 <version>${jetty.version}</version>
</dependency>
<dependency>
 <groupId>org.eclipse.jetty.websocket</groupId>
 <artifactId>websocket-client</artifactId>
 <version>${jetty.version}</version>
</dependency>
<dependency>
 <groupId>org.eclipse.jetty.websocket</groupId>
 <artifactId>websocket-common</artifactId>
 <version>${jetty.version}</version>
</dependency>
<dependency>
 <groupId>org.eclipse.jetty.websocket</groupId>
 <artifactId>websocket-servlet</artifactId>
 <version>${jetty.version}</version>
</dependency>

Note!
Camel 2.20.0 is not released yet, so you might want to wait.

During the creation of those examples I had to fix the camel atmosphere-websocket component itself and these changes will be released soon. You can also check out camel sources, build it and try it yourself.

With these changes a way of atmosphere framework was implemented. You can view these changes here: github

Step 2: @Configuration

I have created a configuration class which basically does two things:

  • Registration of atmosphere-websocket servlet as a bean
@Bean
 public ServletRegistrationBean camelWebSocketServlet() {
 ServletRegistrationBean bean = new ServletRegistrationBean(new CamelWebSocketServlet(), "/camel/*");
 
//!!! Remember always set name "CamelWsServlet" - nothing else should be used.
 // If you use another name your consumer will not be registered and nothing will work.
 bean.setName("CamelWsServlet");
 bean.setLoadOnStartup(0);
 // Need to occur before the EmbeddedAtmosphereInitializer
 bean.setOrder(Ordered.HIGHEST_PRECEDENCE);
 return bean;
 }
  • Created EmbeddedAtmosphereInitializer as a bean
@Bean
 public EmbeddedAtmosphereInitializer atmosphereInitializer() {
 return new EmbeddedAtmosphereInitializer();
 }
 
private static class EmbeddedAtmosphereInitializer extends ContainerInitializer
 implements ServletContextInitializer {
 
@Override
 public void onStartup(ServletContext servletContext) throws ServletException {
 onStartup(Collections.<Class<?>> emptySet(),servletContext);
 }
 }

In case you want to enable the events notification feature, you should provide additional parameters during the servlet registration (see example 2).

// Enables events notification feature
Map<String,String> params = new HashMap<>();
params.put("events","true");
bean.setInitParameters(params);

Step 3: RouteBuilder
Nothing special in example 1, it’s a regular camel route DSL. Example 2 is with events notification enabled. It might be useful to have the following structure:

public void configure() {
 from("atmosphere-websocket:///chat")
 .choice()
 .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONOPEN_EVENT_TYPE))
 .process(websocketSessionProcessor)
 .to("atmosphere-websocket:///chat")
 .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONCLOSE_EVENT_TYPE))
 .process(websocketSessionProcessor)
 .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONERROR_EVENT_TYPE))
 .process(websocketSessionProcessor)
 .when(header(WebsocketConstants.ERROR_TYPE).isEqualTo(WebsocketConstants.MESSAGE_NOT_SENT_ERROR_TYPE))
 .process(websocketNotDeliveredMessageProcessor)
 .otherwise()
 .process(websocketMessageProcessor).to("seda:messages");</pre>
<pre>

As you can see – when we enabled the events notification feature we started receiving events onOpen/onClose/OnError and also when the message has not been sent. The message header will then contain EVENT_TYPE or ERROR_TYPE and CONNECTION_KEY .

EVENT_TYPE defines next states:

int ONOPEN_EVENT_TYPE = 1;
int ONCLOSE_EVENT_TYPE = 0;
int ONERROR_EVENT_TYPE = -1;

CONNECTION_KEY defines the id of each connection (you can think about it like a user connection and base several features on top of this idea – for example if you are building social network messenger and want to indicate which of the users that are currently active such as green dots in Facebook)

Then it is easy to light up green on an onOpen event and turn it off when you get an onClose/onError even with the same connection key.

MESSAGE_NOT_SENT_ERROR_TYPE defines when a message you sent to the websocket component was not delivered to a specific user (connection key) because it disappeared (closed connection for example). The message containing the header (the message that could not be delivered) will contain all necessary information for correct error handling (recipient connection key, message text etc.), so that you can fix the error and redeliver it later.

Those features existed in the camel component for 2 years, but were undocumented. You can get more information from jira: CAMEL-9364, CAMEL-9392, CAMEL-9393.

It is up to you how to use those features.

Code available on my git.

Written by: Pavlo Kletsko