Skip to main content

State Management Architecture

PianoRhythm Server implements a sophisticated state management system using Redis as the primary state store, with MongoDB for persistent data. This document details the state management architecture, data models, and caching strategies.

๐ŸŽฏ State Management Overviewโ€‹

Design Principlesโ€‹

  • Single Source of Truth - Redis serves as the authoritative state store
  • Performance First - In-memory operations for real-time requirements
  • Persistence - MongoDB backup for data durability
  • Consistency - Atomic operations and transactions where needed

State Categoriesโ€‹

  • Session State - User connections and temporary data
  • Game State - Room data, user positions, real-time interactions
  • Social State - Friend relationships, chat messages, user status
  • Configuration State - Server settings, room configurations

๐Ÿ—๏ธ Redis Architectureโ€‹

Data Structure Strategyโ€‹

pianorhythm:
โ”œโ”€โ”€ users:{user_id} # Hash - User session data
โ”œโ”€โ”€ rooms:{room_id} # Hash - Room configuration
โ”œโ”€โ”€ rooms:online # Sorted Set - Active rooms
โ”œโ”€โ”€ rooms:users:{room_id} # Sorted Set - Users in room
โ”œโ”€โ”€ rooms:chat:{room_id} # Sorted Set - Chat messages
โ”œโ”€โ”€ users:friends:{user_id} # Set - Friend relationships
โ”œโ”€โ”€ users:online # Sorted Set - Online users
โ””โ”€โ”€ server:settings # Hash - Server configuration

Key Naming Conventionsโ€‹

  • Prefix: pianorhythm: - Namespace isolation
  • Entity Types: users:, rooms:, server: - Clear categorization
  • Relationships: rooms:users:, users:friends: - Relationship mapping
  • Collections: online, chat, typing - Collection suffixes

๐Ÿ“Š Data Modelsโ€‹

User State Modelโ€‹

pub struct UserDbo {
pub user_id: UserId,
pub username: String,
pub user_tag: String,
pub socket_id: SocketId,
pub room_id: Option<RoomId>,
pub status: UserStatus,
pub roles: UserRoles,
pub settings: UserSettings,
pub billing_settings: UserBillingSettings,
pub last_seen: DateTime<Utc>,
}

Redis Storage:

  • Key: pianorhythm:users:{user_id}
  • Type: Hash
  • TTL: Session-based (extends on activity)

Room State Modelโ€‹

pub struct RoomStateDbo {
pub room_id: RoomId,
pub room_name: String,
pub room_owner: String,
pub room_type: RoomType,
pub settings: RoomSettings,
pub created_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
}

Redis Storage:

  • Key: pianorhythm:rooms:{room_id}
  • Type: Hash
  • Additional: Sorted set entries for discovery

Chat Message Modelโ€‹

pub struct ChatMessageDbo {
pub message_id: String,
pub user_id: UserId,
pub username: String,
pub message: String,
pub timestamp: DateTime<Utc>,
pub message_type: ChatMessageType,
}

Redis Storage:

  • Key: pianorhythm:rooms:chat:{room_id}
  • Type: Sorted Set (scored by timestamp)
  • Retention: Configurable message history limit

๐Ÿ”„ State Operationsโ€‹

User State Operationsโ€‹

User Loginโ€‹

pub fn save_user(&self, user_dbo: &UserDbo) -> Result<(), StateError> {
let mut pipeline = self.store.create_pipeline();

// Save user data
pipeline.hset_multiple(&user_key, &user_hash_data);

// Add to online users
pipeline.zadd("pianorhythm:users:online", &user_dbo.user_id, timestamp);

// Set expiration
pipeline.expire(&user_key, SESSION_TTL);

pipeline.execute()
}

User Room Assignmentโ€‹

pub fn add_user_to_room(&self, room_id: &RoomId, socket_id: &SocketId) 
-> Result<(RoomStateDbo, Vec<UserInfo>, Option<RoomId>), StateError> {

let mut pipeline = self.store.create_pipeline();

// Remove from previous room
if let Some(prev_room) = self.get_user_room(socket_id)? {
pipeline.zrem(&format!("pianorhythm:rooms:users:{}", prev_room), socket_id);
}

// Add to new room
pipeline.zadd(&format!("pianorhythm:rooms:users:{}", room_id), socket_id, timestamp);

// Update user's room assignment
pipeline.hset(&format!("pianorhythm:users:{}", socket_id), "room_id", room_id);

pipeline.execute()
}

Room State Operationsโ€‹

Room Creationโ€‹

pub fn save_room(&self, room_dbo: &RoomStateDbo) -> Result<(), StateError> {
let mut pipeline = self.store.create_pipeline();

// Save room data
pipeline.hset_multiple(&room_key, &room_hash_data);

// Add to online rooms
pipeline.zadd("pianorhythm:rooms:online", &room_dbo.room_id, timestamp);

// Add name mapping if named room
if !room_dbo.room_name.is_empty() {
pipeline.hset("pianorhythm:rooms:name-map", &room_dbo.room_name, &room_dbo.room_id);
}

pipeline.execute()
}

