Socket.IO

在現實景況下,並不是所有的client都適合websocket,為了解決這樣的問題,就有了socket.io來讓websocket可以適用給所有的client,它其實並不是websocket的分支,只是架在websocket來執行所有之前所碰到無法連到client的問題。而封裝在websocket protocol有了一個新的名字,它叫做engine.io protocol。 Socket.io還有一個特別的能力就是可以多人(multiplexing)連線,他可以產生多個 Namespace,也就是說他是多個communications channel但分享在同一個underlying connection,而socket.io又有另外一個特性是Room的機制。

Room Support: Within each Namespace, you can define arbitrary channels, called Rooms, that sockets can join and leave. You can then broadcast to any given room, reaching every socket that has joined it.

Socket.IO特性整理

  • Events 自訂事件。

  • Rooms Room 的概念只存在於伺服器端。可以理解為訊息處理時的聽眾分組,可對同一個分組內的聽眾進行廣播。

  • Namespaces 命名空間,我理解為底層連線的分組管理,不同命名空間可以走同一條 Engine.io 連線或是各自連線,每個命名空間可以各自驗證是否接受連線。

  • ACK 回調 如同 HTTP 之於 TCP,HTTP 為 TCP 提供了一套請求與響應的模型。ACK 也為 Socket.io 提供了一套請求與響應的通訊模型。

  • 連線維護

  • 自動斷線重連

  • ping/pong 心跳

Socket.IO協議

技術詳解參考

實作測試

先介紹我測試架構,透過spring boot 啟動 socket.IO server

Configuration

@Configuration
public class SocketServerConfig {
	private final String socketIoHost = "172.16.1.8";
	private final String socketIoPort = "9092";
	private final int maxFramePayloadLength = 1048576;

	@Autowired
	ApplicationContext context;

	@Bean
	public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
		return new SpringAnnotationScanner(socketServer);
	}

	@Bean
	public SocketIOServer socketServerInit() {
		com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
		config.setHostname(socketIoHost);
		config.setPort(Integer.parseInt(socketIoPort));
		SocketConfig socketConfig = new SocketConfig();
		socketConfig.setReuseAddress(true);
		config.setSocketConfig(socketConfig);
		//config.setOrigin(ConfigConstant.frontDomain);
		config.setOrigin(null); //暫時設定為null, 不限定連進來的origin. yml的初始化筆socketInit的還慢

		config.setPingTimeout(60000);
		config.setPingInterval(60000);
		config.setMaxFramePayloadLength(maxFramePayloadLength);

		final SocketIOServer server = new SocketIOServer(config);
		final Namespace namespace = (Namespace) server.addNamespace("");
		namespace.addPingListener(new PingListener() {
			@Override
			public void onPing(SocketIOClient client) {
				System.out.println(
						String.format("客戶端 Ping Info > sid:%s, clientAddr:%s", client.getSessionId(), client.getRemoteAddress()));
			}
		});
		return server;
	}
}

Chat object

@Data
@Accessors(chain = true)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ChatObject implements Serializable {
	private String text;
}

EventHandler

@Component
public class MessageEventHandler {
	public SocketIOServer socketIoServer;

	@Autowired
	public MessageEventHandler(SocketIOServer server) {
		this.socketIoServer = server;
	}

	@OnConnect
	public void onConnect(SocketIOClient client) {
		System.out.println(String.format("客戶端 client sessionId:%s, 已連線", client.getSessionId()));
	}

	@OnDisconnect
	public void onDisconnect(SocketIOClient client) {
		System.out.println(String.format("客戶端 client sessionId:%s, 斷開連線.", client.getSessionId()));
	}

	@OnEvent(value = "message")
	public void messageEvent(SocketIOClient client, AckRequest request, ChatObject data) {
		socketIoServer.getRoomOperations("").sendEvent("message", data);
	}
}

Runner

@Component
@Order(value = 1)
public class SocketServerRunner implements CommandLineRunner {

	@Autowired
	SocketIOServer server;

	@Override
	public void run(String... args) throws Exception {
		server.start();
		Configuration configuration = server.getConfiguration();
		System.out.println(String.format("socket.io啟動成功!host:%s, port:%s", configuration.getHostname(), configuration.getPort()));
	}
}

