[Image: banniereforumhifi.jpg] (September 11) x

Note de ce sujet :
  • Moyenne : 5 (1 vote(s))
  • 1
  • 2
  • 3
  • 4
  • 5
Installation de HQPe/NAA/Diretta ALSA drivers sous Fedora
#31
Script python destiné à faciliter la mise en oeuvre et la mesure de l'efficacité de Diretta en version DDS

Code :
#!/usr/bin/env python3
"""
Diretta DDS Configuration Calculator with Historical Log Analysis
Optimized for systems with high-quality reference clocks

Key Focus: Clock STABILITY (jitter/phase noise), not just accuracy
"""

import argparse
import re
import subprocess
from pathlib import Path
import numpy as np
import time
import sys
import signal

class DirettaCalculator:
    """Calculate Diretta DDS configuration parameters"""
   
    # Ethernet frame overhead
    ETH_HEADER = 14
    DIRETTA_HEADER = 2
    VLAN_OVERHEAD = 4
    FCS = 4
   
    def __init__(self, mtu: int = 9024):
        self.mtu = mtu
        self.overhead = self.ETH_HEADER + self.DIRETTA_HEADER + self.VLAN_OVERHEAD
        self.available_payload = mtu - self.overhead
       
    def calculate_frame_params(self, sample_rate: int, is_dsd: bool = True):
        """Calculate frame parameters for a given sample rate"""
        # Bytes per sample (stereo)
        if is_dsd:
            bytes_per_sample = 0.25
        else:
            bytes_per_sample = 6
       
        # Calculate samples that fit in available payload
        audio_bytes_max = (int(self.available_payload / 8) * 8)
        samples_per_frame = int(audio_bytes_max / bytes_per_sample)
        audio_bytes = int(samples_per_frame * bytes_per_sample)
       
        # Calculate cycle time
        cycle_time_us = (samples_per_frame / sample_rate) * 1_000_000
        packet_rate_hz = sample_rate / samples_per_frame
        total_frame_size = self.ETH_HEADER + self.DIRETTA_HEADER + audio_bytes + self.FCS
       
        return {
            'sample_rate': sample_rate,
            'sample_rate_mhz': sample_rate / 1_000_000,
            'samples_per_frame': samples_per_frame,
            'audio_bytes': audio_bytes,
            'total_frame_size': total_frame_size,
            'cycle_time_us': cycle_time_us,
            'cycle_time_ms': cycle_time_us / 1000,
            'packet_rate_hz': packet_rate_hz,
            'bytes_per_sample': bytes_per_sample,
            'is_dsd': is_dsd,
        }
   
    def calculate_cycle_time_for_48k(self, is_dsd: bool = True):
        """Calculate CycleTime based on 48kHz base rate"""
        base_rate = 48000
        multiplier = 256 if is_dsd else 8
        sample_rate = base_rate * multiplier
        params = self.calculate_frame_params(sample_rate, is_dsd)
       
        # Round to nearest 5 μs for cleaner values
        cycle_time = int(round(params['cycle_time_us'] / 5) * 5)
       
        return cycle_time, params


