Czy WebSocket będzie dobry do setek tysiecy klientów?

1

Może to banał, ale wydaje mi się, że to powinno iść po HTTPS (łącznie z handshake websocket). Też łatwiej przesłać websocket np. przez proxy jak handshake był zrobiony po HTTPS.

1

Z websocketem nie da się zresztą inaczej niż wysłać od klienta "poświadczenia" jako message albo cześć URL (bo headerów się nie da -( ewentualnie cookie działa)).

A co do pytani a jetty ws Ratpack. Jetty to nadal Webserver, który działa na servletach i nie jest naturalnie "non-bloking".
Mi Jetty startuje około 100-200 ms. To jest super w porównaniu do Tomcat, ale nadal bieda z nedzą vs Ratpack (10-20 ms). A to zmienia zupełnie jakość testowania.

0

Raczej to po HTTPS powinno iść uwierzytelnianie, taki mi się wydaje, a potem tunelowowane połączenie WebSocket powinno już być bezpieczne.

Z drugiej strony po co używać czystych WebSocketów, skoro jest STOMP. Umożliwia również mechanizmy cache'owania podobnie jak HTTP (w nagłówku) więc to znacznie bardziej przyjazny protokół. Skalowanie STOMP'a powinno też być łatwiejsze przez np. RabbitMQ.

0

Kombinuje z Jetty Https, i nie mogę się połączyć do websocketu. Tak jakby zawieszał się na łączeniu.
Kod serwera:

public class ServerProcess extends WebSocketHandler {

	public static void main(String[] args) {
		int serverPort = ServerConfiguration.getConfiguration().getServerPort();
		HttpConfiguration http_config = new HttpConfiguration();
		http_config.setSecureScheme("https");
		http_config.setSecurePort(serverPort);

		HttpConfiguration https_config = new HttpConfiguration(http_config);
		https_config.addCustomizer(new SecureRequestCustomizer());
		SslContextFactory sslContextFactory = new SslContextFactory();
		sslContextFactory.setKeyStorePath("resource/keystore.jks");
		sslContextFactory.setKeyStorePassword("159357258456");

		Server server = new Server();
		ServerConnector connector = new ServerConnector(server,
				new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()));
		connector.setPort(serverPort);
		server.addConnector(connector);

		try {
			server.start();
			server.dump(System.err);
			server.join();
		} catch (Exception e) {
		}
	}

	@Override
	public void configure(WebSocketServletFactory factory) {
		// TODO Auto-generated method stub
		factory.register(SocketService.class);
	}

}

Kod Handlera:

@ClientEndpoint
@ServerEndpoint(value = "/s")
public class SocketService {

	@OnOpen
	public void onWebSocketConnect(Session session) {
		System.out.println("Some client connected!");
	}

	@OnMessage
	public void onWebSocketText(String message) {
		System.out.println("Received TEXT message: " + message);
	}

	@OnClose
	public void onWebSocketClose(CloseReason reason) {
	}

	@OnError
	public void onWebSocketError(Throwable cause) {
		
	}
}

KOD CLIENTA:

public class CLient {

	public static void main(String[] args) {

		final String WS_URL = "ws://0.0.0.0:6036";

	    MyListener socket = new MyListener("Hello world");
	    SslContextFactory sslContextFactory = new SslContextFactory();
	    sslContextFactory.setKeyStorePath("resource/keystore.jks");
	    sslContextFactory.setKeyStorePassword("159357258456");
	    //sslContextFactory.setTrustAll(true); 
	    WebSocketClient client = new WebSocketClient(sslContextFactory);

	    try {
	        client.start();
	        URI uri = new URI(WS_URL);
	        ClientUpgradeRequest cur = new ClientUpgradeRequest();
	        client.connect(socket, uri, cur);
	        socket.awaitClose(5, TimeUnit.SECONDS);
	    } catch (Throwable t) {
	        t.printStackTrace();
	    } finally {
	        try {
	            client.stop();
	        } catch (Exception e) {
	            e.printStackTrace();
	        }
	    }
	}
}

KOD Listenera clienta:

public class MyListener implements WebSocketListener {
	 private Session mSession;
	    private final String mRequest;
	    private final CountDownLatch mCloseLatch;

	    public MyListener(String str) {
	        mCloseLatch = new CountDownLatch(1);
	        mRequest = str;
	    }

	    public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
	        return mCloseLatch.await(duration, unit);
	    }
	    
	    private void disconnect() {
	        if (mSession != null && mSession.isOpen()) {
	            mSession.close();
	        }
	        
	        mCloseLatch.countDown();
	    }
	    
	    @Override
	    public void onWebSocketConnect(Session session) {
	        System.out.printf("onWebSocketConnect: %s%n", session.getRemoteAddress());
	        
	        try {
	            System.out.printf("REQUEST: %n%s%n", mRequest);

	            session.getRemote().sendString("bla!!!");
	        } catch (IOException ex) {
	            System.err.println(ex);
	            return;
	        }
	        
	        mSession = session;
	    }

	    @Override
	    public void onWebSocketClose(int statusCode, String reason) {
	        System.out.printf("onWebSocketClose: %d %s%n", statusCode, reason);
	    }

	    @Override
	    public void onWebSocketText(String str) {
	        System.out.printf("RESPONSE: %n%s%n", str);
	        disconnect();
	    }
	    
	    @Override
	    public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
	        System.out.println("onWebSocketBinary");
	        disconnect();
	    }

	    @Override
	    public void onWebSocketError(Throwable cause) {
	        System.out.printf("onWebSocketError: %s%n", cause);
	        disconnect();
	}
}