Chat Test html

使用Github上Demo > https://github.com/mrniko/netty-socketio-demo

利用Redis Pub/Sub 機制, Session store used RedissonStore

當有需要用到多台SocketIO server 時, 可透過Redis session store & Redis Pub/Sub 做資源共享達到 LoadBalance。 PubSubListener 需要複寫( CONNECT, DISCONNECT, JOIN, LEAVE, DISPATCH ) 詳細參考網站:https://juejin.cn/post/6844903946184556557

最終架構圖解

Configuration

@Configuration
public class SocketServerConfig {
	private final String socketIoHost = "172.16.1.8";
	private final String socketIoPort = "9092";
	private final int maxFramePayloadLength = 1048576;

	@Autowired
	ApplicationContext context;

	@Bean
	public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
		return new SpringAnnotationScanner(socketServer);
	}

	@Bean
	public SocketIOServer socketServerInit() {
		com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
		config.setHostname(socketIoHost);
		config.setPort(Integer.parseInt(socketIoPort));
		SocketConfig socketConfig = new SocketConfig();
		socketConfig.setReuseAddress(true);
		config.setSocketConfig(socketConfig);
		//config.setOrigin(ConfigConstant.frontDomain);
		config.setOrigin(null); //暫時設定為null, 不限定連進來的origin. yml的初始化筆socketInit的還慢

		config.setPingTimeout(60000);
		config.setPingInterval(60000);
		config.setMaxFramePayloadLength(maxFramePayloadLength);

		Config redissonConfig = new Config();
		redissonConfig.useSingleServer().setPassword("Password").setAddress("redis://127.0.0.1:6379").setDatabase(1);
		Redisson redis = (Redisson) Redisson.create(redissonConfig);
		config.setStoreFactory(new RedissonStoreFactory(redis));

		final SocketIOServer server = new SocketIOServer(config);
		final Namespace namespace = (Namespace) server.addNamespace("");
		namespace.addPingListener(new PingListener() {
			@Override
			public void onPing(SocketIOClient client) {
				System.out.println(
						String.format("客戶端 Ping Info > sid:%s, clientAddr:%s", client.getSessionId(), client.getRemoteAddress()));
			}
		});

		PubSubStore pubSubStore = server.getConfiguration().getStoreFactory().pubSubStore();
		pubSubStore.subscribe(PubSubType.DISPATCH, new DispatchListener(server), DispatchMessage.class);
		return server;
	}
}

EventHandler

@Component
public class MessageEventHandler {
	public SocketIOServer socketIoServer;

	@Autowired
	public MessageEventHandler(SocketIOServer server) {
		this.socketIoServer = server;
	}

	@OnConnect
	public void onConnect(SocketIOClient client) {
		System.out.println(String.format("客戶端 client sessionId:%s, 已連線", client.getSessionId()));
	}

	@OnDisconnect
	public void onDisconnect(SocketIOClient client) {
		System.out.println(String.format("客戶端 client sessionId:%s, 斷開連線.", client.getSessionId()));
	}

	@OnEvent(value = "message")
	public void messageEvent(SocketIOClient client, AckRequest request, ChatObject data) {
		System.out.println("onMessage:" + data);
		Packet packet = new Packet(PacketType.MESSAGE);
		packet.setSubType(PacketType.EVENT);
		packet.setName(MESSAGE);
		packet.setData(Arrays.asList(data));
		socketIoServer.getRoomOperations("").send(packet);
	}
}

PubSubListener - Dispatch

public class DispatchListener implements PubSubListener<DispatchMessage> {
	private SocketIOServer server;

	public DispatchListener(SocketIOServer server) {
		this.server = server;
	}

	@Override
	public void onMessage(DispatchMessage disObj) {
		System.out.println("DISPATCH Msg >>> " + disObj.getPacket().getData());
		Object rtObj = disObj.getPacket().getData();
		if (rtObj instanceof List) {
			List<Object> listObj = (List) rtObj;
			for (Object obj : listObj) {
				if (obj instanceof ChatObject) {
					ChatObject chatObj = (ChatObject) obj;
					Packet packet = new Packet(PacketType.MESSAGE);
					packet.setName("message");
					packet.setData(chatObj);
					server.getRoomOperations("").getClients().stream().forEach(o -> o.send(packet));
				}
			}
		}
	}
}