class DirettaMTUDetector:
    """Detect actual MTU from live Diretta traffic using tcpdump"""
   
    def __init__(self, interface: str = 'enp5s0'):
        self.interface = interface
        self.diretta_ethertype = '0x88b6'
        self.process = None
   
    def detect_mtu(self, capture_duration: int = 30, min_packets: int = 5, quick_mode: bool = False) -> dict:
        """
        Capture Diretta packets and analyze frame sizes to determine MTU
       
        Args:
            capture_duration: How long to capture packets (seconds)
            min_packets: Minimum packets needed for reliable detection
            quick_mode: If True, stop after getting stable packet size (faster)
           
        Returns:
            dict with detection results
        """
        print(f"\n{'='*100}")
        print(f"DETECTING MTU FROM LIVE DIRETTA TRAFFIC")
        print(f"{'='*100}")
        print(f"Interface: {self.interface}")
        print(f"EtherType: {self.diretta_ethertype} (Diretta DDS)")
        print(f"Capture duration: {capture_duration} seconds (max)")
        print(f"Minimum packets: {min_packets}")
        if quick_mode:
            print(f"Quick mode: Will stop after detecting consistent frame size")
        print(f"")
        print(f"⚠ NOTE: Diretta must be actively streaming audio for detection to work!")
        print(f"")
       
        # Check if tcpdump is available
        try:
            subprocess.run(['which', 'tcpdump'], check=True, capture_output=True)
        except subprocess.CalledProcessError:
            return {
                'success': False,
                'error': 'tcpdump not found. Install with: sudo apt-get install tcpdump'
            }
       
        # Build tcpdump command with snaplen to capture full frames
        cmd = [
            'sudo', 'tcpdump',
            '-i', self.interface,
            '-e',
            '-n',
            '-q',  # Quieter output
            '-s', '0',  # Full packet capture
            'ether', 'proto', self.diretta_ethertype
        ]
       
        print(f"Running: {' '.join(cmd)}")
        print(f"Capturing packets... Press Ctrl+C to stop early")
        print("")
       
        frame_sizes = []
        start_time = time.time()
        packet_count = 0
       
        try:
            # Start tcpdump process
            self.process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                bufsize=1  # Line buffered
            )
           
            # Read output line by line with timeout
            while True:
                # Check if we've exceeded capture duration
                elapsed = time.time() - start_time
                if elapsed > capture_duration:
                    print(f"\n⏱ Timeout reached ({capture_duration}s)")
                    break
               
                try:
                    line = self.process.stdout.readline()
                    if not line:
                        break
                   
                    # Parse frame size from this line
                    size = self._parse_tcpdump_line(line)
                    if size and size > 0 and size < 65536:  # Sanity check: ignore crazy values
                        frame_sizes.append(size)
                        packet_count += 1
                       
                        # Show progress
                        if packet_count <= 10 or packet_count % 10 == 0:
                            from collections import Counter
                            if len(frame_sizes) >= 3:
                                most_common = Counter(frame_sizes).most_common(1)[0]
                                print(f"✓ Packet {packet_count}: {size} bytes (most common: {most_common[0]} bytes, {most_common[1]} times)", end='\r')
                            else:
                                print(f"✓ Packet {packet_count}: {size} bytes", end='\r')
                       
                        # In quick mode, check for stability
                        if quick_mode and packet_count >= max(min_packets, 10):
                            # Check if we have a clear winner
                            size_dist = Counter(frame_sizes)
                            most_common_size, most_common_count = size_dist.most_common(1)[0]
                           
                            # If >80% of packets are the same size, we're done
                            if most_common_count / len(frame_sizes) > 0.8:
                                print(f"\n✓ Quick mode: Detected stable frame size ({most_common_size} bytes in {most_common_count}/{len(frame_sizes)} packets)")
                                break
                       
                        # Stop if we have enough packets (non-quick mode)
                        if not quick_mode and packet_count >= 100:
                            print(f"\n✓ Captured enough packets ({packet_count})")
                            break
                   
                except Exception as e:
                    pass
           
        except KeyboardInterrupt:
            print(f"\n\n⚠ Capture interrupted by user")
           
        finally:
            # Clean up process
            if self.process:
                self.process.terminate()
                try:
                    self.process.wait(timeout=2)
                except subprocess.TimeoutExpired:
                    self.process.kill()
       
        print(f"\n")
       
        if not frame_sizes:
            return {
                'success': False,
                'error': 'No Diretta packets captured. Is Diretta streaming audio?',
                'captured_packets': 0,
                'diagnostics': self._get_diagnostics()
            }
       
        if len(frame_sizes) < min_packets:
            print(f"⚠ Only captured {len(frame_sizes)} packets (wanted {min_packets}), but will analyze anyway...")
       
        # Analyze frame sizes
        analysis = self._analyze_frame_sizes(frame_sizes)
        analysis['success'] = True
        analysis['captured_packets'] = len(frame_sizes)
        analysis['capture_duration'] = time.time() - start_time
       
        return analysis
   
    def _parse_tcpdump_line(self, line: str) -> int:
        """
        Parse a single tcpdump output line to extract frame size
        """
        # Match "length XXXX" in tcpdump output
        match = re.search(r'length\s+(\d+)', line)
        if match:
            return int(match.group(1))
        return None
   
    def _analyze_frame_sizes(self, frame_sizes: list) -> dict:
        """Analyze captured frame sizes to determine MTU - USING MOST COMMON SIZE"""
       
        from collections import Counter
       
        # Count size distribution
        size_dist = Counter(frame_sizes)
       
        # Find the most common size (this is the real MTU indicator!)
        most_common_size, most_common_count = size_dist.most_common(1)[0]
        most_common_pct = (most_common_count / len(frame_sizes)) * 100
       
        # Get statistics (for reference)
        max_size = max(frame_sizes)
        min_size = min(frame_sizes)
        avg_size = int(np.mean(frame_sizes))
        std_size = int(np.std(frame_sizes))
       
        # Determine MTU based on MOST COMMON size (not maximum!)
        # This ignores outliers and malformed packets
       
        # tcpdump "length" includes Ethernet header but not FCS
        # So actual frame on wire = length + 4 (FCS)
        estimated_mtu = most_common_size + 4  # Add FCS
       
        # Round to common MTU values
        common_mtus = [1500, 1522, 9000, 9024, 9216]
        closest_mtu = min(common_mtus, key=lambda x: abs(x - estimated_mtu))
       
        # Confidence assessment based on how consistent the sizes are
        if most_common_pct >= 90:
            confidence = "Very High"
        elif most_common_pct >= 70:
            confidence = "High"
        elif most_common_pct >= 50:
            confidence = "Medium"
        else:
            confidence = "Low"
       
        # If we're within 50 bytes of a common MTU, use that
        if abs(closest_mtu - estimated_mtu) < 50:
            detected_mtu = closest_mtu
            if abs(closest_mtu - estimated_mtu) < 10:
                mtu_confidence = f"{confidence} (matches standard MTU)"
            else:
                mtu_confidence = f"{confidence} (rounded to standard)"
        else:
            detected_mtu = estimated_mtu
            mtu_confidence = f"{confidence} (non-standard MTU)"
       
        # Detect outliers (packets much larger than the common size)
        outliers = [s for s in frame_sizes if s > most_common_size * 2]
       
        return {
            'detected_mtu': detected_mtu,
            'mtu_confidence': mtu_confidence,
            'detection_method': 'most_common_size',  # Important: we use most common, not max!
            'most_common_size': most_common_size,
            'most_common_count': most_common_count,
            'most_common_pct': most_common_pct,
            'max_frame_size': max_size,
            'min_frame_size': min_size,
            'avg_frame_size': avg_size,
            'std_frame_size': std_size,
            'outlier_count': len(outliers),
            'outlier_sizes': sorted(set(outliers)) if outliers else [],
            'size_distribution': dict(size_dist.most_common(10))
        }
   
    def _get_diagnostics(self) -> dict:
        """Get diagnostic info about the network interface"""
        diagnostics = {}
       
        # Check if interface exists and is up
        try:
            result = subprocess.run(
                ['ip', 'link', 'show', self.interface],
                capture_output=True,
                text=True,
                timeout=5
            )
            diagnostics['interface_exists'] = result.returncode == 0
            if result.returncode == 0:
                diagnostics['interface_up'] = 'UP' in result.stdout
                # Extract MTU from ip link output
                mtu_match = re.search(r'mtu\s+(\d+)', result.stdout)
                if mtu_match:
                    diagnostics['interface_mtu'] = int(mtu_match.group(1))
        except Exception as e:
            diagnostics['error'] = str(e)
       
        # Check if we can see any traffic on the interface
        try:
            result = subprocess.run(
                ['sudo', 'tcpdump', '-i', self.interface, '-c', '5', '-n'],
                capture_output=True,
                text=True,
                timeout=5
            )
            diagnostics['general_traffic'] = 'packets captured' in result.stderr
        except Exception:
            diagnostics['general_traffic'] = False
       
        return diagnostics
   
    def print_detection_report(self, result: dict):
        """Print formatted MTU detection report"""
       
        print(f"\n{'='*100}")
        print(f"MTU DETECTION RESULTS")
        print(f"{'='*100}")
       
        if not result.get('success', False):
            print(f"✗ Detection failed: {result.get('error', 'Unknown error')}")
            if 'captured_packets' in result:
                print(f"  Packets captured: {result['captured_packets']}")
           
            # Print diagnostics if available
            if 'diagnostics' in result:
                print(f"\n{'='*100}")
                print(f"DIAGNOSTICS")
                print(f"{'='*100}")
                diag = result['diagnostics']
               
                if 'interface_exists' in diag:
                    if diag['interface_exists']:
                        print(f"✓ Interface {self.interface} exists")
                        if 'interface_up' in diag:
                            if diag['interface_up']:
                                print(f"✓ Interface is UP")
                            else:
                                print(f"✗ Interface is DOWN - bring it up with: sudo ip link set {self.interface} up")
                        if 'interface_mtu' in diag:
                            print(f"  Current MTU setting: {diag['interface_mtu']} bytes")
                    else:
                        print(f"✗ Interface {self.interface} not found")
                        print(f"  Available interfaces:")
                        try:
                            result = subprocess.run(['ip', 'link', 'show'], capture_output=True, text=True)
                            for line in result.stdout.split('\n'):
                                if ': ' in line and not line.startswith(' '):
                                    print(f"    - {line.split(':')[1].strip().split('@')[0]}")
                        except:
                            pass
               
                if 'general_traffic' in diag:
                    if diag['general_traffic']:
                        print(f"✓ General network traffic detected on interface")
                    else:
                        print(f"✗ No general traffic detected - interface may not be receiving packets")
               
                print(f"\nTROUBLESHOOTING TIPS:")
                print(f"  1. Make sure Diretta is actively streaming audio")
                print(f"  2. Check interface name with: ip link show")
                print(f"  3. Test general traffic with: sudo tcpdump -i {self.interface} -c 10")
                print(f"  4. Increase capture duration: --detect-mtu --duration 60")
                print(f"  5. Try quick mode: --detect-mtu --quick")
           
            return
       
        print(f"✓ Successfully analyzed {result['captured_packets']} Diretta packets")
        print(f"  Capture duration: {result['capture_duration']:.1f} seconds")
        print(f"  Detection method: Using MOST COMMON frame size (ignores outliers)")
        print(f"")
        print(f"DETECTED MTU: {result['detected_mtu']} bytes (confidence: {result['mtu_confidence']})")
        print(f"")
        print(f"Frame Size Analysis:")
        print(f"  Most Common Size:  {result['most_common_size']} bytes ({result['most_common_pct']:.1f}% of packets) ← MTU indicator")
        print(f"  Maximum frame:    {result['max_frame_size']} bytes", end='')
        if result['outlier_count'] > 0:
            print(f" ⚠ OUTLIER - {result['outlier_count']} suspicious packets detected")
            if result['outlier_sizes']:
                print(f"    Outlier sizes: {result['outlier_sizes']}")
        else:
            print()
        print(f"  Minimum frame:    {result['min_frame_size']} bytes")
        print(f"  Average frame:    {result['avg_frame_size']} bytes")
        print(f"  Std deviation:    {result['std_frame_size']} bytes")
        print(f"")
        print(f"Size Distribution:")
        for size, count in sorted(result['size_distribution'].items(), key=lambda x: x[1], reverse=True):
            pct = (count / result['captured_packets']) * 100
            bar = '█' * min(50, int(pct / 2))
            marker = " ← MOST COMMON (MTU basis)" if size == result['most_common_size'] else ""
            outlier_mark = " ⚠ OUTLIER" if size in result.get('outlier_sizes', []) else ""
            print(f"  {size:5d} bytes: {count:4d} packets ({pct:5.1f}%) {bar}{marker}{outlier_mark}")
       
        print(f"\n{'='*100}")


