时间:2020-11-08 17:59
作为一名新晋奶爸和程序员,我在新身份中最常思考的问题就是“照料婴儿的工作真的无法自动化吗?”当然,这也许能够实现,就算有给孩子换尿布的机器人(假设有足够多的父母同意在自己蹒跚学步的孩子身上测试这样的设备),愿意自动化照料婴儿的父母还真为数不多。作为父亲,我首先意识到的事情是:婴儿很多时候都会在哭,即...
作为一名新晋奶爸和程序员,我在新身份中最常思考的问题就是 “照料婴儿的工作真的无法自动化吗?”
当然,这也许能够实现,就算有给孩子换尿布的机器人(假设有足够多的父母同意在自己蹒跚学步的孩子身上测试这样的设备),愿意自动化照料婴儿的父母还真为数不多。
作为父亲,我首先意识到的事情是:婴儿很多时候都会在哭,即使我在家,也不可能总是能听到孩子的哭声。
通常,商用婴儿监视器可以填补这一空白,它们充当对讲机,让你在另一个房间也能听到婴儿的哭声。
但我很快意识到:商用婴儿监视器没有我想象中的理想设备智能:
它们只能充当一个传声筒:把声音从源头带到扬声器,却无法发现孩子哭声的含义;
当家长要去到另一个房间里时,相应要把扬声器带到另一个房间,无法在任何其他现有的音频设备上播放声音;
扬声器通常是低功率扬声器,无法连接到外部扬声器 - 这意味着,如果我在另一个房间播放音乐,我可能会听不到孩子的哭声,即便监控器和我在同一个房间也无法听到;
大多数扬声器都是在低功率无线电波上工作的,这意味着如果婴儿在他 / 她的房间里,而你必须走到楼下,它们才能工作。
因此,我萌生了自制一个更好用的 “智能婴儿监视器”的想法。
说干就干,我先给这个 “智能婴儿监视器”定义了一些需要的功能。
它可以运行于价廉物美的树莓派(RaspberryPI)与 USB 麦克风。
当孩子开始 / 停止哭泣时,它应该检测到孩子的哭声,并通知我(理想情况下是在我的手机上),或者跟踪我仪表板上的数据点,或者运行相应的任务。它不应该是一个单纯的对讲器,简单地将声音从一个源传递到另一个兼容的设备。
它能够在扬声器,智能手机,电脑等设备上传输音频。
它不受源和扬声器之间距离的影响,无需在整个房子里将扬声器移来移去。
它还应该有一个摄像头,可以利用摄像头对孩子实时监控,当他一开始哭,我便可以抓拍到图片或婴儿床的短视频,以检查有什么不对劲。
来看看一个新晋奶爸如何使用工程师的大脑和开源工具来完成这项任务吧。
采集音频样本首先,购买一块树莓派(RaspberryPi),在 SD 卡上烧录好 Linux 操作系统(建议使用 RaspberryPI3 或更高版本),运行 Tensorflow 模型。还可以购买一个与树莓派兼容的 USB 麦克风。
然后安装需要的相关项:
[sudo] apt-get install ffmpeg lame libatlas-base-dev alsa-utils[sudo] pip3 install tensorflow第一步,必须记录足够的音频样本,婴儿在什么时候哭,在什么时候不哭。稍后将利用这些样本来训练音频检测模型。
注意:在这个例子中,我将展示如何利用声音检测来识别婴儿的哭声,同样的精准程序可以用来检测任何其它类型的声音 - 只要它们足够长 (例如:警报或邻居家的钻孔声)。
首先,查看音频输入设备:
arecord -l在树莓派(RaspberryPI)上,得到以下输出 (注意,有两个 USB 麦克风):
**** List of CAPTURE Hardware Devices ****card 1: Device [USB PnP Sound Device], device 0: USB Audio [USB Audio] Subdevices: 0/1 Subdevice #0: subdevice #0card 2: Device_1 [USB PnP Sound Device], device 0: USB Audio [USB Audio] Subdevices: 0/1 Subdevice #0: subdevice #0我利用第二个麦克风来记录声音 - 即卡 2,设备 0。识别它的 ALSA 方法要么是 hw:2,0(直接访问硬件设备),要么是 plughw:2,0(如果需要的话,它会输入采样率和格式转换插件)。确保 SD 卡上有足够的空间,然后开始录制一些音频:
arecord -D plughw:2,0 -c 1 -f cd | lame - audio.mp3和孩子在同一个房间里,记录几分钟或几个小时的音频 - 最好是长时间的沉默、婴儿哭声和其他与之无关的声音 -,录音完成后按 Ctrl-C。尽可能多的重复这个过程多次,在一天中的不同时刻或不同的日子里获取不同的音频样本。
标注音频示例一旦有了足够的音频样本,就可以把它们复制到电脑上来训练模型了 - 可以使用 SCP 复制文件,也可以直接从 SD 卡上复制。
把它们都存储在相同目录下,例如:~/datasets/sound-detect/audio。另外,为每个示例音频文件创建一个新文件夹,它包含一个音频文件 (名为 audio.mp3)和一个标注文件 (名为 labels.json),利用它来标记音频文件中的负 / 正音频段,原始数据集的结构如下:
~/datasets/sound-detect/audio下面:标注录制的音频文件 - 如果它包含了孩子几个小时的哭声,可能会特别受虐。在你最喜欢的音频播放器或 Audacity 中打开每个数据集音频文件,并在每个示例目录中创建一个新的 label.json 文件。确定哭泣开始的确切时间和结束时间,并在 labels.json 中标注为 time_string -> label 的关键值结构。例:
{ "00:00": "negative", "02:13": "positive", "04:57": "negative", "15:41": "positive", "18:24": "negative" }在上面的例子中,00:00 到 02:12 之间的所有音频段将被标记为负,02:13 到 04:56 之间的所有音频段将被标记为正,以此类推。
对所有的音频示例标注完成之后,接下来是生成数据集,最后将它输入到 Tensorflow 模型中去。首先,创建了一个名为 micmon 的通用库和一组用于声音监视的实用工具。然后,开始安装:
git clone git@github.com:/BlackLight/micmon.gitcd micmon[sudo] pip3 install -r requirements.txt[sudo] python3 setup.py build install本模型设计基于音频的频率样本而非原始音频,因为,在这里我们想检测到一个特定的声音,这个声音有着特定的 “频谱”标签,即:基频(或基频下降的窄带范围)和一组特定的谐波。这些谐波频率与基波之间的比率既不受振幅的影响(频率比恒定,与输入幅度无关),也不受相位的影响 (无论何时开始记录,连续的声音都会有相同的频谱特征)。
这种与振幅和相位无关的特性使得这种方法更有可能训练出一个鲁棒的声音检测模型,而不是简单地将原始音频样本馈送到模型中。此外,该模型可以更简单(可以在不影响性能的情况下将多个频率分为一组,从而可以有效地实现降维),无论样本持续时间多长,该模型将 50~ 100 个频带作为输入值,一秒钟的原始音频通常包含 44100 个数据点,并且输入的长度随着样本的持续时间而增加,并且不太容易发生过拟合。
micmon 能计算音频样本某些段的 FFT(快速傅里叶变换),将结果频谱分为低通和高通滤波器的频带,并将结果保存到一组 numpy 压缩 (.npz)文件中。可以通过在命令行上执行 micmon-datagen 命令来实现:
micmon-datagen \ --low 250 --high 2500 --bins 100 \ --sample-duration 2 --channels 1 \ ~/datasets/sound-detect/audio ~/datasets/sound-detect/data在上面的示例中,我们从存储在~/dataset/sound-detect/audio 下的原始音频样本生成一个数据集,并将生成的频谱数据存储到~/datasets/sound-detect/data. –low 和~/datasets/sound-detect/data. --high 中,low 和 high 分别表示最低和最高频率,最低频率的默认值为 20Hz(人耳可闻的最低频率),最高频率的默认值为 20kHz(健康的年轻人耳可闻的最高频率)。
通过对此范围做出限定,尽可能多地捕获希望检测到的其他类型的音频背景和无关谐波的声音。在本案例中,250-2500 赫兹的范围足以检测婴儿的哭声。
婴儿的哭声通常是高频的(歌剧女高音能达到的最高音符在 1000 赫兹左右),在这里设置了至少双倍的最高频率,以确保能获得足够高的谐波 (谐波是更高的频率),但也不要将最高频率设得太高,以防止其他背景声音的谐波。我剪切掉了频率低于 250 赫兹的音频信号 - 婴儿的哭声不太可能发生在低频段,例如,可以打开一些 positive 音频样本,利用均衡器 / 频谱分析仪,检查哪些频率在 positive 样本中占主导地位,并将数据集集中在这些频率上。--bins 指定了频率空间的组数(默认值:100),更大的数值意味着更高的频率分辨率 / 粒度,但如果太高,可能会使模型容易发生过度拟合。
脚本将原始音频分割成较小的段,并计算每个段的频谱标签。示例持续时间指定每个音频段有多长时间(默认:2 秒)。对于持续时间较长的声音,取更大的值会起到更好的作用,但它同时会减少检测的时间,而且可能会在短音上失效。对于持续时间较短的声音,可以取较低的值,但捕获的片段可能没有足够的信息量来可靠地识别声音。
除了 micmon-datagen 脚本之外,也可以利用 micmonAPI,编写脚本来生成数据集。例:
import osfrom micmon.audio import AudioDirectory, AudioPlayer, AudioFilefrom micmon.dataset import DatasetWriterbasedir = os.path.expanduser('~/datasets/sound-detect')audio_dir = os.path.join(basedir, 'audio')datasets_dir = os.path.join(basedir, 'data')cutoff_frequencies = [250, 2500]# Scan the base audio_dir for labelled audio samplesaudio_dirs = AudioDirectory.scan(audio_dir)# Save the spectrum information and labels of the samples to a# different compressed file for each audio file.for audio_dir in audio_dirs: dataset_file = os.path.join(datasets_dir, os.path.basename(audio_dir.path) + '.npz') print(f'Processing audio sample {audio_dir.path}') with AudioFile(audio_dir) as reader, \ DatasetWriter(dataset_file, low_freq=cutoff_frequencies[0], high_freq=cutoff_frequencies[1]) as writer: for sample in reader: writer += sample无论是使用 micmon-datagen 还是使用 micmon Python API 生成数据集,在过程结束时,应该在~/datasets/sound-detect/data 目录下找到一堆 . npz 文件,每个标注后的音频原始文件对应一个数据集。之后,便可以利用这个数据集来训练神经网络进行声音检测。
训练模型micmon 利用 Tensorflow+Keras 来定义和训练模型,有了 PythonAPI,可以很容易地实现。例如:
import osfrom tensorflow.keras import layersfrom micmon.dataset import Datasetfrom micmon.model import Model# This is a directory that contains the saved .npz dataset filesdatasets_dir = os.path.expanduser('~/datasets/sound-detect/data')# This is the output directory where the model will be savedmodel_dir = os.path.expanduser('~/models/sound-detect')# This is the number of training epochs for each dataset sampleepochs = 2# Load the datasets from the compressed files.# 70% of the data points will be included in the training set,# 30% of the data points will be included in the evaluation set# and used to evaluate the performance of the model.datasets =Dataset.scan(datasets_dir, validation_split=0.3)labels = ['negative', 'positive']freq_bins = len(datasets[0].samples[0])# Create a network with 4 layers (one input layer, two intermediate layers and one output layer).# The first intermediate layer in this example will have twice the number of units as the number# of input units, while the second intermediate layer will have 75% of the number of# input units. We also specify the names for the labels and the low and high frequency range# used when sampling.model = Model( [ layers.Input(shape=(freq_bins,)), layers.Dense(int(2 * freq_bins), activation='relu'), layers.Dense(int(0.75 * freq_bins), activation='relu'), layers.Dense(len(labels), activation='softmax'), ], labels=labels, low_freq=datasets[0].low_freq, high_freq=datasets[0].high_freq)# Train the modelfor epoch in range(epochs): for i, dataset in enumerate(datasets): print(f'[epoch {epoch+1}/{epochs}] [audio sample {i+1}/{len(datasets)}]') model.fit(dataset) evaluation = model.evaluate(dataset) print(f'Validation set loss and accuracy: {evaluation}') # Save the modelmodel.save(model_dir, overwrite=True)运行此脚本后(在对模型的准确性感到满意后),可以在~/models/sound-detect 目录下找保存的新模型。在我的这个例子中,我采集~ 5 小时的声音就足够用了,通过定义一个较优的频率范围来训练模型,准确率大于 98%。如果是在计算机上训练模型,只需将其复制到 RaspberryPI,便可以准备进入下一步了。
利用模型进行预测这时候,制作一个脚本:利用以前训练过的模型,当孩子开始哭的时候,通知我们:
import osfrom micmon.audio import AudioDevicefrom micmon.model import Modelmodel_dir = os.path.expanduser('~/models/sound-detect')model = Model.load(model_dir)audio_system = 'alsa' # Supported: alsa and pulseaudio_device = 'plughw:2,0' # Get list of recognized input devices with arecord -lwith AudioDevice(audio_system, device=audio_device) as source: for sample in source: source.pause() # Pause recording while we process the frame prediction = model.predict(sample) print(prediction) source.resume() # Resume recording在 RaspberryPI 上运行脚本,并让它运行一段时间 - 如果在过去 2 秒内没有检测到哭声,它将在标准输出中打印 negative,如果在过去 2 秒内检测到哭声否,则在标准输出中打印 positive。
然而,如果孩子哭了,简单地将消息打印到标准输出中并没有太大作用 - 我们希望得到明确实时通知!
可以利用 Platypush 来实现这个功能。在本例中,我们将使用 pushbullet 集成在检测到 cry 时向我们的手机发送消息。接下来安装 Redis(Platypush 用于接收消息)和 Platypush,利用 HTTP 和 Pushbullet 来集成:
[sudo] apt-get install redis-server[sudo] systemctl start redis-server.service[sudo] systemctl enable redis-server.service[sudo] pip3 install 'platypush[http,pushbullet]'将 Pushbullet 应用程序安装在智能手机上,到 pushbullet.com 上以获取 API token。然后创建一个~/.config/platypush/config.yaml 文件,该文件启用 HTTP 和 Pushbullet 集成:
backend.http: enabled: Truepushbullet: token: YOUR_TOKEN接下来,对前面的脚本进行修改,不让它将消息打印到标准输出,而是触发一个可以被 Platypush hook 捕获的自定义事件 CustomEvent:
#!/usr/bin/python3import argparseimport loggingimport osimport sysfrom platypushimport RedisBusfrom platypush.message.event.custom import CustomEventfrom micmon.audio import AudioDevicefrom micmon.model import Modellogger = logging.getLogger('micmon')def get_args(): parser = argparse.ArgumentParser() parser.add_argument('model_path', help='Path to the file/directory containing the saved Tensorflow model') parser.add_argument('-i', help='Input sound device (e.g. hw:0,1 or default)', required=True, dest='sound_device') parser.add_argument('-e', help='Name of the event that should be raised when a positive event occurs', required=True, dest='event_type') parser.add_argument('-s', '--sound-server', help='Sound server to be used (available: alsa, pulse)', required=False, default='alsa', dest='sound_server') parser.add_argument('-P', '--positive-label', help='Model output label name/index to indicate a positive sample (default: positive)', required=False, default='positive', dest='positive_label') parser.add_argument('-N', '--negative-label', help='Model output label name/index to indicate a negative sample (default: negative)', required=False, default='negative', dest='negative_label') parser.add_argument('-l', '--sample-duration', help='Length of the FFT audio samples (default: 2 seconds)', required=False, type=float, default=2., dest='sample_duration') parser.add_argument('-r', '--sample-rate', help='Sample rate (default: 44100 Hz)', required=False, type=int, default=44100, dest='sample_rate') parser.add_argument('-c', '--channels', help='Number of audio recording channels (default: 1)', required=False, type=int, default=1, dest='channels') parser.add_argument('-f', '--ffmpeg-bin', help='FFmpeg executable path (default: ffmpeg)', required=False, default='ffmpeg', dest='ffmpeg_bin') parser.add_argument('-v', '--verbose', help='Verbose/debug mode', required=False, action='store_true', dest='debug') parser.add_argument('-w', '--window-duration', help='Duration of the look-back window (default: 10 seconds)', required=False, type=float, default=10., dest='window_length') parser.add_argument('-n', '--positive-samples', help='Number of positive samples detected over the window duration to trigger the event (default: 1)', required=False, type=int, default=1, dest='positive_samples') opts, args = parser.parse_known_args(sys.argv[1:]) return opts def main(): args = get_args() if args.debug: logger.setLevel(logging.DEBUG)model_dir = os.path.abspath(os.path.expanduser(args.model_path))model = Model.load(model_dir)window = []cur_prediction = args.negative_labelbus = RedisBus()with AudioDevice(system=args.sound_server, device=args.sound_device, sample_duration=args.sample_duration, sample_rate=args.sample_rate, channels=args.channels, ffmpeg_bin=args.ffmpeg_bin, debug=args.debug) as source: for sample in source: source.pause() # Pause recording while we process the frame prediction = model.predict(sample) logger.debug(f'Sample prediction: {prediction}') has_change = False if len(window) < args.window_length: window += [prediction] else: window = window[1:] + [prediction]positive_samples = len([pred for pred in window if pred == args.positive_label])if args.positive_samples >= positive_samples and \ prediction == args.positive_label and \ cur_prediction != args.positive_label: cur_prediction = args.positive_label has_change = True logging.info(f'Positive sample threshold detected ({positive_samples}/{len(window)})') elif args.positive_samples > positive_samples and \ prediction == args.negative_label and \ cur_prediction != args.negative_label: cur_prediction = args.negative_label has_change = True logging.info(f'Negative sample threshold detected ({len(window)-positive_samples}/{len(window)})')if has_change: evt = CustomEvent(subtype=args.event_type, state=prediction) bus.post(evt) source.resume() # Resume recordingif __name__ == '__main__': main()将上面的脚本保存为~/bin/micmon_detect.py。如果在滑动窗口时间内上检测到 positive_samples 样本(为了减少预测错误或临时故障引起的噪声),则脚本触发事件,并且它只会在当前预测从 negative 到 positive 的情况下触发事件。然后,它被分派给 Platypush。对于其它不同的声音模型(不一定是哭泣婴儿),该脚本也是通用的,对应其它正 / 负标签、其它频率范围和其它类型的输出事件,这个脚本也能工作。
创建一个 Platypush hook 来对事件作出响应,并向设备发送通知。首先,创建 Platypush 脚本目录:
mkdir -p ~/.config/platypush/scriptscd ~/.config/platypush/scripts# Define the directory as a moduletouch __init__.py# Create a script for the baby-cry eventsvi babymonitor.pybabymonitor.py 的内容为:
from platypush.context import get_pluginfrom platypush.event.hook import hookfrom platypush.message.event.custom import CustomEvent@hook(CustomEvent, subtype='baby-cry', state='positive')def on_baby_cry_start(event, **_): pb = get_plugin('pushbullet') pb.send_note(title='Baby cry status', body='The baby is crying!') @hook(CustomEvent, subtype='baby-cry', state='negative')def on_baby_cry_stop(event, **_): pb = get_plugin('pushbullet') pb.send_note(title='Baby cry status', body='The baby stopped crying - good job!')为 Platypush 创建一个服务文件,并启动 / 启用服务,这样它就会在终端上启动:
mkdir -p ~/.config/systemd/userwget -O ~/.config/systemd/user/platypush.service \ https://raw.githubusercontent.com/BlackLight/platypush/master/examples/systemd/platypush.service systemctl --user start platypush.service systemctl --user enable platypush.service为婴儿监视器创建一个服务文件 - 如:
~/.config/systemd/user/babymonitor.service:
[Unit]Description=Monitor to detect my baby's criesAfter=network.target sound.target[Service]ExecStart=/home/pi/bin/micmon_detect.py -i plughw:2,0 -e baby-cry -w 10 -n 2 ~/models/sound-detectRestart=alwaysRestartSec=10[Install]WantedBy=default.target该服务将启动 ALSA 设备 plughw:2,0 上的麦克风监视器,如果在过去 10 秒内检测到至少 2 个 positive 2 秒样本,并且先前的状态为 negative,则会触发 state=positive 事件;如果在过去 10 秒内检测到少于 2 个 positive 样本,并且先前的状态为 positive,则 state=negative。然后可以启动 / 启用服务:
systemctl --user start babymonitor.servicesystemctl --user enable babymonitor.service确认一旦婴儿开始哭泣,就会在手机上收到通知。如果没有收到通知,可以检查一下音频示例的标签、神经网络的架构和参数,或样本长度 / 窗口 / 频带等参数是否正确。
此外,这是一个相对基本的自动化例子 - 可以为它添加更多的自动化任务。例如,可以向另一个 Platypush 设备发送请求 (例如:在卧室或客厅),用 TTS 插件大声提示婴儿在哭。还可以扩展 micmon_detect.py 脚本,以便捕获的音频样本也可以通过 HTTP 流 - 例如使用 Flask 包装器和 ffmpeg 进行音频转换。另一个有趣的用例是,当婴儿开始 / 停止哭泣时,将数据点发送到本地数据库 (可以参考我先前关于 “如何使用 Platypush+PostgreSQL+Mosquitto+Grafana 创建灵活和自我管理的仪表板”的文章 https://towardsdatascience.com/how-to-build-your-home-infrastructure-for-data-collection-and-visualization-and-be-the-real-owner-af9b33723b0c):这是一组相当有用的数据,可以用来跟踪婴儿睡觉、醒着或需要喂食时的情况。虽然监测宝宝一直是我开发 micmon 的初衷,但是同样的程序也可以用来训练和检测其它类型声音的模型。最后,可以考虑使用一组良好的电源或锂电池组,这样监视器便可以便携化了。
安装宝贝摄像头有了一个好的音频馈送和检测方法之后,还可以添加一个视频馈送,以保持对孩子的监控。一开始,我 在 RaspberryPI3 上安装了一个 PiCamera 用于音频检测,后来,我发现这个配置相当不切实际。想想看:一个 RaspberryPi 3、一个附加的电池包和一个摄像头,组合在一起会相当笨拙;如果你找到一个轻型相机,可以很容易地安装在支架或灵活的手臂上,而且可以四处移动,这样,无论他 / 她在哪里,都可以密切关注孩子。最终,我选择了体积较小的 RaspberryPi Zero,它与 PiCamera 兼容,再配一个小电池。
婴儿监视器摄像头模块的第一个原型
同样,先插入一个烧录了与 RaspberryPI 兼容的操作系统的 SD 卡。然后在其插槽中插入一个与 RaspberryPI 兼容的摄像头,确保摄像头模块在 raspi-config 中启用,安装集成有 PiCamera 的 Platypush:
[sudo] pip3 install 'platypush[http,camera,picamera]'然后在~/.config/platypush/config.yaml: 中添加相机配置:
camera.pi: listen_port: 5001在 Platypush 重新启动时检查此配置,并通过 HTTP 从摄像头获取快照:
wget :8008/camera/pi/photo.jpg或在浏览器中打开视频:
:8008/camera/pi/video.mjpg同样,当应用程序启动时,可以创建一个 hook,该 hook 通过 TCP/H264 启动摄像头馈送:
mkdir -p ~/.config/platypush/scriptscd ~/.config/platypush/scriptstouch __init__.pyvi camera.py也可以通过 VLC:播放视频。
vlc tcp/h264://raspberry-pi:5001在手机上通过 VLC 应用程序或利用 RPi Camera Viewer 应用程序观看视频。
从想法到最后实现效果还不错,这也算是一个新晋奶爸从护理琐事中脱身的自我救赎吧。
2024 中国 — 东盟数字教育与健康促进学术交流活动成功举办
2024-08-29长沙南方职业学院筑梦青春乡村振兴服务团2024年暑期“三下乡”文化艺术
2024-07-11拥抱人工智能助推教育变革 人工智能赋能中小学教育发展学术研讨会在洛
2024-05-23高中生走技术路线,学什么专业好?往汽车新能源方向发展怎么样?
2024-05-13中考焦虑退!退!退!成绩不理想,如何在择校黄金期快速选择适合自己发
2024-05-13【答疑解惑】马上就要中考了,孩子成绩还是很差,不想学习怎么办?读职
2024-05-13