Skip to main content

WebSocket Communication

PianoRhythm's real-time communication system is built on WebSockets, enabling low-latency musical collaboration, chat messaging, and synchronized room state management across multiple users.

Architecture Overview

Client-Side WebSocket Implementation

1. WebSocket Service (src/services/websocket.service.ts)

The main service that manages WebSocket connections and message handling:

export default function WebsocketService() {
const [connected, setConnected] = createSignal(false);
const [initialized, setInitialized] = createSignal(false);
const [connectionAttempts, setConnectionAttempts] = createSignal(0);

const websocketEvents = createEventBus<"connected" | "closed" | "error">();

const connect = async (wsIdentity: string) => {
try {
const apiServer = await getApiServerHost();
const webSocketURL = `${apiServer.replace("http", "ws")}/api/websocket`;

console.log(`[WebsocketService] Connecting to: ${webSocketURL}/${wsIdentity}`);

await appService().coreService()?.websocket_connect(
`${webSocketURL}/${wsIdentity}`,
onConnect,
onError,
onClose
);

} catch (error) {
console.error("[WebsocketService] Connection failed:", error);
throw error;
}
};

const onConnect = () => {
console.log("[WebsocketService] Connected successfully");
setConnected(true);
setConnectionAttempts(0);
websocketEvents.emit("connected");
};

const onError = (error: string) => {
console.error("[WebsocketService] Error:", error);
websocketEvents.emit("error", error);

// Implement exponential backoff for reconnection
const attempts = connectionAttempts();
if (attempts < MAX_RECONNECTION_ATTEMPTS) {
const delay = Math.min(1000 * Math.pow(2, attempts), 30000);
setTimeout(() => {
setConnectionAttempts(attempts + 1);
// Retry connection logic
}, delay);
}
};

const onClose = () => {
console.log("[WebsocketService] Connection closed");
setConnected(false);
websocketEvents.emit("closed");
};

return {
connected, initialized, connect, disconnect,
emitServerCommand, emitChatMessage,
websocketEvents
};
}

2. Message Serialization

Protocol Buffers are used for efficient binary message serialization:

// Sending server commands
const emitServerCommand = (command: ServerCommand) => {
const serverCommandAction = ServerCommandActions.create({
action: ServerCommandActions_Action.ServerCommand,
serverCommand: command
});

const appAction = AppStateActions.create({
action: AppStateActions_Action.ServerCommandAction,
serverCommandAction: serverCommandAction
});

const bytes = AppStateActions.encode(appAction).finish();
appService().coreService()?.websocket_send_binary(bytes);
};

// Sending chat messages
const emitChatMessage = (content: string, roomId: string) => {
const chatAction = ChatActions.create({
action: ChatActions_Action.SendMessage,
content: content,
roomId: roomId,
timestamp: Date.now()
});

const appAction = AppStateActions.create({
action: AppStateActions_Action.ChatAction,
chatAction: chatAction
});

const bytes = AppStateActions.encode(appAction).finish();
appService().coreService()?.websocket_send_binary(bytes);
};

3. Message Handling

Incoming messages are processed through the core engine and dispatched to appropriate services:

// Core engine message handler (Rust)
#[wasm_bindgen]
pub fn handle_websocket_message(data: &[u8]) {
if let Ok(action) = AppStateActions::decode(data) {
match action.action() {
AppStateActions_Action::ChatAction => {
handle_chat_action(action.get_chatAction());
}
AppStateActions_Action::UserAction => {
handle_user_action(action.get_userAction());
}
AppStateActions_Action::RoomAction => {
handle_room_action(action.get_roomAction());
}
AppStateActions_Action::SynthAction => {
handle_synth_action(action.get_audioSynthAction());
}
_ => {
log::warn!("Unknown action type: {:?}", action.action());
}
}
}
}

Server-Side WebSocket Implementation

1. WebSocket Server Setup

// Server WebSocket handler
import { createCrossWS } from 'crossws';

const websocket = createCrossWS({
hooks: {
open(peer) {
console.log(`[WebSocket] Client connected: ${peer.id}`);

// Initialize user session
const userSession = {
peerId: peer.id,
userId: null,
roomId: null,
lastActivity: Date.now()
};

userSessions.set(peer.id, userSession);
},

message(peer, message) {
try {
const data = new Uint8Array(message.rawData());
handleWebSocketMessage(peer, data);
} catch (error) {
console.error(`[WebSocket] Message handling error:`, error);
peer.send(JSON.stringify({
type: 'error',
message: 'Invalid message format'
}));
}
},

close(peer, details) {
console.log(`[WebSocket] Client disconnected: ${peer.id}`);
handleUserDisconnect(peer.id);
},

error(peer, error) {
console.error(`[WebSocket] Error for peer ${peer.id}:`, error);
}
}
});

2. Message Routing

// Server-side message routing
const handleWebSocketMessage = async (peer: Peer, data: Uint8Array) => {
try {
const action = AppStateActions.decode(data);

switch (action.action) {
case AppStateActions_Action.ServerCommandAction:
await handleServerCommand(peer, action.serverCommandAction);
break;

case AppStateActions_Action.ChatAction:
await handleChatAction(peer, action.chatAction);
break;

case AppStateActions_Action.SynthAction:
await handleSynthAction(peer, action.audioSynthAction);
break;

case AppStateActions_Action.UserAction:
await handleUserAction(peer, action.userAction);
break;

default:
console.warn(`Unknown action type: ${action.action}`);
}
} catch (error) {
console.error('Message processing error:', error);
sendErrorMessage(peer, 'Failed to process message');
}
};

3. Room Management

// Room-based message broadcasting
class RoomManager {
private rooms = new Map<string, Set<string>>();
private userRooms = new Map<string, string>();

joinRoom(peerId: string, roomId: string): boolean {
// Leave current room if any
this.leaveCurrentRoom(peerId);

// Join new room
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}

this.rooms.get(roomId)!.add(peerId);
this.userRooms.set(peerId, roomId);

// Broadcast user joined to room
this.broadcastToRoom(roomId, {
type: 'user-joined',
userId: peerId,
timestamp: Date.now()
}, peerId);

return true;
}

leaveRoom(peerId: string, roomId: string): void {
const room = this.rooms.get(roomId);
if (room) {
room.delete(peerId);

// Clean up empty rooms
if (room.size === 0) {
this.rooms.delete(roomId);
} else {
// Broadcast user left
this.broadcastToRoom(roomId, {
type: 'user-left',
userId: peerId,
timestamp: Date.now()
});
}
}

this.userRooms.delete(peerId);
}

broadcastToRoom(roomId: string, message: any, excludePeer?: string): void {
const room = this.rooms.get(roomId);
if (!room) return;

const messageBytes = this.serializeMessage(message);

for (const peerId of room) {
if (peerId !== excludePeer) {
const peer = peers.get(peerId);
if (peer && peer.readyState === 1) {
peer.send(messageBytes);
}
}
}
}
}

Real-Time Features

1. MIDI Event Synchronization

// Real-time MIDI event handling
const handleSynthAction = async (peer: Peer, synthAction: AudioSynthActions) => {
const session = userSessions.get(peer.id);
if (!session || !session.roomId) return;

switch (synthAction.action) {
case AudioSynthActions_Action.NoteOn:
// Broadcast note event to room
roomManager.broadcastToRoom(session.roomId, {
type: 'midi-event',
event: 'note-on',
note: synthAction.note,
velocity: synthAction.velocity,
userId: session.userId,
timestamp: Date.now()
}, peer.id);
break;

case AudioSynthActions_Action.NoteOff:
roomManager.broadcastToRoom(session.roomId, {
type: 'midi-event',
event: 'note-off',
note: synthAction.note,
userId: session.userId,
timestamp: Date.now()
}, peer.id);
break;
}
};

2. Chat System

// Real-time chat messaging
const handleChatAction = async (peer: Peer, chatAction: ChatActions) => {
const session = userSessions.get(peer.id);
if (!session || !session.roomId) return;

switch (chatAction.action) {
case ChatActions_Action.SendMessage:
// Validate message
if (!chatAction.content || chatAction.content.length > 500) {
sendErrorMessage(peer, 'Invalid message content');
return;
}

// Store message in database
const messageId = await chatService.saveMessage({
content: chatAction.content,
roomId: session.roomId,
userId: session.userId,
timestamp: Date.now()
});

// Broadcast to room
roomManager.broadcastToRoom(session.roomId, {
type: 'chat-message',
id: messageId,
content: chatAction.content,
userId: session.userId,
timestamp: Date.now()
});
break;

case ChatActions_Action.TypingStart:
roomManager.broadcastToRoom(session.roomId, {
type: 'user-typing',
userId: session.userId,
isTyping: true
}, peer.id);
break;

case ChatActions_Action.TypingStop:
roomManager.broadcastToRoom(session.roomId, {
type: 'user-typing',
userId: session.userId,
isTyping: false
}, peer.id);
break;
}
};

3. User Presence

// User presence management
const handleUserAction = async (peer: Peer, userAction: UserActions) => {
const session = userSessions.get(peer.id);
if (!session) return;

switch (userAction.action) {
case UserActions_Action.UpdateStatus:
// Update user status
await userService.updateUserStatus(session.userId, userAction.status);

// Broadcast status change to room
if (session.roomId) {
roomManager.broadcastToRoom(session.roomId, {
type: 'user-status-changed',
userId: session.userId,
status: userAction.status,
timestamp: Date.now()
});
}
break;

case UserActions_Action.UpdatePosition:
// Update user position in 3D space
if (session.roomId) {
roomManager.broadcastToRoom(session.roomId, {
type: 'user-position-changed',
userId: session.userId,
position: {
x: userAction.positionX,
y: userAction.positionY,
z: userAction.positionZ
},
timestamp: Date.now()
}, peer.id);
}
break;
}
};

