494 lines
17 KiB
Python
Executable File
494 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# author: kuwoyuki <kuwoyuki.eee@cock.li>
|
|
# Rigol DG[89]00 Pro AWG setup.stp crypter
|
|
import argparse
|
|
import binascii
|
|
import csv
|
|
import json
|
|
import io
|
|
import sys
|
|
|
|
# Standard DES Tables (FIPS 46-3, 1-based indexing)
|
|
# fmt: off
|
|
IP_1based = [58,50,42,34,26,18,10,2,60,52,44,36,28,20,12,4,62,54,46,38,30,22,14,6,64,56,48,40,32,24,16,8,57,49,41,33,25,17,9,1,59,51,43,35,27,19,11,3,61,53,45,37,29,21,13,5,63,55,47,39,31,23,15,7]
|
|
FP_1based = [40,8,48,16,56,24,64,32,39,7,47,15,55,23,63,31,38,6,46,14,54,22,62,30,37,5,45,13,53,21,61,29,36,4,44,12,52,20,60,28,35,3,43,11,51,19,59,27,34,2,42,10,50,18,58,26,33,1,41,9,49,17,57,25]
|
|
E_1based = [32,1,2,3,4,5,4,5,6,7,8,9,8,9,10,11,12,13,12,13,14,15,16,17,16,17,18,19,20,21,20,21,22,23,24,25,24,25,26,27,28,29,28,29,30,31,32,1]
|
|
P_1based = [16,7,20,21,29,12,28,17,1,15,23,26,5,18,31,10,2,8,24,14,32,27,3,9,19,13,30,6,22,11,4,25]
|
|
PC1_1based = [57,49,41,33,25,17,9,1,58,50,42,34,26,18,10,2,59,51,43,35,27,19,11,3,60,52,44,36,63,55,47,39,31,23,15,7,62,54,46,38,30,22,14,6,61,53,45,37,29,21,13,5,28,20,12,4]
|
|
PC2_1based = [14,17,11,24,1,5,3,28,15,6,21,10,23,19,12,4,26,8,16,7,27,20,13,2,41,52,31,37,47,55,30,40,51,45,33,48,44,49,39,56,34,53,46,42,50,36,29,32]
|
|
S = [[14,4,13,1,2,15,11,8,3,10,6,12,5,9,0,7,0,15,7,4,14,2,13,1,10,6,12,11,9,5,3,8,4,1,14,8,13,6,2,11,15,12,9,7,3,10,5,0,15,12,8,2,4,9,1,7,5,11,3,14,10,0,6,13],[15,1,8,14,6,11,3,4,9,7,2,13,12,0,5,10,3,13,4,7,15,2,8,14,12,0,1,10,6,9,11,5,0,14,7,11,10,4,13,1,5,8,12,6,9,3,2,15,13,8,10,1,3,15,4,2,11,6,7,12,0,5,14,9],[10,0,9,14,6,3,15,5,1,13,12,7,11,4,2,8,13,7,0,9,3,4,6,10,2,8,5,14,12,11,15,1,13,6,4,9,8,15,3,0,11,1,2,12,5,10,14,7,1,10,13,0,6,9,8,7,4,15,14,3,11,5,2,12],[7,13,14,3,0,6,9,10,1,2,8,5,11,12,4,15,13,8,11,5,6,15,0,3,4,7,2,12,1,10,14,9,10,6,9,0,12,11,7,13,15,1,3,14,5,2,8,4,3,15,0,6,10,1,13,8,9,4,5,11,12,7,2,14],[2,12,4,1,7,10,11,6,8,5,3,15,13,0,14,9,14,11,2,12,4,7,13,1,5,0,15,10,3,9,8,6,4,2,1,11,10,13,7,8,15,9,12,5,6,3,0,14,11,8,12,7,1,14,2,13,6,15,0,9,10,4,5,3],[12,1,10,15,9,2,6,8,0,13,3,4,14,7,5,11,10,15,4,2,7,12,9,5,6,1,13,14,0,11,3,8,9,14,15,5,2,8,12,3,7,0,4,10,1,13,11,6,4,3,2,12,9,5,15,10,11,14,1,7,6,0,8,13],[4,11,2,14,15,0,8,13,3,12,9,7,5,10,6,1,13,0,11,7,4,9,1,10,14,3,5,12,2,15,8,6,1,4,11,13,12,3,7,14,10,15,6,8,0,5,9,2,6,11,13,8,1,4,10,7,9,5,0,15,14,2,3,12],[13,2,8,4,6,15,11,1,10,9,3,14,5,0,12,7,1,15,13,8,10,3,7,4,12,5,6,11,0,14,9,2,7,11,4,1,9,12,14,2,0,6,10,13,15,3,5,8,2,1,14,7,4,10,8,13,15,12,9,0,3,5,6,11]]
|
|
LEFT_SHIFTS = [1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1]
|
|
# fmt: on
|
|
|
|
|
|
def _bytes_to_bits(byte_data):
|
|
return [(byte >> i) & 1 for byte in byte_data for i in range(7, -1, -1)]
|
|
|
|
|
|
def _bits_to_bytes(bits):
|
|
byte_list = bytearray()
|
|
padded_len = (len(bits) + 7) // 8 * 8
|
|
bits = bits + [0] * (padded_len - len(bits))
|
|
for i in range(0, padded_len, 8):
|
|
byte_val = sum(bits[i + j] << (7 - j) for j in range(8))
|
|
byte_list.append(byte_val)
|
|
return bytes(byte_list)
|
|
|
|
|
|
def _permute(block, table):
|
|
return [block[i - 1] for i in table]
|
|
|
|
|
|
def _xor(bits_a, bits_b):
|
|
return [a ^ b for a, b in zip(bits_a, bits_b)]
|
|
|
|
|
|
def _rotate_left(bits, n):
|
|
n = n % len(bits)
|
|
return bits[n:] + bits[:n]
|
|
|
|
|
|
# key & IV derivation
|
|
def _generate_iv_bits():
|
|
"""Generate the 64-bit IV from 'Rigoler'"""
|
|
iv_str = "Rigoler" # 7 chars
|
|
iv_bits_49 = []
|
|
for char in iv_str:
|
|
char_val = ord(char)
|
|
char_7_bits_lsb = [(char_val >> i) & 1 for i in range(7)]
|
|
iv_bits_49.extend(char_7_bits_lsb[::-1]) # reverse for MSB-of-7 first
|
|
return iv_bits_49 + [0] * 15 # pad with 15 zeros to 64 bits
|
|
|
|
|
|
def _generate_round_keys(cpu_serial):
|
|
# key material: 48 zeros | 8 bits of char 0 | 8 zeros
|
|
effective_64_bits = [0] * 64
|
|
effective_64_bits[48:56] = [(ord(cpu_serial[0]) >> i) & 1 for i in range(7, -1, -1)]
|
|
|
|
key_56 = _permute(effective_64_bits, PC1_1based)
|
|
c, d = key_56[:28], key_56[28:]
|
|
round_keys = []
|
|
for i in range(16):
|
|
c, d = _rotate_left(c, LEFT_SHIFTS[i]), _rotate_left(d, LEFT_SHIFTS[i])
|
|
round_keys.append(_permute(c + d, PC2_1based))
|
|
return round_keys
|
|
|
|
|
|
# DES stuff
|
|
def _des_f_function(r_32, k_48):
|
|
"""DES Feistel function F(R, K)."""
|
|
xored = _xor(_permute(r_32, E_1based), k_48)
|
|
s_out_32 = []
|
|
for i in range(8):
|
|
sub = xored[i * 6 : (i + 1) * 6]
|
|
row = (sub[0] << 1) | sub[5]
|
|
col = (sub[1] << 3) | (sub[2] << 2) | (sub[3] << 1) | sub[4]
|
|
val = S[i][row * 16 + col]
|
|
s_out_32.extend([(val >> k) & 1 for k in range(3, -1, -1)])
|
|
return _permute(s_out_32, P_1based)
|
|
|
|
|
|
def _des_crypt_block(block_64bits, round_keys, is_encrypt):
|
|
"""DES encrypt/decrypt single 64-bit block"""
|
|
l, r = (
|
|
_permute(block_64bits, IP_1based)[:32],
|
|
_permute(block_64bits, IP_1based)[32:],
|
|
)
|
|
keys = round_keys if is_encrypt else round_keys[::-1]
|
|
for k in keys:
|
|
l, r = r, _xor(l, _des_f_function(r, k))
|
|
return _permute(r + l, FP_1based) # final swap included
|
|
|
|
|
|
# CBC mode and padding
|
|
def _pad(data_bytes):
|
|
"""Pad data with space chars (0x20) to multiple of 8 bytes"""
|
|
rem = len(data_bytes) % 8
|
|
if rem == 0:
|
|
# len already a multiple of 8, NO padding
|
|
return data_bytes
|
|
else:
|
|
# add padding to reach the next multiple of 8
|
|
pad_len = 8 - rem
|
|
return data_bytes + bytes([0x20] * pad_len)
|
|
|
|
|
|
def _unpad(padded_bytes):
|
|
"""Remove trailing space padding marked by first ' ' ???"""
|
|
marker_index = padded_bytes.find(b" ")
|
|
return padded_bytes[:marker_index] if marker_index != -1 else padded_bytes
|
|
|
|
|
|
def _des_cbc_crypt(data_bytes, iv_64bits, round_keys, is_encrypt):
|
|
"""DES-CBC encrypt/decrypt with padding"""
|
|
if is_encrypt:
|
|
data_bytes = _pad(data_bytes)
|
|
result_bytes = bytearray()
|
|
prev_block_bits = iv_64bits
|
|
|
|
for i in range(0, len(data_bytes), 8):
|
|
current_block_bits = _bytes_to_bits(data_bytes[i : i + 8])
|
|
if is_encrypt:
|
|
processed_bits = _des_crypt_block(
|
|
_xor(current_block_bits, prev_block_bits), round_keys, True
|
|
)
|
|
result_bytes.extend(_bits_to_bytes(processed_bits))
|
|
prev_block_bits = processed_bits
|
|
else: # decrypt
|
|
decrypted_bits = _des_crypt_block(current_block_bits, round_keys, False)
|
|
result_bytes.extend(_bits_to_bytes(_xor(decrypted_bits, prev_block_bits)))
|
|
prev_block_bits = current_block_bits
|
|
return _unpad(bytes(result_bytes)) if not is_encrypt else bytes(result_bytes)
|
|
|
|
|
|
def DataDecryption(strEncryptedDataHex, cpu_serial):
|
|
"""Decrypts hex string using DES-CBC"""
|
|
c_hex_str = strEncryptedDataHex.strip()
|
|
|
|
try:
|
|
encrypted_bytes = binascii.unhexlify(c_hex_str)
|
|
except binascii.Error as e:
|
|
raise ValueError(f"Invalid hex string provided (length {len(c_hex_str)} after stripping): {e}")
|
|
round_keys, iv_bits = _generate_round_keys(cpu_serial), _generate_iv_bits()
|
|
decrypted_bytes = _des_cbc_crypt(encrypted_bytes, iv_bits, round_keys, False)
|
|
try:
|
|
return decrypted_bytes.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
return decrypted_bytes # rawdog bytes
|
|
|
|
|
|
def DataEncryption(strPlainData, cpu_serial):
|
|
"""Encrypts string using DES-CBC. Returns uppercase hex"""
|
|
plaintext_bytes = strPlainData.encode("utf-8")
|
|
round_keys, iv_bits = _generate_round_keys(cpu_serial), _generate_iv_bits()
|
|
encrypted_bytes = _des_cbc_crypt(plaintext_bytes, iv_bits, round_keys, True)
|
|
return binascii.hexlify(encrypted_bytes).decode("ascii").upper()
|
|
|
|
|
|
def write_output(data, output_file=None, is_binary=False):
|
|
# w to file
|
|
if output_file:
|
|
mode = "wb" if is_binary else "w"
|
|
encoding = None if is_binary else "utf-8"
|
|
with open(output_file, mode, encoding=encoding) as f:
|
|
f.write(data)
|
|
print(f"Output successfully written to: {output_file}")
|
|
return
|
|
|
|
# w to stdout
|
|
if is_binary:
|
|
print(data.decode("utf-8"))
|
|
else:
|
|
print(data)
|
|
|
|
|
|
def parse_rigol_csv(csv_string):
|
|
FIELD_NAMES = [
|
|
"Manufacturer",
|
|
"InstrModel",
|
|
"InstrSN",
|
|
"CalibrationDate",
|
|
"SineMaxFreq",
|
|
"SquareMaxFreq",
|
|
"RampMaxFreq",
|
|
"PulseMaxFreq",
|
|
"ArbMaxFreq",
|
|
"HarmonicMaxFreq",
|
|
"MinFreq",
|
|
"HarmonicMinFreq",
|
|
"ARMSerial",
|
|
"MaxChannels",
|
|
"ArbWaveLenLicense",
|
|
"ArbWaveLenValidTime",
|
|
"DuoChanChannelValidTime",
|
|
]
|
|
FREQ_FIELDS = [
|
|
"SineMaxFreq",
|
|
"SquareMaxFreq",
|
|
"RampMaxFreq",
|
|
"PulseMaxFreq",
|
|
"ArbMaxFreq",
|
|
"HarmonicMaxFreq",
|
|
"MinFreq",
|
|
"HarmonicMinFreq",
|
|
]
|
|
UHZ_TO_MHZ_FACTOR = 1e-12 # 1 / 1_000_000_000_000
|
|
ARB_LICENSE_MAP = {
|
|
"DEF": "Default",
|
|
"MEM": "Memory Depth License",
|
|
"CHD": "Dual Channel License",
|
|
}
|
|
TIME_STATUS_MAP = {"Forever": "Active (No Expiration)", "Inactive": "Not Active"}
|
|
TIME_FIELDS = ["ArbWaveLenValidTime", "DuoChanChannelValidTime"]
|
|
|
|
if not csv_string.strip():
|
|
return None
|
|
|
|
print("\nCSV header:")
|
|
print(",".join(FIELD_NAMES))
|
|
print(csv_string)
|
|
|
|
# parse CSV row
|
|
reader = csv.reader(io.StringIO(csv_string))
|
|
row = next(reader, None)
|
|
if not row:
|
|
return None
|
|
|
|
data = dict(zip(FIELD_NAMES, row))
|
|
|
|
# convert frequency fields
|
|
for field in FREQ_FIELDS:
|
|
if field in data and data[field].isdigit():
|
|
uhz_value = int(data[field])
|
|
data[f"{field}_MHz"] = uhz_value * UHZ_TO_MHZ_FACTOR # pyright: ignore [reportArgumentType]
|
|
|
|
# map license code
|
|
if "ArbWaveLenLicense" in data and data["ArbWaveLenLicense"] in ARB_LICENSE_MAP:
|
|
data["ArbWaveLenLicense_Description"] = ARB_LICENSE_MAP[
|
|
data["ArbWaveLenLicense"]
|
|
]
|
|
|
|
# map time fields
|
|
for field in TIME_FIELDS:
|
|
if field in data and data[field] in TIME_STATUS_MAP:
|
|
data[f"{field}_Status"] = TIME_STATUS_MAP[data[field]]
|
|
|
|
return data
|
|
|
|
|
|
def run_parse_mode(args, input_data):
|
|
parsed_result = parse_rigol_csv(input_data.strip())
|
|
if not parsed_result:
|
|
print("Error: Failed to parse CSV data.")
|
|
sys.exit(1)
|
|
|
|
if args.output_file:
|
|
json_output = json.dumps(parsed_result, indent=2)
|
|
with open(args.output_file, "w") as f:
|
|
f.write(json_output)
|
|
print(f"Output written to: {args.output_file}")
|
|
return
|
|
|
|
sections = [
|
|
(
|
|
"Instrument Information",
|
|
[
|
|
("Manufacturer", "Manufacturer"),
|
|
("Model", "InstrModel"),
|
|
("Serial Number", "InstrSN"),
|
|
("Calibration Date", "CalibrationDate"),
|
|
("ARM CPU Serial", "ARMSerial"),
|
|
("Max Channels", "MaxChannels"),
|
|
],
|
|
),
|
|
(
|
|
"Frequency Specifications",
|
|
[
|
|
("Sine Max Frequency", "SineMaxFreq"),
|
|
("Square Max Frequency", "SquareMaxFreq"),
|
|
("Ramp Max Frequency", "RampMaxFreq"),
|
|
("Pulse Max Frequency", "PulseMaxFreq"),
|
|
("Arb Max Frequency", "ArbMaxFreq"),
|
|
("Harmonic Max Frequency", "HarmonicMaxFreq"),
|
|
("Min Frequency", "MinFreq"),
|
|
("Harmonic Min Frequency", "HarmonicMinFreq"),
|
|
],
|
|
),
|
|
(
|
|
"Licensing Information",
|
|
[
|
|
("Arb Wavelength License", "ArbWaveLenLicense"),
|
|
("Arb Wavelength Valid Time", "ArbWaveLenValidTime"),
|
|
("Duo Channel Valid Time", "DuoChanChannelValidTime"),
|
|
],
|
|
),
|
|
]
|
|
|
|
for section_title, fields in sections:
|
|
print(f"\n{section_title}:")
|
|
section_has_data = False
|
|
|
|
for label, key in fields:
|
|
if key not in parsed_result:
|
|
continue
|
|
|
|
section_has_data = True
|
|
value = parsed_result[key]
|
|
output_line = f" {label}: {value}"
|
|
|
|
# MHz conv if available
|
|
mhz_key = f"{key}_MHz"
|
|
if mhz_key in parsed_result:
|
|
mhz_value = parsed_result[mhz_key]
|
|
if isinstance(mhz_value, (int, float)):
|
|
mhz_str = (
|
|
f"{mhz_value:.6f}"
|
|
if abs(mhz_value) > 1e-3 or mhz_value == 0
|
|
else f"{mhz_value:.6e}"
|
|
)
|
|
output_line += f" ({mhz_str} MHz)"
|
|
|
|
desc_key = f"{key}_Description"
|
|
status_key = f"{key}_Status"
|
|
if desc_key in parsed_result:
|
|
output_line += f" ({parsed_result[desc_key]})"
|
|
elif status_key in parsed_result:
|
|
output_line += f" ({parsed_result[status_key]})"
|
|
|
|
print(output_line)
|
|
|
|
if not section_has_data:
|
|
print(" (No data)")
|
|
|
|
|
|
def write_output(data, output_file=None, is_binary=False):
|
|
# w to file
|
|
if output_file:
|
|
mode = "wb" if is_binary else "w"
|
|
encoding = None if is_binary else "utf-8"
|
|
with open(output_file, mode, encoding=encoding) as f:
|
|
f.write(data)
|
|
print(f"Output successfully written to: {output_file}")
|
|
return
|
|
|
|
# w to stdout
|
|
if is_binary:
|
|
print(data.decode("utf-8"))
|
|
else:
|
|
print(data)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Encrypt or Decrypt Rigol DG[89]00 Pro setup.stp using the CPU SN.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""Examples:
|
|
Encrypt text to console:
|
|
python %(prog)s -e -t "RIGOL TECHNOLOGIES,DG821,..." "d95ebe35672e20c2fc08"
|
|
|
|
Decrypt text to console:
|
|
python %(prog)s -d -t "FBC6BDDB0E..." "d95ebe35672e20c2fc08"
|
|
|
|
Parse CSV (decrypted setup.stp) to console:
|
|
python %(prog)s -p -t "RIGOL TECHNOLOGIES,DG821,..."
|
|
|
|
Encrypt from file to output file:
|
|
python %(prog)s -e -f input.txt "d95ebe35672e20c2fc08" -o setup.stp
|
|
|
|
Decrypt from file to output file:
|
|
python %(prog)s -d -f encrypted.hex "d95ebe35672e20c2fc08" -o setup.stp.hex
|
|
""",
|
|
)
|
|
|
|
mode_group = parser.add_mutually_exclusive_group(required=True)
|
|
mode_group.add_argument(
|
|
"-e",
|
|
"--encrypt",
|
|
action="store_const",
|
|
dest="mode",
|
|
const="e",
|
|
help="Encrypt plain text data",
|
|
)
|
|
mode_group.add_argument(
|
|
"-d",
|
|
"--decrypt",
|
|
action="store_const",
|
|
dest="mode",
|
|
const="d",
|
|
help="Decrypt hex string data",
|
|
)
|
|
mode_group.add_argument(
|
|
"-p",
|
|
"--parse",
|
|
action="store_const",
|
|
dest="mode",
|
|
const="p",
|
|
help="Parse Rigol CSV data string",
|
|
)
|
|
|
|
input_group = parser.add_mutually_exclusive_group(required=True)
|
|
input_group.add_argument(
|
|
"-t",
|
|
"--text",
|
|
dest="input_text",
|
|
help="Input data as text (Plain for Encrypt/Parse, Hex for Decrypt)",
|
|
)
|
|
input_group.add_argument(
|
|
"-f", "--file", dest="input_file", help="Path to read input data from file"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-k",
|
|
"--key",
|
|
dest="cpu_serial",
|
|
help="CPU Serial Number (Required for -e/-d modes)",
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
metavar="FILEPATH",
|
|
dest="output_file",
|
|
help="Optional path to save output. Console if omitted",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.mode in ["e", "d"] and not args.cpu_serial:
|
|
parser.error(
|
|
"-k/--key (CPU serial) is required for encryption and decryption modes"
|
|
)
|
|
|
|
# read input data from arg or file
|
|
if args.input_file:
|
|
with open(args.input_file, "r", encoding="utf-8") as f:
|
|
input_data = f.read()
|
|
else:
|
|
input_data = args.input_text
|
|
|
|
if args.mode == "e":
|
|
# encrypt
|
|
print("Mode: Encrypt")
|
|
|
|
if args.input_file:
|
|
print(f"Reading input from file: {args.input_file}")
|
|
|
|
print(f"Input Data: '{input_data[:50]}{'...' if len(input_data) > 50 else ''}'")
|
|
print(f"CPU Serial: {args.cpu_serial}")
|
|
print("-" * 20)
|
|
|
|
encrypted_result_hex = DataEncryption(input_data, args.cpu_serial)
|
|
|
|
print("Encryption Complete.")
|
|
if not args.output_file:
|
|
print("\nEncrypted Hex Output:")
|
|
# always a hex str
|
|
write_output(encrypted_result_hex, args.output_file, is_binary=False)
|
|
|
|
elif args.mode == "d":
|
|
# decrypt
|
|
print("Mode: Decrypt")
|
|
|
|
if args.input_file:
|
|
print(f"Reading input from file: {args.input_file}")
|
|
|
|
print(
|
|
f"Input Hex Data: '{input_data[:50]}{'...' if len(input_data) > 50 else ''}'"
|
|
)
|
|
print(f"CPU Serial: {args.cpu_serial}")
|
|
print("-" * 20)
|
|
|
|
decrypted_result = DataDecryption(input_data, args.cpu_serial)
|
|
|
|
print("Decryption Complete.")
|
|
if not args.output_file:
|
|
print("\nDecrypted Data Output:")
|
|
|
|
is_binary_output = isinstance(decrypted_result, bytes)
|
|
write_output(decrypted_result, args.output_file, is_binary=is_binary_output)
|
|
elif args.mode == "p":
|
|
run_parse_mode(args, input_data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|