class DirettaMonitor:
    """Analyze Diretta operation for jitter and stability using historical logs."""

    def __init__(self, service_name: str = "diretta_sync_host"):
        self.service_name = service_name

    def analyze_recent_logs(self, num_lines: int) -> str:
        """Analyzes the last N log entries from journalctl for stability."""
        report = []
        report.append("=" * 100)
        report.append(f"DIRETTA STABILITY ANALYSIS (LAST {num_lines:,} LOG ENTRIES)")
        report.append("=" * 100)
        report.append(f"Service: {self.service_name}")
       
        from datetime import datetime
        report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("Analyzing logs...")

        try:
            cmd = f"journalctl -u {self.service_name} -n {num_lines} --no-pager"
            logs = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError:
            logs = ""

        if not logs:
            report.append(f"✗ No logs found for service '{self.service_name}'.")
            return "\n".join(report)

        report.append("✓ Logs captured. Performing analysis.")
        report.append("")

        # 1. Analyze Cycle Time (Jitter)
        cycle_times = self.extract_cycle_times(logs)
        if cycle_times:
            cycle_stats = self.analyze_stability(cycle_times, "Cycle Time")
           
            # Use IQR for a more robust assessment
            iqr_jitter = cycle_stats['iqr']
            if iqr_jitter < 1.0:
                quality_assessment = "✓ Excellent (IQR < 1μs)"
            elif iqr_jitter < 5.0:
                quality_assessment = "✓ Good (IQR < 5μs)"
            elif iqr_jitter < 20.0:
                quality_assessment = "○ Fair (IQR < 20μs)"
            else:
                quality_assessment = "✗ Poor (IQR >= 20μs)"

            report.append("1. NETWORK TIMING JITTER (from 'cy=...' values)")
            report.append("-" * 100)
            report.append(f"  Assessment:        {quality_assessment}")
            report.append(f"  Samples found:      {cycle_stats['count']:,}")
            report.append(f"  Average Cycle:      {cycle_stats['mean']:.3f} μs")
            report.append(f"  Standard Deviation: {cycle_stats['stdev']:.3f} μs  <- (primary indicator of stability)")
            report.append(f"  Interquartile Range (IQR): {cycle_stats['iqr']:.3f} μs  <- (robust jitter measurement)")
            report.append(f"  Min / Max Cycle:    {cycle_stats['min']:.3f} μs / {cycle_stats['max']:.3f} μs")
            report.append(f"  Peak-to-Peak Range: {cycle_stats['range']:.3f} μs")
            report.append("-" * 100)
        else:
            report.append(f"1. NETWORK TIMING JITTER: No 'cy=' data found in the last {num_lines:,} log entries.")

        report.append("")

        # 2. Analyze Buffer Level (Drift)
        buffer_levels = self.extract_buffer_levels(logs)
        if buffer_levels:
            buffer_stats = self.analyze_stability(buffer_levels, "Buffer Level")
            drift_range = buffer_stats['range']
            if drift_range <= 1:
                quality = "✓ Optimal"
            elif drift_range <= 3:
                quality = "○ Good"
            else:
                quality = "✗ High"

            report.append("2. CLOCK DRIFT (from 'period=...' values)")
            report.append("-" * 100)
            report.append(f"  Assessment:        {quality} ({drift_range} period drift)")
            report.append(f"  Samples found:      {buffer_stats['count']:,}")
            report.append(f"  Min / Max Level:    {int(buffer_stats['min'])} / {int(buffer_stats['max'])}")
            report.append("-" * 100)
        else:
            report.append(f"2. CLOCK DRIFT: No 'period=' data found in the last {num_lines:,} log entries.")
           
        report.append("\n" + "=" * 100)
        return "\n".join(report)

    def extract_cycle_times(self, logs: str):
        """Extract cycle time values from logs"""
        cycle_times = []
        for match in re.finditer(r'start cycle=(\d+)usec|cy=(\d+)', logs):
            val_str = match.group(1) or match.group(2)
            if val_str:
                val = int(val_str)
                # 'cy' values are in picoseconds, convert to microseconds
                if match.group(2):
                    val = val / 1_000_000
                cycle_times.append(val)
        return cycle_times
   
    def extract_buffer_levels(self, logs: str):
        """Extract buffer level values from logs"""
        buffer_levels = []
        for match in re.finditer(r'period=(\d+)', logs):
            buffer_levels.append(int(match.group(1)))
        return buffer_levels
   
    def analyze_stability(self, values: list, data_type: str) -> dict:
        """Analyzes a list of numbers for stability, including IQR."""
        count = len(values)
        if count < 2:
            return {
                'count': count, 'mean': 0, 'stdev': 0,
                'min': 0, 'max': 0, 'range': 0,
                'q1': 0, 'q3': 0, 'iqr': 0
            }

        mean = np.mean(values)
        stdev = np.std(values)
        min_val = np.min(values)
        max_val = np.max(values)
       
        # Calculate Interquartile Range (IQR) to ignore outliers
        q1 = np.percentile(values, 25)
        q3 = np.percentile(values, 75)
        iqr = q3 - q1

        return {
            'count': count,
            'mean': mean,
            'stdev': stdev,
            'min': min_val,
            'max': max_val,
            'range': max_val - min_val,
            'q1': q1,
            'q3': q3,
            'iqr': iqr
        }


