Skip to main content

State Management Architecture

PianoRhythm Server implements a sophisticated state management system using Redis as the primary state store, with MongoDB for persistent data. This document details the state management architecture, data models, and caching strategies.

🎯 State Management Overview

Design Principles

  • Single Source of Truth - Redis serves as the authoritative state store
  • Performance First - In-memory operations for real-time requirements
  • Persistence - MongoDB backup for data durability
  • Consistency - Atomic operations and transactions where needed

State Categories

  • Session State - User connections and temporary data
  • Game State - Room data, user positions, real-time interactions
  • Social State - Friend relationships, chat messages, user status
  • Configuration State - Server settings, room configurations

🏗️ Redis Architecture

Data Structure Strategy

pianorhythm:
├── users:{user_id} # Hash - User session data
├── rooms:{room_id} # Hash - Room configuration
├── rooms:online # Sorted Set - Active rooms
├── rooms:users:{room_id} # Sorted Set - Users in room
├── rooms:chat:{room_id} # Sorted Set - Chat messages
├── users:friends:{user_id} # Set - Friend relationships
├── users:online # Sorted Set - Online users
└── server:settings # Hash - Server configuration

Key Naming Conventions

  • Prefix: pianorhythm: - Namespace isolation
  • Entity Types: users:, rooms:, server: - Clear categorization
  • Relationships: rooms:users:, users:friends: - Relationship mapping
  • Collections: online, chat, typing - Collection suffixes

📊 Data Models

User State Model

pub struct UserDbo {
pub user_id: UserId,
pub username: String,
pub user_tag: String,
pub socket_id: SocketId,
pub room_id: Option<RoomId>,
pub status: UserStatus,
pub roles: UserRoles,
pub settings: UserSettings,
pub billing_settings: UserBillingSettings,
pub last_seen: DateTime<Utc>,
}

Redis Storage:

  • Key: pianorhythm:users:{user_id}
  • Type: Hash
  • TTL: Session-based (extends on activity)

Room State Model

pub struct RoomStateDbo {
pub room_id: RoomId,
pub room_name: String,
pub room_owner: String,
pub room_type: RoomType,
pub settings: RoomSettings,
pub created_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
}

Redis Storage:

  • Key: pianorhythm:rooms:{room_id}
  • Type: Hash
  • Additional: Sorted set entries for discovery

Chat Message Model

pub struct ChatMessageDbo {
pub message_id: String,
pub user_id: UserId,
pub username: String,
pub message: String,
pub timestamp: DateTime<Utc>,
pub message_type: ChatMessageType,
}

Redis Storage:

  • Key: pianorhythm:rooms:chat:{room_id}
  • Type: Sorted Set (scored by timestamp)
  • Retention: Configurable message history limit

🔄 State Operations

User State Operations

User Login

pub fn save_user(&self, user_dbo: &UserDbo) -> Result<(), StateError> {
let mut pipeline = self.store.create_pipeline();

// Save user data
pipeline.hset_multiple(&user_key, &user_hash_data);

// Add to online users
pipeline.zadd("pianorhythm:users:online", &user_dbo.user_id, timestamp);

// Set expiration
pipeline.expire(&user_key, SESSION_TTL);

pipeline.execute()
}

User Room Assignment

pub fn add_user_to_room(&self, room_id: &RoomId, socket_id: &SocketId) 
-> Result<(RoomStateDbo, Vec<UserInfo>, Option<RoomId>), StateError> {

let mut pipeline = self.store.create_pipeline();

// Remove from previous room
if let Some(prev_room) = self.get_user_room(socket_id)? {
pipeline.zrem(&format!("pianorhythm:rooms:users:{}", prev_room), socket_id);
}

// Add to new room
pipeline.zadd(&format!("pianorhythm:rooms:users:{}", room_id), socket_id, timestamp);

// Update user's room assignment
pipeline.hset(&format!("pianorhythm:users:{}", socket_id), "room_id", room_id);

pipeline.execute()
}

Room State Operations

Room Creation

pub fn save_room(&self, room_dbo: &RoomStateDbo) -> Result<(), StateError> {
let mut pipeline = self.store.create_pipeline();

// Save room data
pipeline.hset_multiple(&room_key, &room_hash_data);

// Add to online rooms
pipeline.zadd("pianorhythm:rooms:online", &room_dbo.room_id, timestamp);

// Add name mapping if named room
if !room_dbo.room_name.is_empty() {
pipeline.hset("pianorhythm:rooms:name-map", &room_dbo.room_name, &room_dbo.room_id);
}

pipeline.execute()
}

