#!/usr/bin/env python3 ''' Convert itm.txt dumps to pcm (wave) files usage: itmdump -f /tmp/itm.txt > dump.txt itm-to-pcm.py dump.txt dump.wav ''' import wave import sys from math import pi class Fifo(object): def __init__(self, size): self._size = size self._buffer = [] def append(self, v): self._buffer.append(v) if len(self._buffer) > self._size: del self._buffer[0] def list(self): return self._buffer def stream_itm(file_in): #f = open(file_in, 'rb') f = open(file_in, 'rb') while True: samples = f.read(2) if len(samples) < 2: break #if len(set(samples)) != 1: continue # corrupted #yield samples[0] yield samples[0] + 0x100*samples[1] #for line in f: # try: # if line.startswith(b'ADC:'): # sample = int(line[len('ADC: '):].strip(), 16) # if sample < 0x1000: yield sample # except: # pass def med_filt_stream(stream, kernel=5): # n.b. introduces a delay of kernel/2 accum = Fifo(kernel) for f in stream: accum.append(f) yield median(accum.list()) def dc_filt_stream(stream, kernel=5000): accum = Fifo(kernel) for f in stream: accum.append(f) dc = sum(accum.list()) / len(accum.list()) yield f - dc def scale_stream(stream, gamma=1.2, makeup=1000): # map x -> scale*x^gamma, but maintain: # 0xffff = scale*0xffff^gamma scale = makeup * 0xffff / 0xffff**gamma for f in stream: if f > 0: yield scale * f**gamma else: yield -scale * (-f)**gamma def hpf(stream, f_c=0.2): # y[i] := α * (y[i-1] + x[i] - x[i-1]) alpha = 1 / (2*pi*f_c + 1) y_prev, x_prev = 0, 0 for x in stream: y = alpha * (y_prev + x - x_prev) yield y y_prev = y def frames_from_samples(itm): b = bytearray() for sample in itm: sample = round(sample) sample = (sample + 0x10000) % 0x10000 # counteract Python's signed mod badness lsb0 = sample % 0x100 lsb1 = (sample // 0x100) % 0x100 b.append(lsb0) b.append(lsb1) return b def median(samples): x = list(sorted(samples)) return x[len(x) // 2] def stream_to_wav(file_in, file_out): # broken itm_stream = stream_itm(file_in) #clean_stream = dc_filt_stream(med_filt_stream(itm_stream)) #clean_stream = dc_filt_stream(itm_stream) clean_stream = dc_filt_stream(itm_stream) #clean_stream = scale_stream(itm_stream) wave_out = wave.open(file_out, 'wb') wave_out.setnchannels(1) wave_out.setsampwidth(2) wave_out.setframerate(2300) wave_out.writeframes(frames_from_samples(list(clean_stream))) def stream_to_term(file_in): #stream = dc_filt_stream(med_filt_stream(stream_itm(file_in))) #stream = scale_stream(dc_filt_stream(stream_itm(file_in))) stream = scale_stream(hpf(dc_filt_stream(stream_itm(file_in)))) #stream = stream_itm(file_in) for f in stream: print(ansi_rgb(color_for_sample(f), '█'), end='') def ansi_rgb(color, text): r, g, b = color #print(color) return '\033[38;2;{r};{g};{b}m{text}\033[0m'.format(**locals()) def color_for_sample(sample): r, g, b = 0, 0, 0 if sample < 0: b = -sample / 0x100 else: r = sample / 0x100 if b > 0xff: g = min(0xff, b / 16) b = 0xff if r > 0xff: g = min(0xff, r / 16) r = 0xff assert r < 0x100, sample assert g < 0x100, sample assert b < 0x100, sample return int(r), int(g), int(b) def main(): if len(sys.argv) == 3: _self, file_in, file_out = sys.argv stream_to_wav(file_in, file_out) if len(sys.argv) == 1: stream_to_term('/dev/stdin') if __name__ == "__main__": main()