144 lines
3.8 KiB
Python
Executable File
144 lines
3.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
'''
|
||
Convert itm.txt dumps to pcm (wave) files
|
||
|
||
usage: itmdump -f /tmp/itm.txt > dump.txt
|
||
itm-to-pcm.py dump.txt dump.wav
|
||
'''
|
||
|
||
import wave
|
||
import sys
|
||
from math import pi
|
||
|
||
class Fifo(object):
|
||
def __init__(self, size):
|
||
self._size = size
|
||
self._buffer = []
|
||
|
||
def append(self, v):
|
||
self._buffer.append(v)
|
||
if len(self._buffer) > self._size:
|
||
del self._buffer[0]
|
||
|
||
def list(self):
|
||
return self._buffer
|
||
|
||
def stream_itm(file_in):
|
||
#f = open(file_in, 'rb')
|
||
f = open(file_in, 'rb')
|
||
while True:
|
||
samples = f.read(2)
|
||
if len(samples) < 2: break
|
||
#if len(set(samples)) != 1: continue # corrupted
|
||
#yield samples[0]
|
||
yield samples[0] + 0x100*samples[1]
|
||
#for line in f:
|
||
# try:
|
||
# if line.startswith(b'ADC:'):
|
||
# sample = int(line[len('ADC: '):].strip(), 16)
|
||
# if sample < 0x1000: yield sample
|
||
# except:
|
||
# pass
|
||
|
||
def med_filt_stream(stream, kernel=5):
|
||
# n.b. introduces a delay of kernel/2
|
||
accum = Fifo(kernel)
|
||
for f in stream:
|
||
accum.append(f)
|
||
yield median(accum.list())
|
||
|
||
def dc_filt_stream(stream, kernel=5000):
|
||
accum = Fifo(kernel)
|
||
for f in stream:
|
||
accum.append(f)
|
||
dc = sum(accum.list()) / len(accum.list())
|
||
yield f - dc
|
||
|
||
def scale_stream(stream, gamma=1.2, makeup=1000):
|
||
# map x -> scale*x^gamma, but maintain:
|
||
# 0xffff = scale*0xffff^gamma
|
||
scale = makeup * 0xffff / 0xffff**gamma
|
||
for f in stream:
|
||
if f > 0:
|
||
yield scale * f**gamma
|
||
else:
|
||
yield -scale * (-f)**gamma
|
||
|
||
def hpf(stream, f_c=0.2):
|
||
# y[i] := α * (y[i-1] + x[i] - x[i-1])
|
||
alpha = 1 / (2*pi*f_c + 1)
|
||
y_prev, x_prev = 0, 0
|
||
for x in stream:
|
||
y = alpha * (y_prev + x - x_prev)
|
||
yield y
|
||
y_prev = y
|
||
|
||
def frames_from_samples(itm):
|
||
b = bytearray()
|
||
for sample in itm:
|
||
sample = round(sample)
|
||
sample = (sample + 0x10000) % 0x10000 # counteract Python's signed mod badness
|
||
lsb0 = sample % 0x100
|
||
lsb1 = (sample // 0x100) % 0x100
|
||
b.append(lsb0)
|
||
b.append(lsb1)
|
||
|
||
return b
|
||
|
||
def median(samples):
|
||
x = list(sorted(samples))
|
||
return x[len(x) // 2]
|
||
|
||
def stream_to_wav(file_in, file_out):
|
||
# broken
|
||
itm_stream = stream_itm(file_in)
|
||
#clean_stream = dc_filt_stream(med_filt_stream(itm_stream))
|
||
#clean_stream = dc_filt_stream(itm_stream)
|
||
clean_stream = dc_filt_stream(itm_stream)
|
||
#clean_stream = scale_stream(itm_stream)
|
||
wave_out = wave.open(file_out, 'wb')
|
||
wave_out.setnchannels(1)
|
||
wave_out.setsampwidth(2)
|
||
wave_out.setframerate(2300)
|
||
wave_out.writeframes(frames_from_samples(list(clean_stream)))
|
||
|
||
def stream_to_term(file_in):
|
||
#stream = dc_filt_stream(med_filt_stream(stream_itm(file_in)))
|
||
#stream = scale_stream(dc_filt_stream(stream_itm(file_in)))
|
||
stream = scale_stream(hpf(dc_filt_stream(stream_itm(file_in))))
|
||
#stream = stream_itm(file_in)
|
||
for f in stream:
|
||
print(ansi_rgb(color_for_sample(f), '█'), end='')
|
||
|
||
def ansi_rgb(color, text):
|
||
r, g, b = color
|
||
#print(color)
|
||
return '\033[38;2;{r};{g};{b}m{text}\033[0m'.format(**locals())
|
||
|
||
def color_for_sample(sample):
|
||
r, g, b = 0, 0, 0
|
||
if sample < 0:
|
||
b = -sample / 0x100
|
||
else:
|
||
r = sample / 0x100
|
||
if b > 0xff:
|
||
g = min(0xff, b / 16)
|
||
b = 0xff
|
||
if r > 0xff:
|
||
g = min(0xff, r / 16)
|
||
r = 0xff
|
||
assert r < 0x100, sample
|
||
assert g < 0x100, sample
|
||
assert b < 0x100, sample
|
||
return int(r), int(g), int(b)
|
||
|
||
def main():
|
||
if len(sys.argv) == 3:
|
||
_self, file_in, file_out = sys.argv
|
||
stream_to_wav(file_in, file_out)
|
||
if len(sys.argv) == 1:
|
||
stream_to_term('/dev/stdin')
|
||
|
||
if __name__ == "__main__":
|
||
main()
|