fix: strip input str

This commit is contained in:
2025-04-14 17:31:49 +06:00
parent ad4d406a2d
commit d984ed8423
3 changed files with 1590 additions and 2 deletions

849
dg800p_cal_zrq.ipynb Normal file

File diff suppressed because one or more lines are too long

737
dg800pro_cal.ipynb Normal file
View File

@@ -0,0 +1,737 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Overview\n",
"This notebook calibrates the high-frequency output flatness of a signal generator. It works by:\n",
"1. Connecting to the signal generator and an oscilloscope via VISA (TCPIP).\n",
"2. Measuring the generator's actual RMS output voltage across specified frequency points for different amplitude ranges using the oscilloscope.\n",
"3. Reading the generator's existing high-frequency flatness calibration data (`cal_hfflat*.hex` files).\n",
"4. Calculating correction factors based on the difference between the measured response and the ideal flat response.\n",
"5. Applying these corrections to the existing calibration data, focusing on higher frequency points within each range.\n",
"6. Saving the updated calibration data into new `.hex` files.\n",
"7. Providing commands to upload the new calibration files to the generator using ADB (Android Debug Bridge).\n",
"\n",
"## Hardware Setup\n",
"* Connect the Signal Generator and Oscilloscope to the network.\n",
"* Connect the Signal Generator's output channel (CH1 or CH2) to the Oscilloscope's input channel (specified in `OSC_MEAS_CHAN`).\n",
"* Use a high-quality 50 Ohm coaxial cable.\n",
"* **Important:** Ensure the Oscilloscope's input channel impedance is set to **50 Ohms**.\n",
"* Enable ADB debugging on the Signal Generator (likely via network)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Configuration\n",
"Adjust the parameters below to match your setup."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"import pyvisa\n",
"import time\n",
"import os\n",
"import pathlib # Optional, can use os.path instead\n",
"\n",
"# --- Instrument & Connection Settings ---\n",
"GEN_IP = \"192.168.138.178\" # IP Address of the Signal Generator\n",
"OSC_IP = \"192.168.138.244\" # IP Address of the Oscilloscope\n",
"ADB_DEVICE_ID = (\n",
" \"192.168.138.178:55555\" # ADB identifier for the generator (IP:Port or Serial)\n",
")\n",
"\n",
"# --- File Paths ---\n",
"# Directory where original .hex files are located and where measurement/modified files will be saved.\n",
"# Use raw string (r'...') or forward slashes ('/') for paths.\n",
"DATA_DIR = r\"C:\\Users\\xxx\\dgpro_calibration_data\" # CHANGE THIS PATH\n",
"\n",
"# Ensure the data directory exists\n",
"pathlib.Path(DATA_DIR).mkdir(\n",
" parents=True, exist_ok=True\n",
") # Create dir if it doesn't exist\n",
"\n",
"# Original and modified calibration filenames (don't change filename structure unless necessary)\n",
"ORIGINAL_HEX_FILES = {1: \"cal_hfflat1.hex\", 2: \"cal_hfflat2.hex\"}\n",
"MODIFIED_HEX_FILES = {1: \"cal_hfflat1_mod.hex\", 2: \"cal_hfflat2_mod.hex\"}\n",
"MEASUREMENT_FILE_TEMPLATE = (\n",
" \"ch{channel}-rng{range_idx}-measurement.npz\" # Template for saving measurements\n",
")\n",
"\n",
"# --- SCPI Commands ---\n",
"# Adjust these if your instruments use different commands\n",
"GEN_IDN_CMD = \"*IDN?\"\n",
"OSC_IDN_CMD = \"*IDN?\"\n",
"GEN_SET_VOLT_CMD = \":SOURce{ch}:VOLTage:AMPLitude {volt:.7e} VPP\" # Set Vpp\n",
"GEN_SET_FREQ_CMD = \":SOURce{ch}:FREQuency {freq:.4e}\" # Set Frequency in Hz\n",
"# *** IMPORTANT: Change this for your specific oscilloscope model! ***\n",
"# Example for many Siglent scopes: Query RMS voltage on Channel 1 -> 'C1:PAVA? RMS'\n",
"# Example from original code (might be Rigol): ':MEAS:ITEM? VRMS,CHAN1'\n",
"OSC_MEAS_CMD = \"C1:PAVA? RMS\" # Placeholder - REPLACE WITH YOUR SCOPE'S COMMAND\n",
"OSC_MEAS_CHAN = \"1\" # Oscilloscope channel connected to the generator output (used if needed in OSC_MEAS_CMD)\n",
"\n",
"# --- Calibration Settings ---\n",
"CHANNELS_TO_CALIBRATE = [\n",
" 1,\n",
" 2,\n",
"] # List of generator channels to calibrate (e.g., [1], [2], or [1, 2])\n",
"SETTLE_TIME_VOLT = 3.0 # Seconds to wait after setting voltage\n",
"SETTLE_TIME_FREQ = 0.7 # Seconds to wait after setting frequency\n",
"\n",
"# Define the calibration ranges, frequencies, and corresponding indices in the .hex file.\n",
"# Each range dictionary needs:\n",
"# 'voltage': Target Vpp for measurements in this range.\n",
"# 'frequencies_uHz': List/array of frequencies in microHertz (uHz).\n",
"# 'hex_start_idx': Starting index (1-based) in the hex file for this range's data.\n",
"# 'hex_end_idx': Ending index (inclusive) in the hex file for this range's data.\n",
"# 'points_to_modify': Number of *highest frequency points* within this range to apply correction to.\n",
"\n",
"# Frequencies are defined once and reused (converted from uHz to Hz later)\n",
"# Note: The original code reversed the lists, meaning measurements went from high to low freq. Preserving that.\n",
"# Also, the number of points varied slightly per range.\n",
"FREQ_LIST_RANGE_1_3_uHz = np.array(\n",
" list(\n",
" reversed(\n",
" [\n",
" 200000000000000,\n",
" 175000000000000,\n",
" 150000000000000,\n",
" 120000000000000,\n",
" 100000000000000,\n",
" 90000000000000,\n",
" 80000000000000,\n",
" 70000000000000,\n",
" 60000000000000,\n",
" 50000000000000,\n",
" 40000000000000,\n",
" 30000000000000,\n",
" 20000000000000,\n",
" 10000000000000,\n",
" 8000000000000,\n",
" 6000000000000,\n",
" 4000000000000,\n",
" 2000000000000,\n",
" 1000000000000,\n",
" 900000000000,\n",
" 800000000000,\n",
" 700000000000,\n",
" 600000000000,\n",
" 500000000000,\n",
" 400000000000,\n",
" 300000000000,\n",
" 200000000000,\n",
" ]\n",
" )\n",
" )\n",
") # 27 points: 200kHz to 200MHz\n",
"\n",
"FREQ_LIST_RANGE_2_uHz = np.array(\n",
" list(\n",
" reversed(\n",
" [\n",
" 100000000000000,\n",
" 90000000000000,\n",
" 80000000000000,\n",
" 70000000000000,\n",
" 60000000000000,\n",
" 50000000000000,\n",
" 40000000000000,\n",
" 30000000000000,\n",
" 20000000000000,\n",
" 10000000000000,\n",
" 8000000000000,\n",
" 6000000000000,\n",
" 4000000000000,\n",
" 2000000000000,\n",
" 1000000000000,\n",
" 900000000000,\n",
" 800000000000,\n",
" 700000000000,\n",
" 600000000000,\n",
" 500000000000,\n",
" 400000000000,\n",
" 300000000000,\n",
" 200000000000,\n",
" ]\n",
" )\n",
" )\n",
") # 23 points: 200kHz to 100MHz\n",
"\n",
"# Structure defining the ranges for calibration\n",
"# Indices match boundaries observed in original code (1-27, 28-50, 51-77)\n",
"# Note: Vpp values from original code comments/calculations are used.\n",
"CAL_RANGES = [\n",
" { # Range 1 (corresponds to original `rng1`)\n",
" \"voltage\": 1.2649111, # Vpp\n",
" \"frequencies_uHz\": FREQ_LIST_RANGE_1_3_uHz,\n",
" \"hex_start_idx\": 1,\n",
" \"hex_end_idx\": 27,\n",
" \"points_to_modify\": 12,\n",
" },\n",
" { # Range 2 (corresponds to original `rng2`)\n",
" \"voltage\": 5.0, # Vpp\n",
" \"frequencies_uHz\": FREQ_LIST_RANGE_2_uHz,\n",
" \"hex_start_idx\": 28,\n",
" \"hex_end_idx\": 50, # 23 points total\n",
" \"points_to_modify\": 12,\n",
" },\n",
" { # Range 3 (corresponds to original `rng3` - 0dBm into 50 Ohm)\n",
" \"voltage\": 0.6324556, # Vpp (approx 0 dBm)\n",
" \"frequencies_uHz\": FREQ_LIST_RANGE_1_3_uHz,\n",
" \"hex_start_idx\": 51,\n",
" \"hex_end_idx\": 77,\n",
" \"points_to_modify\": 12,\n",
" },\n",
"]\n",
"\n",
"# Hex file properties\n",
"HEX_DATA_TYPE = \"<f8\" # Little-endian 64-bit float\n",
"HEX_POINTS_PER_CHANNEL = 78 # Total points = 624 bytes / 8 bytes/point\n",
"\n",
"# --- Plotting Defaults ---\n",
"plt.rcParams[\"figure.figsize\"] = (10, 6) # Default figure size\n",
"\n",
"print(\"Configuration loaded.\")\n",
"print(f\"Data Directory: {DATA_DIR}\")\n",
"print(f\"Generator IP: {GEN_IP}, Oscilloscope IP: {OSC_IP}\")\n",
"print(f\"Calibrating Channels: {CHANNELS_TO_CALIBRATE}\")\n",
"print(f\"Oscilloscope Measurement Command: {OSC_MEAS_CMD}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Helper Functions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def connect_instruments(gen_ip, osc_ip):\n",
" \"\"\"Connects to Generator and Oscilloscope using pyvisa.\"\"\"\n",
" gen = None\n",
" osc = None\n",
" try:\n",
" print(\"Connecting to instruments...\")\n",
" rm = pyvisa.ResourceManager()\n",
" gen_addr = f\"TCPIP0::{gen_ip}::INSTR\"\n",
" osc_addr = f\"TCPIP0::{osc_ip}::INSTR\"\n",
"\n",
" gen = rm.open_resource(gen_addr)\n",
" gen.timeout = 10000 # ms\n",
" gen.read_termination = \"\\n\"\n",
" gen.write_termination = \"\\n\"\n",
" print(f\"Generator Connected: {gen.query(GEN_IDN_CMD).strip()}\")\n",
"\n",
" osc = rm.open_resource(osc_addr)\n",
" osc.timeout = 10000 # ms\n",
" osc.read_termination = \"\\n\"\n",
" osc.write_termination = \"\\n\"\n",
" print(f\"Oscilloscope Connected: {osc.query(OSC_IDN_CMD).strip()}\")\n",
" print(\"-\" * 30)\n",
" return gen, osc\n",
" except pyvisa.errors.VisaIOError as e:\n",
" print(f\"VISA Error connecting to instruments: {e}\")\n",
" if gen:\n",
" gen.close()\n",
" if osc:\n",
" osc.close()\n",
" return None, None\n",
" except Exception as e:\n",
" print(f\"An unexpected error occurred during connection: {e}\")\n",
" if gen:\n",
" gen.close()\n",
" if osc:\n",
" osc.close()\n",
" return None, None\n",
"\n",
"\n",
"def measure_flatness(gen, osc, channel, voltage_vpp, frequencies_hz, osc_meas_cmd):\n",
" \"\"\"Sets generator and measures RMS voltage using the oscilloscope.\"\"\"\n",
" measured_amps_rms = np.zeros_like(frequencies_hz, dtype=float)\n",
"\n",
" print(f\"Starting measurement: CH{channel}, Voltage={voltage_vpp:.4f} Vpp\")\n",
"\n",
" # Set initial voltage\n",
" cmd = GEN_SET_VOLT_CMD.format(ch=channel, volt=voltage_vpp)\n",
" # print(f\"Sending to Gen: {cmd}\") # Uncomment for debugging\n",
" gen.write(cmd)\n",
" time.sleep(SETTLE_TIME_VOLT)\n",
"\n",
" # Set initial frequency (first in the list)\n",
" cmd = GEN_SET_FREQ_CMD.format(ch=channel, freq=frequencies_hz[0])\n",
" # print(f\"Sending to Gen: {cmd}\") # Uncomment for debugging\n",
" gen.write(cmd)\n",
" time.sleep(SETTLE_TIME_FREQ) # Allow settling before first measurement\n",
"\n",
" # Loop through frequencies\n",
" for i, freq_hz in enumerate(frequencies_hz):\n",
" cmd = GEN_SET_FREQ_CMD.format(ch=channel, freq=freq_hz)\n",
" # print(f\"Sending to Gen: {cmd}\") # Uncomment for debugging\n",
" gen.write(cmd)\n",
" time.sleep(SETTLE_TIME_FREQ)\n",
"\n",
" # Measure RMS Voltage\n",
" try:\n",
" # Adapt query based on expected Siglent format (value first, then unit maybe)\n",
" # Or based on original format (just the value)\n",
" query_cmd = osc_meas_cmd.replace(\n",
" \"C1\", f\"C{OSC_MEAS_CHAN}\"\n",
" ) # Replace channel if needed\n",
" # print(f\"Querying Scope: {query_cmd}\") # Uncomment for debugging\n",
" response = osc.query(query_cmd).strip()\n",
"\n",
" # Attempt to extract float - adjust parsing if needed based on scope response\n",
" # e.g., if it returns \"1.234 VRMS\", split and take first part\n",
" try:\n",
" measured_vrms = float(response.split()[0]) # Example parsing\n",
" except ValueError:\n",
" measured_vrms = float(\n",
" response\n",
" ) # Assume direct float response if split fails\n",
"\n",
" measured_amps_rms[i] = measured_vrms\n",
" print(\n",
" f\" Freq: {freq_hz / 1e6:.3f} MHz, Measured VRMS: {measured_vrms:.6f} V\"\n",
" )\n",
" except pyvisa.errors.VisaIOError as e:\n",
" print(f\"VISA Error during measurement at {freq_hz / 1e6:.3f} MHz: {e}\")\n",
" measured_amps_rms[i] = np.nan # Mark as invalid\n",
" except Exception as e:\n",
" print(\n",
" f\"Unexpected error during measurement at {freq_hz / 1e6:.3f} MHz: {e}\"\n",
" )\n",
" measured_amps_rms[i] = np.nan # Mark as invalid\n",
"\n",
" print(f\"Measurement finished for CH{channel}, Voltage={voltage_vpp:.4f} Vpp.\")\n",
" print(\"-\" * 30)\n",
" return frequencies_hz, measured_amps_rms\n",
"\n",
"\n",
"def calculate_and_update_cal(\n",
" original_hex_path, all_measurements, cal_ranges, hex_data_type, output_hex_path\n",
"):\n",
" \"\"\"Calculates new calibration factors and writes the modified .hex file.\"\"\"\n",
" print(f\"Processing calibration for: {output_hex_path}\")\n",
"\n",
" # 1. Read original calibration data\n",
" try:\n",
" with open(original_hex_path, \"rb\") as fh:\n",
" original_cal_data = np.fromfile(fh, dtype=hex_data_type)\n",
" print(f\"Read {len(original_cal_data)} points from {original_hex_path}\")\n",
" if len(original_cal_data) != HEX_POINTS_PER_CHANNEL:\n",
" print(\n",
" f\"Warning: Expected {HEX_POINTS_PER_CHANNEL} points, found {len(original_cal_data)}.\"\n",
" )\n",
" # Decide how to handle: error out, proceed with caution?\n",
" # For now, proceed cautiously. Add error handling if needed.\n",
" # return False\n",
" except FileNotFoundError:\n",
" print(f\"Error: Original calibration file not found: {original_hex_path}\")\n",
" return False\n",
" except Exception as e:\n",
" print(f\"Error reading {original_hex_path}: {e}\")\n",
" return False\n",
"\n",
" modified_cal_data = original_cal_data.copy()\n",
" plt.figure() # Create a figure for plotting results for this channel\n",
"\n",
" # 2. Process each calibration range\n",
" for i, cal_range in enumerate(cal_ranges):\n",
" range_idx = i + 1 # 1-based index for user feedback/filenames\n",
" print(f\" Applying corrections for Range {range_idx}...\")\n",
"\n",
" # Find the measurement data for this range\n",
" measurement_data = all_measurements.get(range_idx)\n",
" if measurement_data is None:\n",
" print(f\" Error: Measurement data for range {range_idx} not found.\")\n",
" continue # Skip this range\n",
"\n",
" measured_freqs = measurement_data[\"frequencies_hz\"]\n",
" measured_amps = measurement_data[\"measured_amps_rms\"]\n",
"\n",
" # Basic check for NaNs or zeros which would break calculation\n",
" if np.isnan(measured_amps).any() or measured_amps[0] == 0:\n",
" print(\n",
" f\" Warning: Invalid measurement data (NaN or zero at start) for range {range_idx}. Skipping correction.\"\n",
" )\n",
" continue\n",
"\n",
" # Get section boundaries (using 0-based Python indexing)\n",
" start_idx = cal_range[\"hex_start_idx\"] # Config uses 1-based for clarity\n",
" end_idx = cal_range[\"hex_end_idx\"] + 1 # Python slice excludes end\n",
" num_points_in_range = end_idx - start_idx\n",
" points_to_modify = cal_range[\"points_to_modify\"]\n",
"\n",
" if len(measured_amps) != num_points_in_range:\n",
" print(\n",
" f\" Warning: Mismatch between measurement points ({len(measured_amps)}) and expected hex range points ({num_points_in_range}) for range {range_idx}. Skipping.\"\n",
" )\n",
" continue\n",
"\n",
" # Extract the relevant section from the *original* calibration data\n",
" original_section = original_cal_data[start_idx:end_idx]\n",
"\n",
" # --- Calculate Correction Factors ---\n",
" # Normalize measured response relative to the first point in the range\n",
" relative_response = measured_amps / measured_amps[0]\n",
"\n",
" # Avoid division by zero if relative_response has zeros (shouldn't happen if checked above)\n",
" relative_response[relative_response == 0] = (\n",
" 1e-9 # Replace zeros with small number\n",
" )\n",
"\n",
" # New calibration factor = Original Factor / Measured Relative Response\n",
" # If measured output dropped (relative_response < 1), new factor > original factor.\n",
" # If measured output peaked (relative_response > 1), new factor < original factor.\n",
" new_cal_section = original_section / relative_response\n",
" # --- Apply Correction to the specified points ---\n",
" # Determine indices *within the section* to modify (last 'points_to_modify')\n",
" modify_start_in_section = num_points_in_range - points_to_modify\n",
" modify_end_in_section = num_points_in_range\n",
"\n",
" # Determine indices *within the full modified_cal_data array*\n",
" modify_start_global = start_idx + modify_start_in_section\n",
" modify_end_global = start_idx + modify_end_in_section\n",
"\n",
" print(\n",
" f\" Modifying indices {modify_start_global} to {modify_end_global - 1} (total {points_to_modify} points)\"\n",
" )\n",
"\n",
" # Update the modified data array only for the selected points\n",
" modified_cal_data[modify_start_global:modify_end_global] = new_cal_section[\n",
" modify_start_in_section:modify_end_in_section\n",
" ]\n",
"\n",
" # --- Plotting for verification ---\n",
" plot_indices = np.arange(start_idx, end_idx)\n",
" plt.plot(\n",
" plot_indices,\n",
" original_section,\n",
" \"--\",\n",
" label=f\"Orig Range {range_idx} (Idx {start_idx}-{end_idx - 1})\",\n",
" )\n",
" plt.plot(\n",
" plot_indices, new_cal_section, \"-\", label=f\"New Calc Range {range_idx}\"\n",
" )\n",
" # Highlight modified points\n",
" plt.scatter(\n",
" plot_indices[modify_start_in_section:modify_end_in_section],\n",
" new_cal_section[modify_start_in_section:modify_end_in_section],\n",
" color=\"red\",\n",
" s=20,\n",
" zorder=5,\n",
" label=f\"Modified Pts Rng {range_idx}\",\n",
" )\n",
"\n",
" # Add overall plot elements\n",
" plt.title(f\"Calibration Factors - {os.path.basename(output_hex_path)}\")\n",
" plt.xlabel(\"Index in .hex file\")\n",
" plt.ylabel(\"Calibration Factor\")\n",
" plt.legend(fontsize=\"small\")\n",
" plt.grid(True)\n",
" plt.tight_layout()\n",
" plot_filename = os.path.splitext(output_hex_path)[0] + \"_comparison.png\"\n",
" plt.savefig(plot_filename)\n",
" print(f\"Saved comparison plot to {plot_filename}\")\n",
" plt.show()\n",
"\n",
" # 3. Write the modified calibration data\n",
" try:\n",
" with open(output_hex_path, \"wb\") as fh:\n",
" modified_cal_data.tofile(fh)\n",
" print(\n",
" f\"Successfully wrote {len(modified_cal_data)} points to {output_hex_path}\"\n",
" )\n",
" return True\n",
" except Exception as e:\n",
" print(f\"Error writing modified file {output_hex_path}: {e}\")\n",
" return False"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Connect to Instruments"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"gen, osc = connect_instruments(GEN_IP, OSC_IP)\n",
"\n",
"# Proceed only if connection is successful\n",
"if gen is None or osc is None:\n",
" print(\n",
" \"\\nERROR: Instrument connection failed. Please check IPs, connections, and VISA setup.\"\n",
" )\n",
" # Optionally raise an error: raise ConnectionError(\"Failed to connect to instruments\")\n",
"else:\n",
" print(\"\\nInstruments connected successfully.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Perform Measurements\n",
"This section iterates through the specified channels and calibration ranges, performing measurements for each."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"all_channel_measurements = {} # Dictionary to store results {channel: {range_idx: data}}\n",
"\n",
"if gen and osc: # Only run if instruments are connected\n",
" for channel in CHANNELS_TO_CALIBRATE:\n",
" print(f\"\\n--- Starting Measurements for Channel {channel} ---\")\n",
" channel_measurements = {} # Store results for this channel {range_idx: data}\n",
"\n",
" for i, cal_range in enumerate(CAL_RANGES):\n",
" range_idx = i + 1 # 1-based index\n",
" voltage_vpp = cal_range[\"voltage\"]\n",
" # Convert frequencies from uHz to Hz for measurement\n",
" frequencies_hz = cal_range[\"frequencies_uHz\"] / 1e6\n",
"\n",
" # Perform the measurement\n",
" measured_freqs_hz, measured_amps_rms = measure_flatness(\n",
" gen, osc, channel, voltage_vpp, frequencies_hz, OSC_MEAS_CMD\n",
" )\n",
"\n",
" # Store results\n",
" measurement_data = {\n",
" \"frequencies_hz\": measured_freqs_hz,\n",
" \"measured_amps_rms\": measured_amps_rms,\n",
" \"voltage_vpp\": voltage_vpp,\n",
" \"channel\": channel,\n",
" \"range_idx\": range_idx,\n",
" }\n",
" channel_measurements[range_idx] = measurement_data\n",
"\n",
" # Save intermediate results for this range\n",
" filename = MEASUREMENT_FILE_TEMPLATE.format(\n",
" channel=channel, range_idx=range_idx\n",
" )\n",
" filepath = os.path.join(DATA_DIR, filename)\n",
" try:\n",
" np.savez(filepath, **measurement_data)\n",
" print(f\"Saved measurement data to: {filepath}\")\n",
" except Exception as e:\n",
" print(f\"Error saving measurement data to {filepath}: {e}\")\n",
"\n",
" print(\"-\" * 20) # Separator between ranges\n",
"\n",
" all_channel_measurements[channel] = channel_measurements\n",
" print(f\"--- Finished Measurements for Channel {channel} ---\")\n",
"\n",
"else:\n",
" print(\"\\nSkipping measurements due to connection failure.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Calculate New Calibration and Generate .hex Files\n",
"This section processes the measurement data saved in step 4 and generates the modified `.hex` files."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if all_channel_measurements: # Proceed only if measurements were taken\n",
" for channel in CHANNELS_TO_CALIBRATE:\n",
" print(f\"\\n--- Calculating Calibration for Channel {channel} ---\")\n",
"\n",
" original_hex_path = os.path.join(DATA_DIR, ORIGINAL_HEX_FILES[channel])\n",
" output_hex_path = os.path.join(DATA_DIR, MODIFIED_HEX_FILES[channel])\n",
" measurements_for_channel = all_channel_measurements.get(channel)\n",
"\n",
" if not measurements_for_channel:\n",
" print(\n",
" f\"Error: No measurement data found for Channel {channel}. Skipping calculation.\"\n",
" )\n",
" continue\n",
"\n",
" success = calculate_and_update_cal(\n",
" original_hex_path,\n",
" measurements_for_channel,\n",
" CAL_RANGES,\n",
" HEX_DATA_TYPE,\n",
" output_hex_path,\n",
" )\n",
"\n",
" if success:\n",
" print(\n",
" f\"Successfully generated modified calibration file for Channel {channel}.\"\n",
" )\n",
" else:\n",
" print(\n",
" f\"Failed to generate modified calibration file for Channel {channel}.\"\n",
" )\n",
"\n",
"else:\n",
" print(\n",
" \"\\nSkipping calibration calculation because no measurements were performed or loaded.\"\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 6. Visualize Original vs. Modified Calibration Data (Optional)\n",
"Plot the original and newly generated `_mod.hex` files together for comparison."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"\\n--- Plotting Final Comparison ---\")\n",
"plt.figure(figsize=(12, 7))\n",
"\n",
"file_labels = []\n",
"\n",
"# Plot original files\n",
"for ch in CHANNELS_TO_CALIBRATE:\n",
" fname = os.path.join(DATA_DIR, ORIGINAL_HEX_FILES[ch])\n",
" try:\n",
" with open(fname, \"rb\") as fh:\n",
" data = np.fromfile(fh, dtype=HEX_DATA_TYPE)\n",
" plt.plot(data, \"--\", label=f\"Original CH{ch}\")\n",
" file_labels.append(f\"Orig CH{ch}\")\n",
" except FileNotFoundError:\n",
" print(f\"Warning: Original file not found for plotting: {fname}\")\n",
" except Exception as e:\n",
" print(f\"Error reading {fname} for plotting: {e}\")\n",
"\n",
"# Plot modified files\n",
"for ch in CHANNELS_TO_CALIBRATE:\n",
" fname = os.path.join(DATA_DIR, MODIFIED_HEX_FILES[ch])\n",
" try:\n",
" with open(fname, \"rb\") as fh:\n",
" data = np.fromfile(fh, dtype=HEX_DATA_TYPE)\n",
" plt.plot(data, \"-\", linewidth=2, label=f\"Modified CH{ch}\")\n",
" file_labels.append(f\"Mod CH{ch}\")\n",
" except FileNotFoundError:\n",
" print(f\"Warning: Modified file not found for plotting: {fname}\")\n",
" except Exception as e:\n",
" print(f\"Error reading {fname} for plotting: {e}\")\n",
"\n",
"# Add vertical lines indicating range boundaries (using 0-based index)\n",
"# Boundaries are *after* the end index of a range\n",
"boundaries = sorted(list(set([r[\"hex_end_idx\"] for r in CAL_RANGES])))\n",
"if boundaries:\n",
" plt.vlines(\n",
" boundaries[:-1],\n",
" ymin=plt.ylim()[0],\n",
" ymax=plt.ylim()[1],\n",
" colors=\"r\",\n",
" linestyles=\":\",\n",
" label=\"Range Boundary\",\n",
" )\n",
" # file_labels.append('Boundary') # Optional: add to legend\n",
"\n",
"plt.title(\"Comparison of Original and Modified HF Flatness Calibration Data\")\n",
"plt.xlabel(\"Index in .hex file\")\n",
"plt.ylabel(\"Calibration Factor\")\n",
"plt.legend()\n",
"plt.grid(True)\n",
"plt.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 7. Upload Calibration Files to Generator using ADB"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"\\n--- ADB Upload Commands ---\")\n",
"print(\"Run these commands in your system terminal (cmd, bash, etc.):\")\n",
"print(\"# Connect to the device (if not already connected):\")\n",
"print(f\"adb connect {ADB_DEVICE_ID}\")\n",
"print(\"-\" * 20)\n",
"\n",
"for ch in CHANNELS_TO_CALIBRATE:\n",
" mod_file_local = os.path.join(DATA_DIR, MODIFIED_HEX_FILES[ch])\n",
" original_filename_remote = ORIGINAL_HEX_FILES[\n",
" ch\n",
" ] # Overwrite the original name on device\n",
" # Assuming the standard Rigol path, adjust if necessary\n",
" remote_path = f\"/rigol/data/{original_filename_remote}\"\n",
"\n",
" # Important: Ensure paths are correctly quoted if they contain spaces\n",
" # Using pathlib helps create OS-agnostic paths for the local file\n",
" mod_file_local_str = str(pathlib.Path(mod_file_local))\n",
"\n",
" print(f\"# Upload Channel {ch}:\")\n",
" # Use quotes around paths, especially the local one if it might have spaces\n",
" print(f'adb -s {ADB_DEVICE_ID} push \"{mod_file_local_str}\" \"{remote_path}\"')\n",
" print(\"\") # Add a newline for clarity\n",
"\n",
"print(\"--- End of ADB Commands ---\")\n",
"print(\n",
" \"\\nAfter uploading, it is recommended to REBOOT the signal generator for the new calibration data to take effect.\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if gen:\n",
" gen.close()\n",
" print(\"Generator connection closed.\")\n",
"if osc:\n",
" osc.close()\n",
" print(\"Oscilloscope connection closed.\")\n",
"\n",
"print(\"\\nCalibration process finished.\")"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

6
rigol_setup_crypter.py Normal file → Executable file
View File

@@ -143,10 +143,12 @@ def _des_cbc_crypt(data_bytes, iv_64bits, round_keys, is_encrypt):
def DataDecryption(strEncryptedDataHex, cpu_serial):
"""Decrypts hex string using DES-CBC"""
c_hex_str = strEncryptedDataHex.strip()
try:
encrypted_bytes = binascii.unhexlify(strEncryptedDataHex)
encrypted_bytes = binascii.unhexlify(c_hex_str)
except binascii.Error as e:
raise ValueError(f"Invalid hex string provided: {e}")
raise ValueError(f"Invalid hex string provided (length {len(c_hex_str)} after stripping): {e}")
round_keys, iv_bits = _generate_round_keys(cpu_serial), _generate_iv_bits()
decrypted_bytes = _des_cbc_crypt(encrypted_bytes, iv_bits, round_keys, False)
try: