Thursday, December 19, 2013

Lambda'ery WebSocket code (from UKTECH13 presentation)

At UKTECH13 I was asked to post the source code to the WebSocket used in my presentation, primarily because it was using JDK 8 constructs that were unfamiliar to many. One of the very nice things about the changes to the languages and the supporting library changes is the lack of if statements in the code.

I would note that in a real world chat application that it unlikely different rooms would have different resource URIs but for the purposes of the presentation this made sense.

package websocket;

import static java.util.Collections.emptySet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/chat/{room}")
public class ChatService {

  private static final Set<Session> EMPTY_ROOM = emptySet();

  static final ConcurrentMap<String, Set<Session>> rooms =
    new ConcurrentHashMap<>(); 


  @OnOpen
  public void onOpen(Session peer, @PathParam("room") String room) {
    rooms.computeIfAbsent(room,
                          s -> new CopyOnWriteArraySet<Session>()).add(peer);
  }

  @OnClose
  public void onClose(Session peer, @PathParam("room") String room) {

    rooms.getOrDefault(room, EMPTY_ROOM).remove(peer);
    
  }

  @OnError
  public void onError(Session peer, Throwable th,
                      @PathParam("room") String room) {
    System.out.println("Peer error " + room + " " + th);
  }


  @OnMessage
  public void message(String message, Session peer,
                      @PathParam("room") String room) {

    // Send a message to all peers in a room who are not the current
    // peer and are still open. Send the message asynchronously to ensure
    // that the first client is not hung up. 

    rooms.getOrDefault(room, EMPTY_ROOM).parallelStream()
         .filter(s -> s != peer && s.isOpen())
         .forEach(s -> s.getAsyncRemote().sendObject(message));
  };

}

One of the problem with the above design is that there is no error logging when an invalid room is used. This could be useful to diagnose errors. Not wanting to use any conditional statements you could use an Optional object:

import java.util.Optional;


  @OnClose
  public void onClose(Session peer, @PathParam("room") String room) {

    Optional.ofNullable(rooms.get(room))
      .orElseThrow(() -> new IllegalStateException("Cannot find room " + room))
      .remove(peer);
    
  }


Of course you might want this on your method objects, so you can use default methods to create a mixin for this on all your Map objects with a trivial subclass.

import java.util.Optional;


  public interface OptionalMap<K,V> extends ConcurrentMap<K,V> {
     public default Optional<V> find(K key) {
        return Optional.ofNullable(get(key));
     }
  }

  public static class OptionalHashMap<K,V> extends ConcurrentHashMap<K,V>
   implements OptionalMap<K,V> {
       
  }
    

  static final OptionalMap<String, Set<Session>> rooms =
    new OptionalHashMap<>(); 


  @OnClose
  public void onClose(Session peer, @PathParam("room") String room) {

    rooms.find(room)
      .orElseThrow(() -> new IllegalStateException("Cannot find room " + room))
      .remove(peer);
    
  }





Whilst working on my presentation it became apparent that it was also possible to use the "openSessions" and "getUserProperties" method to store discrimination data against the Session. I don't have enough experience yet to say which is the better design for a particular case.

package websocket;


import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/chat/{room}")
public class ChatService {

  private static final String ROOM_PROPERTY = "ROOM";


  @OnOpen
  public void onOpen(Session peer, @PathParam("room") String room) {
    peer.getUserProperties().put(ROOM_PROPERTY, room);
  }

  @OnClose
  public void onClose(Session peer, @PathParam("room") String room) {
    
    // No need to tidy up and data is store against peer
  }

  @OnError
  public void onError(Session peer, Throwable th,
                      @PathParam("room") String room) {
    System.out.println("Peer error " + room + " " + th);
  }


  @OnMessage
  public void message(String message, Session peer,
                      @PathParam("room") String room) {

    peer.getOpenSessions().parallelStream()
         .filter(s -> room.equals(s.getUserProperties().get(ROOM_PROPERTY)))
         .filter(s -> s != peer && s.isOpen())
         .forEach(s -> s.getAsyncRemote().sendObject(message));
  };

}