def validate_settings(period_min: int, period_max: int,
                    period_size_min: int, period_size_max: int,
                    sync_buffer: int):
    """Validate settings and return warnings"""
    warnings = []
   
    ratio = period_max / period_min
    if ratio > 3:
        warnings.append(f"⚠ Large periodMin/Max ratio ({period_min}:{period_max} = {ratio:.1f}:1) "
                      f"may cause inefficient buffering. Recommended: ≤2.5:1")
   
    if sync_buffer < period_min:
        warnings.append(f"⚠ syncBufferCount ({sync_buffer}) < periodMin ({period_min}) "
                      f"may cause sync issues. Recommended: syncBuffer ≥ periodMin")
   
    if sync_buffer > period_max * 1.5:
        warnings.append(f"ℹ syncBufferCount ({sync_buffer}) >> periodMax ({period_max}) "
                      f"adds extra latency. Consider reducing if stability is good.")
   
    if period_size_min == period_size_max:
        warnings.append(f"ℹ periodSizeMin = periodSizeMax ({period_size_min}) "
                      f"forces fixed ALSA buffer. Usually auto-selection works better.")
   
    if period_size_min > 4096:
        warnings.append(f"⚠ periodSizeMin ({period_size_min}) is high. "
                      f"May cause compatibility issues with some applications.")
   
    if period_size_max < 2048:
        warnings.append(f"⚠ periodSizeMax ({period_size_max}) is low. "
                      f"May limit buffer flexibility.")
   
    total_periods = period_max + sync_buffer
    if total_periods < 10:
        warnings.append(f"⚠ Total buffering ({total_periods} periods) is aggressive. "
                      f"Monitor closely for underruns on loaded systems.")
    elif total_periods > 20:
        warnings.append(f"ℹ Total buffering ({total_periods} periods) is very conservative. "
                      f"Higher latency (~{total_periods * 3:.0f}ms) but maximum stability.")
   
    if period_min == 4 and period_max == 8 and sync_buffer == 6:
        warnings.append(f"✓ Using recommended optimized settings (4/8/6)")
   
    return warnings


