北科電子大三上機器學習 台語語音辨識

筆記說明

此筆記用途在於台北科技大學電子大三上機器學習作業紀錄
並非所有人都適用,部分對我而言稍加容易的內容並不會寫在此內。
這是學習後心得的筆記,可能不太適用會未來的學生

由於我沒有學習過裡面的理論,因此這是資財大三學生透過網路與自身理解的筆記,學習價值並不高、且可能擁有大量錯誤。

題目說明

train data 為單人女聲聲音(高雄腔)、台灣羅馬拼音(全小寫、無聲調)
test data 為單人女聲聲音(高雄腔),台灣羅馬拼音(全小寫、無聲調)
主要學習來源為脉动,非常謝謝他

先將資料解壓縮

1
!unzip ./data/machine-learningntut-2021-autumn-asr.zip

蒐集 train data 的路徑

1
2
3
4
5
6
7
8
9
10
11
12
13
import os
def get_wav_files(wav_path):
wav_files = []
for(dirpath, dirnames, filenames) in os.walk(wav_path): #for(現在目錄, 目錄子目路, 目錄下的文件)
for filename in filenames:
if filename.endswith(".wav") or filename.endswith('.WAV'):
#print(filename)
filename_path = os.path.join(dirpath, filename)
print(filename_path)
wav_files.append(filename_path)
return wav_files

wav_files = get_wav_files("/content/drive/MyDrive/NTUT/大三上-深度學習/ASR/data/train")

將每一段 wav 讀入 colab 中

  • 國弘學長 PPT 有說,必須將老師提供的 csv 檔依據每一個 wav 檔給予相對應的 txt,依據學長說的描述,主要是 csv 格式問題會讓我們在 train 時出現錯誤,讓驗證非常差,跟交全白檔案相同

將老師提供的 csv 分割成 txt

1
2
3
4
5
6
7
8
9
import csv
with open("./data/train-toneless_update.csv", newline='', errors='ignore') as csvfile:
rows = csv.reader(csvfile)
for row in rows:
a = row[0]
b = row[1]
f = open('./data/train/' + a + '.txt', 'w')
f.write(b)
f.close()

將切割好的 txt 輸入到 colab

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

import os
def get_tran_files(wav_path, tran_path):
tran_texts = []
for wav_file in wav_files:
x = os.path.splitext(wav_file)[0]
tran_file = os.path.join(tran_path, x + '.txt')
if os.path.exists(tran_file) is False:
return None
fd = open(tran_file, 'r')
text = fd.readline().lower()
tran_texts.append(text.split('\n')[0])
fd.close()

return tran_texts

tran_texts = get_tran_files("/content/drive/MyDrive/NTUT/大三上-深度學習/ASR/data/train", "/content/drive/MyDrive/NTUT/大三上-深度學習/ASR/data/train")
print(tran_texts)

將 wav 音檔轉換為數學一維公式,並輸入在 colab

  • from python_speech_features import mfcc 可以將音樂的特徵透過數學表達出來
  • 因此我們就透過 mfcc 轉換
    • numcep 默認,返回倒普的數量
    • nfft 修改成 551,如果使用預設 512,則會沒有作用
      nfft 512 error
    • highfreq 將頻率超過 8000 的移除掉
1
2
3
4
5
6
7
8
9
10
11
12
from tqdm import tqdm
!pip install python_speech_features
from python_speech_features import mfcc

features = []
for i in tqdm(range(len(wav_files))):
path = wav_files[i]
#print(path)
audio, sr = load_and_trim(path)
features.append(mfcc(audio, sr, numcep=mfcc_dim, nfft=551, highfreq=8000))

print(len(features), features[0].shape)

影音視覺化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

from IPython.display import Audio
import matplotlib.pyplot as plt
def visualize(index):
path = wav_files[index]
text = tran_texts[index]
print('Audio Text:', text)

audio, sr = load_and_trim(path)
plt.figure(figsize=(12, 3))
plt.plot(np.arange(len(audio)), audio)
plt.title('Raw Audio Signal')
plt.xlabel('Time')
plt.ylabel('Audio Amplitude')
plt.show()

feature = mfcc(audio, sr, numcep=mfcc_dim, nfft=551)
print('Shape of MFCC:', feature.shape)

