Files
dg800pro_scripts/rigol_setup_crypter.py
2025-04-14 17:31:49 +06:00

494 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
# author: kuwoyuki <kuwoyuki.eee@cock.li>
# Rigol DG[89]00 Pro AWG setup.stp crypter
import argparse
import binascii
import csv
import json
import io
import sys
# Standard DES Tables (FIPS 46-3, 1-based indexing)
# fmt: off
IP_1based = [58,50,42,34,26,18,10,2,60,52,44,36,28,20,12,4,62,54,46,38,30,22,14,6,64,56,48,40,32,24,16,8,57,49,41,33,25,17,9,1,59,51,43,35,27,19,11,3,61,53,45,37,29,21,13,5,63,55,47,39,31,23,15,7]
FP_1based = [40,8,48,16,56,24,64,32,39,7,47,15,55,23,63,31,38,6,46,14,54,22,62,30,37,5,45,13,53,21,61,29,36,4,44,12,52,20,60,28,35,3,43,11,51,19,59,27,34,2,42,10,50,18,58,26,33,1,41,9,49,17,57,25]
E_1based = [32,1,2,3,4,5,4,5,6,7,8,9,8,9,10,11,12,13,12,13,14,15,16,17,16,17,18,19,20,21,20,21,22,23,24,25,24,25,26,27,28,29,28,29,30,31,32,1]
P_1based = [16,7,20,21,29,12,28,17,1,15,23,26,5,18,31,10,2,8,24,14,32,27,3,9,19,13,30,6,22,11,4,25]
PC1_1based = [57,49,41,33,25,17,9,1,58,50,42,34,26,18,10,2,59,51,43,35,27,19,11,3,60,52,44,36,63,55,47,39,31,23,15,7,62,54,46,38,30,22,14,6,61,53,45,37,29,21,13,5,28,20,12,4]
PC2_1based = [14,17,11,24,1,5,3,28,15,6,21,10,23,19,12,4,26,8,16,7,27,20,13,2,41,52,31,37,47,55,30,40,51,45,33,48,44,49,39,56,34,53,46,42,50,36,29,32]
S = [[14,4,13,1,2,15,11,8,3,10,6,12,5,9,0,7,0,15,7,4,14,2,13,1,10,6,12,11,9,5,3,8,4,1,14,8,13,6,2,11,15,12,9,7,3,10,5,0,15,12,8,2,4,9,1,7,5,11,3,14,10,0,6,13],[15,1,8,14,6,11,3,4,9,7,2,13,12,0,5,10,3,13,4,7,15,2,8,14,12,0,1,10,6,9,11,5,0,14,7,11,10,4,13,1,5,8,12,6,9,3,2,15,13,8,10,1,3,15,4,2,11,6,7,12,0,5,14,9],[10,0,9,14,6,3,15,5,1,13,12,7,11,4,2,8,13,7,0,9,3,4,6,10,2,8,5,14,12,11,15,1,13,6,4,9,8,15,3,0,11,1,2,12,5,10,14,7,1,10,13,0,6,9,8,7,4,15,14,3,11,5,2,12],[7,13,14,3,0,6,9,10,1,2,8,5,11,12,4,15,13,8,11,5,6,15,0,3,4,7,2,12,1,10,14,9,10,6,9,0,12,11,7,13,15,1,3,14,5,2,8,4,3,15,0,6,10,1,13,8,9,4,5,11,12,7,2,14],[2,12,4,1,7,10,11,6,8,5,3,15,13,0,14,9,14,11,2,12,4,7,13,1,5,0,15,10,3,9,8,6,4,2,1,11,10,13,7,8,15,9,12,5,6,3,0,14,11,8,12,7,1,14,2,13,6,15,0,9,10,4,5,3],[12,1,10,15,9,2,6,8,0,13,3,4,14,7,5,11,10,15,4,2,7,12,9,5,6,1,13,14,0,11,3,8,9,14,15,5,2,8,12,3,7,0,4,10,1,13,11,6,4,3,2,12,9,5,15,10,11,14,1,7,6,0,8,13],[4,11,2,14,15,0,8,13,3,12,9,7,5,10,6,1,13,0,11,7,4,9,1,10,14,3,5,12,2,15,8,6,1,4,11,13,12,3,7,14,10,15,6,8,0,5,9,2,6,11,13,8,1,4,10,7,9,5,0,15,14,2,3,12],[13,2,8,4,6,15,11,1,10,9,3,14,5,0,12,7,1,15,13,8,10,3,7,4,12,5,6,11,0,14,9,2,7,11,4,1,9,12,14,2,0,6,10,13,15,3,5,8,2,1,14,7,4,10,8,13,15,12,9,0,3,5,6,11]]
LEFT_SHIFTS = [1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1]
# fmt: on
def _bytes_to_bits(byte_data):
return [(byte >> i) & 1 for byte in byte_data for i in range(7, -1, -1)]
def _bits_to_bytes(bits):
byte_list = bytearray()
padded_len = (len(bits) + 7) // 8 * 8
bits = bits + [0] * (padded_len - len(bits))
for i in range(0, padded_len, 8):
byte_val = sum(bits[i + j] << (7 - j) for j in range(8))
byte_list.append(byte_val)
return bytes(byte_list)
def _permute(block, table):
return [block[i - 1] for i in table]
def _xor(bits_a, bits_b):
return [a ^ b for a, b in zip(bits_a, bits_b)]
def _rotate_left(bits, n):
n = n % len(bits)
return bits[n:] + bits[:n]
# key & IV derivation
def _generate_iv_bits():
"""Generate the 64-bit IV from 'Rigoler'"""
iv_str = "Rigoler" # 7 chars
iv_bits_49 = []
for char in iv_str:
char_val = ord(char)
char_7_bits_lsb = [(char_val >> i) & 1 for i in range(7)]
iv_bits_49.extend(char_7_bits_lsb[::-1]) # reverse for MSB-of-7 first
return iv_bits_49 + [0] * 15 # pad with 15 zeros to 64 bits
def _generate_round_keys(cpu_serial):
# key material: 48 zeros | 8 bits of char 0 | 8 zeros
effective_64_bits = [0] * 64
effective_64_bits[48:56] = [(ord(cpu_serial[0]) >> i) & 1 for i in range(7, -1, -1)]
key_56 = _permute(effective_64_bits, PC1_1based)
c, d = key_56[:28], key_56[28:]
round_keys = []
for i in range(16):
c, d = _rotate_left(c, LEFT_SHIFTS[i]), _rotate_left(d, LEFT_SHIFTS[i])
round_keys.append(_permute(c + d, PC2_1based))
return round_keys
# DES stuff
def _des_f_function(r_32, k_48):
"""DES Feistel function F(R, K)."""
xored = _xor(_permute(r_32, E_1based), k_48)
s_out_32 = []
for i in range(8):
sub = xored[i * 6 : (i + 1) * 6]
row = (sub[0] << 1) | sub[5]
col = (sub[1] << 3) | (sub[2] << 2) | (sub[3] << 1) | sub[4]
val = S[i][row * 16 + col]
s_out_32.extend([(val >> k) & 1 for k in range(3, -1, -1)])
return _permute(s_out_32, P_1based)
def _des_crypt_block(block_64bits, round_keys, is_encrypt):
"""DES encrypt/decrypt single 64-bit block"""
l, r = (
_permute(block_64bits, IP_1based)[:32],
_permute(block_64bits, IP_1based)[32:],
)
keys = round_keys if is_encrypt else round_keys[::-1]
for k in keys:
l, r = r, _xor(l, _des_f_function(r, k))
return _permute(r + l, FP_1based) # final swap included
# CBC mode and padding
def _pad(data_bytes):
"""Pad data with space chars (0x20) to multiple of 8 bytes"""
rem = len(data_bytes) % 8
if rem == 0:
# len already a multiple of 8, NO padding
return data_bytes
else:
# add padding to reach the next multiple of 8
pad_len = 8 - rem
return data_bytes + bytes([0x20] * pad_len)
def _unpad(padded_bytes):
"""Remove trailing space padding marked by first ' ' ???"""
marker_index = padded_bytes.find(b" ")
return padded_bytes[:marker_index] if marker_index != -1 else padded_bytes
def _des_cbc_crypt(data_bytes, iv_64bits, round_keys, is_encrypt):
"""DES-CBC encrypt/decrypt with padding"""
if is_encrypt:
data_bytes = _pad(data_bytes)
result_bytes = bytearray()
prev_block_bits = iv_64bits
for i in range(0, len(data_bytes), 8):
current_block_bits = _bytes_to_bits(data_bytes[i : i + 8])
if is_encrypt:
processed_bits = _des_crypt_block(
_xor(current_block_bits, prev_block_bits), round_keys, True
)
result_bytes.extend(_bits_to_bytes(processed_bits))
prev_block_bits = processed_bits
else: # decrypt
decrypted_bits = _des_crypt_block(current_block_bits, round_keys, False)
result_bytes.extend(_bits_to_bytes(_xor(decrypted_bits, prev_block_bits)))
prev_block_bits = current_block_bits
return _unpad(bytes(result_bytes)) if not is_encrypt else bytes(result_bytes)
def DataDecryption(strEncryptedDataHex, cpu_serial):
"""Decrypts hex string using DES-CBC"""
c_hex_str = strEncryptedDataHex.strip()
try:
encrypted_bytes = binascii.unhexlify(c_hex_str)
except binascii.Error as e:
raise ValueError(f"Invalid hex string provided (length {len(c_hex_str)} after stripping): {e}")
round_keys, iv_bits = _generate_round_keys(cpu_serial), _generate_iv_bits()
decrypted_bytes = _des_cbc_crypt(encrypted_bytes, iv_bits, round_keys, False)
try:
return decrypted_bytes.decode("utf-8")
except UnicodeDecodeError:
return decrypted_bytes # rawdog bytes
def DataEncryption(strPlainData, cpu_serial):
"""Encrypts string using DES-CBC. Returns uppercase hex"""
plaintext_bytes = strPlainData.encode("utf-8")
round_keys, iv_bits = _generate_round_keys(cpu_serial), _generate_iv_bits()
encrypted_bytes = _des_cbc_crypt(plaintext_bytes, iv_bits, round_keys, True)
return binascii.hexlify(encrypted_bytes).decode("ascii").upper()
def write_output(data, output_file=None, is_binary=False):
# w to file
if output_file:
mode = "wb" if is_binary else "w"
encoding = None if is_binary else "utf-8"
with open(output_file, mode, encoding=encoding) as f:
f.write(data)
print(f"Output successfully written to: {output_file}")
return
# w to stdout
if is_binary:
print(data.decode("utf-8"))
else:
print(data)
def parse_rigol_csv(csv_string):
FIELD_NAMES = [
"Manufacturer",
"InstrModel",
"InstrSN",
"CalibrationDate",
"SineMaxFreq",
"SquareMaxFreq",
"RampMaxFreq",
"PulseMaxFreq",
"ArbMaxFreq",
"HarmonicMaxFreq",
"MinFreq",
"HarmonicMinFreq",
"ARMSerial",
"MaxChannels",
"ArbWaveLenLicense",
"ArbWaveLenValidTime",
"DuoChanChannelValidTime",
]
FREQ_FIELDS = [
"SineMaxFreq",
"SquareMaxFreq",
"RampMaxFreq",
"PulseMaxFreq",
"ArbMaxFreq",
"HarmonicMaxFreq",
"MinFreq",
"HarmonicMinFreq",
]
UHZ_TO_MHZ_FACTOR = 1e-12 # 1 / 1_000_000_000_000
ARB_LICENSE_MAP = {
"DEF": "Default",
"MEM": "Memory Depth License",
"CHD": "Dual Channel License",
}
TIME_STATUS_MAP = {"Forever": "Active (No Expiration)", "Inactive": "Not Active"}
TIME_FIELDS = ["ArbWaveLenValidTime", "DuoChanChannelValidTime"]
if not csv_string.strip():
return None
print("\nCSV header:")
print(",".join(FIELD_NAMES))
print(csv_string)
# parse CSV row
reader = csv.reader(io.StringIO(csv_string))
row = next(reader, None)
if not row:
return None
data = dict(zip(FIELD_NAMES, row))
# convert frequency fields
for field in FREQ_FIELDS:
if field in data and data[field].isdigit():
uhz_value = int(data[field])
data[f"{field}_MHz"] = uhz_value * UHZ_TO_MHZ_FACTOR # pyright: ignore [reportArgumentType]
# map license code
if "ArbWaveLenLicense" in data and data["ArbWaveLenLicense"] in ARB_LICENSE_MAP:
data["ArbWaveLenLicense_Description"] = ARB_LICENSE_MAP[
data["ArbWaveLenLicense"]
]
# map time fields
for field in TIME_FIELDS:
if field in data and data[field] in TIME_STATUS_MAP:
data[f"{field}_Status"] = TIME_STATUS_MAP[data[field]]
return data
def run_parse_mode(args, input_data):
parsed_result = parse_rigol_csv(input_data.strip())
if not parsed_result:
print("Error: Failed to parse CSV data.")
sys.exit(1)
if args.output_file:
json_output = json.dumps(parsed_result, indent=2)
with open(args.output_file, "w") as f:
f.write(json_output)
print(f"Output written to: {args.output_file}")
return
sections = [
(
"Instrument Information",
[
("Manufacturer", "Manufacturer"),
("Model", "InstrModel"),
("Serial Number", "InstrSN"),
("Calibration Date", "CalibrationDate"),
("ARM CPU Serial", "ARMSerial"),
("Max Channels", "MaxChannels"),
],
),
(
"Frequency Specifications",
[
("Sine Max Frequency", "SineMaxFreq"),
("Square Max Frequency", "SquareMaxFreq"),
("Ramp Max Frequency", "RampMaxFreq"),
("Pulse Max Frequency", "PulseMaxFreq"),
("Arb Max Frequency", "ArbMaxFreq"),
("Harmonic Max Frequency", "HarmonicMaxFreq"),
("Min Frequency", "MinFreq"),
("Harmonic Min Frequency", "HarmonicMinFreq"),
],
),
(
"Licensing Information",
[
("Arb Wavelength License", "ArbWaveLenLicense"),
("Arb Wavelength Valid Time", "ArbWaveLenValidTime"),
("Duo Channel Valid Time", "DuoChanChannelValidTime"),
],
),
]
for section_title, fields in sections:
print(f"\n{section_title}:")
section_has_data = False
for label, key in fields:
if key not in parsed_result:
continue
section_has_data = True
value = parsed_result[key]
output_line = f" {label}: {value}"
# MHz conv if available
mhz_key = f"{key}_MHz"
if mhz_key in parsed_result:
mhz_value = parsed_result[mhz_key]
if isinstance(mhz_value, (int, float)):
mhz_str = (
f"{mhz_value:.6f}"
if abs(mhz_value) > 1e-3 or mhz_value == 0
else f"{mhz_value:.6e}"
)
output_line += f" ({mhz_str} MHz)"
desc_key = f"{key}_Description"
status_key = f"{key}_Status"
if desc_key in parsed_result:
output_line += f" ({parsed_result[desc_key]})"
elif status_key in parsed_result:
output_line += f" ({parsed_result[status_key]})"
print(output_line)
if not section_has_data:
print(" (No data)")
def write_output(data, output_file=None, is_binary=False):
# w to file
if output_file:
mode = "wb" if is_binary else "w"
encoding = None if is_binary else "utf-8"
with open(output_file, mode, encoding=encoding) as f:
f.write(data)
print(f"Output successfully written to: {output_file}")
return
# w to stdout
if is_binary:
print(data.decode("utf-8"))
else:
print(data)
def main():
parser = argparse.ArgumentParser(
description="Encrypt or Decrypt Rigol DG[89]00 Pro setup.stp using the CPU SN.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""Examples:
Encrypt text to console:
python %(prog)s -e -t "RIGOL TECHNOLOGIES,DG821,..." "d95ebe35672e20c2fc08"
Decrypt text to console:
python %(prog)s -d -t "FBC6BDDB0E..." "d95ebe35672e20c2fc08"
Parse CSV (decrypted setup.stp) to console:
python %(prog)s -p -t "RIGOL TECHNOLOGIES,DG821,..."
Encrypt from file to output file:
python %(prog)s -e -f input.txt "d95ebe35672e20c2fc08" -o setup.stp
Decrypt from file to output file:
python %(prog)s -d -f encrypted.hex "d95ebe35672e20c2fc08" -o setup.stp.hex
""",
)
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument(
"-e",
"--encrypt",
action="store_const",
dest="mode",
const="e",
help="Encrypt plain text data",
)
mode_group.add_argument(
"-d",
"--decrypt",
action="store_const",
dest="mode",
const="d",
help="Decrypt hex string data",
)
mode_group.add_argument(
"-p",
"--parse",
action="store_const",
dest="mode",
const="p",
help="Parse Rigol CSV data string",
)
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument(
"-t",
"--text",
dest="input_text",
help="Input data as text (Plain for Encrypt/Parse, Hex for Decrypt)",
)
input_group.add_argument(
"-f", "--file", dest="input_file", help="Path to read input data from file"
)
parser.add_argument(
"-k",
"--key",
dest="cpu_serial",
help="CPU Serial Number (Required for -e/-d modes)",
)
parser.add_argument(
"-o",
"--output",
metavar="FILEPATH",
dest="output_file",
help="Optional path to save output. Console if omitted",
)
args = parser.parse_args()
if args.mode in ["e", "d"] and not args.cpu_serial:
parser.error(
"-k/--key (CPU serial) is required for encryption and decryption modes"
)
# read input data from arg or file
if args.input_file:
with open(args.input_file, "r", encoding="utf-8") as f:
input_data = f.read()
else:
input_data = args.input_text
if args.mode == "e":
# encrypt
print("Mode: Encrypt")
if args.input_file:
print(f"Reading input from file: {args.input_file}")
print(f"Input Data: '{input_data[:50]}{'...' if len(input_data) > 50 else ''}'")
print(f"CPU Serial: {args.cpu_serial}")
print("-" * 20)
encrypted_result_hex = DataEncryption(input_data, args.cpu_serial)
print("Encryption Complete.")
if not args.output_file:
print("\nEncrypted Hex Output:")
# always a hex str
write_output(encrypted_result_hex, args.output_file, is_binary=False)
elif args.mode == "d":
# decrypt
print("Mode: Decrypt")
if args.input_file:
print(f"Reading input from file: {args.input_file}")
print(
f"Input Hex Data: '{input_data[:50]}{'...' if len(input_data) > 50 else ''}'"
)
print(f"CPU Serial: {args.cpu_serial}")
print("-" * 20)
decrypted_result = DataDecryption(input_data, args.cpu_serial)
print("Decryption Complete.")
if not args.output_file:
print("\nDecrypted Data Output:")
is_binary_output = isinstance(decrypted_result, bytes)
write_output(decrypted_result, args.output_file, is_binary=is_binary_output)
elif args.mode == "p":
run_parse_mode(args, input_data)
if __name__ == "__main__":
main()