import csv
import random
import pickle
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import matplotlib.pyplot as plt
from scipy.stats import linregress
EMBEDDING_DIM: Dimension of the dense embedding, will be used in the embedding layer of the model. Defaults to 100.MAXLEN: Maximum length of all sequences. Defaults to 16.TRUNCATING: Truncating strategy (truncate either before or after each sequence.). Defaults to 'post'.PADDING: Padding strategy (pad either before or after each sequence.). Defaults to 'post'.OOV_TOKEN: Token to replace out-of-vocabulary words during text_to_sequence calls. Defaults to \"\MAX_EXAMPLES: Max number of examples to use. Defaults to 160000 (10% of the original number of examples)TRAINING_SPLIT: Proportion of data used for training. Defaults to 0.9EMBEDDING_DIM = 100
MAXLEN = 16
TRUNCATING = 'post'
PADDING = 'post'
OOV_TOKEN = "<OOV>"
MAX_EXAMPLES = 160000
TRAINING_SPLIT = 0.9
The dataset is provided in a csv file.
Each row of this file contains the following values separated by commas:
target: the polarity of the tweet (0 = negative, 4 = positive)
ids: The id of the tweet
date: the date of the tweet
flag: The query. If there is no query, then this value is NO_QUERY.
user: the user that tweeted
text: the text of the tweet
Take a look at the first two examples:
SENTIMENT_CSV = "./data/training_cleaned.csv"
with open(SENTIMENT_CSV, 'r') as csvfile:
print(f"First data point looks like this:\n\n{csvfile.readline()}")
print(f"Second data point looks like this:\n\n{csvfile.readline()}")
sentences = []
labels = []
with open(filename, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
for row in reader:
label = int(int(row[0]) * 1/4)
labels.append(label)
sentence = row[5]
sentences.append(sentence)
print(f"dataset contains {len(sentences)} examples\n")
print(f"Text of second example should look like this:\n{sentences[1]}\n")
print(f"Text of fourth example should look like this:\n{sentences[3]}")
print(f"\nLabels of last 5 examples should look like this:\n{labels[-5:]}")
You might have noticed that this dataset contains a lot of examples. We will keep a low execution time and will be using only 10% of the original data. The next cell does this while also randomnizing the datapoints that will be used:
# Bundle the two lists into a single one
sentences_and_labels = list(zip(sentences, labels))
# Perform random sampling
random.seed(42)
sentences_and_labels = random.sample(sentences_and_labels, MAX_EXAMPLES)
# Unpack back into separate lists
sentences, labels = zip(*sentences_and_labels)
print(f"There are {len(sentences)} sentences and {len(labels)} labels after random sampling\n")
# Compute the number of sentences that will be used for training (should be an integer)
train_size = int(len(sentences) * training_split)
# Split the sentences and labels into train/validation splits
train_sentences = sentences[0:train_size]
train_labels = labels[0:train_size]
validation_sentences = sentences[train_size:]
validation_labels = labels[train_size:]
print(f"There are {len(train_sentences)} sentences for training.\n")
print(f"There are {len(train_labels)} labels for training.\n")
print(f"There are {len(val_sentences)} sentences for validation.\n")
print(f"There are {len(val_labels)} labels for validation.")
# Instantiate the Tokenizer class, passing in the correct value for oov_token
tokenizer = Tokenizer(oov_token=oov_token)
# Fit the tokenizer to the training sentences
tokenizer.fit_on_texts(train_sentences)
word_index = tokenizer.word_index
VOCAB_SIZE = len(word_index)
print(f"Vocabulary contains {VOCAB_SIZE} words\n")
print("<OOV> token included in vocabulary" if "<OOV>" in word_index else "<OOV> token NOT included in vocabulary")
print(f"\nindex of word 'i' should be {word_index['i']}")
# Convert sentences to sequences
sequences = tokenizer.texts_to_sequences(sentences)
# Pad the sequences using the correct padding, truncating and maxlen
pad_trunc_sequences = pad_sequences(sequences, maxlen=maxlen, padding=padding, truncating=truncating)
print(f"Padded and truncated training sequences have shape: {train_pad_trunc_seq.shape}\n")
print(f"Padded and truncated validation sequences have shape: {val_pad_trunc_seq.shape}")
Remember that the pad_sequences function returns numpy arrays, so your training and validation sequences are already in this format.
However the labels are still Python lists. Before going forward we should convert them numpy arrays as well.
train_labels = np.array(train_labels)
val_labels = np.array(val_labels)
# Define path to file containing the embeddings
GLOVE_FILE = './data/glove.6B.100d.txt'
# Initialize an empty embeddings index dictionary
GLOVE_EMBEDDINGS = {}
# Read file and fill GLOVE_EMBEDDINGS with its contents
with open(GLOVE_FILE) as f:
for line in f:
values = line.split()
word = values[0]
coefs = np.asarray(values[1:], dtype='float32')
GLOVE_EMBEDDINGS[word] = coefs
Now you have access to GloVe's pre-trained word vectors. Isn't that cool?
Let's take a look at the vector for the word dog:
test_word = 'dog'
test_vector = GLOVE_EMBEDDINGS[test_word]
print(f"Vector representation of word {test_word} looks like this:\n\n{test_vector}")
print(f"Each word vector has shape: {test_vector.shape}")
# Initialize an empty numpy array with the appropriate size
EMBEDDINGS_MATRIX = np.zeros((VOCAB_SIZE+1, EMBEDDING_DIM))
# Iterate all of the words in the vocabulary and if the vector representation for
# each word exists within GloVe's representations, save it in the EMBEDDINGS_MATRIX array
for word, i in word_index.items():
embedding_vector = GLOVE_EMBEDDINGS.get(word)
if embedding_vector is not None:
EMBEDDINGS_MATRIX[i] = embedding_vector
Now you have the pre-trained embeddings ready to use!
model = tf.keras.Sequential([
# This is how you need to set the Embedding layer when using pre-trained embeddings
tf.keras.layers.Embedding(vocab_size+1, embedding_dim, input_length=maxlen, weights=[embeddings_matrix], trainable=False),
#tf.keras.layers.Conv1D(filters=128, kernel_size=5, activation='relu'),
tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(20)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'])
model.summary()
# Train the model and save the training history
history = model.fit(train_pad_trunc_seq, train_labels, epochs=20, validation_data=(val_pad_trunc_seq, val_labels))
Run the following cell to check your loss curves:
#-----------------------------------------------------------
# Retrieve a list of list results on training and test data
# sets for each training epoch
#-----------------------------------------------------------
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = [*range(20)]
#------------------------------------------------
# Plot training and validation loss per epoch
#------------------------------------------------
plt.plot(epochs, loss, 'r')
plt.plot(epochs, val_loss, 'b')
plt.title('Training and validation loss')
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend(["Loss", "Validation Loss"])
plt.show()
If you wish so, you can also check the training and validation accuracies of your model:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
#------------------------------------------------
# Plot training and validation accuracy per epoch
#------------------------------------------------
plt.plot(epochs, acc, 'r')
plt.plot(epochs, val_acc, 'b')
plt.title('Training and validation accuracy')
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend(["Accuracy", "Validation Accuracy"])
plt.show()
# Test the slope of your val_loss curve
slope, *_ = linregress(epochs, val_loss)
print(f"The slope of your validation loss curve is {slope:.5f}")