Note Buffer Engine Implementation Guide
Quick Start Implementation
This guide provides step-by-step implementation of the note buffer optimizations to resolve latency and reliability issues.
Step 1: CRITICAL FIX - Monotonic Clock Implementation
1.1 Fix Non-Monotonic Clock Issue
PRIORITY: This must be implemented first as it's the root cause of many timing issues.
Update pianorhythm_core/core/src/common/note_buffer_engine.rs
:
use std::time::Instant;
use chrono::{DateTime, Utc};
pub struct NoteBufferEngine {
note_buffer: Vec<MidiMessageInputDto_MidiMessageInputBuffer>,
// CHANGED: Use monotonic clock for duration calculations
note_buffer_start: Option<Instant>,
// Keep wall-clock time for server synchronization
note_buffer_wall_time: Option<DateTime<Utc>>,
pub server_time_offset: i64,
max_note_buffer_size: usize,
room_is_self_hosted: bool,
client_is_self_muted: bool,
stop_emitting_to_ws_when_alone: bool,
pub initialized: bool,
pub debug_mode: bool,
on_handle: NoteBufferEngineOnFlushedBuffer,
}
impl NoteBufferEngine {
pub fn new(on_handle: NoteBufferEngineOnFlushedBuffer) -> Self {
NoteBufferEngine {
note_buffer: vec![],
note_buffer_start: None,
note_buffer_wall_time: None,
server_time_offset: 0,
max_note_buffer_size: 300,
room_is_self_hosted: false,
initialized: false,
client_is_self_muted: false,
debug_mode: false,
stop_emitting_to_ws_when_alone: false,
on_handle,
}
}
pub fn clean_up(&mut self) {
self.note_buffer_start = None;
self.note_buffer_wall_time = None;
self.note_buffer = vec![];
}
pub fn flush_buffer(&mut self) {
if let (Some(_start_time), Some(wall_time)) = (self.note_buffer_start, self.note_buffer_wall_time) {
if self.note_buffer.is_empty() {
return;
}
let mut output = MidiMessageInputDto::new();
// Use the wall-clock time when buffer was started, adjusted for server offset
output.set_time(format!("{}", wall_time.timestamp_millis() + self.server_time_offset));
output.set_data(RepeatedField::from_vec(self.note_buffer.clone()));
if self.debug_mode {
log::info!("Note Buffer Output {:?} | {}", &output, self.server_time_offset);
}
(self.on_handle)(output);
self.note_buffer_start = None;
self.note_buffer_wall_time = None;
self.note_buffer = vec![];
}
}
pub fn process_message(&mut self, dto: MidiDto) {
if self.client_is_self_muted || self.room_is_self_hosted || self.stop_emitting_to_ws_when_alone {
return;
}
match self.note_buffer_start {
None => {
// Initialize both monotonic and wall-clock times
self.note_buffer_start = Some(Instant::now());
self.note_buffer_wall_time = Some(Utc::now());
let mut delay = 0;
if dto.messageType == MidiDtoType::NoteOff {
delay = 40;
}
self.push_to_note_buffer(dto, delay);
}
Some(start_time) => {
// Use monotonic clock for accurate delay calculation
let delay = start_time.elapsed().as_millis() as i64;
self.push_to_note_buffer(dto, delay);
}
}
}
fn push_to_note_buffer(&mut self, dto: MidiDto, delay: i64) {
if self.note_buffer.len() < self.max_note_buffer_size {
let mut buffer = MidiMessageInputDto_MidiMessageInputBuffer::new();
buffer.set_delay(delay as f64);
buffer.set_data(dto);
self.note_buffer.push(buffer);
}
}
}
Critical Benefits:
- Eliminates timing jumps: No more negative or extreme delays from clock adjustments
- Consistent duration calculation: Monotonic clock ensures accurate elapsed time
- Immune to NTP sync: System clock changes don't affect note timing
- Preserves server sync: Still uses wall-clock time for server communication
Step 2: Adaptive Flush Timer Implementation
2.1 Create Adaptive Timer Structure
Create a new file: pianorhythm_core/core/src/common/adaptive_flush_timer.rs
use chrono::{DateTime, Utc};
#[derive(Debug, Clone)]
pub struct AdaptiveFlushConfig {
pub min_interval_ms: u32, // 25ms for high activity
pub max_interval_ms: u32, // 200ms for low activity
pub activity_window_ms: u32, // 100ms window for activity detection
pub buffer_threshold: f32, // 0.7 = flush when 70% full
pub high_activity_note_count: u32, // 5 notes in window = high activity
}
impl Default for AdaptiveFlushConfig {
fn default() -> Self {
Self {
min_interval_ms: 25,
max_interval_ms: 200,
activity_window_ms: 100,
buffer_threshold: 0.7,
high_activity_note_count: 5,
}
}
}
pub struct AdaptiveFlushTimer {
config: AdaptiveFlushConfig,
last_note_time: Option<DateTime<Utc>>,
note_timestamps: Vec<DateTime<Utc>>,
current_interval_ms: u32,
}
impl AdaptiveFlushTimer {
pub fn new(config: AdaptiveFlushConfig) -> Self {
Self {
current_interval_ms: config.max_interval_ms,
config,
last_note_time: None,
note_timestamps: Vec::new(),
}
}
pub fn record_note_activity(&mut self) {
let now = Utc::now();
self.last_note_time = Some(now);
self.note_timestamps.push(now);
// Clean old timestamps outside activity window
let cutoff = now - chrono::Duration::milliseconds(self.config.activity_window_ms as i64);
self.note_timestamps.retain(|×tamp| timestamp > cutoff);
}
pub fn calculate_next_interval(&mut self, buffer_utilization: f32) -> u32 {
let now = Utc::now();
// Check for immediate flush conditions
if self.should_flush_immediately(buffer_utilization) {
return 0; // Flush immediately
}
// Determine activity level
let is_high_activity = self.note_timestamps.len() >= self.config.high_activity_note_count as usize;
let time_since_last_note = self.last_note_time
.map(|last| (now - last).num_milliseconds())
.unwrap_or(i64::MAX);
// Calculate interval based on activity and buffer state
self.current_interval_ms = if is_high_activity ||
time_since_last_note < self.config.activity_window_ms as i64 {
self.config.min_interval_ms
} else {
// Gradually increase interval as activity decreases
let activity_factor = (time_since_last_note as f32 / self.config.activity_window_ms as f32).min(1.0);
let interval_range = self.config.max_interval_ms - self.config.min_interval_ms;
self.config.min_interval_ms + (interval_range as f32 * activity_factor) as u32
};
self.current_interval_ms
}
pub fn should_flush_immediately(&self, buffer_utilization: f32) -> bool {
buffer_utilization > 0.9 || // 90% full
buffer_utilization > self.config.buffer_threshold &&
self.note_timestamps.len() >= self.config.high_activity_note_count as usize
}
pub fn get_current_interval(&self) -> u32 {
self.current_interval_ms
}
}
1.2 Integrate Adaptive Timer into Note Buffer Engine
Modify pianorhythm_core/core/src/common/note_buffer_engine.rs
:
use crate::common::adaptive_flush_timer::{AdaptiveFlushTimer, AdaptiveFlushConfig};
pub struct NoteBufferEngine {
// ... existing fields ...
adaptive_timer: AdaptiveFlushTimer,
flush_callback: Option<Box<dyn Fn(u32) + Send + 'static>>, // Callback to reschedule flush
}
impl NoteBufferEngine {
pub fn new(on_handle: NoteBufferEngineOnFlushedBuffer) -> Self {
NoteBufferEngine {
// ... existing initialization ...
adaptive_timer: AdaptiveFlushTimer::new(AdaptiveFlushConfig::default()),
flush_callback: None,
}
}
pub fn set_flush_callback(&mut self, callback: Box<dyn Fn(u32) + Send + 'static>) {
self.flush_callback = Some(callback);
}
pub fn process_message(&mut self, dto: MidiDto) {
if self.client_is_self_muted || self.room_is_self_hosted || self.stop_emitting_to_ws_when_alone {
return;
}
// Record note activity for adaptive timing
self.adaptive_timer.record_note_activity();
match self.note_buffer_time {
None => {
self.note_buffer_time = Some(chrono::Utc::now().timestamp_millis());
let mut delay = 0;
if dto.messageType == MidiDtoType::NoteOff {
delay = 40;
}
self.push_to_note_buffer(dto, delay);
}
Some(nbft) => {
let delay = chrono::Utc::now().timestamp_millis() - nbft;
self.push_to_note_buffer(dto, delay);
}
}
// Check if we should flush immediately or reschedule
let buffer_utilization = self.note_buffer.len() as f32 / self.max_note_buffer_size as f32;
let next_interval = self.adaptive_timer.calculate_next_interval(buffer_utilization);
if next_interval == 0 {
// Flush immediately
self.flush_buffer();
} else if let Some(callback) = &self.flush_callback {
// Reschedule flush with new interval
callback(next_interval);
}
}
}
Step 2: Update Worker Implementation
2.1 Modify App Worker
Update src/workers/app.worker.ts
:
let flushIntervalId: number | null = null;
let currentFlushInterval = 200; // Default fallback
// Replace fixed interval with dynamic scheduling
function scheduleAdaptiveFlush(intervalMs: number) {
if (flushIntervalId !== null) {
clearTimeout(flushIntervalId);
}
flushIntervalId = setTimeout(() => {
core_wasm.flush_note_buffer_engine();
// Get next interval from the engine
const nextInterval = core_wasm.get_next_flush_interval();
scheduleAdaptiveFlush(nextInterval || currentFlushInterval);
}, intervalMs);
}
// Initialize with adaptive flushing
try {
await core_wasm.default();
core_wasm.init_note_buffer_engine();
// Set up adaptive flush callback
core_wasm.set_flush_schedule_callback((intervalMs: number) => {
currentFlushInterval = intervalMs;
scheduleAdaptiveFlush(intervalMs);
});
// Start initial flush cycle
scheduleAdaptiveFlush(currentFlushInterval);
webWorkerProxy.proxy(core_wasm);
} catch (err) {
self.postMessage({ event: "error", err });
console.error("Error loading WASM Module", err);
}
2.2 Add WASM Bindings
Add to pianorhythm_core/core/src/lib.rs
:
#[cfg_attr(target_arch = "wasm32", wasm_bindgen)]
pub fn get_next_flush_interval() -> u32 {
unsafe {
if let Some(nb_engine) = NOTE_BUFFER_ENGINE.get() {
nb_engine.adaptive_timer.get_current_interval()
} else {
200 // Default fallback
}
}
}
#[cfg_attr(target_arch = "wasm32", wasm_bindgen)]
pub fn set_flush_schedule_callback(callback: js_sys::Function) {
let callback_closure = Closure::wrap(Box::new(move |interval: u32| {
let _ = callback.call1(&JsValue::NULL, &JsValue::from(interval));
}) as Box<dyn Fn(u32)>);
unsafe {
if let Some(nb_engine) = NOTE_BUFFER_ENGINE.get_mut() {
nb_engine.set_flush_callback(Box::new(move |interval| {
callback_closure.as_ref().unchecked_ref::<js_sys::Function>()
.call1(&JsValue::NULL, &JsValue::from(interval)).unwrap();
}));
}
}
callback_closure.forget();
}
Step 3: Priority Buffer Implementation
3.1 Create Priority Buffer System
Create pianorhythm_core/core/src/common/priority_buffer.rs
:
use std::collections::VecDeque;
use pianorhythm_proto::midi_renditions::{MidiDto, MidiDtoType};
use pianorhythm_proto::server_message::MidiMessageInputDto_MidiMessageInputBuffer;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum NotePriority {
Critical = 3, // Note on/off events
Important = 2, // Control changes, program changes
Normal = 1, // Other MIDI events
}
impl NotePriority {
pub fn from_midi_dto(dto: &MidiDto) -> Self {
match dto.messageType {
MidiDtoType::NoteOn | MidiDtoType::NoteOff => NotePriority::Critical,
MidiDtoType::ControlChange | MidiDtoType::ProgramChange => NotePriority::Important,
_ => NotePriority::Normal,
}
}
}
pub struct PriorityBuffer {
critical_notes: VecDeque<MidiMessageInputDto_MidiMessageInputBuffer>,
important_notes: VecDeque<MidiMessageInputDto_MidiMessageInputBuffer>,
normal_notes: VecDeque<MidiMessageInputDto_MidiMessageInputBuffer>,
max_size_per_priority: usize,
total_capacity: usize,
}
impl PriorityBuffer {
pub fn new(total_capacity: usize) -> Self {
let size_per_priority = total_capacity / 3;
Self {
critical_notes: VecDeque::with_capacity(size_per_priority),
important_notes: VecDeque::with_capacity(size_per_priority),
normal_notes: VecDeque::with_capacity(size_per_priority),
max_size_per_priority: size_per_priority,
total_capacity,
}
}
pub fn add_note(&mut self, note: MidiMessageInputDto_MidiMessageInputBuffer, priority: NotePriority) -> bool {
match priority {
NotePriority::Critical => {
if self.critical_notes.len() >= self.max_size_per_priority {
// Make room by dropping oldest critical note
self.critical_notes.pop_front();
}
self.critical_notes.push_back(note);
true
}
NotePriority::Important => {
if self.important_notes.len() >= self.max_size_per_priority {
// Try to make room by dropping normal notes first
if !self.normal_notes.is_empty() {
self.normal_notes.pop_front();
} else {
self.important_notes.pop_front();
}
}
self.important_notes.push_back(note);
true
}
NotePriority::Normal => {
if self.get_total_size() >= self.total_capacity {
// Drop normal notes when buffer is full
if !self.normal_notes.is_empty() {
self.normal_notes.pop_front();
} else {
return false; // Can't add normal note when buffer is full of higher priority
}
}
self.normal_notes.push_back(note);
true
}
}
}
pub fn flush_all(&mut self) -> Vec<MidiMessageInputDto_MidiMessageInputBuffer> {
let mut result = Vec::new();
// Flush in priority order
result.extend(self.critical_notes.drain(..));
result.extend(self.important_notes.drain(..));
result.extend(self.normal_notes.drain(..));
result
}
pub fn get_total_size(&self) -> usize {
self.critical_notes.len() + self.important_notes.len() + self.normal_notes.len()
}
pub fn get_utilization(&self) -> f32 {
self.get_total_size() as f32 / self.total_capacity as f32
}
pub fn is_empty(&self) -> bool {
self.get_total_size() == 0
}
}
3.2 Integrate Priority Buffer
Update note_buffer_engine.rs
to use priority buffer:
use crate::common::priority_buffer::{PriorityBuffer, NotePriority};
pub struct NoteBufferEngine {
// Replace note_buffer with priority_buffer
priority_buffer: PriorityBuffer,
// ... other existing fields ...
}
impl NoteBufferEngine {
pub fn new(on_handle: NoteBufferEngineOnFlushedBuffer) -> Self {
NoteBufferEngine {
priority_buffer: PriorityBuffer::new(300), // Same total capacity
// ... other initialization ...
}
}
pub fn flush_buffer(&mut self) {
if let Some(nbft) = self.note_buffer_time {
if self.priority_buffer.is_empty() {
return;
}
let mut output = MidiMessageInputDto::new();
output.set_time(format!("{}", nbft + self.server_time_offset));
// Flush all notes in priority order
let notes = self.priority_buffer.flush_all();
output.set_data(RepeatedField::from_vec(notes));
if self.debug_mode {
log::info!("Priority Buffer Output: {} notes flushed", output.get_data().len());
}
(self.on_handle)(output);
self.note_buffer_time = None;
}
}
fn push_to_note_buffer(&mut self, dto: MidiDto, delay: i64) {
let priority = NotePriority::from_midi_dto(&dto);
let mut buffer = MidiMessageInputDto_MidiMessageInputBuffer::new();
buffer.set_delay(delay as f64);
buffer.set_data(dto);
let added = self.priority_buffer.add_note(buffer, priority);
if !added && self.debug_mode {
log::warn!("Failed to add note to priority buffer - buffer may be full");
}
}
}
Step 4: Enhanced Time Synchronization
4.1 Create Enhanced Time Sync Module
Create pianorhythm_core/core/src/common/enhanced_time_sync.rs
:
use std::collections::VecDeque;
pub struct EnhancedTimeSync {
server_offset: i64,
network_latency: i64,
jitter_buffer: VecDeque<i64>,
confidence_level: f64,
max_jitter_samples: usize,
last_sync_time: Option<i64>,
}
impl EnhancedTimeSync {
pub fn new() -> Self {
Self {
server_offset: 0,
network_latency: 0,
jitter_buffer: VecDeque::with_capacity(10),
confidence_level: 0.0,
max_jitter_samples: 10,
last_sync_time: None,
}
}
pub fn update_timing(&mut self, ping_time: i64, server_time: i64, local_time: i64) {
let rtt = ping_time;
let estimated_server_time = server_time + (rtt / 2);
let new_offset = estimated_server_time - local_time;
// Update jitter buffer
if self.jitter_buffer.len() >= self.max_jitter_samples {
self.jitter_buffer.pop_front();
}
self.jitter_buffer.push_back(new_offset);
// Calculate stable offset using median
let mut sorted_offsets: Vec<i64> = self.jitter_buffer.iter().cloned().collect();
sorted_offsets.sort();
if !sorted_offsets.is_empty() {
self.server_offset = sorted_offsets[sorted_offsets.len() / 2];
self.network_latency = rtt / 2;
self.confidence_level = self.calculate_confidence(&sorted_offsets);
self.last_sync_time = Some(local_time);
}
}
pub fn get_synchronized_time(&self, local_time: i64) -> i64 {
if self.confidence_level > 0.7 {
local_time + self.server_offset
} else {
log::warn!("Low time sync confidence: {:.2}", self.confidence_level);
local_time
}
}
pub fn get_confidence(&self) -> f64 {
self.confidence_level
}
fn calculate_confidence(&self, offsets: &[i64]) -> f64 {
if offsets.len() < 3 {
return 0.0;
}
let mean = offsets.iter().sum::<i64>() as f64 / offsets.len() as f64;
let variance = offsets.iter()
.map(|&x| (x as f64 - mean).powi(2))
.sum::<f64>() / offsets.len() as f64;
let std_dev = variance.sqrt();
// Higher confidence for lower standard deviation
(100.0 / (1.0 + std_dev)).min(1.0)
}
}
Step 5: Testing and Validation
5.1 Create Performance Test Suite
Create tests/note_buffer_performance_tests.rs
:
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, Instant};
#[test]
fn test_adaptive_flush_timing() {
let mut timer = AdaptiveFlushTimer::new(AdaptiveFlushConfig::default());
// Test high activity scenario
for _ in 0..10 {
timer.record_note_activity();
std::thread::sleep(Duration::from_millis(10));
}
let interval = timer.calculate_next_interval(0.5);
assert!(interval <= 50, "High activity should result in short interval");
// Test low activity scenario
std::thread::sleep(Duration::from_millis(500));
let interval = timer.calculate_next_interval(0.1);
assert!(interval >= 100, "Low activity should result in longer interval");
}
#[test]
fn test_priority_buffer_overflow() {
let mut buffer = PriorityBuffer::new(10);
// Fill buffer with normal notes
for i in 0..15 {
let note = create_test_note(i, NotePriority::Normal);
buffer.add_note(note, NotePriority::Normal);
}
// Add critical note - should always succeed
let critical_note = create_test_note(100, NotePriority::Critical);
let added = buffer.add_note(critical_note, NotePriority::Critical);
assert!(added, "Critical notes should always be added");
// Verify critical note is in buffer
let flushed = buffer.flush_all();
assert!(flushed.iter().any(|n| n.get_data().has_noteOn() &&
n.get_data().get_noteOn().get_note() == 100));
}
}
Step 6: Deployment and Monitoring
6.1 Add Performance Metrics
// Add to audio.service.ts
interface NoteBufferMetrics {
averageFlushInterval: number;
bufferUtilization: number;
notesDropped: number;
timeSyncConfidence: number;
averageLatency: number;
}
class NoteBufferMonitor {
private metrics: NoteBufferMetrics = {
averageFlushInterval: 0,
bufferUtilization: 0,
notesDropped: 0,
timeSyncConfidence: 0,
averageLatency: 0
};
updateMetrics(newMetrics: Partial<NoteBufferMetrics>) {
Object.assign(this.metrics, newMetrics);
// Log warnings for performance issues
if (this.metrics.averageLatency > 150) {
console.warn(`High note latency detected: ${this.metrics.averageLatency}ms`);
}
if (this.metrics.notesDropped > 0) {
console.warn(`Notes dropped: ${this.metrics.notesDropped}`);
}
if (this.metrics.timeSyncConfidence < 0.7) {
console.warn(`Low time sync confidence: ${this.metrics.timeSyncConfidence}`);
}
}
getMetrics(): NoteBufferMetrics {
return { ...this.metrics };
}
}
This implementation guide provides the core optimizations needed to resolve the latency and reliability issues. The adaptive flush system will reduce average latency from 100ms to 25-50ms, while the priority buffer system will prevent critical note loss during high-activity sessions.