Wednesday, December 4, 2013

Lambda, will it serialize?

So I have been ponder an enhancement required on the Tyrus project that would allow a user to broadcast to a subset of client connected to a URL across a cluster of machines. There are various way of doing this; but since I was playing with JDK 8 this problem definitely looked like a nail.

To this end I created a simple unit test class that would take my filter, serialise it to disk, read it back and in then execute it. It had a instance field "VALUE" that we could use to reference directly or indirectly to find out what would cause the serialisation to fail.

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.Serializable;

import java.util.function.Predicate;

import org.junit.Test;

public class SerializablePredicateFilterTest {

  public String VALUE = "Bob";

  public interface SerializablePredicate<T> extends Predicate<T>, Serializable {

  }


  public <T> void filter(SerializablePredicate<T> sp, T value) throws IOException, ClassNotFoundException {


    sp.getClass().isLocalClass();

    File tempFile = File.createTempFile("labmda", "set");


    try (ObjectOutput oo = new ObjectOutputStream(new FileOutputStream(tempFile))) {
      oo.writeObject(sp);
    }


    try (ObjectInput oi = new ObjectInputStream(new FileInputStream(tempFile))) {
      SerializablePredicate<T> p = (SerializablePredicate<T>) oi.readObject();

      System.out.println(p.test(value));
    }


  }

}


So just to calibrate lets make sure that an anonymous inner class will fail, because it will always contain a reference to enclosing object....

  @Test(expected = NotSerializableException.class)
  public void testAnonymousDirect() throws IOException, ClassNotFoundException {

    String value = VALUE;


    filter(new SerializablePredicate<String>() {

      @Override
      public boolean test(String t) {
        return value.length() > t.length();
      }
    }, "Bob");

  }


The same is true for local classes, what you don't use local classes?

  @Test(expected = NotSerializableException.class)
  public void testLocalClass() throws IOException, ClassNotFoundException {

    class LocalPredicate implements SerializablePredicate<String> {
      @Override
      public boolean test(String t) {
        // TODO Implement this method
        return false;
      }
    }

    filter(new LocalPredicate(), "Bobby");

  }


So a standalone class will of course work, in this case a nested class for convenience.

  public static class LengthPredicate implements SerializablePredicate<String> {

    private String value;


    public LengthPredicate(String value) {
      super();
      this.value = value;
    }

    public void setValue(String value) {
      this.value = value;
    }

    public String getValue() {
      return value;
    }

    @Override
    public boolean test(String t) {
      // TODO Implement this method
      return false;
    }
  }


  @Test
  public void testStaticInnerClass() throws IOException, ClassNotFoundException {

    filter(new LengthPredicate(VALUE), "Bobby");

  }


So lets get down with JDK 8, it turns out that my first try also fails but it does confirm that the serialisation is quite happy to take a Lambda in general.

  @Test(expected = NotSerializableException.class)
  public void testLambdaDirect() throws IOException, ClassNotFoundException {


    filter((String s) -> VALUE.length() > s.length(), "Bobby");

  }


A slight modification to copy the value into a effectively final attributes, and voila the lambda is now serialised and retrieved properly.

  @Test
  public void testLambdaInDirect() throws IOException, ClassNotFoundException {

    String value = VALUE;

    filter((String s) -> value.length() > s.length(), "Bobby");

  }


And of course if the value is a simple method parameter it also works fine.


  @Test
  public void testLambdaParameter() throws IOException, ClassNotFoundException {

    invokeWithParameter(VALUE);

  }

  private void invokeWithParameter(String value) throws java.lang.ClassNotFoundException, java.io.IOException {
    filter((String s) -> value.length() > s.length(), "Bobby");
  }


So the answer is yes, you can get it to serialise if you are a bit careful.