Add a waveform streamer
This commit is contained in:
@@ -9,11 +9,25 @@ itm-to-pcm.py dump.txt dump.wav
|
|||||||
import wave
|
import wave
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
def read_itm(file_in):
|
class Fifo(object):
|
||||||
|
def __init__(self, size):
|
||||||
|
self._size = size
|
||||||
|
self._buffer = []
|
||||||
|
|
||||||
|
def append(self, v):
|
||||||
|
self._buffer.append(v)
|
||||||
|
if len(self._buffer) > self._size:
|
||||||
|
del self._buffer[0]
|
||||||
|
|
||||||
|
def list(self):
|
||||||
|
return self._buffer
|
||||||
|
|
||||||
|
def stream_itm(file_in):
|
||||||
|
#f = open(file_in, 'rb')
|
||||||
f = open(file_in, 'rb')
|
f = open(file_in, 'rb')
|
||||||
while True:
|
while True:
|
||||||
samples = f.read(2)
|
samples = f.read(2)
|
||||||
if not samples: break
|
if len(samples) < 2: break
|
||||||
#if len(set(samples)) != 1: continue # corrupted
|
#if len(set(samples)) != 1: continue # corrupted
|
||||||
#yield samples[0]
|
#yield samples[0]
|
||||||
yield samples[0] + 0x100*samples[1]
|
yield samples[0] + 0x100*samples[1]
|
||||||
@@ -25,6 +39,20 @@ def read_itm(file_in):
|
|||||||
# except:
|
# except:
|
||||||
# pass
|
# pass
|
||||||
|
|
||||||
|
def med_filt_stream(stream, kernel=5):
|
||||||
|
# n.b. introduces a delay of kernel/2
|
||||||
|
accum = Fifo(kernel)
|
||||||
|
for f in stream:
|
||||||
|
accum.append(f)
|
||||||
|
yield median(accum.list())
|
||||||
|
|
||||||
|
def dc_filt_stream(stream, kernel=2):
|
||||||
|
accum = Fifo(kernel)
|
||||||
|
for f in stream:
|
||||||
|
accum.append(f)
|
||||||
|
dc = sum(accum.list()) / len(accum.list())
|
||||||
|
yield f - dc
|
||||||
|
|
||||||
def frames_from_samples(itm):
|
def frames_from_samples(itm):
|
||||||
b = bytearray()
|
b = bytearray()
|
||||||
for sample in itm:
|
for sample in itm:
|
||||||
@@ -36,35 +64,44 @@ def frames_from_samples(itm):
|
|||||||
|
|
||||||
return b
|
return b
|
||||||
|
|
||||||
def clean_frames(frames):
|
|
||||||
# Run a median filter over the frames to clean outliers:
|
|
||||||
post_med = [0]*len(frames)
|
|
||||||
for i, _ in enumerate(frames):
|
|
||||||
left = max(0, i-2)
|
|
||||||
right = min(len(frames), i+3)
|
|
||||||
post_med[i] = median(frames[left:right])
|
|
||||||
# Remove DC component
|
|
||||||
dc = sum(post_med) / len(post_med)
|
|
||||||
post_dc = [f - dc for f in post_med]
|
|
||||||
# Amplify to [-0x7fff, 0x7fff]
|
|
||||||
prev_max = max(abs(f) for f in post_dc)
|
|
||||||
amp_factor = 0x7fff / prev_max
|
|
||||||
post_amp = [f*amp_factor for f in post_dc]
|
|
||||||
# Round to int
|
|
||||||
return [int(round(f)) for f in post_amp]
|
|
||||||
|
|
||||||
def median(samples):
|
def median(samples):
|
||||||
x = list(sorted(samples))
|
x = list(sorted(samples))
|
||||||
return x[len(x) // 2]
|
return x[len(x) // 2]
|
||||||
|
|
||||||
def main():
|
def stream_to_wav(file_in, file_out):
|
||||||
_self, file_in, file_out = sys.argv
|
# broken
|
||||||
itm = clean_frames(list(read_itm(file_in)))
|
itm_stream = stream_itm(file_in)
|
||||||
|
clean_stream = dc_filt_stream(med_filt_stream(itm_stream))
|
||||||
wave_out = wave.open(file_out, 'wb')
|
wave_out = wave.open(file_out, 'wb')
|
||||||
wave_out.setnchannels(1)
|
wave_out.setnchannels(1)
|
||||||
wave_out.setsampwidth(2)
|
wave_out.setsampwidth(2)
|
||||||
wave_out.setframerate(1000)
|
wave_out.setframerate(1000)
|
||||||
wave_out.writeframes(frames_from_samples(itm))
|
wave_out.writeframes(frames_from_samples(list(clean_stream)))
|
||||||
|
|
||||||
|
def stream_to_term(file_in):
|
||||||
|
stream = dc_filt_stream(med_filt_stream(stream_itm(file_in)))
|
||||||
|
for f in stream:
|
||||||
|
print(ansi_rgb(color_for_sample(f), '█'), end='')
|
||||||
|
|
||||||
|
def ansi_rgb(color, text):
|
||||||
|
r, g, b = color
|
||||||
|
return '\033[38;2;{r};{g};{b}m{text}\033[0m'.format(**locals())
|
||||||
|
|
||||||
|
def color_for_sample(sample):
|
||||||
|
r, g, b = 0, 0, 0
|
||||||
|
if sample < 0:
|
||||||
|
b = int(-sample)
|
||||||
|
else:
|
||||||
|
r = int(sample)
|
||||||
|
#return (128, 0, 0)
|
||||||
|
return r, g, b
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) == 3:
|
||||||
|
_self, file_in, file_out = sys.argv
|
||||||
|
stream_to_wav(file_in, file_out)
|
||||||
|
if len(sys.argv) == 1:
|
||||||
|
stream_to_term('/dev/stdin')
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
Reference in New Issue
Block a user