.. _sec_utils: Utility Functions and Classes ============================= This section contains the implementations of utility functions and classes used in this book. .. raw:: html
pytorchmxnetjaxtensorflow
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python import collections import inspect from IPython import display from torch import nn from d2l import torch as d2l .. raw:: html
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python import collections import inspect import random from IPython import display from mxnet import autograd, gluon, np, npx from mxnet.gluon import nn from d2l import mxnet as d2l npx.set_np() .. raw:: html
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python import collections import inspect import jax from IPython import display from d2l import jax as d2l .. raw:: latex \diilbookstyleoutputcell .. parsed-literal:: :class: output No GPU/TPU found, falling back to CPU. (Set TF_CPP_MIN_LOG_LEVEL=0 and rerun for more info.) .. raw:: html
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python import collections import inspect import tensorflow as tf from IPython import display from d2l import tensorflow as d2l .. raw:: html
.. raw:: html
Hyperparameters. .. raw:: latex \diilbookstyleinputcell .. code:: python @d2l.add_to_class(d2l.HyperParameters) #@save def save_hyperparameters(self, ignore=[]): """Save function arguments into class attributes.""" frame = inspect.currentframe().f_back _, _, _, local_vars = inspect.getargvalues(frame) self.hparams = {k:v for k, v in local_vars.items() if k not in set(ignore+['self']) and not k.startswith('_')} for k, v in self.hparams.items(): setattr(self, k, v) Progress bar. .. raw:: latex \diilbookstyleinputcell .. code:: python @d2l.add_to_class(d2l.ProgressBoard) #@save def draw(self, x, y, label, every_n=1): Point = collections.namedtuple('Point', ['x', 'y']) if not hasattr(self, 'raw_points'): self.raw_points = collections.OrderedDict() self.data = collections.OrderedDict() if label not in self.raw_points: self.raw_points[label] = [] self.data[label] = [] points = self.raw_points[label] line = self.data[label] points.append(Point(x, y)) if len(points) != every_n: return mean = lambda x: sum(x) / len(x) line.append(Point(mean([p.x for p in points]), mean([p.y for p in points]))) points.clear() if not self.display: return d2l.use_svg_display() if self.fig is None: self.fig = d2l.plt.figure(figsize=self.figsize) plt_lines, labels = [], [] for (k, v), ls, color in zip(self.data.items(), self.ls, self.colors): plt_lines.append(d2l.plt.plot([p.x for p in v], [p.y for p in v], linestyle=ls, color=color)[0]) labels.append(k) axes = self.axes if self.axes else d2l.plt.gca() if self.xlim: axes.set_xlim(self.xlim) if self.ylim: axes.set_ylim(self.ylim) if not self.xlabel: self.xlabel = self.x axes.set_xlabel(self.xlabel) axes.set_ylabel(self.ylabel) axes.set_xscale(self.xscale) axes.set_yscale(self.yscale) axes.legend(plt_lines, labels) display.display(self.fig) display.clear_output(wait=True) Add FrozenLake enviroment .. raw:: html
pytorch
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python def frozen_lake(seed): #@save # See https://www.gymlibrary.dev/environments/toy_text/frozen_lake/ to learn more about this env # How to process env.P.items is adpated from https://sites.google.com/view/deep-rl-bootcamp/labs import gym env = gym.make('FrozenLake-v1', is_slippery=False) env.seed(seed) env.action_space.np_random.seed(seed) env.action_space.seed(seed) env_info = {} env_info['desc'] = env.desc # 2D array specifying what each grid item means env_info['num_states'] = env.nS # Number of observations/states or obs/state dim env_info['num_actions'] = env.nA # Number of actions or action dim # Define indices for (transition probability, nextstate, reward, done) tuple env_info['trans_prob_idx'] = 0 # Index of transition probability entry env_info['nextstate_idx'] = 1 # Index of next state entry env_info['reward_idx'] = 2 # Index of reward entry env_info['done_idx'] = 3 # Index of done entry env_info['mdp'] = {} env_info['env'] = env for (s, others) in env.P.items(): # others(s) = {a0: [ (p(s'|s,a0), s', reward, done),...], a1:[...], ...} for (a, pxrds) in others.items(): # pxrds is [(p1,next1,r1,d1),(p2,next2,r2,d2),..]. # e.g. [(0.3, 0, 0, False), (0.3, 0, 0, False), (0.3, 4, 1, False)] env_info['mdp'][(s,a)] = pxrds return env_info .. raw:: html
.. raw:: html
Create enviroment .. raw:: html
pytorch
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python def make_env(name ='', seed=0): #@save # Input parameters: # name: specifies a gym environment. # For Value iteration, only FrozenLake-v1 is supported. if name == 'FrozenLake-v1': return frozen_lake(seed) else: raise ValueError("%s env is not supported in this Notebook") .. raw:: html
.. raw:: html
Show value function .. raw:: html
pytorch
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python def show_value_function_progress(env_desc, V, pi): #@save # This function visualizes how value and policy changes over time. # V: [num_iters, num_states] # pi: [num_iters, num_states] # How to visualize value function is adapted (but changed) from: https://sites.google.com/view/deep-rl-bootcamp/labs num_iters = V.shape[0] fig, ax = plt.subplots(figsize=(15, 15)) for k in range(V.shape[0]): plt.subplot(4, 4, k + 1) plt.imshow(V[k].reshape(4,4), cmap="bone") ax = plt.gca() ax.set_xticks(np.arange(0, 5)-.5, minor=True) ax.set_yticks(np.arange(0, 5)-.5, minor=True) ax.grid(which="minor", color="w", linestyle='-', linewidth=3) ax.tick_params(which="minor", bottom=False, left=False) ax.set_xticks([]) ax.set_yticks([]) # LEFT action: 0, DOWN action: 1 # RIGHT action: 2, UP action: 3 action2dxdy = {0:(-.25, 0),1: (0, .25), 2:(0.25, 0),3: (-.25, 0)} for y in range(4): for x in range(4): action = pi[k].reshape(4,4)[y, x] dx, dy = action2dxdy[action] if env_desc[y,x].decode() == 'H': ax.text(x, y, str(env_desc[y,x].decode()), ha="center", va="center", color="y", size=20, fontweight='bold') elif env_desc[y,x].decode() == 'G': ax.text(x, y, str(env_desc[y,x].decode()), ha="center", va="center", color="w", size=20, fontweight='bold') else: ax.text(x, y, str(env_desc[y,x].decode()), ha="center", va="center", color="g", size=15, fontweight='bold') # No arrow for cells with G and H labels if env_desc[y,x].decode() != 'G' and env_desc[y,x].decode() != 'H': ax.arrow(x, y, dx, dy, color='r', head_width=0.2, head_length=0.15) ax.set_title("Step = " + str(k + 1), fontsize=20) fig.tight_layout() plt.show() .. raw:: html
.. raw:: html
Show Q function .. raw:: html
pytorch
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python def show_Q_function_progress(env_desc, V_all, pi_all): #@save # This function visualizes how value and policy changes over time. # V: [num_iters, num_states] # pi: [num_iters, num_states] # We want to only shows few values num_iters_all = V_all.shape[0] num_iters = num_iters_all // 10 vis_indx = np.arange(0, num_iters_all, num_iters).tolist() vis_indx.append(num_iters_all - 1) V = np.zeros((len(vis_indx), V_all.shape[1])) pi = np.zeros((len(vis_indx), V_all.shape[1])) for c, i in enumerate(vis_indx): V[c] = V_all[i] pi[c] = pi_all[i] num_iters = V.shape[0] fig, ax = plt.subplots(figsize=(15, 15)) for k in range(V.shape[0]): plt.subplot(4, 4, k + 1) plt.imshow(V[k].reshape(4,4), cmap="bone") ax = plt.gca() ax.set_xticks(np.arange(0, 5)-.5, minor=True) ax.set_yticks(np.arange(0, 5)-.5, minor=True) ax.grid(which="minor", color="w", linestyle='-', linewidth=3) ax.tick_params(which="minor", bottom=False, left=False) ax.set_xticks([]) ax.set_yticks([]) # LEFT action: 0, DOWN action: 1 # RIGHT action: 2, UP action: 3 action2dxdy = {0:(-.25, 0),1:(0, .25), 2:(0.25, 0),3:(-.25, 0)} for y in range(4): for x in range(4): action = pi[k].reshape(4,4)[y, x] dx, dy = action2dxdy[action] if env_desc[y,x].decode() == 'H': ax.text(x, y, str(env_desc[y,x].decode()), ha="center", va="center", color="y", size=20, fontweight='bold') elif env_desc[y,x].decode() == 'G': ax.text(x, y, str(env_desc[y,x].decode()), ha="center", va="center", color="w", size=20, fontweight='bold') else: ax.text(x, y, str(env_desc[y,x].decode()), ha="center", va="center", color="g", size=15, fontweight='bold') # No arrow for cells with G and H labels if env_desc[y,x].decode() != 'G' and env_desc[y,x].decode() != 'H': ax.arrow(x, y, dx, dy, color='r', head_width=0.2, head_length=0.15) ax.set_title("Step = " + str(vis_indx[k] + 1), fontsize=20) fig.tight_layout() plt.show() .. raw:: html
.. raw:: html
Trainer A bunch of functions that will be deprecated: .. raw:: html
pytorchmxnetjaxtensorflow
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python def load_array(data_arrays, batch_size, is_train=True): #@save """Construct a PyTorch data iterator.""" dataset = torch.utils.data.TensorDataset(*data_arrays) return torch.utils.data.DataLoader(dataset, batch_size, shuffle=is_train) def synthetic_data(w, b, num_examples): #@save """Generate y = Xw + b + noise.""" X = torch.normal(0, 1, (num_examples, len(w))) y = torch.matmul(X, w) + b y += torch.normal(0, 0.01, y.shape) return X, y.reshape((-1, 1)) def sgd(params, lr, batch_size): #@save """Minibatch stochastic gradient descent.""" with torch.no_grad(): for param in params: param -= lr * param.grad / batch_size param.grad.zero_() def get_dataloader_workers(): #@save """Use 4 processes to read the data.""" return 4 def load_data_fashion_mnist(batch_size, resize=None): #@save """Download the Fashion-MNIST dataset and then load it into memory.""" trans = [transforms.ToTensor()] if resize: trans.insert(0, transforms.Resize(resize)) trans = transforms.Compose(trans) mnist_train = torchvision.datasets.FashionMNIST( root="../data", train=True, transform=trans, download=True) mnist_test = torchvision.datasets.FashionMNIST( root="../data", train=False, transform=trans, download=True) return (torch.utils.data.DataLoader(mnist_train, batch_size, shuffle=True, num_workers=get_dataloader_workers()), torch.utils.data.DataLoader(mnist_test, batch_size, shuffle=False, num_workers=get_dataloader_workers())) def evaluate_accuracy_gpu(net, data_iter, device=None): #@save """Compute the accuracy for a model on a dataset using a GPU.""" if isinstance(net, nn.Module): net.eval() # Set the model to evaluation mode if not device: device = next(iter(net.parameters())).device # No. of correct predictions, no. of predictions metric = d2l.Accumulator(2) with torch.no_grad(): for X, y in data_iter: if isinstance(X, list): # Required for BERT Fine-tuning (to be covered later) X = [x.to(device) for x in X] else: X = X.to(device) y = y.to(device) metric.add(d2l.accuracy(net(X), y), y.numel()) return metric[0] / metric[1] #@save def train_ch6(net, train_iter, test_iter, num_epochs, lr, device): """Train a model with a GPU (defined in Chapter 6).""" def init_weights(m): if type(m) == nn.Linear or type(m) == nn.Conv2d: nn.init.xavier_uniform_(m.weight) net.apply(init_weights) print('training on', device) net.to(device) optimizer = torch.optim.SGD(net.parameters(), lr=lr) loss = nn.CrossEntropyLoss() animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test acc']) timer, num_batches = d2l.Timer(), len(train_iter) for epoch in range(num_epochs): # Sum of training loss, sum of training accuracy, no. of examples metric = d2l.Accumulator(3) net.train() for i, (X, y) in enumerate(train_iter): timer.start() optimizer.zero_grad() X, y = X.to(device), y.to(device) y_hat = net(X) l = loss(y_hat, y) l.backward() optimizer.step() with torch.no_grad(): metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0]) timer.stop() train_l = metric[0] / metric[2] train_acc = metric[1] / metric[2] if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1: animator.add(epoch + (i + 1) / num_batches, (train_l, train_acc, None)) test_acc = evaluate_accuracy_gpu(net, test_iter) animator.add(epoch + 1, (None, None, test_acc)) print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, ' f'test acc {test_acc:.3f}') print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec ' f'on {str(device)}') def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save """Plot a list of images.""" figsize = (num_cols * scale, num_rows * scale) _, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize) axes = axes.flatten() for i, (ax, img) in enumerate(zip(axes, imgs)): try: img = img.detach().numpy() except: pass ax.imshow(img) ax.axes.get_xaxis().set_visible(False) ax.axes.get_yaxis().set_visible(False) if titles: ax.set_title(titles[i]) return axes def linreg(X, w, b): #@save """The linear regression model.""" return torch.matmul(X, w) + b def squared_loss(y_hat, y): #@save """Squared loss.""" return (y_hat - y.reshape(y_hat.shape)) ** 2 / 2 def get_fashion_mnist_labels(labels): #@save """Return text labels for the Fashion-MNIST dataset.""" text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat', 'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot'] return [text_labels[int(i)] for i in labels] class Animator: #@save """For plotting data in animation.""" def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1, figsize=(3.5, 2.5)): # Incrementally plot multiple lines if legend is None: legend = [] d2l.use_svg_display() self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize) if nrows * ncols == 1: self.axes = [self.axes, ] # Use a lambda function to capture arguments self.config_axes = lambda: d2l.set_axes( self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend) self.X, self.Y, self.fmts = None, None, fmts def add(self, x, y): # Add multiple data points into the figure if not hasattr(y, "__len__"): y = [y] n = len(y) if not hasattr(x, "__len__"): x = [x] * n if not self.X: self.X = [[] for _ in range(n)] if not self.Y: self.Y = [[] for _ in range(n)] for i, (a, b) in enumerate(zip(x, y)): if a is not None and b is not None: self.X[i].append(a) self.Y[i].append(b) self.axes[0].cla() for x, y, fmt in zip(self.X, self.Y, self.fmts): self.axes[0].plot(x, y, fmt) self.config_axes() display.display(self.fig) display.clear_output(wait=True) class Accumulator: #@save """For accumulating sums over `n` variables.""" def __init__(self, n): self.data = [0.0] * n def add(self, *args): self.data = [a + float(b) for a, b in zip(self.data, args)] def reset(self): self.data = [0.0] * len(self.data) def __getitem__(self, idx): return self.data[idx] def accuracy(y_hat, y): #@save """Compute the number of correct predictions.""" if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: y_hat = y_hat.argmax(axis=1) cmp = y_hat.type(y.dtype) == y return float(cmp.type(y.dtype).sum()) import hashlib import os import tarfile import zipfile import requests def download(url, folder='../data', sha1_hash=None): #@save """Download a file to folder and return the local filepath.""" if not url.startswith('http'): # For back compatability url, sha1_hash = DATA_HUB[url] os.makedirs(folder, exist_ok=True) fname = os.path.join(folder, url.split('/')[-1]) # Check if hit cache if os.path.exists(fname) and sha1_hash: sha1 = hashlib.sha1() with open(fname, 'rb') as f: while True: data = f.read(1048576) if not data: break sha1.update(data) if sha1.hexdigest() == sha1_hash: return fname # Download print(f'Downloading {fname} from {url}...') r = requests.get(url, stream=True, verify=True) with open(fname, 'wb') as f: f.write(r.content) return fname def extract(filename, folder=None): #@save """Extract a zip/tar file into folder.""" base_dir = os.path.dirname(filename) _, ext = os.path.splitext(filename) assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.' if ext == '.zip': fp = zipfile.ZipFile(filename, 'r') else: fp = tarfile.open(filename, 'r') if folder is None: folder = base_dir fp.extractall(folder) def download_extract(name, folder=None): #@save """Download and extract a zip/tar file.""" fname = download(name) base_dir = os.path.dirname(fname) data_dir, ext = os.path.splitext(fname) if ext == '.zip': fp = zipfile.ZipFile(fname, 'r') elif ext in ('.tar', '.gz'): fp = tarfile.open(fname, 'r') else: assert False, 'Only zip/tar files can be extracted.' fp.extractall(base_dir) return os.path.join(base_dir, folder) if folder else data_dir def tokenize(lines, token='word'): #@save """Split text lines into word or character tokens.""" assert token in ('word', 'char'), 'Unknown token type: ' + token return [line.split() if token == 'word' else list(line) for line in lines] def evaluate_loss(net, data_iter, loss): #@save """Evaluate the loss of a model on the given dataset.""" metric = d2l.Accumulator(2) # Sum of losses, no. of examples for X, y in data_iter: out = net(X) y = y.reshape(out.shape) l = loss(out, y) metric.add(l.sum(), l.numel()) return metric[0] / metric[1] def grad_clipping(net, theta): #@save """Clip the gradient.""" if isinstance(net, nn.Module): params = [p for p in net.parameters() if p.requires_grad] else: params = net.params norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params)) if norm > theta: for param in params: param.grad[:] *= theta / norm .. raw:: html
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python def load_array(data_arrays, batch_size, is_train=True): #@save """Construct a Gluon data iterator.""" dataset = gluon.data.ArrayDataset(*data_arrays) return gluon.data.DataLoader(dataset, batch_size, shuffle=is_train) def synthetic_data(w, b, num_examples): #@save """Generate y = Xw + b + noise.""" X = np.random.normal(0, 1, (num_examples, len(w))) y = np.dot(X, w) + b y += np.random.normal(0, 0.01, y.shape) return X, y.reshape((-1, 1)) def sgd(params, lr, batch_size): #@save """Minibatch stochastic gradient descent.""" for param in params: param[:] = param - lr * param.grad / batch_size def get_dataloader_workers(): #@save """Use 4 processes to read the data except for Windows.""" return 0 if sys.platform.startswith('win') else 4 def load_data_fashion_mnist(batch_size, resize=None): #@save """Download the Fashion-MNIST dataset and then load it into memory.""" dataset = gluon.data.vision trans = [dataset.transforms.ToTensor()] if resize: trans.insert(0, dataset.transforms.Resize(resize)) trans = dataset.transforms.Compose(trans) mnist_train = dataset.FashionMNIST(train=True).transform_first(trans) mnist_test = dataset.FashionMNIST(train=False).transform_first(trans) return (gluon.data.DataLoader(mnist_train, batch_size, shuffle=True, num_workers=get_dataloader_workers()), gluon.data.DataLoader(mnist_test, batch_size, shuffle=False, num_workers=get_dataloader_workers())) def evaluate_accuracy_gpu(net, data_iter, device=None): #@save """Compute the accuracy for a model on a dataset using a GPU.""" if not device: # Query the first device where the first parameter is on device = list(net.collect_params().values())[0].list_ctx()[0] # No. of correct predictions, no. of predictions metric = d2l.Accumulator(2) for X, y in data_iter: X, y = X.as_in_ctx(device), y.as_in_ctx(device) metric.add(d2l.accuracy(net(X), y), d2l.size(y)) return metric[0] / metric[1] #@save def train_ch6(net, train_iter, test_iter, num_epochs, lr, device): """Train a model with a GPU (defined in Chapter 6).""" net.initialize(force_reinit=True, ctx=device, init=init.Xavier()) loss = gluon.loss.SoftmaxCrossEntropyLoss() trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': lr}) animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test acc']) timer, num_batches = d2l.Timer(), len(train_iter) for epoch in range(num_epochs): # Sum of training loss, sum of training accuracy, no. of examples metric = d2l.Accumulator(3) for i, (X, y) in enumerate(train_iter): timer.start() # Here is the major difference from `d2l.train_epoch_ch3` X, y = X.as_in_ctx(device), y.as_in_ctx(device) with autograd.record(): y_hat = net(X) l = loss(y_hat, y) l.backward() trainer.step(X.shape[0]) metric.add(l.sum(), d2l.accuracy(y_hat, y), X.shape[0]) timer.stop() train_l = metric[0] / metric[2] train_acc = metric[1] / metric[2] if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1: animator.add(epoch + (i + 1) / num_batches, (train_l, train_acc, None)) test_acc = evaluate_accuracy_gpu(net, test_iter) animator.add(epoch + 1, (None, None, test_acc)) print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, ' f'test acc {test_acc:.3f}') print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec ' f'on {str(device)}') def grad_clipping(net, theta): #@save """Clip the gradient.""" if isinstance(net, gluon.Block): params = [p.data() for p in net.collect_params().values()] else: params = net.params norm = math.sqrt(sum((p.grad ** 2).sum() for p in params)) if norm > theta: for param in params: param.grad[:] *= theta / norm def evaluate_accuracy(net, data_iter): #@save """Compute the accuracy for a model on a dataset.""" metric = Accumulator(2) # No. of correct predictions, no. of predictions for X, y in data_iter: metric.add(accuracy(net(X), y), d2l.size(y)) return metric[0] / metric[1] def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save """Plot a list of images.""" figsize = (num_cols * scale, num_rows * scale) _, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize) axes = axes.flatten() for i, (ax, img) in enumerate(zip(axes, imgs)): try: img = img.asnumpy() except: pass ax.imshow(img) ax.axes.get_xaxis().set_visible(False) ax.axes.get_yaxis().set_visible(False) if titles: ax.set_title(titles[i]) return axes def linreg(X, w, b): #@save """The linear regression model.""" return np.dot(X, w) + b def squared_loss(y_hat, y): #@save """Squared loss.""" return (y_hat - y.reshape(y_hat.shape)) ** 2 / 2 def get_fashion_mnist_labels(labels): #@save """Return text labels for the Fashion-MNIST dataset.""" text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat', 'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot'] return [text_labels[int(i)] for i in labels] class Animator: #@save """For plotting data in animation.""" def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1, figsize=(3.5, 2.5)): # Incrementally plot multiple lines if legend is None: legend = [] d2l.use_svg_display() self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize) if nrows * ncols == 1: self.axes = [self.axes, ] # Use a lambda function to capture arguments self.config_axes = lambda: d2l.set_axes( self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend) self.X, self.Y, self.fmts = None, None, fmts def add(self, x, y): # Add multiple data points into the figure if not hasattr(y, "__len__"): y = [y] n = len(y) if not hasattr(x, "__len__"): x = [x] * n if not self.X: self.X = [[] for _ in range(n)] if not self.Y: self.Y = [[] for _ in range(n)] for i, (a, b) in enumerate(zip(x, y)): if a is not None and b is not None: self.X[i].append(a) self.Y[i].append(b) self.axes[0].cla() for x, y, fmt in zip(self.X, self.Y, self.fmts): self.axes[0].plot(x, y, fmt) self.config_axes() display.display(self.fig) display.clear_output(wait=True) class Accumulator: #@save """For accumulating sums over `n` variables.""" def __init__(self, n): self.data = [0.0] * n def add(self, *args): self.data = [a + float(b) for a, b in zip(self.data, args)] def reset(self): self.data = [0.0] * len(self.data) def __getitem__(self, idx): return self.data[idx] def accuracy(y_hat, y): #@save """Compute the number of correct predictions.""" if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: y_hat = y_hat.argmax(axis=1) cmp = y_hat.astype(y.dtype) == y return float(cmp.astype(y.dtype).sum()) import hashlib import os import tarfile import zipfile import requests def download(url, folder='../data', sha1_hash=None): #@save """Download a file to folder and return the local filepath.""" if not url.startswith('http'): # For back compatability url, sha1_hash = DATA_HUB[url] os.makedirs(folder, exist_ok=True) fname = os.path.join(folder, url.split('/')[-1]) # Check if hit cache if os.path.exists(fname) and sha1_hash: sha1 = hashlib.sha1() with open(fname, 'rb') as f: while True: data = f.read(1048576) if not data: break sha1.update(data) if sha1.hexdigest() == sha1_hash: return fname # Download print(f'Downloading {fname} from {url}...') r = requests.get(url, stream=True, verify=True) with open(fname, 'wb') as f: f.write(r.content) return fname def extract(filename, folder=None): #@save """Extract a zip/tar file into folder.""" base_dir = os.path.dirname(filename) _, ext = os.path.splitext(filename) assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.' if ext == '.zip': fp = zipfile.ZipFile(filename, 'r') else: fp = tarfile.open(filename, 'r') if folder is None: folder = base_dir fp.extractall(folder) def download_extract(name, folder=None): #@save """Download and extract a zip/tar file.""" fname = download(name) base_dir = os.path.dirname(fname) data_dir, ext = os.path.splitext(fname) if ext == '.zip': fp = zipfile.ZipFile(fname, 'r') elif ext in ('.tar', '.gz'): fp = tarfile.open(fname, 'r') else: assert False, 'Only zip/tar files can be extracted.' fp.extractall(base_dir) return os.path.join(base_dir, folder) if folder else data_dir def tokenize(lines, token='word'): #@save """Split text lines into word or character tokens.""" assert token in ('word', 'char'), 'Unknown token type: ' + token return [line.split() if token == 'word' else list(line) for line in lines] def evaluate_loss(net, data_iter, loss): #@save """Evaluate the loss of a model on the given dataset.""" metric = d2l.Accumulator(2) # Sum of losses, no. of examples for X, y in data_iter: l = loss(net(X), y) metric.add(l.sum(), d2l.size(l)) return metric[0] / metric[1] .. raw:: html
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save """Plot a list of images.""" figsize = (num_cols * scale, num_rows * scale) _, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize) axes = axes.flatten() for i, (ax, img) in enumerate(zip(axes, imgs)): try: img = np.asarray(img) except: pass ax.imshow(img) ax.axes.get_xaxis().set_visible(False) ax.axes.get_yaxis().set_visible(False) if titles: ax.set_title(titles[i]) return axes import hashlib import os import tarfile import zipfile import requests def download(url, folder='../data', sha1_hash=None): #@save """Download a file to folder and return the local filepath.""" if not url.startswith('http'): # For back compatability url, sha1_hash = DATA_HUB[url] os.makedirs(folder, exist_ok=True) fname = os.path.join(folder, url.split('/')[-1]) # Check if hit cache if os.path.exists(fname) and sha1_hash: sha1 = hashlib.sha1() with open(fname, 'rb') as f: while True: data = f.read(1048576) if not data: break sha1.update(data) if sha1.hexdigest() == sha1_hash: return fname # Download print(f'Downloading {fname} from {url}...') r = requests.get(url, stream=True, verify=True) with open(fname, 'wb') as f: f.write(r.content) return fname def extract(filename, folder=None): #@save """Extract a zip/tar file into folder.""" base_dir = os.path.dirname(filename) _, ext = os.path.splitext(filename) assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.' if ext == '.zip': fp = zipfile.ZipFile(filename, 'r') else: fp = tarfile.open(filename, 'r') if folder is None: folder = base_dir fp.extractall(folder) .. raw:: html
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python def load_array(data_arrays, batch_size, is_train=True): #@save """Construct a TensorFlow data iterator.""" dataset = tf.data.Dataset.from_tensor_slices(data_arrays) if is_train: dataset = dataset.shuffle(buffer_size=1000) dataset = dataset.batch(batch_size) return dataset def synthetic_data(w, b, num_examples): #@save """Generate y = Xw + b + noise.""" X = tf.zeros((num_examples, w.shape[0])) X += tf.random.normal(shape=X.shape) y = tf.matmul(X, tf.reshape(w, (-1, 1))) + b y += tf.random.normal(shape=y.shape, stddev=0.01) y = tf.reshape(y, (-1, 1)) return X, y def sgd(params, grads, lr, batch_size): #@save """Minibatch stochastic gradient descent.""" for param, grad in zip(params, grads): param.assign_sub(lr * grad / batch_size) def load_data_fashion_mnist(batch_size, resize=None): #@save """Download the Fashion-MNIST dataset and then load it into memory.""" mnist_train, mnist_test = tf.keras.datasets.fashion_mnist.load_data() # Divide all numbers by 255 so that all pixel values are between # 0 and 1, add a batch dimension at the last. And cast label to int32 process = lambda X, y: (tf.expand_dims(X, axis=3) / 255, tf.cast(y, dtype='int32')) resize_fn = lambda X, y: ( tf.image.resize_with_pad(X, resize, resize) if resize else X, y) return ( tf.data.Dataset.from_tensor_slices(process(*mnist_train)).batch( batch_size).shuffle(len(mnist_train[0])).map(resize_fn), tf.data.Dataset.from_tensor_slices(process(*mnist_test)).batch( batch_size).map(resize_fn)) class TrainCallback(tf.keras.callbacks.Callback): #@save """A callback to visiualize the training progress.""" def __init__(self, net, train_iter, test_iter, num_epochs, device_name): self.timer = d2l.Timer() self.animator = d2l.Animator( xlabel='epoch', xlim=[1, num_epochs], legend=[ 'train loss', 'train acc', 'test acc']) self.net = net self.train_iter = train_iter self.test_iter = test_iter self.num_epochs = num_epochs self.device_name = device_name def on_epoch_begin(self, epoch, logs=None): self.timer.start() def on_epoch_end(self, epoch, logs): self.timer.stop() test_acc = self.net.evaluate( self.test_iter, verbose=0, return_dict=True)['accuracy'] metrics = (logs['loss'], logs['accuracy'], test_acc) self.animator.add(epoch + 1, metrics) if epoch == self.num_epochs - 1: batch_size = next(iter(self.train_iter))[0].shape[0] num_examples = batch_size * tf.data.experimental.cardinality( self.train_iter).numpy() print(f'loss {metrics[0]:.3f}, train acc {metrics[1]:.3f}, ' f'test acc {metrics[2]:.3f}') print(f'{num_examples / self.timer.avg():.1f} examples/sec on ' f'{str(self.device_name)}') #@save def train_ch6(net_fn, train_iter, test_iter, num_epochs, lr, device): """Train a model with a GPU (defined in Chapter 6).""" device_name = device._device_name strategy = tf.distribute.OneDeviceStrategy(device_name) with strategy.scope(): optimizer = tf.keras.optimizers.SGD(learning_rate=lr) loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) net = net_fn() net.compile(optimizer=optimizer, loss=loss, metrics=['accuracy']) callback = TrainCallback(net, train_iter, test_iter, num_epochs, device_name) net.fit(train_iter, epochs=num_epochs, verbose=0, callbacks=[callback]) return net def evaluate_accuracy(net, data_iter): #@save """Compute the accuracy for a model on a dataset.""" metric = Accumulator(2) # No. of correct predictions, no. of predictions for X, y in data_iter: metric.add(accuracy(net(X), y), d2l.size(y)) return metric[0] / metric[1] def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save """Plot a list of images.""" figsize = (num_cols * scale, num_rows * scale) _, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize) axes = axes.flatten() for i, (ax, img) in enumerate(zip(axes, imgs)): try: img = img.numpy() except: pass ax.imshow(img) ax.axes.get_xaxis().set_visible(False) ax.axes.get_yaxis().set_visible(False) if titles: ax.set_title(titles[i]) return axes def linreg(X, w, b): #@save """The linear regression model.""" return tf.matmul(X, w) + b def squared_loss(y_hat, y): #@save """Squared loss.""" return (y_hat - tf.reshape(y, y_hat.shape)) ** 2 / 2 def get_fashion_mnist_labels(labels): #@save """Return text labels for the Fashion-MNIST dataset.""" text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat', 'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot'] return [text_labels[int(i)] for i in labels] class Animator: #@save """For plotting data in animation.""" def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1, figsize=(3.5, 2.5)): # Incrementally plot multiple lines if legend is None: legend = [] d2l.use_svg_display() self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize) if nrows * ncols == 1: self.axes = [self.axes, ] # Use a lambda function to capture arguments self.config_axes = lambda: d2l.set_axes( self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend) self.X, self.Y, self.fmts = None, None, fmts def add(self, x, y): # Add multiple data points into the figure if not hasattr(y, "__len__"): y = [y] n = len(y) if not hasattr(x, "__len__"): x = [x] * n if not self.X: self.X = [[] for _ in range(n)] if not self.Y: self.Y = [[] for _ in range(n)] for i, (a, b) in enumerate(zip(x, y)): if a is not None and b is not None: self.X[i].append(a) self.Y[i].append(b) self.axes[0].cla() for x, y, fmt in zip(self.X, self.Y, self.fmts): self.axes[0].plot(x, y, fmt) self.config_axes() display.display(self.fig) display.clear_output(wait=True) class Accumulator: #@save """For accumulating sums over `n` variables.""" def __init__(self, n): self.data = [0.0] * n def add(self, *args): self.data = [a + float(b) for a, b in zip(self.data, args)] def reset(self): self.data = [0.0] * len(self.data) def __getitem__(self, idx): return self.data[idx] def accuracy(y_hat, y): #@save """Compute the number of correct predictions.""" if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: y_hat = tf.argmax(y_hat, axis=1) cmp = tf.cast(y_hat, y.dtype) == y return float(tf.reduce_sum(tf.cast(cmp, y.dtype))) import hashlib import os import tarfile import zipfile import requests def download(url, folder='../data', sha1_hash=None): #@save """Download a file to folder and return the local filepath.""" if not url.startswith('http'): # For back compatability url, sha1_hash = DATA_HUB[url] os.makedirs(folder, exist_ok=True) fname = os.path.join(folder, url.split('/')[-1]) # Check if hit cache if os.path.exists(fname) and sha1_hash: sha1 = hashlib.sha1() with open(fname, 'rb') as f: while True: data = f.read(1048576) if not data: break sha1.update(data) if sha1.hexdigest() == sha1_hash: return fname # Download print(f'Downloading {fname} from {url}...') r = requests.get(url, stream=True, verify=True) with open(fname, 'wb') as f: f.write(r.content) return fname def extract(filename, folder=None): #@save """Extract a zip/tar file into folder.""" base_dir = os.path.dirname(filename) _, ext = os.path.splitext(filename) assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.' if ext == '.zip': fp = zipfile.ZipFile(filename, 'r') else: fp = tarfile.open(filename, 'r') if folder is None: folder = base_dir fp.extractall(folder) def download_extract(name, folder=None): #@save """Download and extract a zip/tar file.""" fname = download(name) base_dir = os.path.dirname(fname) data_dir, ext = os.path.splitext(fname) if ext == '.zip': fp = zipfile.ZipFile(fname, 'r') elif ext in ('.tar', '.gz'): fp = tarfile.open(fname, 'r') else: assert False, 'Only zip/tar files can be extracted.' fp.extractall(base_dir) return os.path.join(base_dir, folder) if folder else data_dir def tokenize(lines, token='word'): #@save """Split text lines into word or character tokens.""" assert token in ('word', 'char'), 'Unknown token type: ' + token return [line.split() if token == 'word' else list(line) for line in lines] def evaluate_loss(net, data_iter, loss): #@save """Evaluate the loss of a model on the given dataset.""" metric = d2l.Accumulator(2) # Sum of losses, no. of examples for X, y in data_iter: l = loss(net(X), y) metric.add(tf.reduce_sum(l), d2l.size(l)) return metric[0] / metric[1] def grad_clipping(grads, theta): #@save """Clip the gradient.""" theta = tf.constant(theta, dtype=tf.float32) new_grad = [] for grad in grads: if isinstance(grad, tf.IndexedSlices): new_grad.append(tf.convert_to_tensor(grad)) else: new_grad.append(grad) norm = tf.math.sqrt(sum((tf.reduce_sum(grad ** 2)).numpy() for grad in new_grad)) norm = tf.cast(norm, tf.float32) if tf.greater(norm, theta): for i, grad in enumerate(new_grad): new_grad[i] = grad * theta / norm else: new_grad = new_grad return new_grad .. raw:: html
.. raw:: html
More for the attention chapter. .. raw:: html
pytorchmxnettensorflow
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python #@save d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip', '94646ad1522d915e7b0f9296181140edcf86a4f5') #@save def read_data_nmt(): """Load the English-French dataset.""" data_dir = d2l.download_extract('fra-eng') with open(os.path.join(data_dir, 'fra.txt'), 'r', encoding='utf-8') as f: return f.read() #@save def preprocess_nmt(text): """Preprocess the English-French dataset.""" def no_space(char, prev_char): return char in set(',.!?') and prev_char != ' ' # Replace non-breaking space with space, and convert uppercase letters to # lowercase ones text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower() # Insert space between words and punctuation marks out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else char for i, char in enumerate(text)] return ''.join(out) #@save def tokenize_nmt(text, num_examples=None): """Tokenize the English-French dataset.""" source, target = [], [] for i, line in enumerate(text.split('\n')): if num_examples and i > num_examples: break parts = line.split('\t') if len(parts) == 2: source.append(parts[0].split(' ')) target.append(parts[1].split(' ')) return source, target #@save def truncate_pad(line, num_steps, padding_token): """Truncate or pad sequences.""" if len(line) > num_steps: return line[:num_steps] # Truncate return line + [padding_token] * (num_steps - len(line)) # Pad #@save def build_array_nmt(lines, vocab, num_steps): """Transform text sequences of machine translation into minibatches.""" lines = [vocab[l] for l in lines] lines = [l + [vocab['']] for l in lines] array = torch.tensor([truncate_pad( l, num_steps, vocab['']) for l in lines]) valid_len = (array != vocab['']).type(torch.int32).sum(1) return array, valid_len #@save def load_data_nmt(batch_size, num_steps, num_examples=600): """Return the iterator and the vocabularies of the translation dataset.""" text = preprocess_nmt(read_data_nmt()) source, target = tokenize_nmt(text, num_examples) src_vocab = d2l.Vocab(source, min_freq=2, reserved_tokens=['', '', '']) tgt_vocab = d2l.Vocab(target, min_freq=2, reserved_tokens=['', '', '']) src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps) tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps) data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len) data_iter = d2l.load_array(data_arrays, batch_size) return data_iter, src_vocab, tgt_vocab #@save def sequence_mask(X, valid_len, value=0): """Mask irrelevant entries in sequences.""" maxlen = X.size(1) mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None] X[~mask] = value return X #@save class MaskedSoftmaxCELoss(nn.CrossEntropyLoss): """The softmax cross-entropy loss with masks.""" # `pred` shape: (`batch_size`, `num_steps`, `vocab_size`) # `label` shape: (`batch_size`, `num_steps`) # `valid_len` shape: (`batch_size`,) def forward(self, pred, label, valid_len): weights = torch.ones_like(label) weights = sequence_mask(weights, valid_len) self.reduction='none' unweighted_loss = super(MaskedSoftmaxCELoss, self).forward( pred.permute(0, 2, 1), label) weighted_loss = (unweighted_loss * weights).mean(dim=1) return weighted_loss #@save def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device): """Train a model for sequence to sequence.""" def xavier_init_weights(m): if type(m) == nn.Linear: nn.init.xavier_uniform_(m.weight) if type(m) == nn.GRU: for param in m._flat_weights_names: if "weight" in param: nn.init.xavier_uniform_(m._parameters[param]) net.apply(xavier_init_weights) net.to(device) optimizer = torch.optim.Adam(net.parameters(), lr=lr) loss = MaskedSoftmaxCELoss() net.train() animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[10, num_epochs]) for epoch in range(num_epochs): timer = d2l.Timer() metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens for batch in data_iter: optimizer.zero_grad() X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch] bos = torch.tensor([tgt_vocab['']] * Y.shape[0], device=device).reshape(-1, 1) dec_input = torch.cat([bos, Y[:, :-1]], 1) # Teacher forcing Y_hat, _ = net(X, dec_input, X_valid_len) l = loss(Y_hat, Y, Y_valid_len) l.sum().backward() # Make the loss scalar for `backward` d2l.grad_clipping(net, 1) num_tokens = Y_valid_len.sum() optimizer.step() with torch.no_grad(): metric.add(l.sum(), num_tokens) if (epoch + 1) % 10 == 0: animator.add(epoch + 1, (metric[0] / metric[1],)) print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} ' f'tokens/sec on {str(device)}') #@save def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, device, save_attention_weights=False): """Predict for sequence to sequence.""" # Set `net` to eval mode for inference net.eval() src_tokens = src_vocab[src_sentence.lower().split(' ')] + [ src_vocab['']] enc_valid_len = torch.tensor([len(src_tokens)], device=device) src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['']) # Add the batch axis enc_X = torch.unsqueeze( torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0) enc_outputs = net.encoder(enc_X, enc_valid_len) dec_state = net.decoder.init_state(enc_outputs, enc_valid_len) # Add the batch axis dec_X = torch.unsqueeze(torch.tensor( [tgt_vocab['']], dtype=torch.long, device=device), dim=0) output_seq, attention_weight_seq = [], [] for _ in range(num_steps): Y, dec_state = net.decoder(dec_X, dec_state) # We use the token with the highest prediction likelihood as input # of the decoder at the next time step dec_X = Y.argmax(dim=2) pred = dec_X.squeeze(dim=0).type(torch.int32).item() # Save attention weights (to be covered later) if save_attention_weights: attention_weight_seq.append(net.decoder.attention_weights) # Once the end-of-sequence token is predicted, the generation of the # output sequence is complete if pred == tgt_vocab['']: break output_seq.append(pred) return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq .. raw:: html
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python #@save d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip', '94646ad1522d915e7b0f9296181140edcf86a4f5') #@save def read_data_nmt(): """Load the English-French dataset.""" data_dir = d2l.download_extract('fra-eng') with open(os.path.join(data_dir, 'fra.txt'), 'r', encoding='utf-8') as f: return f.read() #@save def preprocess_nmt(text): """Preprocess the English-French dataset.""" def no_space(char, prev_char): return char in set(',.!?') and prev_char != ' ' # Replace non-breaking space with space, and convert uppercase letters to # lowercase ones text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower() # Insert space between words and punctuation marks out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else char for i, char in enumerate(text)] return ''.join(out) #@save def tokenize_nmt(text, num_examples=None): """Tokenize the English-French dataset.""" source, target = [], [] for i, line in enumerate(text.split('\n')): if num_examples and i > num_examples: break parts = line.split('\t') if len(parts) == 2: source.append(parts[0].split(' ')) target.append(parts[1].split(' ')) return source, target #@save def truncate_pad(line, num_steps, padding_token): """Truncate or pad sequences.""" if len(line) > num_steps: return line[:num_steps] # Truncate return line + [padding_token] * (num_steps - len(line)) # Pad #@save def build_array_nmt(lines, vocab, num_steps): """Transform text sequences of machine translation into minibatches.""" lines = [vocab[l] for l in lines] lines = [l + [vocab['']] for l in lines] array = np.array([truncate_pad( l, num_steps, vocab['']) for l in lines]) valid_len = (array != vocab['']).astype(np.int32).sum(1) return array, valid_len #@save def load_data_nmt(batch_size, num_steps, num_examples=600): """Return the iterator and the vocabularies of the translation dataset.""" text = preprocess_nmt(read_data_nmt()) source, target = tokenize_nmt(text, num_examples) src_vocab = d2l.Vocab(source, min_freq=2, reserved_tokens=['', '', '']) tgt_vocab = d2l.Vocab(target, min_freq=2, reserved_tokens=['', '', '']) src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps) tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps) data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len) data_iter = d2l.load_array(data_arrays, batch_size) return data_iter, src_vocab, tgt_vocab #@save class MaskedSoftmaxCELoss(gluon.loss.SoftmaxCELoss): """The softmax cross-entropy loss with masks.""" # `pred` shape: (`batch_size`, `num_steps`, `vocab_size`) # `label` shape: (`batch_size`, `num_steps`) # `valid_len` shape: (`batch_size`,) def forward(self, pred, label, valid_len): # `weights` shape: (`batch_size`, `num_steps`, 1) weights = np.expand_dims(np.ones_like(label), axis=-1) weights = npx.sequence_mask(weights, valid_len, True, axis=1) return super(MaskedSoftmaxCELoss, self).forward(pred, label, weights) #@save def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device): """Train a model for sequence to sequence.""" net.initialize(init.Xavier(), force_reinit=True, ctx=device) trainer = gluon.Trainer(net.collect_params(), 'adam', {'learning_rate': lr}) loss = MaskedSoftmaxCELoss() animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[10, num_epochs]) for epoch in range(num_epochs): timer = d2l.Timer() metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens for batch in data_iter: X, X_valid_len, Y, Y_valid_len = [ x.as_in_ctx(device) for x in batch] bos = np.array( [tgt_vocab['']] * Y.shape[0], ctx=device).reshape(-1, 1) dec_input = np.concatenate([bos, Y[:, :-1]], 1) # Teacher forcing with autograd.record(): Y_hat, _ = net(X, dec_input, X_valid_len) l = loss(Y_hat, Y, Y_valid_len) l.backward() d2l.grad_clipping(net, 1) num_tokens = Y_valid_len.sum() trainer.step(num_tokens) metric.add(l.sum(), num_tokens) if (epoch + 1) % 10 == 0: animator.add(epoch + 1, (metric[0] / metric[1],)) print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} ' f'tokens/sec on {str(device)}') #@save def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, device, save_attention_weights=False): """Predict for sequence to sequence.""" src_tokens = src_vocab[src_sentence.lower().split(' ')] + [ src_vocab['']] enc_valid_len = np.array([len(src_tokens)], ctx=device) src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['']) # Add the batch axis enc_X = np.expand_dims(np.array(src_tokens, ctx=device), axis=0) enc_outputs = net.encoder(enc_X, enc_valid_len) dec_state = net.decoder.init_state(enc_outputs, enc_valid_len) # Add the batch axis dec_X = np.expand_dims(np.array([tgt_vocab['']], ctx=device), axis=0) output_seq, attention_weight_seq = [], [] for _ in range(num_steps): Y, dec_state = net.decoder(dec_X, dec_state) # We use the token with the highest prediction likelihood as input # of the decoder at the next time step dec_X = Y.argmax(axis=2) pred = dec_X.squeeze(axis=0).astype('int32').item() # Save attention weights (to be covered later) if save_attention_weights: attention_weight_seq.append(net.decoder.attention_weights) # Once the end-of-sequence token is predicted, the generation of the # output sequence is complete if pred == tgt_vocab['']: break output_seq.append(pred) return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq .. raw:: html
.. raw:: html
.. raw:: latex \diilbookstyleinputcell .. code:: python #@save d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip', '94646ad1522d915e7b0f9296181140edcf86a4f5') #@save def read_data_nmt(): """Load the English-French dataset.""" data_dir = d2l.download_extract('fra-eng') with open(os.path.join(data_dir, 'fra.txt'), 'r', encoding='utf-8') as f: return f.read() #@save def preprocess_nmt(text): """Preprocess the English-French dataset.""" def no_space(char, prev_char): return char in set(',.!?') and prev_char != ' ' # Replace non-breaking space with space, and convert uppercase letters to # lowercase ones text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower() # Insert space between words and punctuation marks out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else char for i, char in enumerate(text)] return ''.join(out) #@save def tokenize_nmt(text, num_examples=None): """Tokenize the English-French dataset.""" source, target = [], [] for i, line in enumerate(text.split('\n')): if num_examples and i > num_examples: break parts = line.split('\t') if len(parts) == 2: source.append(parts[0].split(' ')) target.append(parts[1].split(' ')) return source, target #@save def truncate_pad(line, num_steps, padding_token): """Truncate or pad sequences.""" if len(line) > num_steps: return line[:num_steps] # Truncate return line + [padding_token] * (num_steps - len(line)) # Pad #@save def build_array_nmt(lines, vocab, num_steps): """Transform text sequences of machine translation into minibatches.""" lines = [vocab[l] for l in lines] lines = [l + [vocab['']] for l in lines] array = tf.constant([truncate_pad( l, num_steps, vocab['']) for l in lines]) valid_len = tf.reduce_sum( tf.cast(array != vocab[''], tf.int32), 1) return array, valid_len #@save def load_data_nmt(batch_size, num_steps, num_examples=600): """Return the iterator and the vocabularies of the translation dataset.""" text = preprocess_nmt(read_data_nmt()) source, target = tokenize_nmt(text, num_examples) src_vocab = d2l.Vocab(source, min_freq=2, reserved_tokens=['', '', '']) tgt_vocab = d2l.Vocab(target, min_freq=2, reserved_tokens=['', '', '']) src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps) tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps) data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len) data_iter = d2l.load_array(data_arrays, batch_size) return data_iter, src_vocab, tgt_vocab #@save def sequence_mask(X, valid_len, value=0): """Mask irrelevant entries in sequences.""" maxlen = X.shape[1] mask = tf.range(start=0, limit=maxlen, dtype=tf.float32)[ None, :] < tf.cast(valid_len[:, None], dtype=tf.float32) if len(X.shape) == 3: return tf.where(tf.expand_dims(mask, axis=-1), X, value) else: return tf.where(mask, X, value) #@save class MaskedSoftmaxCELoss(tf.keras.losses.Loss): """The softmax cross-entropy loss with masks.""" def __init__(self, valid_len): super().__init__(reduction='none') self.valid_len = valid_len # `pred` shape: (`batch_size`, `num_steps`, `vocab_size`) # `label` shape: (`batch_size`, `num_steps`) # `valid_len` shape: (`batch_size`,) def call(self, label, pred): weights = tf.ones_like(label, dtype=tf.float32) weights = sequence_mask(weights, self.valid_len) label_one_hot = tf.one_hot(label, depth=pred.shape[-1]) unweighted_loss = tf.keras.losses.CategoricalCrossentropy( from_logits=True, reduction='none')(label_one_hot, pred) weighted_loss = tf.reduce_mean((unweighted_loss*weights), axis=1) return weighted_loss #@save def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device): """Train a model for sequence to sequence.""" optimizer = tf.keras.optimizers.Adam(learning_rate=lr) animator = d2l.Animator(xlabel="epoch", ylabel="loss", xlim=[10, num_epochs]) for epoch in range(num_epochs): timer = d2l.Timer() metric = d2l.Accumulator(2) # Sum of training loss, no. of tokens for batch in data_iter: X, X_valid_len, Y, Y_valid_len = [x for x in batch] bos = tf.reshape(tf.constant([tgt_vocab['']] * Y.shape[0]), shape=(-1, 1)) dec_input = tf.concat([bos, Y[:, :-1]], 1) # Teacher forcing with tf.GradientTape() as tape: Y_hat, _ = net(X, dec_input, X_valid_len, training=True) l = MaskedSoftmaxCELoss(Y_valid_len)(Y, Y_hat) gradients = tape.gradient(l, net.trainable_variables) gradients = d2l.grad_clipping(gradients, 1) optimizer.apply_gradients(zip(gradients, net.trainable_variables)) num_tokens = tf.reduce_sum(Y_valid_len).numpy() metric.add(tf.reduce_sum(l), num_tokens) if (epoch + 1) % 10 == 0: animator.add(epoch + 1, (metric[0] / metric[1],)) print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} ' f'tokens/sec on {str(device._device_name)}') #@save def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, save_attention_weights=False): """Predict for sequence to sequence.""" src_tokens = src_vocab[src_sentence.lower().split(' ')] + [ src_vocab['']] enc_valid_len = tf.constant([len(src_tokens)]) src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['']) # Add the batch axis enc_X = tf.expand_dims(src_tokens, axis=0) enc_outputs = net.encoder(enc_X, enc_valid_len, training=False) dec_state = net.decoder.init_state(enc_outputs, enc_valid_len) # Add the batch axis dec_X = tf.expand_dims(tf.constant([tgt_vocab['']]), axis=0) output_seq, attention_weight_seq = [], [] for _ in range(num_steps): Y, dec_state = net.decoder(dec_X, dec_state, training=False) # We use the token with the highest prediction likelihood as input # of the decoder at the next time step dec_X = tf.argmax(Y, axis=2) pred = tf.squeeze(dec_X, axis=0) # Save attention weights if save_attention_weights: attention_weight_seq.append(net.decoder.attention_weights) # Once the end-of-sequence token is predicted, the generation of the # output sequence is complete if pred == tgt_vocab['']: break output_seq.append(pred.numpy()) return ' '.join(tgt_vocab.to_tokens(tf.reshape(output_seq, shape = -1).numpy().tolist())), attention_weight_seq .. raw:: html
.. raw:: html