326 lines
10 KiB
Python
326 lines
10 KiB
Python
# %% [markdown]
|
|
# Update `VISA_GEN`/`VISA_OSC` with your resource strings and `GEN_CH`/`OSC_CH` w/ channel numbers before running.
|
|
|
|
# %%
|
|
import pyvisa
|
|
import time
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import os
|
|
|
|
GEN_CH = 1 # generator chan to cal
|
|
OSC_CH = 1 # scope chan
|
|
|
|
# VISA resource strs
|
|
VISA_GEN = "USB0::6833::1606::DG8P1234567890::0::INSTR"
|
|
VISA_OSC = "USB0::62700::4119::SDS01234567890::0::INSTR"
|
|
|
|
# full bw freq points
|
|
FREQ_FULL = np.array(
|
|
[
|
|
200e3,
|
|
300e3,
|
|
400e3,
|
|
500e3,
|
|
600e3,
|
|
700e3,
|
|
800e3,
|
|
900e3,
|
|
1e6,
|
|
2e6,
|
|
4e6,
|
|
6e6,
|
|
8e6,
|
|
10e6,
|
|
20e6,
|
|
30e6,
|
|
40e6,
|
|
50e6,
|
|
60e6,
|
|
70e6,
|
|
80e6,
|
|
90e6,
|
|
100e6,
|
|
120e6,
|
|
150e6,
|
|
175e6,
|
|
200e6,
|
|
]
|
|
)
|
|
|
|
# high voltage range is limited to 100mhz apparently
|
|
FREQ_HIGH_VOLT = FREQ_FULL[:-4]
|
|
|
|
print(f"Full Freq Points: {len(FREQ_FULL)} (Max: {FREQ_FULL[-1] / 1e6} MHz)")
|
|
print(f"High Volt Points: {len(FREQ_HIGH_VOLT)} (Max: {FREQ_HIGH_VOLT[-1] / 1e6} MHz)")
|
|
|
|
|
|
class InstrumentController:
|
|
def __init__(self, gen_addr, osc_addr):
|
|
self.rm = pyvisa.ResourceManager()
|
|
try:
|
|
self.gen = self.rm.open_resource(gen_addr)
|
|
self.osc = self.rm.open_resource(osc_addr)
|
|
print(f"Connected to GEN: {self.gen.query('*IDN?').strip()}")
|
|
print(f"Connected to OSC: {self.osc.query('*IDN?').strip()}")
|
|
except Exception as e:
|
|
print(f"Connection Failed: {e}")
|
|
|
|
def prepare_gen(self, ch):
|
|
print(f"Resetting Generator CH{ch} state...")
|
|
self.gen.write(f":SOUR{ch}:FUNC SIN") # sine
|
|
self.gen.write(f":SOUR{ch}:VOLT:OFFS 0") # 0V offset
|
|
self.gen.write(f":OUTP{ch}:LOAD 50") # match 50R terminator
|
|
self.gen.write(f":OUTP{ch} ON") # enable output
|
|
|
|
def prepare_scope(self, ch):
|
|
print(f"Configuring Scope CH{ch}...")
|
|
self.osc.write(f"C{ch}:TRA ON") # trace On
|
|
self.osc.write(f"C{ch}:ATT 1") # probe 1X
|
|
self.osc.write(f"C{ch}:BWL FULL") # full bw
|
|
self.osc.write(f"C{ch}:TRIG_LEVEL 0")
|
|
|
|
# reset it just in case
|
|
self.osc.write(":ACQuire:TYPE NORMal")
|
|
time.sleep(0.1)
|
|
self.osc.write(":ACQuire:TYPE AVERage,16")
|
|
|
|
def measure_rms(self, ch, samples=5):
|
|
time.sleep(0.5)
|
|
vals = []
|
|
for _ in range(samples):
|
|
try:
|
|
resp = self.osc.query(f"C{ch}:PAVA? RMS")
|
|
# parse "C2:PAVA RMS,1.234V"
|
|
v = float(resp.split(",")[1].split("V")[0])
|
|
vals.append(v)
|
|
except (IndexError, ValueError):
|
|
pass
|
|
time.sleep(0.1)
|
|
|
|
return np.mean(vals) if vals else 0.0
|
|
|
|
def set_gen(self, ch, freq, vpp):
|
|
self.gen.write(f":SOUR{ch}:FREQ {freq}")
|
|
self.gen.write(f":SOUR{ch}:VOLT {vpp}")
|
|
|
|
|
|
instr = InstrumentController(VISA_GEN, VISA_OSC)
|
|
|
|
# %% [markdown]
|
|
# Runs the physical sweep. The generator uses three internal amplifier ranges (Mid, High, Low). We sweep each distinct range to characterize the rolloff.
|
|
#
|
|
# Data is saved to `cal_measurements_final.npz` after the sweep.
|
|
|
|
# %%
|
|
def run_calibration_sweeps(controller, gen_ch, osc_ch):
|
|
# Vpp used in factory calibration are 1: 12649111e-7 V, 2: 50000000e-7 V, 3: 6324556e-7 V
|
|
tasks = [
|
|
# range 1: mid voltage (~1.26V), full bw
|
|
{"name": "rng1", "vpp": 1.2649111, "freqs": FREQ_FULL},
|
|
# range 2: high voltage (~5.0V), 100MHz
|
|
{"name": "rng2", "vpp": 5.0000000, "freqs": FREQ_HIGH_VOLT},
|
|
# range 3: low voltage (~0.63V), full bw
|
|
{"name": "rng3", "vpp": 0.6324556, "freqs": FREQ_FULL},
|
|
]
|
|
|
|
data_store = {}
|
|
controller.prepare_gen(gen_ch)
|
|
controller.prepare_scope(osc_ch)
|
|
|
|
for task in tasks:
|
|
vpp = task["vpp"]
|
|
freqs = task["freqs"]
|
|
name = task["name"]
|
|
|
|
print(f"\n--- Sweeping {name}: {vpp} Vpp (Max Freq: {freqs[-1] / 1e6} MHz) ---")
|
|
|
|
controller.set_gen(gen_ch, freqs[0], vpp)
|
|
controller.osc.write(f"C{osc_ch}:VOLT_DIV {vpp / 6}")
|
|
time.sleep(2)
|
|
|
|
meas_vals = []
|
|
for f in freqs:
|
|
controller.set_gen(gen_ch, f, vpp)
|
|
|
|
# Adjust timebase dynamically
|
|
if f > 100e6:
|
|
tdiv = 2e-9
|
|
elif f > 10e6:
|
|
tdiv = 10e-9
|
|
elif f > 1e6:
|
|
tdiv = 100e-9
|
|
else:
|
|
tdiv = 5e-6
|
|
|
|
controller.osc.write(f"TIME_DIV {tdiv}")
|
|
time.sleep(0.8) # wait a bit for it to settle
|
|
|
|
rms = controller.measure_rms(osc_ch, samples=5)
|
|
meas_vals.append(rms)
|
|
|
|
print(f"Freq: {f / 1e6:5.1f} MHz | RMS: {rms:.4f}")
|
|
|
|
data_store[name] = np.array(meas_vals)
|
|
|
|
return data_store
|
|
|
|
|
|
# sweep and save
|
|
meas_data = run_calibration_sweeps(instr, GEN_CH, OSC_CH)
|
|
np.savez("cal_measurements_final.npz", **meas_data)
|
|
print("Sweep complete. Data saved to .npz file.")
|
|
|
|
# %% [markdown]
|
|
# Correction logic is:
|
|
# $$ \text{factor}_{new} = \text{factor}_{old} \times \frac{V_{ref}}{V_{meas}} $$
|
|
#
|
|
# Where $V_{ref}$ is the amplitude measured at the lowest frequency (200 kHz). Only the last 12 points (>25 MHz) are corrected here, low range should be correctly factory calibrated but feel free to increase the correction points.
|
|
|
|
# %%
|
|
def generate_final_hex(input_file, output_file, measurements):
|
|
if not os.path.exists(input_file):
|
|
print(f"Error: Input file '{input_file}' not found.")
|
|
return
|
|
|
|
with open(input_file, "rb") as f:
|
|
# 78 doubles (64-bit float)
|
|
# [Header(1)] + [Rng1(27)] + [Rng2(23)] + [Rng3(27)]
|
|
original_blob = np.fromfile(f, dtype="<f8", sep="")
|
|
|
|
# slice blob into ranges
|
|
sec_1 = original_blob[1:28].copy() # Mid Voltage
|
|
sec_2 = original_blob[28:51].copy() # High Voltage
|
|
sec_3 = original_blob[51:78].copy() # Low Voltage
|
|
|
|
# only modify > 25MHz tail, DG821 should be cal'ed up to 25MHz
|
|
POINTS_TO_CORRECT = 12
|
|
|
|
def apply_mod(original_arr, meas_arr):
|
|
"""Calculates new coefficients based on measured falloff."""
|
|
ref_amp = meas_arr[0] # reference amp is the lowest freq measurement
|
|
safe_meas = np.where(meas_arr < (ref_amp * 0.05), ref_amp, meas_arr)
|
|
correction_curve = ref_amp / safe_meas
|
|
|
|
out_arr = original_arr.copy()
|
|
start_idx = len(original_arr) - POINTS_TO_CORRECT
|
|
|
|
# apply correction factor to the tail
|
|
out_arr[start_idx:] = original_arr[start_idx:] * correction_curve[start_idx:]
|
|
return out_arr
|
|
|
|
# process all ranges
|
|
new_sec_1 = apply_mod(sec_1, measurements["rng1"])
|
|
new_sec_2 = apply_mod(sec_2, measurements["rng2"])
|
|
new_sec_3 = apply_mod(sec_3, measurements["rng3"])
|
|
|
|
# rebuild blob
|
|
final_blob = np.zeros_like(original_blob)
|
|
final_blob[0] = original_blob[0] # header
|
|
final_blob[1:28] = new_sec_1
|
|
final_blob[28:51] = new_sec_2
|
|
final_blob[51:78] = new_sec_3
|
|
|
|
with open(output_file, "wb") as f:
|
|
final_blob.tofile(f)
|
|
|
|
print(f"Generated corrected calibration file: {output_file}")
|
|
# plot
|
|
plot_results(sec_1, new_sec_1, sec_2, new_sec_2, sec_3, new_sec_3, measurements)
|
|
|
|
|
|
def plot_results(s1, n1, s2, n2, s3, n3, meas):
|
|
_fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(16, 12))
|
|
|
|
def plot_row(ax_row, freq, old_coeffs, new_coeffs, meas_data, title_prefix, vpp):
|
|
# coefficients
|
|
ax_row[0].plot(freq / 1e6, old_coeffs, label="Original")
|
|
ax_row[0].plot(freq / 1e6, new_coeffs, "--", label="Corrected")
|
|
ax_row[0].set_title(f"{title_prefix} Cal Factors")
|
|
ax_row[0].set_ylabel("Factor")
|
|
ax_row[0].legend()
|
|
ax_row[0].grid(True, alpha=0.3)
|
|
|
|
# measurements
|
|
ax_row[1].plot(freq / 1e6, meas_data, "r.-", label="Measured")
|
|
ax_row[1].set_title(f"{title_prefix} Raw Output ($V_{{pp}}={vpp}$)")
|
|
ax_row[1].set_ylabel("RMS (V)")
|
|
ax_row[1].grid(True, alpha=0.3)
|
|
|
|
plot_row(axes[0], FREQ_FULL, s1, n1, meas["rng1"], "Range 1 (Mid)", 1.26)
|
|
plot_row(axes[1], FREQ_HIGH_VOLT, s2, n2, meas["rng2"], "Range 2 (High)", 5.0)
|
|
plot_row(axes[2], FREQ_FULL, s3, n3, meas["rng3"], "Range 3 (Low)", 0.63)
|
|
|
|
axes[2, 0].set_xlabel("Frequency (MHz)")
|
|
axes[2, 1].set_xlabel("Frequency (MHz)")
|
|
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
|
|
# gen
|
|
input_hex = f"cal_hfflat{GEN_CH}.hex"
|
|
output_hex = f"cal_hfflat{GEN_CH}_mod.hex"
|
|
generate_final_hex(input_hex, output_hex, meas_data)
|
|
|
|
# %% [markdown]
|
|
# Push the generated `cal_hfflat{gen_ch}_mod.hex` to the device.
|
|
#
|
|
# I.e. `adb push cal_hfflat1_mod.hex /rigol/data/cal_hfflat1.hex`, reboot, then do the verification sweep if needed.
|
|
|
|
# %%
|
|
def check_final_flatness(controller, gen_ch, osc_ch):
|
|
print("=" * 32)
|
|
print(f"Verification sweep: Channel {gen_ch}")
|
|
print("=" * 32)
|
|
|
|
new_meas_data = run_calibration_sweeps(controller, gen_ch, osc_ch)
|
|
_fig, axes = plt.subplots(nrows=3, ncols=1, figsize=(10, 14))
|
|
|
|
plot_cfg = [
|
|
{"key": "rng1", "title": "Range 1 (Mid: 1.26V)", "freq": FREQ_FULL},
|
|
{"key": "rng2", "title": "Range 2 (High: 5.0V)", "freq": FREQ_HIGH_VOLT},
|
|
{"key": "rng3", "title": "Range 3 (Low: 0.63V)", "freq": FREQ_FULL},
|
|
]
|
|
|
|
for i, cfg in enumerate(plot_cfg):
|
|
key = cfg["key"]
|
|
freqs_mhz = cfg["freq"] / 1e6
|
|
vals = new_meas_data[key]
|
|
|
|
# flatness ratio = V_meas / V_ref (at 200kHz)
|
|
ref_val = vals[0]
|
|
normalized = vals / ref_val
|
|
|
|
# peak deviation
|
|
max_dev = np.max(np.abs(normalized - 1.0)) * 100
|
|
|
|
ax = axes[i]
|
|
ax.plot(freqs_mhz, normalized, "b.-", linewidth=2, label="Measured Response")
|
|
|
|
ax.axhline(
|
|
1.0, color="k", linestyle="-", alpha=0.8, linewidth=1, label="Target (1.0)"
|
|
)
|
|
ax.axhline(
|
|
1.01, color="g", linestyle="--", alpha=0.5, label=r"$\pm 1\%$ Tolerance"
|
|
)
|
|
ax.axhline(0.99, color="g", linestyle="--", alpha=0.5)
|
|
|
|
ax.set_title(f"{cfg['title']} - Max Deviation: {max_dev:.2f}%")
|
|
ax.set_ylabel("Normalized Gain ($V_{out} / V_{ref}$)")
|
|
ax.set_xlabel("Frequency (MHz)")
|
|
|
|
if max_dev < 5.0:
|
|
ax.set_ylim(0.95, 1.05)
|
|
|
|
ax.legend(loc="upper right")
|
|
ax.grid(True, which="both", alpha=0.3)
|
|
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
|
|
check_final_flatness(instr, GEN_CH, OSC_CH)
|
|
|
|
|