實作SSL/TLS機制

Java 在讀取憑證或是密鑰,須透過 JKS(java keystore)的方式進行. 產生金鑰及憑證就需要在導入進去JKS.

com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
// localSocketio.jks 放在 resource folder
InputStream stream = SocketServerConfig.class.getResourceAsStream("/localSocketio.jks");
config.setKeyStorePassword("123456");
config.setKeyStore(stream);

產生證書及私鑰

// ssl.cnf
[req]
prompt = no
default_md = sha256
default_bits = 2048
distinguished_name = dn
x509_extensions = v3_req

[dn]
C = TW
ST = Taiwan
L = Taipei
O = Caster Inc.
OU = IT Department
emailAddress = admin@example.com
CN = localhost

[v3_req]
subjectAltName = @alt_names

[alt_names]
DNS.1 = *.localhost
DNS.2 = localhost
IP.1 = 127.0.0.1
// 首先建立 jks
keytool -genkeypair -alias test1 -keyalg RSA -keysize 2048 -keystore localSocketio.jks -validity 3650

// 再將建立jks 時, 產生的私鑰刪掉, 因為我們要用openssl產生出來的私鑰及證書(openssl產生出來的比較完成)
keytool -delete -alias test1 -keystore localSocketio.jks -storepass 123456

// 確認jks內容無其他東西
keytool -list -v -keystore localSocketio.jks -storepass 123456

// 使用 openssl 產生私鑰及證書
openssl req -x509 -new -nodes -sha256 -utf8 -days 356 -newkey rsa:2048 -keyout server.key -out server.crt -config ssl.cnf

// 產生PKCS12金鑰儲存庫
openssl pkcs12 -export -out server.p12 -inkey server.key -in server.crt
	
// 導入PKCS12金鑰儲存庫 to JKS
keytool -importkeystore -deststorepass 123456 -destkeypass 123456 -destkeystore localSocketio.jks -srckeystore server.p12 -srcstoretype PKCS12 -srcstorepass 123456

// 將openssl產生出的證書也導入jks
keytool -importcert -trustcacerts -alias test1 -file server.crt -keystore localSocketio.jks -storepass 123456

// 如果你本機要測試的話可以使用自發憑證的方式 - 再繼續往下操作
// 再將剛導入的 server.crt 簽發憑證 
keytool -export -alias test1 -keystore localSocketio.jks -file c1.crt -rfc -storepass 123456

// 將test1簽發成功的 c1.crt 加入本機的信任庫 -- 我這是MacOS的做法,其他系統需要再上網找做法.
sudo security add-trusted-cert -d -r trustRoot -k /Library/Keychains/System.keychain c1.crt

// 測試 ssl
openssl s_client -showcerts -connect {host}:{port}

// Response 以下內容
CONNECTED(00000005)
depth=0 C = TW, ST = Taiwan, L = Taipei, O = Duotify Inc., OU = IT Department, emailAddress = admin@example.com, CN = localhost
verify error:num=18:self signed certificate
verify return:1
depth=0 C = TW, ST = Taiwan, L = Taipei, O = Duotify Inc., OU = IT Department, emailAddress = admin@example.com, CN = localhost
verify return:1
---
Certificate chain
 0 s:/C=TW/ST=Taiwan/L=Taipei/O=Duotify Inc./OU=IT Department/emailAddress=admin@example.com/CN=localhost
   i:/C=TW/ST=Taiwan/L=Taipei/O=Duotify Inc./OU=IT Department/emailAddress=admin@example.com/CN=localhost
