{ "cells": [ { "cell_type": "markdown", "id": "42249cb4", "metadata": {}, "source": [ "Update `VISA_GEN`/`VISA_OSC` with your resource strings and `GEN_CH`/`OSC_CH` w/ channel numbers before running." ] }, { "cell_type": "code", "execution_count": null, "id": "2d2baeb2", "metadata": {}, "outputs": [], "source": [ "import pyvisa\n", "import time\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "import os\n", "\n", "GEN_CH = 1 # generator chan to cal\n", "OSC_CH = 1 # scope chan\n", "\n", "# VISA resource strs\n", "VISA_GEN = \"USB0::6833::1606::DG8P1234567890::0::INSTR\"\n", "VISA_OSC = \"USB0::62700::4119::SDS01234567890::0::INSTR\"\n", "\n", "# full bw freq points\n", "FREQ_FULL = np.array(\n", " [\n", " 200e3,\n", " 300e3,\n", " 400e3,\n", " 500e3,\n", " 600e3,\n", " 700e3,\n", " 800e3,\n", " 900e3,\n", " 1e6,\n", " 2e6,\n", " 4e6,\n", " 6e6,\n", " 8e6,\n", " 10e6,\n", " 20e6,\n", " 30e6,\n", " 40e6,\n", " 50e6,\n", " 60e6,\n", " 70e6,\n", " 80e6,\n", " 90e6,\n", " 100e6,\n", " 120e6,\n", " 150e6,\n", " 175e6,\n", " 200e6,\n", " ]\n", ")\n", "\n", "# high voltage range is limited to 100mhz apparently\n", "FREQ_HIGH_VOLT = FREQ_FULL[:-4]\n", "\n", "print(f\"Full Freq Points: {len(FREQ_FULL)} (Max: {FREQ_FULL[-1] / 1e6} MHz)\")\n", "print(f\"High Volt Points: {len(FREQ_HIGH_VOLT)} (Max: {FREQ_HIGH_VOLT[-1] / 1e6} MHz)\")\n", "\n", "\n", "class InstrumentController:\n", " def __init__(self, gen_addr, osc_addr):\n", " self.rm = pyvisa.ResourceManager()\n", " try:\n", " self.gen = self.rm.open_resource(gen_addr)\n", " self.osc = self.rm.open_resource(osc_addr)\n", " print(f\"Connected to GEN: {self.gen.query('*IDN?').strip()}\")\n", " print(f\"Connected to OSC: {self.osc.query('*IDN?').strip()}\")\n", " except Exception as e:\n", " print(f\"Connection Failed: {e}\")\n", "\n", " def prepare_gen(self, ch):\n", " print(f\"Resetting Generator CH{ch} state...\")\n", " self.gen.write(f\":SOUR{ch}:FUNC SIN\") # sine\n", " self.gen.write(f\":SOUR{ch}:VOLT:OFFS 0\") # 0V offset\n", " self.gen.write(f\":OUTP{ch}:LOAD 50\") # match 50R terminator\n", " self.gen.write(f\":OUTP{ch} ON\") # enable output\n", "\n", " def prepare_scope(self, ch):\n", " print(f\"Configuring Scope CH{ch}...\")\n", " self.osc.write(f\"C{ch}:TRA ON\") # trace On\n", " self.osc.write(f\"C{ch}:ATT 1\") # probe 1X\n", " self.osc.write(f\"C{ch}:BWL FULL\") # full bw\n", " self.osc.write(f\"C{ch}:TRIG_LEVEL 0\")\n", "\n", " # reset it just in case\n", " self.osc.write(\":ACQuire:TYPE NORMal\")\n", " time.sleep(0.1)\n", " self.osc.write(\":ACQuire:TYPE AVERage,16\")\n", "\n", " def measure_rms(self, ch, samples=5):\n", " time.sleep(0.5)\n", " vals = []\n", " for _ in range(samples):\n", " try:\n", " resp = self.osc.query(f\"C{ch}:PAVA? RMS\")\n", " # parse \"C2:PAVA RMS,1.234V\"\n", " v = float(resp.split(\",\")[1].split(\"V\")[0])\n", " vals.append(v)\n", " except (IndexError, ValueError):\n", " pass\n", " time.sleep(0.1)\n", "\n", " return np.mean(vals) if vals else 0.0\n", "\n", " def set_gen(self, ch, freq, vpp):\n", " self.gen.write(f\":SOUR{ch}:FREQ {freq}\")\n", " self.gen.write(f\":SOUR{ch}:VOLT {vpp}\")\n", "\n", "\n", "instr = InstrumentController(VISA_GEN, VISA_OSC)" ] }, { "cell_type": "markdown", "id": "440bfc6d", "metadata": {}, "source": [ "Runs the physical sweep. The generator uses three internal amplifier ranges (Mid, High, Low). We sweep each distinct range to characterize the rolloff.\n", "\n", "Data is saved to `cal_measurements_final.npz` after the sweep." ] }, { "cell_type": "code", "execution_count": null, "id": "a8b47a9b", "metadata": {}, "outputs": [], "source": [ "def run_calibration_sweeps(controller, gen_ch, osc_ch):\n", " # Vpp used in factory calibration are 1: 12649111e-7 V, 2: 50000000e-7 V, 3: 6324556e-7 V\n", " tasks = [\n", " # range 1: mid voltage (~1.26V), full bw\n", " {\"name\": \"rng1\", \"vpp\": 1.2649111, \"freqs\": FREQ_FULL},\n", " # range 2: high voltage (~5.0V), 100MHz\n", " {\"name\": \"rng2\", \"vpp\": 5.0000000, \"freqs\": FREQ_HIGH_VOLT},\n", " # range 3: low voltage (~0.63V), full bw\n", " {\"name\": \"rng3\", \"vpp\": 0.6324556, \"freqs\": FREQ_FULL},\n", " ]\n", "\n", " data_store = {}\n", " controller.prepare_gen(gen_ch)\n", " controller.prepare_scope(osc_ch)\n", "\n", " for task in tasks:\n", " vpp = task[\"vpp\"]\n", " freqs = task[\"freqs\"]\n", " name = task[\"name\"]\n", "\n", " print(f\"\\n--- Sweeping {name}: {vpp} Vpp (Max Freq: {freqs[-1] / 1e6} MHz) ---\")\n", "\n", " controller.set_gen(gen_ch, freqs[0], vpp)\n", " controller.osc.write(f\"C{osc_ch}:VOLT_DIV {vpp / 6}\")\n", " time.sleep(2)\n", "\n", " meas_vals = []\n", " for f in freqs:\n", " controller.set_gen(gen_ch, f, vpp)\n", "\n", " # Adjust timebase dynamically\n", " if f > 100e6:\n", " tdiv = 2e-9\n", " elif f > 10e6:\n", " tdiv = 10e-9\n", " elif f > 1e6:\n", " tdiv = 100e-9\n", " else:\n", " tdiv = 5e-6\n", "\n", " controller.osc.write(f\"TIME_DIV {tdiv}\")\n", " time.sleep(0.8) # wait a bit for it to settle\n", "\n", " rms = controller.measure_rms(osc_ch, samples=5)\n", " meas_vals.append(rms)\n", "\n", " print(f\"Freq: {f / 1e6:5.1f} MHz | RMS: {rms:.4f}\")\n", "\n", " data_store[name] = np.array(meas_vals)\n", "\n", " return data_store\n", "\n", "\n", "# sweep and save\n", "meas_data = run_calibration_sweeps(instr, GEN_CH, OSC_CH)\n", "np.savez(\"cal_measurements_final.npz\", **meas_data)\n", "print(\"Sweep complete. Data saved to .npz file.\")" ] }, { "cell_type": "markdown", "id": "a95e8926", "metadata": {}, "source": [ "Correction logic is:\n", "$$ \\text{factor}_{new} = \\text{factor}_{old} \\times \\frac{V_{ref}}{V_{meas}} $$\n", "\n", "Where $V_{ref}$ is the amplitude measured at the lowest frequency (200 kHz). Only the last 12 points (>25 MHz) are corrected here, low range should be correctly factory calibrated but feel free to increase the correction points." ] }, { "cell_type": "code", "execution_count": null, "id": "299a0c65", "metadata": {}, "outputs": [], "source": [ "def generate_final_hex(input_file, output_file, measurements):\n", " if not os.path.exists(input_file):\n", " print(f\"Error: Input file '{input_file}' not found.\")\n", " return\n", "\n", " with open(input_file, \"rb\") as f:\n", " # 78 doubles (64-bit float)\n", " # [Header(1)] + [Rng1(27)] + [Rng2(23)] + [Rng3(27)]\n", " original_blob = np.fromfile(f, dtype=\" 25MHz tail, DG821 should be cal'ed up to 25MHz\n", " POINTS_TO_CORRECT = 12\n", "\n", " def apply_mod(original_arr, meas_arr):\n", " \"\"\"Calculates new coefficients based on measured falloff.\"\"\"\n", " ref_amp = meas_arr[0] # reference amp is the lowest freq measurement\n", " safe_meas = np.where(meas_arr < (ref_amp * 0.05), ref_amp, meas_arr)\n", " correction_curve = ref_amp / safe_meas\n", "\n", " out_arr = original_arr.copy()\n", " start_idx = len(original_arr) - POINTS_TO_CORRECT\n", "\n", " # apply correction factor to the tail\n", " out_arr[start_idx:] = original_arr[start_idx:] * correction_curve[start_idx:]\n", " return out_arr\n", "\n", " # process all ranges\n", " new_sec_1 = apply_mod(sec_1, measurements[\"rng1\"])\n", " new_sec_2 = apply_mod(sec_2, measurements[\"rng2\"])\n", " new_sec_3 = apply_mod(sec_3, measurements[\"rng3\"])\n", "\n", " # rebuild blob\n", " final_blob = np.zeros_like(original_blob)\n", " final_blob[0] = original_blob[0] # header\n", " final_blob[1:28] = new_sec_1\n", " final_blob[28:51] = new_sec_2\n", " final_blob[51:78] = new_sec_3\n", "\n", " with open(output_file, \"wb\") as f:\n", " final_blob.tofile(f)\n", "\n", " print(f\"Generated corrected calibration file: {output_file}\")\n", " # plot\n", " plot_results(sec_1, new_sec_1, sec_2, new_sec_2, sec_3, new_sec_3, measurements)\n", "\n", "\n", "def plot_results(s1, n1, s2, n2, s3, n3, meas):\n", " _fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(16, 12))\n", "\n", " def plot_row(ax_row, freq, old_coeffs, new_coeffs, meas_data, title_prefix, vpp):\n", " # coefficients\n", " ax_row[0].plot(freq / 1e6, old_coeffs, label=\"Original\")\n", " ax_row[0].plot(freq / 1e6, new_coeffs, \"--\", label=\"Corrected\")\n", " ax_row[0].set_title(f\"{title_prefix} Cal Factors\")\n", " ax_row[0].set_ylabel(\"Factor\")\n", " ax_row[0].legend()\n", " ax_row[0].grid(True, alpha=0.3)\n", "\n", " # measurements\n", " ax_row[1].plot(freq / 1e6, meas_data, \"r.-\", label=\"Measured\")\n", " ax_row[1].set_title(f\"{title_prefix} Raw Output ($V_{{pp}}={vpp}$)\")\n", " ax_row[1].set_ylabel(\"RMS (V)\")\n", " ax_row[1].grid(True, alpha=0.3)\n", "\n", " plot_row(axes[0], FREQ_FULL, s1, n1, meas[\"rng1\"], \"Range 1 (Mid)\", 1.26)\n", " plot_row(axes[1], FREQ_HIGH_VOLT, s2, n2, meas[\"rng2\"], \"Range 2 (High)\", 5.0)\n", " plot_row(axes[2], FREQ_FULL, s3, n3, meas[\"rng3\"], \"Range 3 (Low)\", 0.63)\n", "\n", " axes[2, 0].set_xlabel(\"Frequency (MHz)\")\n", " axes[2, 1].set_xlabel(\"Frequency (MHz)\")\n", "\n", " plt.tight_layout()\n", " plt.show()\n", "\n", "\n", "# gen\n", "input_hex = f\"cal_hfflat{GEN_CH}.hex\"\n", "output_hex = f\"cal_hfflat{GEN_CH}_mod.hex\"\n", "generate_final_hex(input_hex, output_hex, meas_data)" ] }, { "cell_type": "markdown", "id": "f32494b3", "metadata": {}, "source": [ "Push the generated `cal_hfflat{gen_ch}_mod.hex` to the device.\n", "\n", "I.e. `adb push cal_hfflat1_mod.hex /rigol/data/cal_hfflat1.hex`, reboot, then do the verification sweep if needed." ] }, { "cell_type": "code", "execution_count": null, "id": "a7069a1e", "metadata": {}, "outputs": [], "source": [ "def check_final_flatness(controller, gen_ch, osc_ch):\n", " print(\"=\" * 32)\n", " print(f\"Verification sweep: Channel {gen_ch}\")\n", " print(\"=\" * 32)\n", "\n", " new_meas_data = run_calibration_sweeps(controller, gen_ch, osc_ch)\n", " _fig, axes = plt.subplots(nrows=3, ncols=1, figsize=(10, 14))\n", "\n", " plot_cfg = [\n", " {\"key\": \"rng1\", \"title\": \"Range 1 (Mid: 1.26V)\", \"freq\": FREQ_FULL},\n", " {\"key\": \"rng2\", \"title\": \"Range 2 (High: 5.0V)\", \"freq\": FREQ_HIGH_VOLT},\n", " {\"key\": \"rng3\", \"title\": \"Range 3 (Low: 0.63V)\", \"freq\": FREQ_FULL},\n", " ]\n", "\n", " for i, cfg in enumerate(plot_cfg):\n", " key = cfg[\"key\"]\n", " freqs_mhz = cfg[\"freq\"] / 1e6\n", " vals = new_meas_data[key]\n", "\n", " # flatness ratio = V_meas / V_ref (at 200kHz)\n", " ref_val = vals[0]\n", " normalized = vals / ref_val\n", "\n", " # peak deviation\n", " max_dev = np.max(np.abs(normalized - 1.0)) * 100\n", "\n", " ax = axes[i]\n", " ax.plot(freqs_mhz, normalized, \"b.-\", linewidth=2, label=\"Measured Response\")\n", "\n", " ax.axhline(\n", " 1.0, color=\"k\", linestyle=\"-\", alpha=0.8, linewidth=1, label=\"Target (1.0)\"\n", " )\n", " ax.axhline(\n", " 1.01, color=\"g\", linestyle=\"--\", alpha=0.5, label=r\"$\\pm 1\\%$ Tolerance\"\n", " )\n", " ax.axhline(0.99, color=\"g\", linestyle=\"--\", alpha=0.5)\n", "\n", " ax.set_title(f\"{cfg['title']} - Max Deviation: {max_dev:.2f}%\")\n", " ax.set_ylabel(\"Normalized Gain ($V_{out} / V_{ref}$)\")\n", " ax.set_xlabel(\"Frequency (MHz)\")\n", "\n", " if max_dev < 5.0:\n", " ax.set_ylim(0.95, 1.05)\n", "\n", " ax.legend(loc=\"upper right\")\n", " ax.grid(True, which=\"both\", alpha=0.3)\n", "\n", " plt.tight_layout()\n", " plt.show()\n", "\n", "\n", "check_final_flatness(instr, GEN_CH, OSC_CH)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.7" } }, "nbformat": 4, "nbformat_minor": 5 }