Connection Management

1. Connection Health Monitoring

// Heartbeat system for connection health
class ConnectionHealthMonitor {
private heartbeatInterval = 30000; // 30 seconds
private timeoutThreshold = 60000; // 60 seconds

startMonitoring(): void {
setInterval(() => {
this.checkConnections();
}, this.heartbeatInterval);
}

checkConnections(): void {
const now = Date.now();

for (const [peerId, session] of userSessions) {
const timeSinceLastActivity = now - session.lastActivity;

if (timeSinceLastActivity > this.timeoutThreshold) {
console.log(`[WebSocket] Timeout detected for peer: ${peerId}`);
this.handleTimeout(peerId);
} else if (timeSinceLastActivity > this.heartbeatInterval) {
// Send ping
const peer = peers.get(peerId);
if (peer) {
peer.ping();
}
}
}
}

handleTimeout(peerId: string): void {
const peer = peers.get(peerId);
if (peer) {
peer.close();
}
this.cleanupSession(peerId);
}
}

2. Reconnection Logic

// Client-side reconnection handling
const handleReconnection = async () => {
let attempts = 0;
const maxAttempts = 5;

while (attempts < maxAttempts && !connected()) {
try {
const delay = Math.min(1000 * Math.pow(2, attempts), 30000);
console.log(`[WebSocket] Reconnection attempt ${attempts + 1} in ${delay}ms`);

await new Promise(resolve => setTimeout(resolve, delay));
await connect(lastUsedIdentity);

console.log('[WebSocket] Reconnection successful');
break;

} catch (error) {
attempts++;
console.error(`[WebSocket] Reconnection attempt ${attempts} failed:`, error);

if (attempts >= maxAttempts) {
console.error('[WebSocket] Max reconnection attempts reached');
// Show user notification about connection failure
showConnectionFailureNotification();
}
}
}
};

Performance Optimization

1. Message Batching

// Batch multiple messages for efficiency
class MessageBatcher {
private batch: Uint8Array[] = [];
private batchTimeout: number | null = null;
private maxBatchSize = 10;
private batchDelay = 16; // ~60fps

addMessage(message: Uint8Array): void {
this.batch.push(message);

if (this.batch.length >= this.maxBatchSize) {
this.flushBatch();
} else if (!this.batchTimeout) {
this.batchTimeout = setTimeout(() => {
this.flushBatch();
}, this.batchDelay);
}
}

flushBatch(): void {
if (this.batch.length === 0) return;

// Create batched message
const batchedMessage = this.createBatchedMessage(this.batch);

// Send batched message
this.sendMessage(batchedMessage);

// Clear batch
this.batch = [];
if (this.batchTimeout) {
clearTimeout(this.batchTimeout);
this.batchTimeout = null;
}
}
}

2. Message Compression

// Optional message compression for large payloads
import { compress, decompress } from 'lz4js';

const sendCompressedMessage = (message: Uint8Array): void => {
if (message.length > COMPRESSION_THRESHOLD) {
const compressed = compress(message);
const header = new Uint8Array([1]); // Compression flag
const fullMessage = new Uint8Array(header.length + compressed.length);
fullMessage.set(header);
fullMessage.set(compressed, header.length);

websocket.send(fullMessage);
} else {
const header = new Uint8Array([0]); // No compression
const fullMessage = new Uint8Array(header.length + message.length);
fullMessage.set(header);
fullMessage.set(message, header.length);

websocket.send(fullMessage);
}
};

Security Considerations

1. Message Validation

// Server-side message validation
const validateMessage = (action: AppStateActions, peer: Peer): boolean => {
const session = userSessions.get(peer.id);

// Check authentication
if (!session || !session.userId) {
sendErrorMessage(peer, 'Authentication required');
return false;
}

// Rate limiting
if (isRateLimited(peer.id)) {
sendErrorMessage(peer, 'Rate limit exceeded');
return false;
}

// Message size validation
if (action.toBuffer().length > MAX_MESSAGE_SIZE) {
sendErrorMessage(peer, 'Message too large');
return false;
}

return true;
};

2. Rate Limiting

// Rate limiting implementation
class RateLimiter {
private requests = new Map<string, number[]>();
private maxRequests = 100; // per minute
private windowMs = 60000;

isAllowed(peerId: string): boolean {
const now = Date.now();
const requests = this.requests.get(peerId) || [];

// Remove old requests outside the window
const validRequests = requests.filter(time => now - time < this.windowMs);

if (validRequests.length >= this.maxRequests) {
return false;
}

validRequests.push(now);
this.requests.set(peerId, validRequests);

return true;
}
}

Next Steps