Steps in feature extraction

Shuffle data

shuffle_data[source]

shuffle_data(filenames)

def shuffle_data(filenames):
    return tf.random.shuffle(filenames)

Train/Validation/Test Split

using 80:10:10 ratio

train_test_split[source]

train_test_split(filenames)

def train_test_split(filenames):
    TRAIN_PORTION = 0.8
    VAL_PORTION = 0.1
    TEST_PORTION = 0.1
    
    num_samples = len(filenames)

    train_end = int(num_samples*TRAIN_PORTION)
    val_end = train_end + int(num_samples*VAL_PORTION)

    train_files = filenames[:train_end]
    val_files = filenames[train_end: val_end]
    test_files = filenames[val_end:]

    print('Training set size:', len(train_files))
    print('Validation set size:', len(val_files))
    print('Test set size:', len(test_files))
    
    return [train_files, val_files, test_files]

Get waveforms

decode_audio[source]

decode_audio(audio_binary)

get_label[source]

get_label(file_path)

get_waveform_and_label[source]

get_waveform_and_label(file_path)

def decode_audio(audio_binary):
    # audio --> tensor
    audio, _ = tf.audio.decode_wav(audio_binary)
    return tf.squeeze(audio, axis=-1)

def get_label(file_path):
    parts = tf.strings.split(file_path, os.path.sep)
    # be careful with data type here
    # this function must return a tensor
    label_tensor = tf.strings.substr(parts[-1], pos=9, len=1)
    return label_tensor

def get_waveform_and_label(file_path):
    label = get_label(file_path)
    audio_binary = tf.io.read_file(file_path)
    waveform = decode_audio(audio_binary)
    return waveform, label

Get spectrograms

get_spectrogram[source]

get_spectrogram(waveform)

get_spectrogram_and_label_id_digits[source]

get_spectrogram_and_label_id_digits(audio, label)

get_spectrogram_and_label_id_letters[source]

get_spectrogram_and_label_id_letters(audio, label)

get_spectrogram_and_label_id_mixed[source]

get_spectrogram_and_label_id_mixed(audio, label)

def get_spectrogram(waveform):
    diff = [16000] - tf.shape(waveform)
    
    waveform = tf.cast(waveform, tf.float32)
    
    if diff >= 0:
        # Padding for files with less than 16000 samples
        zero_padding = tf.zeros([16000] - tf.shape(waveform), dtype=tf.float32)
        # Concatenate audio with padding so that all audio clips will be of the same length
        equal_length = tf.concat([waveform, zero_padding], 0)
    else:
        # Cut the tail if audio > 1 second
        equal_length = tf.slice(waveform, [0], [16000])
        
    spectrogram = tf.signal.stft(
      equal_length, frame_length=255, frame_step=128)

    spectrogram = tf.abs(spectrogram)

    return spectrogram

def get_spectrogram_and_label_id_digits(audio, label):
    spectrogram = get_spectrogram(audio)
    spectrogram = tf.expand_dims(spectrogram, -1)
    label_strings = np.array([str(num) for num in range(0,10)])
    label_id = tf.argmax(int(label == label_strings))
    return spectrogram, label_id

def get_spectrogram_and_label_id_letters(audio, label):
    spectrogram = get_spectrogram(audio)
    spectrogram = tf.expand_dims(spectrogram, -1)
    label_strings = np.array(list(string.ascii_uppercase))
    label_id = tf.argmax(int(label == label_strings))
    return spectrogram, label_id

def get_spectrogram_and_label_id_mixed(audio, label):
    spectrogram = get_spectrogram(audio)
    spectrogram = tf.expand_dims(spectrogram, -1)
    label_strings = np.array([str(num) for num in range(0,10)] + list(string.ascii_uppercase))
    label_id = tf.argmax(int(label == label_strings))
    return spectrogram, label_id

Combined pipeline

preprocess_dataset[source]

preprocess_dataset(files, dataset_type)

def preprocess_dataset(files, dataset_type):
    AUTOTUNE = tf.data.experimental.AUTOTUNE
    files_ds = tf.data.Dataset.from_tensor_slices(files)
    waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
    if dataset_type == 'digits':
        spectrogram_ds = waveform_ds.map(
          get_spectrogram_and_label_id_digits, num_parallel_calls=AUTOTUNE)
    elif dataset_type == 'letters':
        spectrogram_ds = waveform_ds.map(
          get_spectrogram_and_label_id_letters, num_parallel_calls=AUTOTUNE)
    elif dataset_type == 'mixed':
        spectrogram_ds = waveform_ds.map(
          get_spectrogram_and_label_id_mixed, num_parallel_calls=AUTOTUNE)
    return spectrogram_ds

Example of using the pipeline on digits data

DATASET_TYPE = 'digits'

# load classified filenames
filenames = load_fnames('noise_levels/digit_noise_levels/35U.data')
print('number of files:', len(filenames))

# shuffle
filenames = shuffle_data(filenames)

# Train/Validation/Test Split
split_result = train_test_split(filenames)
train_files = split_result[0]
val_files = split_result[1]
test_files = split_result[2]

# Process data using the combined pipeline
train_ds = preprocess_dataset(train_files, DATASET_TYPE)
val_ds = preprocess_dataset(val_files, DATASET_TYPE)
test_ds = preprocess_dataset(test_files, DATASET_TYPE)

print("Completed")
number of files: 1590
Training set size: 1272
Validation set size: 159
Test set size: 159
Completed