def generate_config(cycle_time: int, period_min: int = 4, period_max: int = 8,
                  period_size_min: int = 2048, period_size_max: int = 8192,
                  sync_buffer: int = 6, latency_buffer: int = 0,
                  thred_mode: int = 257, interface: str = 'enp5s0',
                  cpu_send: int = 1, cpu_other: int = 2) -> str:
    """Generate Diretta configuration file content"""
   
    # CycleMinTime is 0.5% less than CycleTime
    cycle_min = int(cycle_time * 0.995)
   
    config = f"""[global]
Interface={interface}
TargetProfileLimitTime=0
ThredMode={thred_mode}
InfoCycle=100000
FlexCycle=max
CycleTime={cycle_time}
CycleMinTime={cycle_min}
Debug=stdout
periodMax={period_max}
periodMin={period_min}
periodSizeMax={period_size_max}
periodSizeMin={period_size_min}
syncBufferCount={sync_buffer}
alsaUnderrun=enable
unInitMemDet=disable
CpuSend={cpu_send}
CpuOther={cpu_other}
LatencyBuffer={latency_buffer}
"""
    return config


def print_header():
    """Print program header"""
    print("""
╔═══════════════════════════════════════════════════════════════════════════════╗
║              DIRETTA DDS CONFIGURATION & LOG ANALYSIS                        ║
║                    Optimized for High-Quality Reference Clocks                ║
║                                                                              ║
║  Focus: Clock STABILITY (phase noise/jitter), not just accuracy              ║
╚═══════════════════════════════════════════════════════════════════════════════╝
""")


