Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import pyaudio 

2import logging 

3 

4from askbob.speech.listener.listener import UtteranceService 

5 

6 

7class MicUtteranceService(UtteranceService): 

8 

9 device_index: int 

10 pa: pyaudio.PyAudio 

11 stream: pyaudio.Stream 

12 

13 def __init__(self, aggressiveness: int = 1, device_index: int = None, input_rate: int = 16000, 

14 lowpass_frequency: int = 65, highpass_frequency: int = 4000): 

15 super().__init__(aggressiveness=aggressiveness, 

16 lowpass_frequency=lowpass_frequency, 

17 highpass_frequency=highpass_frequency) 

18 self.device_index = device_index 

19 self.input_rate = input_rate 

20 self.pa = pyaudio.PyAudio() 

21 self.stream = self._create_stream() 

22 

23 logging.info("Found input sound devices: " + '; '.join([ 

24 f"({i}) {self.pa.get_device_info_by_host_api_device_index(0, i).get('name')}" 

25 for i in range(0, self.pa.get_host_api_info_by_index(0)['deviceCount']) 

26 if self.pa.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels') > 0 

27 ])) 

28 

29 if device_index is not None: 

30 logging.info(f"Using input sound device index: {device_index}") 

31 else: 

32 logging.info( 

33 f"Using default input sound device index: {self.pa.get_host_api_info_by_index(0)['defaultInputDevice']}") 

34 

35 def _create_stream(self): 

36 """Creates a new audio stream from the microphone.""" 

37 

38 def callback(in_data, frame_count, time_info, status): 

39 self.buffer_queue.put(in_data) 

40 return (None, pyaudio.paContinue) 

41 

42 stream = self.pa.open( 

43 format=pyaudio.paInt16, 

44 channels=self.channels, 

45 rate=self.input_rate, 

46 input=True, 

47 frames_per_buffer=self.input_rate // self.blocks_per_second, 

48 stream_callback=callback, 

49 input_device_index=self.device_index if self.device_index else None 

50 ) 

51 stream.start_stream() 

52 

53 return stream 

54 

55 def _destroy(self) -> None: 

56 """Destroys the stream and terminates recording with PyAudio.""" 

57 self.stream.stop_stream() 

58 self.stream.close() 

59 self.pa.terminate()