Co jest źle? Cuduje już kilka godzin i nic...

1

Jeśli masz mieć setki tysięcy równoczesnych długich połączeń na websocketach, to musisz mieć po stronie serwera coś, co obsługuje całe I/O asynchronicznie. Nie da się tego zrobić żadnym frameworkiem, który na jedno połączenie klienta rezerwuje jeden wątek, jak np. Java EE, Spring, czy inne frameworki RPC.

W przypadku Javy sprawdzonym rozwiązaniem w takich sytuacjach jest Netty. Milion równoczesnych połączeń TCP jeden serwer bez problemów uciągnie (a nawet 10 milionów), choć oczywiście może się okazać, że jeden serwer nie pociągnie całej logiki biznesowej, którą chcesz realizować, a wtedy będziesz musiał rozpraszać to na więcej serwerów. ;)

Tu masz przykłady jak to zrobić:
https://github.com/smallnest/C1000K-Servers

// dopisane: wygląda na to, że Jetty też da radę - jednak odkąd pamiętam do HTTP Netty zawsze było wydajniejsze od Jetty. Tj. Jetty było bardziej wysokopoziomowe i wygodniejsze w użyciu, Netty trudniejsze, ale bardziej zorientowane na pełną kontrolę i wydajność. Do ktosia, kto używał Jetty - czy Jetty umie już dane składować off-heap?

0

Ogarnąłem coś takiego i teoretycznie działa. Pytanie czy prawidłowo? Czy każde połączenie będzie obsługiwane w osobnym wątku? Jak to sprawdzić? Jeśli chodzi o same przetwarzanie już odebranych wiadomości lub wysyłanie powiadomień to jest w osobnych wątkach, tylko czy serwer jetty zaimplementowany w taki sposób będzie obsługiwał sockety wielowątkowo?

Server:

 import java.io.IOException;

import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

public class ServerProcess extends WebSocketHandler {

	public static void main(String[] args) throws IOException {
		int serverPort = ServerConfiguration.getConfiguration().getServerPort();
		SslContextFactory sslContextFactory = new SslContextFactory();
		sslContextFactory.setKeyStorePath("resource/keystore.jks");
		sslContextFactory.setKeyStorePassword("159357258456");
		SslConnectionFactory sslConne = new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.toString());
		
		HttpConfiguration http_config = new HttpConfiguration();
		http_config.setSecureScheme("https");
		http_config.setSecurePort(serverPort);
		http_config.setOutputBufferSize(Integer.MAX_VALUE);
		http_config.setRequestHeaderSize(8192);
		http_config.setResponseHeaderSize(8192);

		HttpConfiguration https_config = new HttpConfiguration(http_config);
		https_config.addCustomizer(new SecureRequestCustomizer());
		
		HttpConnectionFactory cf = new HttpConnectionFactory(https_config);
		
		Server server = new Server();
		ServerConnector connector = new ServerConnector(server, sslConne, cf);
		connector.setPort(serverPort);
		server.addConnector(connector);
		server.setHandler(new ServerProcess());

		try {
			server.start();
			server.dump(System.err);
			server.join();
		} catch (Exception e) {
		}
	}

	@Override
	public void configure(WebSocketServletFactory factory) {
		// TODO Auto-generated method stub
		factory.register(SocketService.class);
	}

}

Listener:

import java.util.UUID;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;

 @WebSocket
@ServerEndpoint(value = "/s")
public class SocketService {
	private UUID sessionUUID;
		
	@OnWebSocketMessage
	public void onWebSocketMessage(String message) {
		System.out.println("Received message: " + message + " od sesji: " + sessionUUID.toString());
	}

	@OnWebSocketClose
	public void onWebSocketClose(int arg0, String arg1) {
		// TODO Auto-generated method stub
		
	}

	@OnWebSocketConnect
	public void onWebSocketConnect(Session session) {
		// TODO Auto-generated method stub
		sessionUUID = ClientManager.getInstance().addSession(session);
	}

	@OnWebSocketError
	public void onWebSocketError(Throwable arg0) {
		// TODO Auto-generated method stub
		
	}

}
1
darksead napisał(a):

Czy każde połączenie będzie obsługiwane w osobnym wątku?

Wątki i połączenia są do siebie ortogonalne.

Jak to sprawdzić? Jeśli chodzi o same przetwarzanie już odebranych wiadomości lub wysyłanie powiadomień to jest w osobnych wątkach, tylko czy serwer jetty zaimplementowany w taki sposób będzie obsługiwał sockety wielowątkowo?

Nie wiem jak jest w Jetty, ale Netty obsługuje sockety przez select lub epoll i wywołuje Twój kod tylko jak przyjdzie wiadomość, którą masz obsłużyć możliwie najszybciej w sposób nieblokujący. Z kodu z Jetty wynika, że robi to tak samo. W efekcie wszystko może działać na jednym wątku multipleksowanym pomiędzy milion połączeń. Aby w pełni wykorzystać CPU wystarczy mieć jeden wątek na logiczny rdzeń procesora. Ze względu na efektywność cache odebraną wiadomość powinien przetwarzać od początku do końca ten sam wątek, więc nie powinieneś nic kombinować z wątkami. Swój kod umieszczasz w onWebSocketMessage i będzie działać.

0

Spring Project Reactor Reactive Netty WebFlux? ;)

https://mobile.twitter.com/snicoll/status/835137969711374336

1 użytkowników online, w tym zalogowanych: 0, gości: 1