piezo-ddr-controller/scripts/itm-to-pcm.py

144 lines
3.8 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
'''
Convert itm.txt dumps to pcm (wave) files
usage: itmdump -f /tmp/itm.txt > dump.txt
itm-to-pcm.py dump.txt dump.wav
'''
import wave
import sys
from math import pi
class Fifo(object):
def __init__(self, size):
self._size = size
self._buffer = []
def append(self, v):
self._buffer.append(v)
if len(self._buffer) > self._size:
del self._buffer[0]
def list(self):
return self._buffer
def stream_itm(file_in):
#f = open(file_in, 'rb')
f = open(file_in, 'rb')
while True:
samples = f.read(2)
if len(samples) < 2: break
#if len(set(samples)) != 1: continue # corrupted
#yield samples[0]
yield samples[0] + 0x100*samples[1]
#for line in f:
# try:
# if line.startswith(b'ADC:'):
# sample = int(line[len('ADC: '):].strip(), 16)
# if sample < 0x1000: yield sample
# except:
# pass
def med_filt_stream(stream, kernel=5):
# n.b. introduces a delay of kernel/2
accum = Fifo(kernel)
for f in stream:
accum.append(f)
yield median(accum.list())
def dc_filt_stream(stream, kernel=5000):
accum = Fifo(kernel)
for f in stream:
accum.append(f)
dc = sum(accum.list()) / len(accum.list())
yield f - dc
def scale_stream(stream, gamma=1.2, makeup=1000):
# map x -> scale*x^gamma, but maintain:
# 0xffff = scale*0xffff^gamma
scale = makeup * 0xffff / 0xffff**gamma
for f in stream:
if f > 0:
yield scale * f**gamma
else:
yield -scale * (-f)**gamma
def hpf(stream, f_c=0.2):
# y[i] := α * (y[i-1] + x[i] - x[i-1])
alpha = 1 / (2*pi*f_c + 1)
y_prev, x_prev = 0, 0
for x in stream:
y = alpha * (y_prev + x - x_prev)
yield y
y_prev = y
def frames_from_samples(itm):
b = bytearray()
for sample in itm:
sample = round(sample)
sample = (sample + 0x10000) % 0x10000 # counteract Python's signed mod badness
lsb0 = sample % 0x100
lsb1 = (sample // 0x100) % 0x100
b.append(lsb0)
b.append(lsb1)
return b
def median(samples):
x = list(sorted(samples))
return x[len(x) // 2]
def stream_to_wav(file_in, file_out):
# broken
itm_stream = stream_itm(file_in)
#clean_stream = dc_filt_stream(med_filt_stream(itm_stream))
#clean_stream = dc_filt_stream(itm_stream)
clean_stream = dc_filt_stream(itm_stream)
#clean_stream = scale_stream(itm_stream)
wave_out = wave.open(file_out, 'wb')
wave_out.setnchannels(1)
wave_out.setsampwidth(2)
wave_out.setframerate(2300)
wave_out.writeframes(frames_from_samples(list(clean_stream)))
def stream_to_term(file_in):
#stream = dc_filt_stream(med_filt_stream(stream_itm(file_in)))
#stream = scale_stream(dc_filt_stream(stream_itm(file_in)))
stream = scale_stream(hpf(dc_filt_stream(stream_itm(file_in))))
#stream = stream_itm(file_in)
for f in stream:
print(ansi_rgb(color_for_sample(f), ''), end='')
def ansi_rgb(color, text):
r, g, b = color
#print(color)
return '\033[38;2;{r};{g};{b}m{text}\033[0m'.format(**locals())
def color_for_sample(sample):
r, g, b = 0, 0, 0
if sample < 0:
b = -sample / 0x100
else:
r = sample / 0x100
if b > 0xff:
g = min(0xff, b / 16)
b = 0xff
if r > 0xff:
g = min(0xff, r / 16)
r = 0xff
assert r < 0x100, sample
assert g < 0x100, sample
assert b < 0x100, sample
return int(r), int(g), int(b)
def main():
if len(sys.argv) == 3:
_self, file_in, file_out = sys.argv
stream_to_wav(file_in, file_out)
if len(sys.argv) == 1:
stream_to_term('/dev/stdin')
if __name__ == "__main__":
main()