Chat Message Storage

pub fn save_chat_message(&self, room_id: &RoomId, message: &ChatMessageDbo) 
-> Result<bool, StateError> {

let chat_key = format!("pianorhythm:rooms:chat:{}", room_id);
let message_json = serde_json::to_string(message)?;
let score = message.timestamp.timestamp_millis() as f64;

// Add message to sorted set
self.store.zadd(&chat_key, &message_json, score)?;

// Trim old messages (keep last 100)
self.store.zremrangebyrank(&chat_key, 0, -101)?;

Ok(true)
}

🚀 Performance Optimizations

Connection Pooling

pub struct RedisDBStore {
pool: r2d2::Pool<RedisConnectionManager>,
config: Arc<PianoRhythmConfig>,
}

impl RedisDBStore {
pub fn new(pool: r2d2::Pool<RedisConnectionManager>) -> Self {
Self { pool, config }
}

fn get_connection(&self) -> Result<r2d2::PooledConnection<RedisConnectionManager>, StateError> {
self.pool.get().map_err(|e| StateError::ConnectionError(e.to_string()))
}
}

Pipeline Operations

pub fn create_pipeline(&self) -> RedisPipeline {
RedisPipeline::new(self.get_connection()?)
}

// Batch multiple operations
let mut pipeline = state.create_pipeline();
pipeline.hset("key1", "field1", "value1");
pipeline.zadd("key2", "member1", 1.0);
pipeline.expire("key1", 3600);
pipeline.execute()?; // Single round-trip

Caching Strategies

Hot Data Caching

  • User Sessions - Cached for session duration
  • Active Rooms - Cached with activity-based TTL
  • Friend Lists - Cached with manual invalidation

Cache Invalidation

pub fn invalidate_user_cache(&self, user_id: &UserId) {
// Remove from cache
self.store.del(&format!("pianorhythm:cache:user:{}", user_id));

// Publish invalidation event
self.publish_cache_invalidation("user", user_id);
}

🔄 Data Synchronization

Redis to MongoDB Sync

pub async fn sync_user_to_mongodb(&self, user_dbo: &UserDbo) -> Result<(), StateError> {
// Update MongoDB with current Redis state
self.users_service.update_user(user_dbo).await?;

// Log sync operation
debug!("Synced user {} to MongoDB", user_dbo.user_id);

Ok(())
}

Cross-Server Synchronization

pub fn publish_state_change(&self, change: StateChange) {
let channel = format!("pianorhythm:state-changes:{}", self.config.server_name);
let message = serde_json::to_string(&change).unwrap();

self.store.publish(&channel, message);
}

🛡️ Data Consistency

Atomic Operations

pub fn atomic_room_join(&self, user_id: &UserId, room_id: &RoomId) -> Result<(), StateError> {
let mut pipeline = self.store.create_pipeline();
pipeline.atomic(); // Start transaction

// Check room capacity
let current_users = pipeline.zcard(&format!("pianorhythm:rooms:users:{}", room_id));

// Conditional operations based on capacity
pipeline.execute_conditional(|results| {
let user_count: i64 = results[0];
user_count < MAX_ROOM_CAPACITY
})
}

Conflict Resolution

  • Last Write Wins - For user preferences and settings
  • Merge Strategy - For complex state objects
  • Version Vectors - For distributed conflict detection

📊 Monitoring and Metrics

Redis Metrics

  • Connection Pool Usage - Monitor pool exhaustion
  • Command Latency - Track operation performance
  • Memory Usage - Monitor Redis memory consumption
  • Hit/Miss Ratios - Cache effectiveness metrics

State Metrics

pub struct StateMetrics {
pub active_users: i64,
pub active_rooms: i64,
pub total_connections: i64,
pub cache_hit_rate: f64,
pub average_response_time: Duration,
}

🔧 Configuration

Redis Configuration

[redis]
url = "redis://localhost:6379"
pool_size = 20
timeout = 5000
retry_attempts = 3

State Management Configuration

pub struct StateConfig {
pub session_ttl: Duration,
pub room_ttl: Duration,
pub chat_history_limit: usize,
pub cache_size_limit: usize,
}

This state management architecture provides high-performance, consistent, and scalable data operations while maintaining data integrity and supporting real-time multiplayer gaming requirements.