Dataset link for downloading:
From original source:
From Google Drive:
The digits and letters dataset only include some recordings of single digits and letters.
import os
import pathlib
import glob
import string
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display
# Use seed 66 for consistant experiments
seed = 66
tf.random.set_seed(seed)
np.random.seed(seed)
Choose a dataset. Choices: digits, letters, mixed
choices = ['digits', 'letters', 'mixed']
dataset_type = choices[0]
Download data manually from Google Drive if the website of the original source is down. Place the downloaded folder under 'data' directory.
download_from = 'origin'
data_dir = pathlib.Path('data/avicar_some'+dataset_type)
def download_data(data_dir, dataset_type, download_from):
if download_from == 'origin':
file_name = 'avicar_some' + dataset_type + '.zip'
download_link = "http://www.isle.illinois.edu/speech_web_lg/data/avicar_some" + dataset_type + ".zip"
# elif download_from == 'drive':
# file_name = 'avicar_some' + dataset_type + '.zip'
# if dataset_type == 'digits':
# dowload_link = 'https://drive.google.com/file/d/1SJlrT6kZrhtAmABcO63nVPYrACWMnthD/view?usp=sharing'
# elif dataset_type == 'letters':
# download_link = 'https://drive.google.com/file/d/1TyEMlvPGVyR0xzsdmloMJF8XhAhCbiVe/view?usp=sharing'
if not data_dir.exists():
tf.keras.utils.get_file(
file_name,
origin= download_link,
extract=True,
cache_dir='.', cache_subdir='data')
download_data(data_dir, dataset_type, download_from)
Get label names.
if dataset_type == 'digits':
label_strings = np.array([str(num) for num in range(0,10)])
elif dataset_type == 'letters':
label_strings = np.array(list(string.ascii_lowercase))
else:
label_strings = np.array([str(num) for num in range(0,10)] + list(string.ascii_lowercase))
print(dataset_type + ":", label_strings)
Extract filenames into a list.
Some files are probably put in the folder by mistake, so we need to filter out some files that have the incorrect labels. We check one of the indices in the file name. The string at that index indicates the lable of the file.
filenames = tf.io.gfile.glob(str(data_dir)+"*/*.wav")
print("Example filename:", filenames[99])
print()
if dataset_type == "digits":
# Filter out non-digit files
filenames = [fname for fname in filenames if fname[32].isdigit()]
# Count # of examples for each label
for i in range(len(label_strings)):
num_examples = len([fname for fname in filenames if fname[32]==label_strings[i]])
print(f"""# examples for "{label_strings[i]}": {num_examples}""")
elif dataset_type == 'letters':
# Filter out non-letter files
filenames = [fname for fname in filenames if not(fname[32].isdigit())]
# Count # of examples for each label
for i in range(len(label_strings)):
num_examples = len([fname for fname in filenames if fname[33]==label_strings[i].upper()])
print(f"""# examples for "{label_strings[i]}": {num_examples}""")
num_samples = len(filenames)
print('# total examples:', num_samples)
print()
Shuffle files
filenames = tf.random.shuffle(filenames)
print('Example file tensor:', filenames[0])
Train/Validation/Test Split using 80:10:10 ratio
TRAIN_PORTION = 0.8
VAL_PORTION = 0.1
TEST_PORTION = 0.1
train_end = int(num_samples*TRAIN_PORTION)
val_end = train_end + int(num_samples*VAL_PORTION)
train_files = filenames[:train_end]
val_files = filenames[train_end: val_end]
test_files = filenames[val_end:]
print('Training set size:', len(train_files))
print('Validation set size:', len(val_files))
print('Test set size:', len(test_files))
def decode_audio(audio_binary):
audio, _ = tf.audio.decode_wav(audio_binary)
return tf.squeeze(audio, axis=-1)
def get_label(file_path):
parts = tf.strings.split(file_path, os.path.sep)
# be careful with data type here
# this function must return a tensor
label_tensor = tf.strings.substr(parts[-1], pos=9, len=1)
return label_tensor
# print(get_label(train_files[1]))
def get_waveform_and_label(file_path):
label = get_label(file_path)
audio_binary = tf.io.read_file(file_path)
waveform = decode_audio(audio_binary)
return waveform, label
AUTOTUNE = tf.data.experimental.AUTOTUNE
files_ds = tf.data.Dataset.from_tensor_slices(train_files)
waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
Display some waveforms randomly picked from our dataset
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 12))
for i, (audio, label) in enumerate(waveform_ds.take(n)):
r = i // cols
c = i % cols
ax = axes[r][c]
ax.plot(audio.numpy())
ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
label = label.numpy().decode('utf-8')
ax.set_title(label)
plt.show()
Look at waveform, spectrogram, and the audio of 1 example.
Note: Before converting wavefrom to spectrogram, we must pad or cut the length to 1 second.
def get_spectrogram(waveform):
diff = [16000] - tf.shape(waveform)
waveform = tf.cast(waveform, tf.float32)
if diff >= 0:
# Padding for files with less than 16000 samples
zero_padding = tf.zeros([16000] - tf.shape(waveform), dtype=tf.float32)
# Concatenate audio with padding so that all audio clips will be of the same length
equal_length = tf.concat([waveform, zero_padding], 0)
else:
# Cut the tail if audio > 1 second
equal_length = tf.slice(waveform, [0], [16000])
spectrogram = tf.signal.stft(
equal_length, frame_length=255, frame_step=128)
spectrogram = tf.abs(spectrogram)
return spectrogram
for waveform, label in waveform_ds.take(1):
label = label.numpy().decode('utf-8')
spectrogram = get_spectrogram(waveform)
print('Label:', label)
print('Waveform shape:', waveform.shape)
print('Spectrogram shape:', spectrogram.shape)
print('Audio playback')
display.display(display.Audio(waveform, rate=16000))
Plot the waveform and spectrogram
def plot_spectrogram(spectrogram, ax):
# Convert to frequencies to log scale and transpose so that the time is
# represented in the x-axis (columns).
log_spec = np.log(spectrogram.T)
height = log_spec.shape[0]
X = np.arange(16000, step=height + 1)
Y = range(height)
ax.pcolormesh(X, Y, log_spec)
fig, axes = plt.subplots(2, figsize=(12, 8))
timescale = np.arange(waveform.shape[0])
axes[0].plot(timescale, waveform.numpy())
axes[0].set_title('Waveform')
axes[0].set_xlim([0, 16000])
plot_spectrogram(spectrogram.numpy(), axes[1])
axes[1].set_title('Spectrogram')
plt.show()
def get_spectrogram_and_label_id(audio, label):
spectrogram = get_spectrogram(audio)
spectrogram = tf.expand_dims(spectrogram, -1)
label_id = tf.argmax(int(label == label_strings))
return spectrogram, label_id
spectrogram_ds = waveform_ds.map(
get_spectrogram_and_label_id, num_parallel_calls=AUTOTUNE)
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 10))
for i, (spectrogram, label_id) in enumerate(spectrogram_ds.take(n)):
r = i // cols
c = i % cols
ax = axes[r][c]
plot_spectrogram(np.squeeze(spectrogram.numpy()), ax)
ax.set_title(label_strings[label_id.numpy()])
ax.axis('off')
plt.show()
Extract MFCC
from python_speech_features import mfcc
import scipy.io.wavfile as wav
mfccs = {}
for wave in filenames:
wave = wave.numpy().decode('utf-8')
(rate, sig) = wav.read(wave)
mfccs[wave] = mfcc(sig, rate, nfft=2000)
example_mfcc = list(mfccs.values())[0]
print('Total MFCCs:', len(mfccs))
print()
print('Example_MFCC')
print('Number of windows =', example_mfcc.shape[0])
print('Length of each feature =', example_mfcc.shape[1])
example_mfcc = example_mfcc.T
plt.matshow(example_mfcc)
plt.title('Example_MFCC')
We have done proprocessing for the training set. Now we repeat the same steps for the validation set and the testing set.
def preprocess_dataset(files):
files_ds = tf.data.Dataset.from_tensor_slices(files)
output_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
output_ds = output_ds.map(
get_spectrogram_and_label_id, num_parallel_calls=AUTOTUNE)
return output_ds
train_ds = spectrogram_ds
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)
Split data into batches
batch_size = 64
train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)
Reduce read latency during training
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)
Model
for spectrogram, _ in spectrogram_ds.take(1):
input_shape = spectrogram.shape
print('Input shape:', input_shape)
num_labels = len(label_strings)
norm_layer = preprocessing.Normalization()
norm_layer.adapt(spectrogram_ds.map(lambda x, _: x))
model = models.Sequential([
layers.Input(shape=input_shape),
preprocessing.Resizing(32, 32),
norm_layer,
layers.Conv2D(32, 3, activation='relu'),
layers.Conv2D(64, 3, activation='relu'),
layers.MaxPooling2D(),
layers.Dropout(0.25),
layers.Flatten(),
layers.Dense(128, activation='relu'),
layers.Dropout(0.5),
layers.Dense(num_labels),
])
model.summary()
model.compile(
optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'],
)
Train
EPOCHS = 25
history = model.fit(
train_ds,
validation_data=val_ds,
epochs=EPOCHS,
callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)
Save model
model_dir = 'models'
# Create a folder and save the model
model_name = 'model_single_digit' #Make sure you change this name. DO NOT OVERWRITE TRAINED MODELS.
# model.save(os.path.join(model_dir, model_name))
Plot loss
metrics = history.history
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.show()
def create_multi_classification_model(input_shape):
model = tf.keras.Sequential()
model.add(tf.keras.layers.Dense(256, shape = input_shape, activation='relu'))
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dense(64, ))
model.add(tf.keras.layers.Dense(32, ))
model.add(tf.keras.layers.Dense(num_labels, activation='softmax'))
return model
FFN_model = models.Sequential([
layers.Input(shape=input_shape),
preprocessing.Resizing(64, 64),
norm_layer,
layers.Dense(64, activation='relu'),
layers.Dense(32, ),
layers.Dense(16, ),
layers.Dense(8, ),
layers.Dropout(0.25),
layers.Flatten(),
layers.Dense(num_labels, activation='softmax'),
])
# loss = loss = tf.keras.losses.SparseCategoricalCrossentropy()
# metrics = tf.keras.metrics.SparseCategoricalAccuracy()
FFN_model.compile(
optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'],
)
FFN_model.summary()
EPOCHS = 25
FFN_history = FFN_model.fit(
train_ds,
validation_data=val_ds,
epochs=EPOCHS,
callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)
metrics = FFN_history.history
plt.plot(FFN_history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.show()
dataset_type = choices[1]
download_from = 'origin'
data_dir = pathlib.Path('data/avicar_some'+dataset_type)
download_data(data_dir, dataset_type, download_from)
if dataset_type == 'digits':
label_strings = np.array([str(num) for num in range(0,10)])
elif dataset_type == 'letters':
label_strings = np.array(list(string.ascii_lowercase))
else:
label_strings = np.array([str(num) for num in range(0,10)] + list(string.ascii_lowercase))
print(dataset_type + ":", label_strings)
filenames = tf.io.gfile.glob(str(data_dir)+"*/*.wav")
print("Example filename:", filenames[99])
print()
if dataset_type == "digits":
# Filter out non-digit files
filenames = [fname for fname in filenames if fname[32].isdigit()]
# Count # of examples for each label
for i in range(len(label_strings)):
num_examples = len([fname for fname in filenames if fname[32]==label_strings[i]])
print(f"""# examples for "{label_strings[i]}": {num_examples}""")
elif dataset_type == 'letters':
# Filter out non-letter files
filenames = [fname for fname in filenames if not(fname[32].isdigit())]
# Count # of examples for each label
for i in range(len(label_strings)):
num_examples = len([fname for fname in filenames if fname[33]==label_strings[i].upper()])
print(f"""# examples for "{label_strings[i]}": {num_examples}""")
num_samples = len(filenames)
print('# total examples:', num_samples)
print()
filenames = tf.random.shuffle(filenames)
print('Example file tensor:', filenames[0])
TRAIN_PORTION = 0.8
VAL_PORTION = 0.1
TEST_PORTION = 0.1
train_end = int(num_samples*TRAIN_PORTION)
val_end = train_end + int(num_samples*VAL_PORTION)
train_files = filenames[:train_end]
val_files = filenames[train_end: val_end]
test_files = filenames[val_end:]
print('Training set size:', len(train_files))
print('Validation set size:', len(val_files))
print('Test set size:', len(test_files))
def decode_audio(audio_binary):
audio, _ = tf.audio.decode_wav(audio_binary)
return tf.squeeze(audio, axis=-1)
def get_label(file_path):
parts = tf.strings.split(file_path, os.path.sep)
# be careful with data type here
# this function must return a tensor
label_tensor = tf.strings.substr(parts[-1], pos=9, len=1)
return label_tensor
# print(get_label(train_files[1]))
def get_waveform_and_label(file_path):
label = get_label(file_path)
audio_binary = tf.io.read_file(file_path)
waveform = decode_audio(audio_binary)
return waveform, label
AUTOTUNE = tf.data.experimental.AUTOTUNE
files_ds = tf.data.Dataset.from_tensor_slices(train_files)
waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 12))
for i, (audio, label) in enumerate(waveform_ds.take(n)):
r = i // cols
c = i % cols
ax = axes[r][c]
ax.plot(audio.numpy())
ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
label = label.numpy().decode('utf-8')
ax.set_title(label)
plt.show()
for waveform, label in waveform_ds.take(1):
label = label.numpy().decode('utf-8')
spectrogram = get_spectrogram(waveform)
print('Label:', label)
print('Waveform shape:', waveform.shape)
print('Spectrogram shape:', spectrogram.shape)
print('Audio playback')
display.display(display.Audio(waveform, rate=16000))
fig, axes = plt.subplots(2, figsize=(12, 8))
timescale = np.arange(waveform.shape[0])
axes[0].plot(timescale, waveform.numpy())
axes[0].set_title('Waveform')
axes[0].set_xlim([0, 16000])
plot_spectrogram(spectrogram.numpy(), axes[1])
axes[1].set_title('Spectrogram')
plt.show()
spectrogram_ds = waveform_ds.map(
get_spectrogram_and_label_id, num_parallel_calls=AUTOTUNE)
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 10))
for i, (spectrogram, label_id) in enumerate(spectrogram_ds.take(n)):
r = i // cols
c = i % cols
ax = axes[r][c]
plot_spectrogram(np.squeeze(spectrogram.numpy()), ax)
ax.set_title(label_strings[label_id.numpy()])
ax.axis('off')
plt.show()
train_ds = spectrogram_ds
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)
batch_size = 64
train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)
for spectrogram, _ in spectrogram_ds.take(1):
input_shape = spectrogram.shape
print('Input shape:', input_shape)
num_labels = len(label_strings)
norm_layer = preprocessing.Normalization()
norm_layer.adapt(spectrogram_ds.map(lambda x, _: x))
letter_cnn_model = models.Sequential([
layers.Input(shape=input_shape),
preprocessing.Resizing(32, 32),
norm_layer,
layers.Conv2D(32, 3, activation='relu'),
layers.Conv2D(64, 3, activation='relu'),
layers.MaxPooling2D(),
layers.Dropout(0.25),
layers.Flatten(),
layers.Dense(128, activation='relu'),
layers.Dropout(0.5),
layers.Dense(num_labels),
])
letter_cnn_model.summary()
letter_cnn_model.compile(
optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'],
)
EPOCHS = 25
letter_cnn_history =letter_cnn_model.fit(
train_ds,
validation_data=val_ds,
epochs=EPOCHS,
callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)
test_audio = []
test_labels = []
for audio, label in test_ds:
test_audio.append(audio.numpy())
test_labels.append(label.numpy())
test_audio = np.array(test_audio)
test_labels = np.array(test_labels)
y_pred = np.argmax(FFN_model.predict(test_audio), axis=1)
y_true = test_labels
test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')
Plot confusion matrix
import seaborn as sns
confusion_mtx = tf.math.confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx, xticklabels=label_strings, yticklabels=label_strings,
annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()