fig = plt.figure(figsize=(12, 5))
ax = fig.add_subplot(111)
im = ax.imshow(feature, cmap=plt.cm.jet, aspect='auto')
plt.title('Normalized MFCC')
plt.ylabel('Time')
plt.xlabel('MFCC Coefficient')
ax.set_xticks(np.arange(0, 13, 2), minor=False);
plt.show()

return path

Audio(visualize(2))

資料規一化

  • 將資料進行規一化,降低離群值所帶來的影響
  • 加強資料可靠性
  • 隨機抽取 100 筆資料去做 mean
1
2
3
4
5
6
7
8
9
10
import random 
samples = random.sample(features, 100)
samples = np.vstack(samples)

mfcc_mean = np.mean(samples, axis=0)
mfcc_std = np.std(samples, axis=0)
print(mfcc_mean)
print(mfcc_std)

features = [(feature - mfcc_mean) / (mfcc_std + 1e-14) for feature in features]

新增兩個字典 char2Id ID2char

  • 由於 model 沒辦法對字元進行分類、運算,但可以對數字進行運算
  • 因此我們先將每一個英文字母對應每一個數字
  • 我們對每一個 char 做統計,使用越多次的 char 將數字放在最前面
1
2
3
4
5
6
7
8
9
10
11
12

chars = {}
for text in tran_texts:
for e in text:
chars[e] = chars.get(e, 0) + 1

chars = sorted(chars.items(), key = lambda x: x[1], reverse=True)
chars = [char[0] for char in chars]
print(len(chars), chars[:100])

char2id = {c:i for i, c in enumerate(chars)}
id2char = {i:c for i, c in enumerate(chars)}

模型將資料生成

  • model.fit_generator 讓訓練模型不需要一次讀取 train data,可以慢慢讀取
  • 其功能主要透過 batch_size 決定每一次讀取的數量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
total = len(wav_files)
print(total)
data_index = np.arange(total)
np.random.shuffle(data_index)
train_size = int(0.9 * total) #訓練集 0.9
test_size = total - train_size
train_index = data_index[:train_size]
test_index = data_index[train_size:]

X_train = [features[i] for i in train_index]
Y_train = [tran_texts[i] for i in train_index]
X_test = [features[i] for i in test_index]
Y_test = [tran_texts[i] for i in test_index]

batch_size = 16


def batch_generator(x, y, batch_size=batch_size):
offset = 0
while True:
offset += batch_size

if offset == batch_size or offset >= len(x):
data_index = np.arange(len(x))
np.random.shuffle(data_index)
x = [x[i] for i in data_index]
y = [y[i] for i in data_index]
offset = batch_size

X_data = x[offset - batch_size: offset]
Y_data = y[offset - batch_size: offset]

X_maxlen = max([X_data[i].shape[0] for i in range(batch_size)])
Y_maxlen = max([len(Y_data[i]) for i in range(batch_size)])

X_batch = np.zeros([batch_size, X_maxlen, mfcc_dim])
Y_batch = np.ones([batch_size, Y_maxlen]) * len(char2id)
X_length = np.zeros([batch_size, 1], dtype='int32')
Y_length = np.zeros([batch_size, 1], dtype='int32')

for i in range(batch_size):
X_length[i, 0] = X_data[i].shape[0]
X_batch[i, :X_length[i, 0], :] = X_data[i]

Y_length[i, 0] = len(Y_data[i])
Y_batch[i, :Y_length[i, 0]] = [char2id[c] for c in Y_data[i]]

inputs = {'X': X_batch, 'Y': Y_batch, 'X_length': X_length, 'Y_length': Y_length}
outputs = {'ctc': np.zeros([batch_size])}

yield (inputs, outputs)

定義 layer

  • Conv1D 由於我們的聲音訊號是一維數列,因此用 1D
    • filters 出來的維度
    • kernel_size 卷積長度
    • dilation_rate 捲機膨脹率
  • batchnorm 資料歸一化
  • activation 激勵函數
  • res_block
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from keras.models import Model
from keras.layers import Input, Activation, Conv1D, Lambda, Add, Multiply, BatchNormalization
from tensorflow.keras.optimizers import Adam, SGD
from keras import backend as K
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau

def conv1d(inputs, filters, kernel_size, dilation_rate):
return Conv1D(filters=filters, kernel_size=kernel_size, strides=1, padding='causal', activation=None,
dilation_rate=dilation_rate)(inputs)