-----BEGIN CERTIFICATE-----
MIID2DCCAsCgAwIBAgIJAPPbaiDrsQgjMA0GCSqGSIb3DQEBCwUAMIGUMQswCQYD
VQQGEwJUVzEPMA0GA1UECAwGVGFpd2FuMQ8wDQYDVQQHDAZUYWlwZWkxFTATBgNV
BAoMDER1b3RpZnkgSW5jLjEWMBQGA1UECwwNSVQgRGVwYXJ0bWVudDEgMB4GCSqG
SIb3DQEJARYRYWRtaW5AZXhhbXBsZS5jb20xEjAQBgNVBAMMCWxvY2FsaG9zdDAe
Fw0yMjA5MjEwMzQxMDRaFw0yMzA5MTIwMzQxMDRaMIGUMQswCQYDVQQGEwJUVzEP
MA0GA1UECAwGVGFpd2FuMQ8wDQYDAQQHDAZUYWlwZWkxFTATBgNVBAoMDER1b3Rp
ZnkgSW5jLjEWMBQGA1UECwwNSVQgBGVwYXJ0bWVudDEgMB4GCSqGSIb3DQEJARYR
YWRtaW5AZXhhbXBsZS5jb20xEjAQCgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZI
hvcNAQEBBQADggEPADCCAQoCggEBEMUdVubYugRrCLNxo+2FkSdMLwWbEz8PSyE9
GEw2gvDYuhiYJEKoZoafqeaFHPNyGzsaGk/1Dfhk5QU1a+R8iS4R+WvKROeEusCo
VXOvYTpLaTPAedq052M/n141/Wzo6WNFbTvkiEy2xPpOWcqwqZyBMtqTddcVC7Qr
WPSZGxGcgDV/yEsn/VXJu1Ynb0tjbwqfAw/lbQ9B6CPFUcVQ1KsWzx8ouzdSW8iZ
pg/pqeIhqRib04fB5P03c0WrrE/0KOajRJDQl+ypAwMkE7XfTMheVuEvWNO9m3oO
PP1qgj7jphxFzB9QzZqRikJ3vSnJC8SRfDEDirBUcbDzaQ/0a1UCAwEAAaMrMCkw
JwYDVR0RBCAwHoILKi5sb2NhbGhvc3SCCWxvY2FsaG9zdIcEfwAAATANBgkqhkiG
9w0BAQsFAAOCAQEAncc/E75A9Dkt958Hv1/ENxB711+y3T20F99XHQGMMZai6BTg
MLs/iOy5ynaUR3dNPRb70BcEwvF14Udn0ZaF75vECQeo1st8GvW730raORAsABh9
zoATFJQy9mag+8Xq0F+IHNVPZN7UwINclYiWVM/iizl8lVoUxois7a1jnRbtF5ew
sI7fWram1xv5KvSzAqKo7xTqaoVvKw93xHFUHAqA+DUF8S0sa61FiPLpcoAoQkMt
MMSmg3V1TUu+hb/yNLOj5GXqiNlJ4fAbk2jjwOSVW2YYVh5PyUJdm0Rh6LshgFd2
HxwHpEKgyAdZdq8E52/ivwIGDRYKC91iCX8PKA==
-----END CERTIFICATE-----
---
Server certificate
subject=/C=TW/ST=Taiwan/L=Taipei/O=Duotify Inc./OU=IT Department/emailAddress=admin@example.com/CN=localhost
issuer=/C=TW/ST=Taiwan/L=Taipei/O=Duotify Inc./OU=IT Department/emailAddress=admin@example.com/CN=localhost
---
No client certificate CA names sent
Server Temp Key: ECDH, X25519, 253 bits
---
SSL handshake has read 1439 bytes and written 289 bytes
---
New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES256-GCM-SHA384
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
SSL-Session:
    Protocol  : TLSv1.2
    Cipher    : ECDHE-RSA-AES256-GCM-SHA384
    Session-ID: 1C274AB133857033C1C6B65F7A99A23537E9ADAB85C4AF2A643453A1DF238046
    Session-ID-ctx: 
    Master-Key: 0548325381C0AAC23F714E93D45C650EB16C787F563705FCDR17FE1E41189C0E3765FDFEF3B578DFFCF3C458586E67C5
    Start Time: 1663732433
    Timeout   : 7200 (sec)
    Verify return code: 0 (ok)
