# %% [markdown] # Update `VISA_GEN`/`VISA_OSC` with your resource strings and `GEN_CH`/`OSC_CH` w/ channel numbers before running. # %% import pyvisa import time import numpy as np import matplotlib.pyplot as plt import os GEN_CH = 1 # generator chan to cal OSC_CH = 1 # scope chan # VISA resource strs VISA_GEN = "USB0::6833::1606::DG8P1234567890::0::INSTR" VISA_OSC = "USB0::62700::4119::SDS01234567890::0::INSTR" # full bw freq points FREQ_FULL = np.array( [ 200e3, 300e3, 400e3, 500e3, 600e3, 700e3, 800e3, 900e3, 1e6, 2e6, 4e6, 6e6, 8e6, 10e6, 20e6, 30e6, 40e6, 50e6, 60e6, 70e6, 80e6, 90e6, 100e6, 120e6, 150e6, 175e6, 200e6, ] ) # high voltage range is limited to 100mhz apparently FREQ_HIGH_VOLT = FREQ_FULL[:-4] print(f"Full Freq Points: {len(FREQ_FULL)} (Max: {FREQ_FULL[-1] / 1e6} MHz)") print(f"High Volt Points: {len(FREQ_HIGH_VOLT)} (Max: {FREQ_HIGH_VOLT[-1] / 1e6} MHz)") class InstrumentController: def __init__(self, gen_addr, osc_addr): self.rm = pyvisa.ResourceManager() try: self.gen = self.rm.open_resource(gen_addr) self.osc = self.rm.open_resource(osc_addr) print(f"Connected to GEN: {self.gen.query('*IDN?').strip()}") print(f"Connected to OSC: {self.osc.query('*IDN?').strip()}") except Exception as e: print(f"Connection Failed: {e}") def prepare_gen(self, ch): print(f"Resetting Generator CH{ch} state...") self.gen.write(f":SOUR{ch}:FUNC SIN") # sine self.gen.write(f":SOUR{ch}:VOLT:OFFS 0") # 0V offset self.gen.write(f":OUTP{ch}:LOAD 50") # match 50R terminator self.gen.write(f":OUTP{ch} ON") # enable output def prepare_scope(self, ch): print(f"Configuring Scope CH{ch}...") self.osc.write(f"C{ch}:TRA ON") # trace On self.osc.write(f"C{ch}:ATT 1") # probe 1X self.osc.write(f"C{ch}:BWL FULL") # full bw self.osc.write(f"C{ch}:TRIG_LEVEL 0") # reset it just in case self.osc.write(":ACQuire:TYPE NORMal") time.sleep(0.1) self.osc.write(":ACQuire:TYPE AVERage,16") def measure_rms(self, ch, samples=5): time.sleep(0.5) vals = [] for _ in range(samples): try: resp = self.osc.query(f"C{ch}:PAVA? RMS") # parse "C2:PAVA RMS,1.234V" v = float(resp.split(",")[1].split("V")[0]) vals.append(v) except (IndexError, ValueError): pass time.sleep(0.1) return np.mean(vals) if vals else 0.0 def set_gen(self, ch, freq, vpp): self.gen.write(f":SOUR{ch}:FREQ {freq}") self.gen.write(f":SOUR{ch}:VOLT {vpp}") instr = InstrumentController(VISA_GEN, VISA_OSC) # %% [markdown] # Runs the physical sweep. The generator uses three internal amplifier ranges (Mid, High, Low). We sweep each distinct range to characterize the rolloff. # # Data is saved to `cal_measurements_final.npz` after the sweep. # %% def run_calibration_sweeps(controller, gen_ch, osc_ch): # Vpp used in factory calibration are 1: 12649111e-7 V, 2: 50000000e-7 V, 3: 6324556e-7 V tasks = [ # range 1: mid voltage (~1.26V), full bw {"name": "rng1", "vpp": 1.2649111, "freqs": FREQ_FULL}, # range 2: high voltage (~5.0V), 100MHz {"name": "rng2", "vpp": 5.0000000, "freqs": FREQ_HIGH_VOLT}, # range 3: low voltage (~0.63V), full bw {"name": "rng3", "vpp": 0.6324556, "freqs": FREQ_FULL}, ] data_store = {} controller.prepare_gen(gen_ch) controller.prepare_scope(osc_ch) for task in tasks: vpp = task["vpp"] freqs = task["freqs"] name = task["name"] print(f"\n--- Sweeping {name}: {vpp} Vpp (Max Freq: {freqs[-1] / 1e6} MHz) ---") controller.set_gen(gen_ch, freqs[0], vpp) controller.osc.write(f"C{osc_ch}:VOLT_DIV {vpp / 6}") time.sleep(2) meas_vals = [] for f in freqs: controller.set_gen(gen_ch, f, vpp) # Adjust timebase dynamically if f > 100e6: tdiv = 2e-9 elif f > 10e6: tdiv = 10e-9 elif f > 1e6: tdiv = 100e-9 else: tdiv = 5e-6 controller.osc.write(f"TIME_DIV {tdiv}") time.sleep(0.8) # wait a bit for it to settle rms = controller.measure_rms(osc_ch, samples=5) meas_vals.append(rms) print(f"Freq: {f / 1e6:5.1f} MHz | RMS: {rms:.4f}") data_store[name] = np.array(meas_vals) return data_store # sweep and save meas_data = run_calibration_sweeps(instr, GEN_CH, OSC_CH) np.savez("cal_measurements_final.npz", **meas_data) print("Sweep complete. Data saved to .npz file.") # %% [markdown] # Correction logic is: # $$ \text{factor}_{new} = \text{factor}_{old} \times \frac{V_{ref}}{V_{meas}} $$ # # Where $V_{ref}$ is the amplitude measured at the lowest frequency (200 kHz). Only the last 12 points (>25 MHz) are corrected here, low range should be correctly factory calibrated but feel free to increase the correction points. # %% def generate_final_hex(input_file, output_file, measurements): if not os.path.exists(input_file): print(f"Error: Input file '{input_file}' not found.") return with open(input_file, "rb") as f: # 78 doubles (64-bit float) # [Header(1)] + [Rng1(27)] + [Rng2(23)] + [Rng3(27)] original_blob = np.fromfile(f, dtype=" 25MHz tail, DG821 should be cal'ed up to 25MHz POINTS_TO_CORRECT = 12 def apply_mod(original_arr, meas_arr): """Calculates new coefficients based on measured falloff.""" ref_amp = meas_arr[0] # reference amp is the lowest freq measurement safe_meas = np.where(meas_arr < (ref_amp * 0.05), ref_amp, meas_arr) correction_curve = ref_amp / safe_meas out_arr = original_arr.copy() start_idx = len(original_arr) - POINTS_TO_CORRECT # apply correction factor to the tail out_arr[start_idx:] = original_arr[start_idx:] * correction_curve[start_idx:] return out_arr # process all ranges new_sec_1 = apply_mod(sec_1, measurements["rng1"]) new_sec_2 = apply_mod(sec_2, measurements["rng2"]) new_sec_3 = apply_mod(sec_3, measurements["rng3"]) # rebuild blob final_blob = np.zeros_like(original_blob) final_blob[0] = original_blob[0] # header final_blob[1:28] = new_sec_1 final_blob[28:51] = new_sec_2 final_blob[51:78] = new_sec_3 with open(output_file, "wb") as f: final_blob.tofile(f) print(f"Generated corrected calibration file: {output_file}") # plot plot_results(sec_1, new_sec_1, sec_2, new_sec_2, sec_3, new_sec_3, measurements) def plot_results(s1, n1, s2, n2, s3, n3, meas): _fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(16, 12)) def plot_row(ax_row, freq, old_coeffs, new_coeffs, meas_data, title_prefix, vpp): # coefficients ax_row[0].plot(freq / 1e6, old_coeffs, label="Original") ax_row[0].plot(freq / 1e6, new_coeffs, "--", label="Corrected") ax_row[0].set_title(f"{title_prefix} Cal Factors") ax_row[0].set_ylabel("Factor") ax_row[0].legend() ax_row[0].grid(True, alpha=0.3) # measurements ax_row[1].plot(freq / 1e6, meas_data, "r.-", label="Measured") ax_row[1].set_title(f"{title_prefix} Raw Output ($V_{{pp}}={vpp}$)") ax_row[1].set_ylabel("RMS (V)") ax_row[1].grid(True, alpha=0.3) plot_row(axes[0], FREQ_FULL, s1, n1, meas["rng1"], "Range 1 (Mid)", 1.26) plot_row(axes[1], FREQ_HIGH_VOLT, s2, n2, meas["rng2"], "Range 2 (High)", 5.0) plot_row(axes[2], FREQ_FULL, s3, n3, meas["rng3"], "Range 3 (Low)", 0.63) axes[2, 0].set_xlabel("Frequency (MHz)") axes[2, 1].set_xlabel("Frequency (MHz)") plt.tight_layout() plt.show() # gen input_hex = f"cal_hfflat{GEN_CH}.hex" output_hex = f"cal_hfflat{GEN_CH}_mod.hex" generate_final_hex(input_hex, output_hex, meas_data) # %% [markdown] # Push the generated `cal_hfflat{gen_ch}_mod.hex` to the device. # # I.e. `adb push cal_hfflat1_mod.hex /rigol/data/cal_hfflat1.hex`, reboot, then do the verification sweep if needed. # %% def check_final_flatness(controller, gen_ch, osc_ch): print("=" * 32) print(f"Verification sweep: Channel {gen_ch}") print("=" * 32) new_meas_data = run_calibration_sweeps(controller, gen_ch, osc_ch) _fig, axes = plt.subplots(nrows=3, ncols=1, figsize=(10, 14)) plot_cfg = [ {"key": "rng1", "title": "Range 1 (Mid: 1.26V)", "freq": FREQ_FULL}, {"key": "rng2", "title": "Range 2 (High: 5.0V)", "freq": FREQ_HIGH_VOLT}, {"key": "rng3", "title": "Range 3 (Low: 0.63V)", "freq": FREQ_FULL}, ] for i, cfg in enumerate(plot_cfg): key = cfg["key"] freqs_mhz = cfg["freq"] / 1e6 vals = new_meas_data[key] # flatness ratio = V_meas / V_ref (at 200kHz) ref_val = vals[0] normalized = vals / ref_val # peak deviation max_dev = np.max(np.abs(normalized - 1.0)) * 100 ax = axes[i] ax.plot(freqs_mhz, normalized, "b.-", linewidth=2, label="Measured Response") ax.axhline( 1.0, color="k", linestyle="-", alpha=0.8, linewidth=1, label="Target (1.0)" ) ax.axhline( 1.01, color="g", linestyle="--", alpha=0.5, label=r"$\pm 1\%$ Tolerance" ) ax.axhline(0.99, color="g", linestyle="--", alpha=0.5) ax.set_title(f"{cfg['title']} - Max Deviation: {max_dev:.2f}%") ax.set_ylabel("Normalized Gain ($V_{out} / V_{ref}$)") ax.set_xlabel("Frequency (MHz)") if max_dev < 5.0: ax.set_ylim(0.95, 1.05) ax.legend(loc="upper right") ax.grid(True, which="both", alpha=0.3) plt.tight_layout() plt.show() check_final_flatness(instr, GEN_CH, OSC_CH)