def print_clock_philosophy():
    """Print clock quality philosophy"""
    print(f"\n{'='*100}")
    print("CLOCK QUALITY FOR AUDIO: Stability vs Accuracy")
    print(f"{'='*100}")
    print("""
CRITICAL DISTINCTION:

❌ Clock ACCURACY (frequency error):
  • How close to exactly 10.000000 MHz
  • Important for: GPS, telecommunications, timing standards
  • Less important for: Audio quality

✓ Clock STABILITY (phase noise/jitter):
  • Short-term frequency variations
  • Measured in: dBc/Hz at offset frequencies (1Hz, 10Hz, 100Hz, etc.)
  • CRITICAL for: Audio quality, imaging, soundstage

WHY STABILITY MATTERS FOR AUDIO:

Phase noise at audio frequencies (1Hz - 20kHz) creates:
  • Timing jitter in sample reconstruction
  • Modulation noise that masks low-level details
  • Loss of soundstage depth and imaging precision
  • Subtle harshness in high frequencies

THE DIRETTA CHALLENGE:

Your Mutec provides ultra-low jitter at the DAC (receiver end).
But what about jitter at the source (server end)?

  [Server Clock]  → DDS Stream → [Mutec Clock] → [DAC]
  (±100-500ps)                    (±1ps)
        ↑                            ↑
    Potential jitter            Excellent stability
    source

This analysis script helps you understand if server-side jitter
is affecting your system, and whether PTP synchronization would help.
""")


def main():
    parser = argparse.ArgumentParser(
        description='Diretta DDS Configuration Calculator with Log Analysis',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s                                      # Default config generation
  %(prog)s --detect-mtu                        # Auto-detect MTU from live traffic
  %(prog)s --detect-mtu --quick                # Quick MTU detection
  %(prog)s --detect-mtu --duration 60          # Longer capture duration
  %(prog)s --analyze-sync                      # Analyze last 1,000 sync log entries
  %(prog)s --analyze-sync 10000                # Analyze last 10,000 sync log entries
  %(prog)s --analyze-memory                    # Analyze last 1,000 memory log entries
  %(prog)s --config-only                        # Just show the config file text

""")
   
    # Configuration Parameters
    calc_group = parser.add_argument_group('DDS Configuration')
    calc_group.add_argument('--mtu', type=int, default=9024, help='MTU size in bytes (default: 9024)')
    calc_group.add_argument('--detect-mtu', action='store_true', help='Auto-detect MTU from live Diretta traffic')
    calc_group.add_argument('--quick', action='store_true', help='Quick MTU detection mode (stops after finding stable size)')
    calc_group.add_argument('--duration', type=int, default=30, help='Capture duration in seconds for MTU detection (default: 30)')
    calc_group.add_argument('--pcm', action='store_true', help='Calculate for PCM instead of DSD')
    calc_group.add_argument('--period-min', type=int, default=4, help='periodMin: minimum network buffer depth (default: 4)')
    calc_group.add_argument('--period-max', type=int, default=8, help='periodMax: maximum network buffer depth (default: 8)')
    calc_group.add_argument('--sync-buffer', type=int, default=6, help='syncBufferCount: clock sync buffer (default: 6)')
    calc_group.add_argument('--latency-buffer', type=int, default=0, help='LatencyBuffer: additional latency (default: 0)')
    calc_group.add_argument('--period-size-min', type=int, default=2048, help='periodSizeMin: ALSA buffer minimum (default: 2048)')
    calc_group.add_argument('--period-size-max', type=int, default=8192, help='periodSizeMax: ALSA buffer maximum (default: 8192)')
    calc_group.add_argument('--thred-mode', type=int, default=257, help='ThredMode value (default: 257)')
    calc_group.add_argument('--interface', type=str, default='enp5s0', help='Network interface name (default: enp5s0)')
    calc_group.add_argument('--cpu-send', type=int, default=1, help='CPU core for send thread (default: 1)')
    calc_group.add_argument('--cpu-other', type=int, default=2, help='CPU core for other threads (default: 2)')

    # Analysis Parameters
    analysis_group = parser.add_argument_group('Log Analysis')
    analysis_group.add_argument('--analyze-sync', type=int, nargs='?', const=1000, default=None, metavar='N',
                              help='Analyze the last N log entries from diretta_sync_host service (default: 1000)')
    analysis_group.add_argument('--analyze-memory', type=int, nargs='?', const=1000, default=None, metavar='N',
                              help='Analyze the last N log entries from diretta_memoryplay_host service (default: 1000)')
     
    # Output Control
    output_group = parser.add_argument_group('Output Control')
    output_group.add_argument('--config-only', action='store_true',
                      help='Only generate and show the config file text')
    output_group.add_argument('--show-clock-info', action='store_true',
                      help='Show detailed clock quality information')

    args = parser.parse_args()
   
    # MTU Detection Mode
    if args.detect_mtu:
        detector = DirettaMTUDetector(interface=args.interface)
        result = detector.detect_mtu(
            capture_duration=args.duration,
            min_packets=5,
            quick_mode=args.quick
        )
        detector.print_detection_report(result)
       
        if result.get('success'):
            print(f"\n? TIP: Use detected MTU with: python3 {sys.argv[0]} --mtu {result['detected_mtu']}")
       
        return
   
    # Analysis Mode - Sync
    if args.analyze_sync is not None:
        monitor = DirettaMonitor(service_name="diretta_sync_host")
        report = monitor.analyze_recent_logs(num_lines=args.analyze_sync)
        print(report)
        return

    # Analysis Mode - Memory
    if args.analyze_memory is not None:
        monitor = DirettaMonitor(service_name="diretta_memoryplay_host")
        report = monitor.analyze_recent_logs(num_lines=args.analyze_memory)
        print(report)
        return

    # Configuration Mode
    calculator = DirettaCalculator(mtu=args.mtu)
    is_dsd = not args.pcm
    format_type = "DSD" if is_dsd else "PCM"
   
    # Calculate cycle time based on 48kHz
    target_cycle_us, params_48k = calculator.calculate_cycle_time_for_48k(is_dsd)
   
    # Calculate CycleMinTime (0.5% less than CycleTime)
    cycle_min_us = int(target_cycle_us * 0.995)
   
    warnings = validate_settings(args.period_min, args.period_max,
        args.period_size_min, args.period_size_max, args.sync_buffer)
   
    if not args.config_only:
        print_header()
        if args.show_clock_info:
            print_clock_philosophy()
        print(f"\n{'='*100}\nCONFIGURATION SUMMARY\n{'='*100}")
        print(f"Format: {format_type}")
        print(f"MTU: {args.mtu} bytes")
        print(f"Base calculation: 48kHz (DSD256: {params_48k['sample_rate']:,} Hz)" if is_dsd else f"Base calculation: 48kHz (PCM: {params_48k['sample_rate']:,} Hz)")
        print(f"Samples per frame: {params_48k['samples_per_frame']:,}")
        print(f"Target CycleTime: {target_cycle_us} μs (optimized for 48kHz)")
        print(f"CycleMinTime: {cycle_min_us} μs (0.5% lower margin)")
        print(f"Network Buffering: periodMin={args.period_min}, periodMax={args.period_max}")
        print(f"Clock Sync Buffer: syncBufferCount={args.sync_buffer}")
        print(f"Total buffering: {args.period_max + args.sync_buffer} periods")
        if warnings:
            print(f"\n{'='*100}\nCONFIGURATION NOTES\n{'='*100}\n")
            for warning in warnings:
                print(f"  {warning}\n")

    config = generate_config(cycle_time=target_cycle_us, period_min=args.period_min,
        period_max=args.period_max, period_size_min=args.period_size_min,
        period_size_max=args.period_size_max, sync_buffer=args.sync_buffer,
        latency_buffer=args.latency_buffer, thred_mode=args.thred_mode, interface=args.interface,
        cpu_send=args.cpu_send, cpu_other=args.cpu_other)
   
    print(f"\n{'='*100}\nDIRETTA CONFIGURATION FILE\n{'='*100}\n")
    print(config)
   
    if not args.config_only:
        print(f"\n{'='*100}\nNEXT STEPS\n{'='*100}")
        print(f"""
1. SAVE CONFIGURATION:
  Copy the above to: ~/DirettaAlsaHost/setting.inf

2. RESTART DIRETTA:
  sudo systemctl restart diretta_sync_host.service
  # or for memory mode:
  sudo systemctl restart diretta_memoryplay_host.service

3. ANALYZE RECENT PERFORMANCE:
  python3 {__file__} --analyze-sync 10000
  # or for memory mode:
  python3 {__file__} --analyze-memory 10000
 
4. INTERPRET RESULTS:
  • Jitter IQR < 1.0 μs: Excellent
  • Drift ≤ 1 period: Optimal for high-quality clocks
 
5. AUTO-DETECT MTU (optional):
  python3 {__file__} --detect-mtu --quick
  python3 {__file__} --detect-mtu --duration 60
""")


if __name__ == "__main__":
    main()


Sujets apparemment similaires…
Sujet Auteur Réponses Affichages Dernier message
  Carte Mère SOtM - sMB-Q370 pour TARGET DIRETTA jean-luc 122 64,601 12-04-2024, 12:20 PM
Dernier message: bbill
  Soucis Windows 7 sous Mac OS Euterpe 26 8,616 10-23-2024, 09:11 AM
Dernier message: Le Moine Bleu
  Diretta Lucia Development kit Olivier 131 79,959 07-15-2024, 10:12 PM
Dernier message: Le dom
  ENGINEERED fermé ou service client sous l'eau ? Moxa 15 8,114 01-27-2024, 02:26 PM
Dernier message: thomasv
Music Room Shaper sous Win avec Roon et HQPlayer zaurux 4 4,255 11-10-2023, 12:43 AM
Dernier message: ds21

Atteindre :


Utilisateur(s) parcourant ce sujet : 3 visiteur(s)