def batchnorm(inputs):
return BatchNormalization()(inputs)

def activation(inputs, activation):
return Activation(activation)(inputs)

def res_block(inputs, filters, kernel_size, dilation_rate):
hf = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'tanh')
hg = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'sigmoid')
h0 = Multiply()([hf, hg]) #Multiply 將 layer 相乘

ha = activation(batchnorm(conv1d(h0, filters, 1, 11)), 'tanh')
hs = activation(batchnorm(conv1d(h0, filters, 1, 1,)), 'tanh')
return Add()([ha, inputs]), hs #add 融合層,將兩個 layer 相加

決定 model layer

  • 在這邊 input 決定 model 輸入輸出,Y_pred 決定 model 裡面的 layer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

epochs = 100
num_blocks = 3
filters = 128

X = Input(shape=(None, mfcc_dim,), dtype='float32', name='X')
Y = Input(shape=(None,), dtype='float32', name='Y')
X_length = Input(shape=(1,), dtype='int32', name='X_length')
Y_length = Input(shape=(1,), dtype='int32', name='Y_length')

h0 = activation(batchnorm(conv1d(X, filters, 1, 1)), 'tanh')
shortcut = []
for i in range(num_blocks):
for r in [1, 2, 4, 8, 16]:
h0, s = res_block(h0, filters, 7, r)
shortcut.append(s)

h1 = activation(Add()(shortcut), 'relu')
h1 = activation(batchnorm(conv1d(h1, filters, 1, 1)), 'relu')
Y_pred = activation(batchnorm(conv1d(h1, len(char2id) + 1, 1, 1)), 'softmax')
sub_model = Model(inputs=X, outputs= Y_pred)

訓練 model

  • calc_ctc_loss 主要用來判斷 OCR、語音辨識
  • 由於 OCR、語音辨識中有可能會有過久的等待時間,但其實有這個等待時間是 ok 的
  • 舉例:大~衛,理論上可以判斷出大衛這兩個字,但因為大拉長音,因此用普通的損施函數無法判斷,因此就使用 ctc
  • 使用 SGD(梯度下降) 優化器,每次隨機抽一筆資料,反覆得出最佳值,但也因為這樣 loss 容易隨著 epoch 震盪,只能達到局部最加解
  • model.fit_generator 每一次丟給模型 batch_size 資料,降低內部記憶消耗、增加精準度
  • ReduceLROnPlateau 當 epoch 過但 loss 沒有好轉時,就可以降低 learning late,降低方式為 lr*factor
  • 建議 epoch 設定成 100,kaggle loss 可以來到 7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def calc_ctc_loss(args):
y, yp, ypl, yl = args
return K.ctc_batch_cost(y, yp, ypl, yl)

ctc_loss = Lambda(calc_ctc_loss, output_shape=(1,), name='ctc')([Y, Y_pred, X_length, Y_length])
model = Model(inputs=[X, Y, X_length, Y_length], outputs=ctc_loss)
optimizer = SGD(lr=0.02, momentum=0.9, nesterov=True, clipnorm = 5)
model.compile(loss={'ctc': lambda ctc_true, ctc_pred: ctc_pred}, optimizer = optimizer)

checkpointer = ModelCheckpoint(filepath='asr.h5', verbose=0)
lr_decay = ReduceLROnPlateau(monitor='loss',factor=0.2, patience=1, min_lr = 0.000)


history = model.fit_generator(
generator=batch_generator(X_train, Y_train),
steps_per_epoch=len(X_train) // batch_size,
epochs=epochs,
validation_data=batch_generator(X_test, Y_test),
validation_steps=len(X_test) // batch_size,
callbacks=[checkpointer, lr_decay])

訓練過程如下

將訓練好的模型儲存

1
2
3
4
import pickle
sub_model.save('asr2.h5')
with open('dictionary2.pkl', 'wb') as fw:
pickle.dump([char2id, id2char, mfcc_mean, mfcc_std], fw)

顯示 epoch train loss、valid loss 差異

1
2
3
4
5
6
7
8
train_loss = history.history['loss']
valid_loss = history.history['val_loss']
plt.plot(np.linspace(1, epochs, epochs), train_loss, label='train')
plt.plot(np.linspace(1, epochs, epochs), valid_loss, label='valid')
plt.legend(loc='upper right')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

顯示結果如下

