Script python destiné à faciliter la mise en oeuvre et la mesure de l'efficacité de Diretta en version DDS
Code :
#!/usr/bin/env python3
"""
Diretta DDS Configuration Calculator with Historical Log Analysis
Optimized for systems with high-quality reference clocks
Key Focus: Clock STABILITY (jitter/phase noise), not just accuracy
"""
import argparse
import re
import subprocess
from pathlib import Path
import numpy as np
import time
import sys
import signal
class DirettaCalculator:
"""Calculate Diretta DDS configuration parameters"""
# Ethernet frame overhead
ETH_HEADER = 14
DIRETTA_HEADER = 2
VLAN_OVERHEAD = 4
FCS = 4
def __init__(self, mtu: int = 9024):
self.mtu = mtu
self.overhead = self.ETH_HEADER + self.DIRETTA_HEADER + self.VLAN_OVERHEAD
self.available_payload = mtu - self.overhead
def calculate_frame_params(self, sample_rate: int, is_dsd: bool = True):
"""Calculate frame parameters for a given sample rate"""
# Bytes per sample (stereo)
if is_dsd:
bytes_per_sample = 0.25
else:
bytes_per_sample = 6
# Calculate samples that fit in available payload
audio_bytes_max = (int(self.available_payload / 8) * 8)
samples_per_frame = int(audio_bytes_max / bytes_per_sample)
audio_bytes = int(samples_per_frame * bytes_per_sample)
# Calculate cycle time
cycle_time_us = (samples_per_frame / sample_rate) * 1_000_000
packet_rate_hz = sample_rate / samples_per_frame
total_frame_size = self.ETH_HEADER + self.DIRETTA_HEADER + audio_bytes + self.FCS
return {
'sample_rate': sample_rate,
'sample_rate_mhz': sample_rate / 1_000_000,
'samples_per_frame': samples_per_frame,
'audio_bytes': audio_bytes,
'total_frame_size': total_frame_size,
'cycle_time_us': cycle_time_us,
'cycle_time_ms': cycle_time_us / 1000,
'packet_rate_hz': packet_rate_hz,
'bytes_per_sample': bytes_per_sample,
'is_dsd': is_dsd,
}
def calculate_cycle_time_for_48k(self, is_dsd: bool = True):
"""Calculate CycleTime based on 48kHz base rate"""
base_rate = 48000
multiplier = 256 if is_dsd else 8
sample_rate = base_rate * multiplier
params = self.calculate_frame_params(sample_rate, is_dsd)
# Round to nearest 5 μs for cleaner values
cycle_time = int(round(params['cycle_time_us'] / 5) * 5)
return cycle_time, params
class DirettaMTUDetector:
"""Detect actual MTU from live Diretta traffic using tcpdump"""
def __init__(self, interface: str = 'enp5s0'):
self.interface = interface
self.diretta_ethertype = '0x88b6'
self.process = None
def detect_mtu(self, capture_duration: int = 30, min_packets: int = 5, quick_mode: bool = False) -> dict:
"""
Capture Diretta packets and analyze frame sizes to determine MTU
Args:
capture_duration: How long to capture packets (seconds)
min_packets: Minimum packets needed for reliable detection
quick_mode: If True, stop after getting stable packet size (faster)
Returns:
dict with detection results
"""
print(f"\n{'='*100}")
print(f"DETECTING MTU FROM LIVE DIRETTA TRAFFIC")
print(f"{'='*100}")
print(f"Interface: {self.interface}")
print(f"EtherType: {self.diretta_ethertype} (Diretta DDS)")
print(f"Capture duration: {capture_duration} seconds (max)")
print(f"Minimum packets: {min_packets}")
if quick_mode:
print(f"Quick mode: Will stop after detecting consistent frame size")
print(f"")
print(f"⚠ NOTE: Diretta must be actively streaming audio for detection to work!")
print(f"")
# Check if tcpdump is available
try:
subprocess.run(['which', 'tcpdump'], check=True, capture_output=True)
except subprocess.CalledProcessError:
return {
'success': False,
'error': 'tcpdump not found. Install with: sudo apt-get install tcpdump'
}
# Build tcpdump command with snaplen to capture full frames
cmd = [
'sudo', 'tcpdump',
'-i', self.interface,
'-e',
'-n',
'-q', # Quieter output
'-s', '0', # Full packet capture
'ether', 'proto', self.diretta_ethertype
]
print(f"Running: {' '.join(cmd)}")
print(f"Capturing packets... Press Ctrl+C to stop early")
print("")
frame_sizes = []
start_time = time.time()
packet_count = 0
try:
# Start tcpdump process
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1 # Line buffered
)
# Read output line by line with timeout
while True:
# Check if we've exceeded capture duration
elapsed = time.time() - start_time
if elapsed > capture_duration:
print(f"\n⏱ Timeout reached ({capture_duration}s)")
break
try:
line = self.process.stdout.readline()
if not line:
break
# Parse frame size from this line
size = self._parse_tcpdump_line(line)
if size and size > 0 and size < 65536: # Sanity check: ignore crazy values
frame_sizes.append(size)
packet_count += 1
# Show progress
if packet_count <= 10 or packet_count % 10 == 0:
from collections import Counter
if len(frame_sizes) >= 3:
most_common = Counter(frame_sizes).most_common(1)[0]
print(f"✓ Packet {packet_count}: {size} bytes (most common: {most_common[0]} bytes, {most_common[1]} times)", end='\r')
else:
print(f"✓ Packet {packet_count}: {size} bytes", end='\r')
# In quick mode, check for stability
if quick_mode and packet_count >= max(min_packets, 10):
# Check if we have a clear winner
size_dist = Counter(frame_sizes)
most_common_size, most_common_count = size_dist.most_common(1)[0]
# If >80% of packets are the same size, we're done
if most_common_count / len(frame_sizes) > 0.8:
print(f"\n✓ Quick mode: Detected stable frame size ({most_common_size} bytes in {most_common_count}/{len(frame_sizes)} packets)")
break
# Stop if we have enough packets (non-quick mode)
if not quick_mode and packet_count >= 100:
print(f"\n✓ Captured enough packets ({packet_count})")
break
except Exception as e:
pass
except KeyboardInterrupt:
print(f"\n\n⚠ Capture interrupted by user")
finally:
# Clean up process
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=2)
except subprocess.TimeoutExpired:
self.process.kill()
print(f"\n")
if not frame_sizes:
return {
'success': False,
'error': 'No Diretta packets captured. Is Diretta streaming audio?',
'captured_packets': 0,
'diagnostics': self._get_diagnostics()
}
if len(frame_sizes) < min_packets:
print(f"⚠ Only captured {len(frame_sizes)} packets (wanted {min_packets}), but will analyze anyway...")
# Analyze frame sizes
analysis = self._analyze_frame_sizes(frame_sizes)
analysis['success'] = True
analysis['captured_packets'] = len(frame_sizes)
analysis['capture_duration'] = time.time() - start_time
return analysis
def _parse_tcpdump_line(self, line: str) -> int:
"""
Parse a single tcpdump output line to extract frame size
"""
# Match "length XXXX" in tcpdump output
match = re.search(r'length\s+(\d+)', line)
if match:
return int(match.group(1))
return None
def _analyze_frame_sizes(self, frame_sizes: list) -> dict:
"""Analyze captured frame sizes to determine MTU - USING MOST COMMON SIZE"""
from collections import Counter
# Count size distribution
size_dist = Counter(frame_sizes)
# Find the most common size (this is the real MTU indicator!)
most_common_size, most_common_count = size_dist.most_common(1)[0]
most_common_pct = (most_common_count / len(frame_sizes)) * 100
# Get statistics (for reference)
max_size = max(frame_sizes)
min_size = min(frame_sizes)
avg_size = int(np.mean(frame_sizes))
std_size = int(np.std(frame_sizes))
# Determine MTU based on MOST COMMON size (not maximum!)
# This ignores outliers and malformed packets
# tcpdump "length" includes Ethernet header but not FCS
# So actual frame on wire = length + 4 (FCS)
estimated_mtu = most_common_size + 4 # Add FCS
# Round to common MTU values
common_mtus = [1500, 1522, 9000, 9024, 9216]
closest_mtu = min(common_mtus, key=lambda x: abs(x - estimated_mtu))
# Confidence assessment based on how consistent the sizes are
if most_common_pct >= 90:
confidence = "Very High"
elif most_common_pct >= 70:
confidence = "High"
elif most_common_pct >= 50:
confidence = "Medium"
else:
confidence = "Low"
# If we're within 50 bytes of a common MTU, use that
if abs(closest_mtu - estimated_mtu) < 50:
detected_mtu = closest_mtu
if abs(closest_mtu - estimated_mtu) < 10:
mtu_confidence = f"{confidence} (matches standard MTU)"
else:
mtu_confidence = f"{confidence} (rounded to standard)"
else:
detected_mtu = estimated_mtu
mtu_confidence = f"{confidence} (non-standard MTU)"
# Detect outliers (packets much larger than the common size)
outliers = [s for s in frame_sizes if s > most_common_size * 2]
return {
'detected_mtu': detected_mtu,
'mtu_confidence': mtu_confidence,
'detection_method': 'most_common_size', # Important: we use most common, not max!
'most_common_size': most_common_size,
'most_common_count': most_common_count,
'most_common_pct': most_common_pct,
'max_frame_size': max_size,
'min_frame_size': min_size,
'avg_frame_size': avg_size,
'std_frame_size': std_size,
'outlier_count': len(outliers),
'outlier_sizes': sorted(set(outliers)) if outliers else [],
'size_distribution': dict(size_dist.most_common(10))
}
def _get_diagnostics(self) -> dict:
"""Get diagnostic info about the network interface"""
diagnostics = {}
# Check if interface exists and is up
try:
result = subprocess.run(
['ip', 'link', 'show', self.interface],
capture_output=True,
text=True,
timeout=5
)
diagnostics['interface_exists'] = result.returncode == 0
if result.returncode == 0:
diagnostics['interface_up'] = 'UP' in result.stdout
# Extract MTU from ip link output
mtu_match = re.search(r'mtu\s+(\d+)', result.stdout)
if mtu_match:
diagnostics['interface_mtu'] = int(mtu_match.group(1))
except Exception as e:
diagnostics['error'] = str(e)
# Check if we can see any traffic on the interface
try:
result = subprocess.run(
['sudo', 'tcpdump', '-i', self.interface, '-c', '5', '-n'],
capture_output=True,
text=True,
timeout=5
)
diagnostics['general_traffic'] = 'packets captured' in result.stderr
except Exception:
diagnostics['general_traffic'] = False
return diagnostics
def print_detection_report(self, result: dict):
"""Print formatted MTU detection report"""
print(f"\n{'='*100}")
print(f"MTU DETECTION RESULTS")
print(f"{'='*100}")
if not result.get('success', False):
print(f"✗ Detection failed: {result.get('error', 'Unknown error')}")
if 'captured_packets' in result:
print(f" Packets captured: {result['captured_packets']}")
# Print diagnostics if available
if 'diagnostics' in result:
print(f"\n{'='*100}")
print(f"DIAGNOSTICS")
print(f"{'='*100}")
diag = result['diagnostics']
if 'interface_exists' in diag:
if diag['interface_exists']:
print(f"✓ Interface {self.interface} exists")
if 'interface_up' in diag:
if diag['interface_up']:
print(f"✓ Interface is UP")
else:
print(f"✗ Interface is DOWN - bring it up with: sudo ip link set {self.interface} up")
if 'interface_mtu' in diag:
print(f" Current MTU setting: {diag['interface_mtu']} bytes")
else:
print(f"✗ Interface {self.interface} not found")
print(f" Available interfaces:")
try:
result = subprocess.run(['ip', 'link', 'show'], capture_output=True, text=True)
for line in result.stdout.split('\n'):
if ': ' in line and not line.startswith(' '):
print(f" - {line.split(':')[1].strip().split('@')[0]}")
except:
pass
if 'general_traffic' in diag:
if diag['general_traffic']:
print(f"✓ General network traffic detected on interface")
else:
print(f"✗ No general traffic detected - interface may not be receiving packets")
print(f"\nTROUBLESHOOTING TIPS:")
print(f" 1. Make sure Diretta is actively streaming audio")
print(f" 2. Check interface name with: ip link show")
print(f" 3. Test general traffic with: sudo tcpdump -i {self.interface} -c 10")
print(f" 4. Increase capture duration: --detect-mtu --duration 60")
print(f" 5. Try quick mode: --detect-mtu --quick")
return
print(f"✓ Successfully analyzed {result['captured_packets']} Diretta packets")
print(f" Capture duration: {result['capture_duration']:.1f} seconds")
print(f" Detection method: Using MOST COMMON frame size (ignores outliers)")
print(f"")
print(f"DETECTED MTU: {result['detected_mtu']} bytes (confidence: {result['mtu_confidence']})")
print(f"")
print(f"Frame Size Analysis:")
print(f" Most Common Size: {result['most_common_size']} bytes ({result['most_common_pct']:.1f}% of packets) ← MTU indicator")
print(f" Maximum frame: {result['max_frame_size']} bytes", end='')
if result['outlier_count'] > 0:
print(f" ⚠ OUTLIER - {result['outlier_count']} suspicious packets detected")
if result['outlier_sizes']:
print(f" Outlier sizes: {result['outlier_sizes']}")
else:
print()
print(f" Minimum frame: {result['min_frame_size']} bytes")
print(f" Average frame: {result['avg_frame_size']} bytes")
print(f" Std deviation: {result['std_frame_size']} bytes")
print(f"")
print(f"Size Distribution:")
for size, count in sorted(result['size_distribution'].items(), key=lambda x: x[1], reverse=True):
pct = (count / result['captured_packets']) * 100
bar = '█' * min(50, int(pct / 2))
marker = " ← MOST COMMON (MTU basis)" if size == result['most_common_size'] else ""
outlier_mark = " ⚠ OUTLIER" if size in result.get('outlier_sizes', []) else ""
print(f" {size:5d} bytes: {count:4d} packets ({pct:5.1f}%) {bar}{marker}{outlier_mark}")
print(f"\n{'='*100}")
class DirettaMonitor:
"""Analyze Diretta operation for jitter and stability using historical logs."""
def __init__(self, service_name: str = "diretta_sync_host"):
self.service_name = service_name
def analyze_recent_logs(self, num_lines: int) -> str:
"""Analyzes the last N log entries from journalctl for stability."""
report = []
report.append("=" * 100)
report.append(f"DIRETTA STABILITY ANALYSIS (LAST {num_lines:,} LOG ENTRIES)")
report.append("=" * 100)
report.append(f"Service: {self.service_name}")
from datetime import datetime
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("Analyzing logs...")
try:
cmd = f"journalctl -u {self.service_name} -n {num_lines} --no-pager"
logs = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
logs = ""
if not logs:
report.append(f"✗ No logs found for service '{self.service_name}'.")
return "\n".join(report)
report.append("✓ Logs captured. Performing analysis.")
report.append("")
# 1. Analyze Cycle Time (Jitter)
cycle_times = self.extract_cycle_times(logs)
if cycle_times:
cycle_stats = self.analyze_stability(cycle_times, "Cycle Time")
# Use IQR for a more robust assessment
iqr_jitter = cycle_stats['iqr']
if iqr_jitter < 1.0:
quality_assessment = "✓ Excellent (IQR < 1μs)"
elif iqr_jitter < 5.0:
quality_assessment = "✓ Good (IQR < 5μs)"
elif iqr_jitter < 20.0:
quality_assessment = "○ Fair (IQR < 20μs)"
else:
quality_assessment = "✗ Poor (IQR >= 20μs)"
report.append("1. NETWORK TIMING JITTER (from 'cy=...' values)")
report.append("-" * 100)
report.append(f" Assessment: {quality_assessment}")
report.append(f" Samples found: {cycle_stats['count']:,}")
report.append(f" Average Cycle: {cycle_stats['mean']:.3f} μs")
report.append(f" Standard Deviation: {cycle_stats['stdev']:.3f} μs <- (primary indicator of stability)")
report.append(f" Interquartile Range (IQR): {cycle_stats['iqr']:.3f} μs <- (robust jitter measurement)")
report.append(f" Min / Max Cycle: {cycle_stats['min']:.3f} μs / {cycle_stats['max']:.3f} μs")
report.append(f" Peak-to-Peak Range: {cycle_stats['range']:.3f} μs")
report.append("-" * 100)
else:
report.append(f"1. NETWORK TIMING JITTER: No 'cy=' data found in the last {num_lines:,} log entries.")
report.append("")
# 2. Analyze Buffer Level (Drift)
buffer_levels = self.extract_buffer_levels(logs)
if buffer_levels:
buffer_stats = self.analyze_stability(buffer_levels, "Buffer Level")
drift_range = buffer_stats['range']
if drift_range <= 1:
quality = "✓ Optimal"
elif drift_range <= 3:
quality = "○ Good"
else:
quality = "✗ High"
report.append("2. CLOCK DRIFT (from 'period=...' values)")
report.append("-" * 100)
report.append(f" Assessment: {quality} ({drift_range} period drift)")
report.append(f" Samples found: {buffer_stats['count']:,}")
report.append(f" Min / Max Level: {int(buffer_stats['min'])} / {int(buffer_stats['max'])}")
report.append("-" * 100)
else:
report.append(f"2. CLOCK DRIFT: No 'period=' data found in the last {num_lines:,} log entries.")
report.append("\n" + "=" * 100)
return "\n".join(report)
def extract_cycle_times(self, logs: str):
"""Extract cycle time values from logs"""
cycle_times = []
for match in re.finditer(r'start cycle=(\d+)usec|cy=(\d+)', logs):
val_str = match.group(1) or match.group(2)
if val_str:
val = int(val_str)
# 'cy' values are in picoseconds, convert to microseconds
if match.group(2):
val = val / 1_000_000
cycle_times.append(val)
return cycle_times
def extract_buffer_levels(self, logs: str):
"""Extract buffer level values from logs"""
buffer_levels = []
for match in re.finditer(r'period=(\d+)', logs):
buffer_levels.append(int(match.group(1)))
return buffer_levels
def analyze_stability(self, values: list, data_type: str) -> dict:
"""Analyzes a list of numbers for stability, including IQR."""
count = len(values)
if count < 2:
return {
'count': count, 'mean': 0, 'stdev': 0,
'min': 0, 'max': 0, 'range': 0,
'q1': 0, 'q3': 0, 'iqr': 0
}
mean = np.mean(values)
stdev = np.std(values)
min_val = np.min(values)
max_val = np.max(values)
# Calculate Interquartile Range (IQR) to ignore outliers
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
return {
'count': count,
'mean': mean,
'stdev': stdev,
'min': min_val,
'max': max_val,
'range': max_val - min_val,
'q1': q1,
'q3': q3,
'iqr': iqr
}
def validate_settings(period_min: int, period_max: int,
period_size_min: int, period_size_max: int,
sync_buffer: int):
"""Validate settings and return warnings"""
warnings = []
ratio = period_max / period_min
if ratio > 3:
warnings.append(f"⚠ Large periodMin/Max ratio ({period_min}:{period_max} = {ratio:.1f}:1) "
f"may cause inefficient buffering. Recommended: ≤2.5:1")
if sync_buffer < period_min:
warnings.append(f"⚠ syncBufferCount ({sync_buffer}) < periodMin ({period_min}) "
f"may cause sync issues. Recommended: syncBuffer ≥ periodMin")
if sync_buffer > period_max * 1.5:
warnings.append(f"ℹ syncBufferCount ({sync_buffer}) >> periodMax ({period_max}) "
f"adds extra latency. Consider reducing if stability is good.")
if period_size_min == period_size_max:
warnings.append(f"ℹ periodSizeMin = periodSizeMax ({period_size_min}) "
f"forces fixed ALSA buffer. Usually auto-selection works better.")
if period_size_min > 4096:
warnings.append(f"⚠ periodSizeMin ({period_size_min}) is high. "
f"May cause compatibility issues with some applications.")
if period_size_max < 2048:
warnings.append(f"⚠ periodSizeMax ({period_size_max}) is low. "
f"May limit buffer flexibility.")
total_periods = period_max + sync_buffer
if total_periods < 10:
warnings.append(f"⚠ Total buffering ({total_periods} periods) is aggressive. "
f"Monitor closely for underruns on loaded systems.")
elif total_periods > 20:
warnings.append(f"ℹ Total buffering ({total_periods} periods) is very conservative. "
f"Higher latency (~{total_periods * 3:.0f}ms) but maximum stability.")
if period_min == 4 and period_max == 8 and sync_buffer == 6:
warnings.append(f"✓ Using recommended optimized settings (4/8/6)")
return warnings
def generate_config(cycle_time: int, period_min: int = 4, period_max: int = 8,
period_size_min: int = 2048, period_size_max: int = 8192,
sync_buffer: int = 6, latency_buffer: int = 0,
thred_mode: int = 257, interface: str = 'enp5s0',
cpu_send: int = 1, cpu_other: int = 2) -> str:
"""Generate Diretta configuration file content"""
# CycleMinTime is 0.5% less than CycleTime
cycle_min = int(cycle_time * 0.995)
config = f"""[global]
Interface={interface}
TargetProfileLimitTime=0
ThredMode={thred_mode}
InfoCycle=100000
FlexCycle=max
CycleTime={cycle_time}
CycleMinTime={cycle_min}
Debug=stdout
periodMax={period_max}
periodMin={period_min}
periodSizeMax={period_size_max}
periodSizeMin={period_size_min}
syncBufferCount={sync_buffer}
alsaUnderrun=enable
unInitMemDet=disable
CpuSend={cpu_send}
CpuOther={cpu_other}
LatencyBuffer={latency_buffer}
"""
return config
def print_header():
"""Print program header"""
print("""
╔═══════════════════════════════════════════════════════════════════════════════╗
║ DIRETTA DDS CONFIGURATION & LOG ANALYSIS ║
║ Optimized for High-Quality Reference Clocks ║
║ ║
║ Focus: Clock STABILITY (phase noise/jitter), not just accuracy ║
╚═══════════════════════════════════════════════════════════════════════════════╝
""")
def print_clock_philosophy():
"""Print clock quality philosophy"""
print(f"\n{'='*100}")
print("CLOCK QUALITY FOR AUDIO: Stability vs Accuracy")
print(f"{'='*100}")
print("""
CRITICAL DISTINCTION:
❌ Clock ACCURACY (frequency error):
• How close to exactly 10.000000 MHz
• Important for: GPS, telecommunications, timing standards
• Less important for: Audio quality
✓ Clock STABILITY (phase noise/jitter):
• Short-term frequency variations
• Measured in: dBc/Hz at offset frequencies (1Hz, 10Hz, 100Hz, etc.)
• CRITICAL for: Audio quality, imaging, soundstage
WHY STABILITY MATTERS FOR AUDIO:
Phase noise at audio frequencies (1Hz - 20kHz) creates:
• Timing jitter in sample reconstruction
• Modulation noise that masks low-level details
• Loss of soundstage depth and imaging precision
• Subtle harshness in high frequencies
THE DIRETTA CHALLENGE:
Your Mutec provides ultra-low jitter at the DAC (receiver end).
But what about jitter at the source (server end)?
[Server Clock] → DDS Stream → [Mutec Clock] → [DAC]
(±100-500ps) (±1ps)
↑ ↑
Potential jitter Excellent stability
source
This analysis script helps you understand if server-side jitter
is affecting your system, and whether PTP synchronization would help.
""")
def main():
parser = argparse.ArgumentParser(
description='Diretta DDS Configuration Calculator with Log Analysis',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # Default config generation
%(prog)s --detect-mtu # Auto-detect MTU from live traffic
%(prog)s --detect-mtu --quick # Quick MTU detection
%(prog)s --detect-mtu --duration 60 # Longer capture duration
%(prog)s --analyze-sync # Analyze last 1,000 sync log entries
%(prog)s --analyze-sync 10000 # Analyze last 10,000 sync log entries
%(prog)s --analyze-memory # Analyze last 1,000 memory log entries
%(prog)s --config-only # Just show the config file text
""")
# Configuration Parameters
calc_group = parser.add_argument_group('DDS Configuration')
calc_group.add_argument('--mtu', type=int, default=9024, help='MTU size in bytes (default: 9024)')
calc_group.add_argument('--detect-mtu', action='store_true', help='Auto-detect MTU from live Diretta traffic')
calc_group.add_argument('--quick', action='store_true', help='Quick MTU detection mode (stops after finding stable size)')
calc_group.add_argument('--duration', type=int, default=30, help='Capture duration in seconds for MTU detection (default: 30)')
calc_group.add_argument('--pcm', action='store_true', help='Calculate for PCM instead of DSD')
calc_group.add_argument('--period-min', type=int, default=4, help='periodMin: minimum network buffer depth (default: 4)')
calc_group.add_argument('--period-max', type=int, default=8, help='periodMax: maximum network buffer depth (default: 8)')
calc_group.add_argument('--sync-buffer', type=int, default=6, help='syncBufferCount: clock sync buffer (default: 6)')
calc_group.add_argument('--latency-buffer', type=int, default=0, help='LatencyBuffer: additional latency (default: 0)')
calc_group.add_argument('--period-size-min', type=int, default=2048, help='periodSizeMin: ALSA buffer minimum (default: 2048)')
calc_group.add_argument('--period-size-max', type=int, default=8192, help='periodSizeMax: ALSA buffer maximum (default: 8192)')
calc_group.add_argument('--thred-mode', type=int, default=257, help='ThredMode value (default: 257)')
calc_group.add_argument('--interface', type=str, default='enp5s0', help='Network interface name (default: enp5s0)')
calc_group.add_argument('--cpu-send', type=int, default=1, help='CPU core for send thread (default: 1)')
calc_group.add_argument('--cpu-other', type=int, default=2, help='CPU core for other threads (default: 2)')
# Analysis Parameters
analysis_group = parser.add_argument_group('Log Analysis')
analysis_group.add_argument('--analyze-sync', type=int, nargs='?', const=1000, default=None, metavar='N',
help='Analyze the last N log entries from diretta_sync_host service (default: 1000)')
analysis_group.add_argument('--analyze-memory', type=int, nargs='?', const=1000, default=None, metavar='N',
help='Analyze the last N log entries from diretta_memoryplay_host service (default: 1000)')
# Output Control
output_group = parser.add_argument_group('Output Control')
output_group.add_argument('--config-only', action='store_true',
help='Only generate and show the config file text')
output_group.add_argument('--show-clock-info', action='store_true',
help='Show detailed clock quality information')
args = parser.parse_args()
# MTU Detection Mode
if args.detect_mtu:
detector = DirettaMTUDetector(interface=args.interface)
result = detector.detect_mtu(
capture_duration=args.duration,
min_packets=5,
quick_mode=args.quick
)
detector.print_detection_report(result)
if result.get('success'):
print(f"\n? TIP: Use detected MTU with: python3 {sys.argv[0]} --mtu {result['detected_mtu']}")
return
# Analysis Mode - Sync
if args.analyze_sync is not None:
monitor = DirettaMonitor(service_name="diretta_sync_host")
report = monitor.analyze_recent_logs(num_lines=args.analyze_sync)
print(report)
return
# Analysis Mode - Memory
if args.analyze_memory is not None:
monitor = DirettaMonitor(service_name="diretta_memoryplay_host")
report = monitor.analyze_recent_logs(num_lines=args.analyze_memory)
print(report)
return
# Configuration Mode
calculator = DirettaCalculator(mtu=args.mtu)
is_dsd = not args.pcm
format_type = "DSD" if is_dsd else "PCM"
# Calculate cycle time based on 48kHz
target_cycle_us, params_48k = calculator.calculate_cycle_time_for_48k(is_dsd)
# Calculate CycleMinTime (0.5% less than CycleTime)
cycle_min_us = int(target_cycle_us * 0.995)
warnings = validate_settings(args.period_min, args.period_max,
args.period_size_min, args.period_size_max, args.sync_buffer)
if not args.config_only:
print_header()
if args.show_clock_info:
print_clock_philosophy()
print(f"\n{'='*100}\nCONFIGURATION SUMMARY\n{'='*100}")
print(f"Format: {format_type}")
print(f"MTU: {args.mtu} bytes")
print(f"Base calculation: 48kHz (DSD256: {params_48k['sample_rate']:,} Hz)" if is_dsd else f"Base calculation: 48kHz (PCM: {params_48k['sample_rate']:,} Hz)")
print(f"Samples per frame: {params_48k['samples_per_frame']:,}")
print(f"Target CycleTime: {target_cycle_us} μs (optimized for 48kHz)")
print(f"CycleMinTime: {cycle_min_us} μs (0.5% lower margin)")
print(f"Network Buffering: periodMin={args.period_min}, periodMax={args.period_max}")
print(f"Clock Sync Buffer: syncBufferCount={args.sync_buffer}")
print(f"Total buffering: {args.period_max + args.sync_buffer} periods")
if warnings:
print(f"\n{'='*100}\nCONFIGURATION NOTES\n{'='*100}\n")
for warning in warnings:
print(f" {warning}\n")
config = generate_config(cycle_time=target_cycle_us, period_min=args.period_min,
period_max=args.period_max, period_size_min=args.period_size_min,
period_size_max=args.period_size_max, sync_buffer=args.sync_buffer,
latency_buffer=args.latency_buffer, thred_mode=args.thred_mode, interface=args.interface,
cpu_send=args.cpu_send, cpu_other=args.cpu_other)
print(f"\n{'='*100}\nDIRETTA CONFIGURATION FILE\n{'='*100}\n")
print(config)
if not args.config_only:
print(f"\n{'='*100}\nNEXT STEPS\n{'='*100}")
print(f"""
1. SAVE CONFIGURATION:
Copy the above to: ~/DirettaAlsaHost/setting.inf
2. RESTART DIRETTA:
sudo systemctl restart diretta_sync_host.service
# or for memory mode:
sudo systemctl restart diretta_memoryplay_host.service
3. ANALYZE RECENT PERFORMANCE:
python3 {__file__} --analyze-sync 10000
# or for memory mode:
python3 {__file__} --analyze-memory 10000
4. INTERPRET RESULTS:
• Jitter IQR < 1.0 μs: Excellent
• Drift ≤ 1 period: Optimal for high-quality clocks
5. AUTO-DETECT MTU (optional):
python3 {__file__} --detect-mtu --quick
python3 {__file__} --detect-mtu --duration 60
""")
if __name__ == "__main__":
main()
![[Image: banniereforumhifi.jpg]](https://i.postimg.cc/wxwWFvzj/banniereforumhifi.jpg)