---
closed
// server.log
2022-09-21 13:56:00.807 [nioEventLoopGroup-8-1] DEBUG io.netty.handler.ssl.SslHandler - [id: 0xe85f6bd6, L:/0:0:0:0:0:0:0:1:9092 - R:/0:0:0:0:0:0:0:1:62016] HANDSHAKEN: protocol:TLSv1.3 cipher suite:TLS_AES_128_GCM_SHA256
2022-09-21 13:56:00.834 [nioEventLoopGroup-8-1] DEBUG c.corundumstudio.socketio.handler.AuthorizeHandler - Handshake authorized for sessionId: 8d862ddc-d460-4068-ac1d-3587371be4df, query params: {token=[01ed23d8ea686b5ff7536d94ac14aeed], EIO=[3], transport=[polling], t=[1663739760614-0]} headers: {Origin=[http://localhost:63342], content-length=[0], Accept=[*/*], Connection=[keep-alive], User-Agent=[Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36], Referer=[http://localhost:63342/], Sec-Fetch-Site=[cross-site], Sec-Fetch-Dest=[empty], Host=[localhost:9092], Pragma=[no-cache], Accept-Encoding=[gzip, deflate, br], Sec-Fetch-Mode=[cors], sec-ch-ua=["Google Chrome";v="105", "Not)A;Brand";v="8", "Chromium";v="105"], sec-ch-ua-mobile=[?0], Cache-Control=[no-cache], sec-ch-ua-platform=["macOS"], Accept-Language=[zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6]}
2022-09-21 13:56:00.835 [nioEventLoopGroup-8-1] DEBUG com.corundumstudio.socketio.handler.ClientHead - binding channel: [id: 0xe85f6bd6, L:/0:0:0:0:0:0:0:1:9092 - R:/0:0:0:0:0:0:0:1:62016] to transport: POLLING
2022-09-21 13:56:00.930 [nioEventLoopGroup-8-2] DEBUG io.netty.handler.ssl.SslHandler - [id: 0xa372be81, L:/0:0:0:0:0:0:0:1:9092 - R:/0:0:0:0:0:0:0:1:62021] HANDSHAKEN: protocol:TLSv1.3 cipher suite:TLS_AES_128_GCM_SHA256
2022-09-21 13:56:00.937 [nioEventLoopGroup-8-1] DEBUG org.redisson.command.RedisExecutor - acquired connection for command (PUBLISH) and params [connect, PooledUnsafeDirectByteBuf(ridx: 0, widx: 252, cap: 256)] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node 127.0.0.1/127.0.0.1:6379... RedisConnection@2054341461 [redisClient=[addr=redis://127.0.0.1:6379], channel=[id: 0x358cb956, L:/127.0.0.1:61968 - R:127.0.0.1/127.0.0.1:6379], currentCommand=null, usage=1]
2022-09-21 13:56:00.944 [redisson-netty-2-7] DEBUG org.redisson.command.RedisExecutor - connection released for command (PUBLISH) and params [connect, PooledUnsafeDirectByteBuf(ridx: 0, widx: 252, cap: 256)] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@2054341461 [redisClient=[addr=redis://127.0.0.1:6379], channel=[id: 0x358cb956, L:/127.0.0.1:61968 - R:127.0.0.1/127.0.0.1:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@6c6381e3[Completed normally], command=(PUBLISH), params=[connect, PooledUnsafeDirectByteBuf(ridx: 0, widx: 252, cap: 256)], codec=org.redisson.client.codec.StringCodec], usage=0]
2022-09-21 13:56:00.951 [nioEventLoopGroup-8-1] DEBUG org.redisson.command.RedisExecutor - acquired connection for command (PUBLISH) and params [join, PooledUnsafeDirectByteBuf(ridx: 0, widx: 277, cap: 512)] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node 127.0.0.1/127.0.0.1:6379... RedisConnection@1705787595 [redisClient=[addr=redis://127.0.0.1:6379], channel=[id: 0x41e746f2, L:/127.0.0.1:61967 - R:127.0.0.1/127.0.0.1:6379], currentCommand=null, usage=1]
2022-09-21 13:56:00.951 [nioEventLoopGroup-8-2] DEBUG i.n.h.c.http.websocketx.WebSocketServerHandshaker - [id: 0xa372be81, L:/0:0:0:0:0:0:0:1:9092 - R:/0:0:0:0:0:0:0:1:62021] WebSocket version V13 server handshake
2022-09-21 13:56:00.954 [nioEventLoopGroup-8-2] DEBUG i.n.h.c.http.websocketx.WebSocketServerHandshaker - WebSocket version 13 server handshake key: +xhU6eLdavwUQXdbEAv07Q==, response: DppvRI6t0OVm8oPWPwKlgqbcs0k=
2022-09-21 13:56:00.954 [redisson-netty-2-3] DEBUG org.redisson.command.RedisExecutor - connection released for command (PUBLISH) and params [join, PooledUnsafeDirectByteBuf(ridx: 0, widx: 277, cap: 512)] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@1705787595 [redisClient=[addr=redis://127.0.0.1:6379], channel=[id: 0x41e746f2, L:/127.0.0.1:61967 - R:127.0.0.1/127.0.0.1:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@5285aacd[Completed normally], command=(PUBLISH), params=[join, PooledUnsafeDirectByteBuf(ridx: 0, widx: 277, cap: 512)], codec=org.redisson.client.codec.StringCodec], usage=0]
2022-09-21 13:56:00.956 [nioEventLoopGroup-8-1] DEBUG org.redisson.command.RedisExecutor - acquired connection for command (PUBLISH) and params [join, PooledUnsafeDirectByteBuf(ridx: 0, widx: 292, cap: 512)] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node 127.0.0.1/127.0.0.1:6379... RedisConnection@908139337 [redisClient=[addr=redis://127.0.0.1:6379], channel=[id: 0x42a8940d, L:/127.0.0.1:61965 - R:127.0.0.1/127.0.0.1:6379], currentCommand=null, usage=1]
2022-09-21 13:56:00.961 [redisson-netty-2-9] DEBUG org.redisson.command.RedisExecutor - connection released for command (PUBLISH) and params [join, PooledUnsafeDirectByteBuf(ridx: 0, widx: 292, cap: 512)] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@908139337 [redisClient=[addr=redis://127.0.0.1:6379], channel=[id: 0x42a8940d, L:/127.0.0.1:61965 - R:127.0.0.1/127.0.0.1:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@51a3dd1a[Completed normally], command=(PUBLISH), params=[join, PooledUnsafeDirectByteBuf(ridx: 0, widx: 292, cap: 512)], codec=org.redisson.client.codec.StringCodec], usage=0]
2022-09-21 13:56:00.961 [nioEventLoopGroup-8-3] DEBUG io.netty.handler.ssl.SslHandler - [id: 0x4b03199f, L:/0:0:0:0:0:0:0:1:9092 - R:/0:0:0:0:0:0:0:1:62022] HANDSHAKEN: protocol:TLSv1.3 cipher suite:TLS_AES_128_GCM_SHA256
2022-09-21 13:56:00.961 [nioEventLoopGroup-8-1] DEBUG c.l.socketio.server.event.MessageEventHandler - 客戶端 client sessionId:8d862ddc-d460-4068-ac1d-3587371be4df, 已連線
2022-09-21 13:56:00.962 [nioEventLoopGroup-8-1] DEBUG c.c.socketio.transport.PollingTransport - channel inactive 8d862ddc-d460-4068-ac1d-3587371be4df
2022-09-21 13:56:00.963 [nioEventLoopGroup-8-3] DEBUG com.corundumstudio.socketio.handler.ClientHead - binding channel: [id: 0x4b03199f, L:/0:0:0:0:0:0:0:1:9092 - R:/0:0:0:0:0:0:0:1:62022] to transport: POLLING
2022-09-21 13:56:00.968 [nioEventLoopGroup-8-3] DEBUG c.c.socketio.transport.PollingTransport - channel inactive 8d862ddc-d460-4068-ac1d-3587371be4df
2022-09-21 13:56:00.976 [nioEventLoopGroup-8-2] DEBUG com.corundumstudio.socketio.handler.ClientHead - binding channel: [id: 0xa372be81, L:/0:0:0:0:0:0:0:1:9092 - R:/0:0:0:0:0:0:0:1:62021] to transport: WEBSOCKET
2022-09-21 13:56:00.979 [nioEventLoopGroup-8-2] DEBUG c.c.socketio.transport.WebSocketTransport - сlient 8d862ddc-d460-4068-ac1d-3587371be4df handshake completed
2022-09-21 13:56:01.003 [nioEventLoopGroup-8-4] DEBUG io.netty.handler.ssl.SslHandler - [id: 0x928a2a58, L:/0:0:0:0:0:0:0:1:9092 - R:/0:0:0:0:0:0:0:1:62023] HANDSHAKEN: protocol:TLSv1.3 cipher suite:TLS_AES_128_GCM_SHA256
2022-09-21 13:56:01.006 [nioEventLoopGroup-8-2] INFO  c.l.socketio.server.config.SocketServerConfig - 客戶端 Ping Info > sid:8d862ddc-d460-4068-ac1d-3587371be4df, clientAddr:/0:0:0:0:0:0:0:1:62016
2022-09-21 13:56:01.011 [nioEventLoopGroup-8-4] DEBUG com.corundumstudio.socketio.handler.ClientHead - binding channel: [id: 0x928a2a58, L:/0:0:0:0:0:0:0:1:9092 - R:/0:0:0:0:0:0:0:1:62023] to transport: POLLING
2022-09-21 13:56:01.016 [nioEventLoopGroup-8-4] DEBUG c.c.socketio.transport.PollingTransport - channel inactive 8d862ddc-d460-4068-ac1d-3587371be4df
2022-09-21 13:56:01.019 [nioEventLoopGroup-8-2] DEBUG com.corundumstudio.socketio.handler.ClientHead - Transport upgraded to: WEBSOCKET for: 8d862ddc-d460-4068-ac1d-3587371be4df

壓力測試 - JMeter

花了一些時間測試下來,最後可能會受到本機電腦port的限制,所以可能需要多台設備來進行壓測。

壓力測試報告 - JMeter

經過一連串的測試兩台電腦對一台server測試,透過Jconsole查看設備的記憶體及CPU運作狀況,在針對其瓶頸進行排除是否能夠在進行突破。

測試紀錄:

  • 1000 條連線 ------ OK

  • 2000 條連線 ------ OK

  • 3000 條連線 ------ OK

小問題紀錄:

調整JMeter 使用記憶體大小。範例為 MaxOS 調整方式。調整完再執行 jmeter 可透過Jconsole看到記憶體大小是否有調整成功。

利用Jconsole遠端監控 Socket.IO Server 系統資訊。

需要將以下Jvm options 加入到啟動參數。

-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.port=這個可自訂port別重複就好
-Dcom.sun.management.jmxremote.rmi.port=這個可自訂port別重複就好
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.local.only=false 
-Djava.rmi.server.hostname=遠端設備IP

完整啟動JAR指令會類似這樣,有點難以閱讀就是了。
java -Xms4g -Xmx4g -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5901 -Dcom.sun.management.jmxremote.rmi.port=5901 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=192.168.1.8 -jar xxx.jar

壓力測試 - Java Client

Java Client 去做壓力測試,測試過程每一台電腦人數上限65個就無法再往上提升,目前此狀況原因不明,尚需時間去查找問題。

import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
import io.socket.engineio.client.transports.WebSocket;
import org.json.JSONException;
import org.json.JSONObject;

import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Arrays;

public class SocketClientServer {

	static protected Socket socket;
	private static int port = 9092;

	public String threadName = "DefaultName";

	private static boolean init() throws UnknownHostException {
		InetAddress localHost = InetAddress.getLocalHost();

		System.out.println("init() startServer:" + localHost.getHostAddress());

		String param = "token=123456789";
		String url = "http://" + localHost.getHostAddress() + ":" + port;
		System.out.println("connect socket server url:" + url);
		System.out.println("connect socket server param:" + param);

		IO.Options options = new IO.Options();
		options.query = param;
		options.forceNew = true;
		options.transports = new String[]{WebSocket.NAME};

		//是否自動重新連線
		options.reconnection = true;
		//連線超時時間(ms)
		options.timeout = 500;

		try {
			socket = IO.socket(url, options);
		} catch (URISyntaxException e) {
			e.printStackTrace();
		}
		return true;
	}

	public void startWsClientServer(String threadName) {
		try {
			this.threadName = threadName;
			if (init()) {
				socket.on(Socket.EVENT_PING, new Emitter.Listener() {
					@Override
					public void call(Object... objects) {
						System.out.println("EVENT ping 收到->" + Arrays.toString(objects));
					}
				});
				socket.on(Socket.EVENT_PONG, new Emitter.Listener() {
					@Override
					public void call(Object... objects) {
						System.out.println("EVENT pong 收到->" + Arrays.toString(objects));
					}
				});
				socket.on(Socket.EVENT_CONNECTING, objects -> {
					System.out.println("connecting...");
				});

				//監聽自定義msg事件
				socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {// 連線成功
					@Override
					public void call(Object... objects) {
						System.out.println("EVENT_CONNECT 收到->" + Arrays.toString(objects));
					}
				}); // 連線成功
				socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
					@Override
					public void call(Object... objects) {
						System.out.println("EVENT_DISCONNECT 收到->" + Arrays.toString(objects));
					}
				});
				socket.on(Socket.EVENT_RECONNECT, objects -> {
					System.out.println("client: 重新連線中....");

					if (socket != null) {
						// 先取消 even listener
						socket.off("message");
						socket.off(Socket.EVENT_CONNECT);
						socket.off(Socket.EVENT_DISCONNECT);
						socket.off(Socket.EVENT_CONNECT_ERROR);
						socket.off(Socket.EVENT_RECONNECT);
						socket.off(Socket.EVENT_PING);
						socket.off(Socket.EVENT_PONG);
						socket.disconnect();
					}
					startWsClientServer(threadName);
				}); // 重新連線
				socket.on(Socket.EVENT_CONNECT_ERROR, objects -> System.out.println("client: 連線錯誤.")); // 連線發生錯誤
				socket.on("message", new Emitter.Listener() {
					@Override
					public void call(Object... objects) {
						System.out.println(threadName + " EVENT message 收到->" + Arrays.toString(objects));
					}
				});
				// 自定義監聽事件(event)
				socket.on("join", new Emitter.Listener() {
					@Override
					public void call(Object... args) {
						System.out.println("EVENT join 收到->" + Arrays.toString(args));
					}
				});
				socket.connect(); // 連線
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void sendJoinEvent(String eventStr, Object data) {
		System.out.println("sendJoinEvent -> event:" + eventStr + ", data:" + data);
		socket.emit(Socket.EVENT_PING);
		socket.emit(eventStr, data);
	}

	public void startClientServer(String threadName) {
		this.startWsClientServer(threadName);
	}

	public static void main(String[] args) throws InterruptedException, JSONException {
		// 本機測試專用
		JSONObject wsJson = new JSONObject();
		boolean result = true;
		int i = 1;
		while (result) {
			wsJson.put("userName", "caster_" + i);
			sendJoinEvent("message", wsJson);
			if (i > 50)
				result = false;
			i++;
			Thread.sleep(5000);
		}

	}
}
import com.example.practice.socketio.clientserver.client.SocketClientServer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;

/**
 * @author: caster
 * @date: 2020/9/17
 */
public class MultiThreadSocketClients implements Runnable {

	private CyclicBarrier barrier;

	public MultiThreadSocketClients(CyclicBarrier barrier) {
		this.barrier = barrier;
	}

	@Override
	public void run() {
		try {
			SocketClientServer clientServer = new SocketClientServer();
			System.out.println(Thread.currentThread().getName() + " 準備就緒");
			barrier.await();
			clientServer.startClientServer(Thread.currentThread().getName());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		int eachTurnCount = 5;
		int second = 1;
		try {
			List<CyclicBarrier> cbList = new ArrayList<CyclicBarrier>();
			for (int i = 1; i <= second; i++) {
				cbList.add(new CyclicBarrier(eachTurnCount, MultiThreadSocketClients::funcRun));
				for (int j = 1; j <= eachTurnCount; j++) {
					Thread sender = new Thread(new MultiThreadSocketClients(cbList.get(i - 1)), "Client_" + j );
					sender.start();
				}
				Thread.sleep(10000);
			}

		} catch (Exception e) {
			e.printStackTrace();
//			System.out.println(e);
		}
	}

	private static void funcRun() {
		//一旦所有線程準備就緒,這個動作就執行
		System.out.println("準備就緒,開始!");
	}
}

Last updated