Chat Message Storageโ€‹

pub fn save_chat_message(&self, room_id: &RoomId, message: &ChatMessageDbo) 
-> Result<bool, StateError> {

let chat_key = format!("pianorhythm:rooms:chat:{}", room_id);
let message_json = serde_json::to_string(message)?;
let score = message.timestamp.timestamp_millis() as f64;

// Add message to sorted set
self.store.zadd(&chat_key, &message_json, score)?;

// Trim old messages (keep last 100)
self.store.zremrangebyrank(&chat_key, 0, -101)?;

Ok(true)
}

๐Ÿš€ Performance Optimizationsโ€‹

Connection Poolingโ€‹

pub struct RedisDBStore {
pool: r2d2::Pool<RedisConnectionManager>,
config: Arc<PianoRhythmConfig>,
}

impl RedisDBStore {
pub fn new(pool: r2d2::Pool<RedisConnectionManager>) -> Self {
Self { pool, config }
}

fn get_connection(&self) -> Result<r2d2::PooledConnection<RedisConnectionManager>, StateError> {
self.pool.get().map_err(|e| StateError::ConnectionError(e.to_string()))
}
}

Pipeline Operationsโ€‹

pub fn create_pipeline(&self) -> RedisPipeline {
RedisPipeline::new(self.get_connection()?)
}

// Batch multiple operations
let mut pipeline = state.create_pipeline();
pipeline.hset("key1", "field1", "value1");
pipeline.zadd("key2", "member1", 1.0);
pipeline.expire("key1", 3600);
pipeline.execute()?; // Single round-trip

Caching Strategiesโ€‹

Hot Data Cachingโ€‹

  • User Sessions - Cached for session duration
  • Active Rooms - Cached with activity-based TTL
  • Friend Lists - Cached with manual invalidation

Cache Invalidationโ€‹

pub fn invalidate_user_cache(&self, user_id: &UserId) {
// Remove from cache
self.store.del(&format!("pianorhythm:cache:user:{}", user_id));

// Publish invalidation event
self.publish_cache_invalidation("user", user_id);
}

๐Ÿ”„ Data Synchronizationโ€‹

Redis to MongoDB Syncโ€‹

pub async fn sync_user_to_mongodb(&self, user_dbo: &UserDbo) -> Result<(), StateError> {
// Update MongoDB with current Redis state
self.users_service.update_user(user_dbo).await?;

// Log sync operation
debug!("Synced user {} to MongoDB", user_dbo.user_id);

Ok(())
}

Cross-Server Synchronizationโ€‹

pub fn publish_state_change(&self, change: StateChange) {
let channel = format!("pianorhythm:state-changes:{}", self.config.server_name);
let message = serde_json::to_string(&change).unwrap();

self.store.publish(&channel, message);
}

๐Ÿ›ก๏ธ Data Consistencyโ€‹

Atomic Operationsโ€‹

pub fn atomic_room_join(&self, user_id: &UserId, room_id: &RoomId) -> Result<(), StateError> {
let mut pipeline = self.store.create_pipeline();
pipeline.atomic(); // Start transaction

// Check room capacity
let current_users = pipeline.zcard(&format!("pianorhythm:rooms:users:{}", room_id));

// Conditional operations based on capacity
pipeline.execute_conditional(|results| {
let user_count: i64 = results[0];
user_count < MAX_ROOM_CAPACITY
})
}

Conflict Resolutionโ€‹

  • Last Write Wins - For user preferences and settings
  • Merge Strategy - For complex state objects
  • Version Vectors - For distributed conflict detection

๐Ÿ“Š Monitoring and Metricsโ€‹

Redis Metricsโ€‹

  • Connection Pool Usage - Monitor pool exhaustion
  • Command Latency - Track operation performance
  • Memory Usage - Monitor Redis memory consumption
  • Hit/Miss Ratios - Cache effectiveness metrics

State Metricsโ€‹

pub struct StateMetrics {
pub active_users: i64,
pub active_rooms: i64,
pub total_connections: i64,
pub cache_hit_rate: f64,
pub average_response_time: Duration,
}

๐Ÿ”ง Configurationโ€‹

Redis Configurationโ€‹

[redis]
url = "redis://localhost:6379"
pool_size = 20
timeout = 5000
retry_attempts = 3

State Management Configurationโ€‹

pub struct StateConfig {
pub session_ttl: Duration,
pub room_ttl: Duration,
pub chat_history_limit: usize,
pub cache_size_limit: usize,
}

This state management architecture provides high-performance, consistent, and scalable data operations while maintaining data integrity and supporting real-time multiplayer gaming requirements.