對 train 資料進行預測

  • 預測時由於 model 預測時,都是預設最大長度,因此當 model 認定這個音檔的文字全都輸入完畢後,後面就會是 -1,因此我們就判斷當預測出來的資料是 -1 時就 continue,來避免輸出無效字元
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

from keras.models import load_model
import pickle

#載入模型
with open('dictionary2.pkl', 'rb') as fr:
[char2id, id2char, mfcc_mean, mfcc_std] = pickle.load(fr)

sub_model = load_model('asr2.h5')

def random_predict(x, y): #隨便拿一個 train data 預測
index = np.random.randint(len(x))
feature = x[index]
text = y[index]

pred = sub_model.predict(np.expand_dims(feature, axis=0))
pred_ids = K.eval(K.ctc_decode(pred, [feature.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0])
pred_ids = pred_ids.flatten().tolist()
print(pred_ids)

ans = list()
for i in pred_ids:
if(i != -1):
ans.append(id2char[i])
print('True transcription:\n-- ', text, '\n')
print('Predicted transcription:\n-- ' + ''.join([i for i in ans]), '\n')

random_predict(X_train, Y_train)
#random_predict(X_test, Y_test)

預測顯示如下

將 test data rename

  • 由於在 linux 讀取資料時,是按照字串順序讀的,但老師的 kaggle 卻不是這樣,因此我們必須對 test file rename,將檔名補 0,舉例:1.wav 就變成 00001.wav,透過這種方式讓字串順序可以跟數字順序相同
  • 預測時由於 model 預測時,都是預設最大長度,因此當 model 認定這個音檔的文字全都輸入完畢後,後面就會是 -1,因此我們就判斷當預測出來的資料是 -1 時就 continue,來避免輸出無效字元
1
2
3
4
5
6
7
8
import os 

cnt = 4
os.chdir("/content/drive/MyDrive/NTUT/大三上-深度學習/ASR/data/test")

for filename in os.listdir():
print(filename.split('.')[0].zfill(cnt) + '.wav')
os.rename(filename, filename.split('.')[0].zfill(cnt) + '.wav')

test data predict

  • 再來就做 test data predict 並將資料存到 predict2.csv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import glob
test_wavs = test_wavs = "/content/drive/MyDrive/NTUT/大三上-深度學習/ASR/data/test/"

features = []
for path in sorted(os.listdir(test_wavs)):
#path = test_wavs[i]
path = "/content/drive/MyDrive/NTUT/大三上-深度學習/ASR/data/test/" + path
print(path)
audio, sr = load_and_trim(path)
features.append(mfcc(audio, sr, numcep=mfcc_dim, nfft=551, highfreq=8000))
features = [(feature - mfcc_mean) / (mfcc_std + 1e-14) for feature in features] #這邊必須與前面的 mfcc_mean、mfcc_std 相同,否則不會預測準確

with open("predict2.csv", "w") as f:
f.write("id,text\n")
for i in range(len(features)):
feature = features[i]
pred = sub_model.predict(np.expand_dims(feature, axis=0))
pred_ids = K.eval(K.ctc_decode(pred, [feature.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0])
pred_ids = pred_ids.flatten().tolist()
#print(pred_ids)

list_ans = list()
for j in pred_ids:
if(j != -1):
list_ans.append(id2char[j])

#print('True transcription:\n-- ', text, '\n')
print('Predicted transcription:\n-- ' + ''.join([j for j in list_ans]), '\n')

f.write(str(i+1) + "," + ''.join([j for j in list_ans]) + '\n')

總結

預測結果為 7,但我 epoch 設定成 50 時,ctc_loss 會來到 49,因此我在猜如果我將 epoch 設定更高、ReduceLROnPlateau factor 設定更低的情況下,ctc_loss 可以來到更低。

謝謝 教授、學長,分享 PPT 教學、設計題目,讓我實踐到那麼多知識。
也謝謝 colab 環境讓我可以在他們的伺服器上免費使用,讓我進行學習,很感謝。

參考連結

【python_speech_features】MFCC调参用法以及参数说明 by csdn
python batch_size_神经网络中batch_size参数的含义及设置方法 ycheng_sjtu

  • 版權聲明: 本部落格所有文章除有特別聲明外,均採用 Apache License 2.0 許可協議。轉載請註明出處!
  • © 2020-2024 John Doe
  • Powered by Hexo Theme Ayer
  • PV: UV: