【狩猟の効率化】pythonでFM電波のモニタリングと波形のツイート

父は実家の山で害獣駆除目的の狩猟をしています。
その罠に何かしらがかかると、罠に付属した特定小電力トランシーバーが
電池が切れるまでのおよそ一日の間FMで「かっこう」を流し続けます。
かっこうが流れているかどうかは、定期的に無線で確認する必要があります。
トランシーバーからの電波は距離200mくらいまでしか届かないらしく、
遠くにいるときには確認ができません。

移動や確認の手間を省くために、父は5000円程度で購入できるLinuxマシン、ラズベリーパイとお値段不詳のSDRチューナーを購入しました。

購入されたまま長らく放置されていたラズパイとSDRチューナーを使って、
電波を確認して波形のプロットをツイートし、
獲物がかかっていると判断された場合には父のツイッターアカウントに@ツイートで連絡するプログラムを私が作りました。
(罠や無線の設置と使用は、必要な免許を取得し、許可を受けた父が行っています。)

ラズパイとSDRチューナー(リンクになっています。)


誰かの役に立てればと思い、作ったコードを公開します。
ただし、このブログを参考にして生じた損害等については一切責任を持ちません。
自己責任で行ってください。

質問等ありましたら遠慮せずコメントください。


* 手順 *
まず、開発用のtwitterアカウントを取得します。
これに一番時間がかかりました。
twitter社から返事が来るまでに1カ月くらい。

次にコード内のパラメータを調整します。

最後に、動作確認をしてcronで定期実行の設定をします。
cronの設定は簡単なのでググってください。


日本語説明付きのコードを載せた後に、日本語説明なしのコードを掲載します。

トランシーバーが「かっこう」を垂れ流しているときの波形

トランシーバーがオフのときの波形
今回受信したい特定小電力トランシーバーの周波数は422.0MHzと422.1MHzの間です。

青字のところだけ書き換えれば別の環境でも使えるはず…。
13個所あります。

* 説明付きコード *
### parameter ###
iteration=3 #電波の受信を行う回数
intervals=3 #受信の間隔(秒)
#トランシーバーからの電波は数秒の無音時間があるので、3回の観測のうちどこかで必ずかっこうを観測できるようにずらす。

freq_ll=421.700 #トランシーバーがオフの時の波形を見て、隣のピークをひっかけないように適当にベースラインの下限値を設定
freq_lh=422.000 #受信したい電波を挟む
freq_hl=422.100 #受信したい電波を挟む
freq_hh=422.400 #トランシーバーがオフの時の波形を見て、隣のピークをひっかけないように適当にベースラインの上限値を設定


#twitter
取得した開発用ツイッターアカウントでconsumer_key~access_secretを確認し、
XXX...を適切に書き換える。
consumer_key='XXXXXXXXXXXXXXXXXXXXXXXXX'
consumer_secret='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
access_token='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
access_secret='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
alert_user='@xxxxxxxxxxxxxx' #<-- your user name
alert_userは@ツイートをするユーザーネームです。
普段使っているアカウントが良いと思います。
################### ライブラリのインポート
import matplotlib 
matplotlib.use('Agg')
from rtlsdr import *
from pylab import *
import time
import threading
import numpy as np
import sys
import tweepy
import os 

################################
# tweet settings #
################################
CONSUMER_KEY = consumer_key
CONSUMER_SECRET = consumer_secret
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
ACCESS_TOKEN = access_token
ACCESS_SECRET = access_secret
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
api = tweepy.API(auth)


################################
# observing radio #
################################

sdr = RtlSdr()

#1st################
# configure device
sdr.sample_rate = 2.048e6  # Hz よくわからん、たぶん何Hzごとにデータをとるかだと思う。
sdr.center_freq = 422.05e6     # Hz 観測中心周波数
sdr.gain = 'auto'

#データを入れる箱を作るために一回観測する。
#(このデータは使わない。たぶんここは省略できるんだけど私にはその技術はなかった。)
samples = sdr.read_samples(256*1024)
hoge=psd(samples, NFFT=1024, Fs=sdr.sample_rate/1e6, Fc=sdr.center_freq/1e6,return_line=True)
plt.close()
#setting
xx=np.empty([iteration,np.shape(hoge[0])[0]]) #強度
yy=np.empty([iteration,np.shape(hoge[1])[0]]) #周波数

#####################


#i=0

i=0
kk=0
#本観測(iteration回繰り返す)
while i <iteration:
    samples = sdr.read_samples(256*1024)
    xx[i],yy[i]=psd(samples, NFFT=1024, Fs=sdr.sample_rate/1e6, Fc=sdr.center_freq/1e6)
    xx[i]=xx[i]-np.mean(xx[i]) #オフセットを差し引く
    print(datetime.datetime.now())
    time.sleep(intervals) #intervals秒だけ休む
    i=i+1


print("average: ",np.mean(xx))
print("sigma: ",np.std(xx))

average=np.mean(xx)
sigma=np.std(xx) #ノイズレベルの計算
#シグナルが出るであろう周波数だけ抜き出す
signal=xx[yy>freq_lh]  signal_freq=yy[yy>freq_lh] signal=signal[signal_freq<freq_hl] signal_freq=signal_freq[signal_freq<freq_hl] plt.close()  plt.plot(yy[0],xx[0]) #1回目の観測結果をプロット(青色) plt.plot(yy[1],xx[1]) #2回目の観測結果をプロット(黄色) plt.plot(yy[2],xx[2]) #3回目の観測結果をプロット(緑色) plt.savefig("monitor.png")  #波形を保存 plt.close() emono=0 if max(signal)>10*sigma: #ノイズレベルの10倍のシグナルが検出されると捕まえた判定 print("!!!!! catched !!!!!") emono=1 else: print("zan nen") emono=0 ################################ # tweet # ################################ #normal #画像ツイート text=datetime.datetime.now().strftime('%Y%m%d') api.update_with_media(filename='monitor.png',status=text) os.system('rm monitor.png') #ツイートした画像はすぐに削除 #chatched #捕まえた判定したら@ツイート if emono==1: api.update_status(status=alert_user+' !!! chatched !!!' + text)





コード全文

### parameter ###
iteration=3
intervals=3

freq_ll=421.700
freq_lh=422.000
freq_hl=422.100
freq_hh=422.400

#twitter
consumer_key='XXXXXXXXXXXXXXXXXXXXXXXXX'
consumer_secret='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
access_token='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
access_secret='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
alert_user='@xxxxxxxxxxxxxx' #<-- your user name



###################
import matplotlib 
matplotlib.use('Agg')
from rtlsdr import *
from pylab import *
import time
import threading
import numpy as np
import sys
import tweepy
import os 

################################
# tweet settings #
################################
CONSUMER_KEY = consumer_key
CONSUMER_SECRET = consumer_secret
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
ACCESS_TOKEN = access_token
ACCESS_SECRET = access_secret
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
api = tweepy.API(auth)


################################
# observing radio #
################################

sdr = RtlSdr()

#1st################
# configure device
sdr.sample_rate = 2.048e6  # Hz
sdr.center_freq = 422.05e6     # Hz
sdr.gain = 'auto'

samples = sdr.read_samples(256*1024)
hoge=psd(samples, NFFT=1024, Fs=sdr.sample_rate/1e6, Fc=sdr.center_freq/1e6,return_line=True)
plt.close()
#setting
xx=np.empty([iteration,np.shape(hoge[0])[0]])
yy=np.empty([iteration,np.shape(hoge[1])[0]])


#####################


#i=0

i=0
kk=0

while i <iteration:
    samples = sdr.read_samples(256*1024)
    xx[i],yy[i]=psd(samples, NFFT=1024, Fs=sdr.sample_rate/1e6, Fc=sdr.center_freq/1e6)
    xx[i]=xx[i]-np.mean(xx[i])
    print(datetime.datetime.now())
    time.sleep(intervals)
    i=i+1


print("average: ",np.mean(xx))
print("sigma: ",np.std(xx))

average=np.mean(xx)
sigma=np.std(xx)

signal=xx[yy>freq_lh]
signal_freq=yy[yy>freq_lh]
signal=signal[signal_freq<freq_hl]
signal_freq=signal_freq[signal_freq<freq_hl]

plt.close()
plt.plot(yy[0],xx[0])
plt.plot(yy[1],xx[1])
plt.plot(yy[2],xx[2])
plt.savefig("monitor.png")
plt.close()

emono=0
if max(signal)>10*sigma:
    print("!!!!! catched !!!!!")
    emono=1
else:
    print("zan nen")
    emono=0

################################
# tweet #
################################
#normal
text=datetime.datetime.now().strftime('%Y%m%d')
api.update_with_media(filename='monitor.png',status=text)
os.system('rm monitor.png')

#chatched
if emono==1:
    api.update_status(status=alert_user+' !!! chatched !!!' + text)

コメント

このブログの人気の投稿

TypeError: ufunc 'bitwise_xor' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''

overleafで画像が表示されない!

エラー: ""C:\U" で始まる文字列の中で 8 進文字なしに '\U' が使われています