はじめに
今回の記事は、前回に引き続き、私が受講しているE資格 JDLA認定プログラムの「ラビットチャレンジ」のレポート記事です。
今回のテーマは深層学習です。
講座ではDay1〜Day4まで分かれており、今回はDay4(後半)に取り組みます。
Day4は主に、強化学習について扱います。
確認テスト、実装演習も併せて載せていきます。
応用技術
MobileNet
Depthwise Separable Convolution(Depthwise ConvolutionとPointwise Convolution)という仕組みを用いて画像認識において軽量化・高速化・高精度化したモデルです。
全体の計算量は、Depthwise Convolutionの計算量+Pointwise Convolutionの計算量となります。
一般的な畳込みの計算量
高さ\(H\)、幅\(W\)、カーネルのサイズ\(K\)、チャンネル\(C\)、フィルタ数\(M\)とすると以下のようになります。
- 入力マップ:\(H×W×C\)
- カーネル:\(K×K×C\)
- 出力マップ:\(H×W×M\)
ストライド1でパディングありの場合の計算量は以下のようになります。
- ある1点の計算量:\(K×K×C×MK×K×C×M\)
- 全出力マップの計算量:\(H×W×K×K×C×MH×W×K×K×C×M\)
Depthwise Convolution
入力マップのチャネルごとに畳み込みを実施し、出力マップをそれらと結合します。(入力マップのチャネル数と同じ)
チャンネルごとに畳み込みをするため、チャンネル間の関連性は考慮されません。
- 入力マップ:H×W×C
- カーネル:K×K
- 出力マップ:H×W×C
- 全出力マップの計算量:H×W×K×K×C
Pointwise Convolution
Depthwise Convolutionの逆の立ち位置にあります。
各チャンネルごとに畳み込んだDCとは異なり、全チャンネルに対して1点ずつ畳み込みます。
カーネルのサイズは\(1×1\)に固定されます。
DCではチャンネル間の関連性は考慮されなかったがこのPCによって1点ずつチャンネル間の関連が考慮されるようになる。
入力マップ:H×W×C
カーネル:1×1×C
出力マップ:H×W×M
全出力マップの計算量:H×W×C×M
DenseNet(Dense Convolutional Network)
DenseNetは、CNNアーキテクチャの一種です。
ニューラルネットワークでは層が深くなるにつれて、学習が難しくなるという問題がありましたが、ResNetなどのCNNアーキテクチャでは、前方の層から後方の層へアイデンティティ接続を介してパスを作ることで問題を対処しました。
DenseBlockと呼ばれるモジュールを用いたDenseNetもそのようなアーキテクチャの1つです。
Dense Block
Dense Blockは複数の層で構成されており、各層を通り抜けるとチャンネル(前の層で処理された内容)が増えていきます。
ResNetのように前の層からのスキップコネクションがあります。
ResNetでは前1層の入力のみ使われていましたが、Dense Blockでは前の各層全てが使われます。
特徴マップの入力に対してバッチ正規化、ReLU関数による変換、3×3による畳み込みを行います。
成長率(Growth Rate)
ハイパーパラメータです。
成長率\(k\)が大きくなるとネットワークが大きくなるため、小さい値とすることが多いです。
1つ目の入力の場合。\(k0\)のチャンネル数がそのまま出力のチャンネル数\(k\)となりますが、次の入力は\(k0+k\)、さらに次は\(k0+2k\)…と増えていきます。最終的には\(k0+4k\)となります。
Transition Layer
Dense Blockをつなぐ層のことです。
Dense Blockでチャンネル数が増えていくため、ダウンサンプリングしていきます。
これによって特徴量を抽出しつつ、元のチャンネル数に戻します。
正規化
Batch Normalization
ミニバッチ単位で平均が0、分散(標準偏差)が1になるように正規化します。
標準化との違いは要検討です。
学習時間の短縮や初期値への依存低減、過学習の抑制効果があります。
\(N\)個のデータの同一チャンネルが正規化の単位となり、特徴マップごとに正規化された結果を出力します。
しかし、バッチサイズに影響を受けるため、バッチサイズが小さいと学習が収束しないこともあります。
Layer Normalization
各データ1つの全チャンネルに対して正規化します。
そして、全てのチャンネルで正規化して特徴マップごとに出力します。
特徴
- 入力データのスケール(値を倍にしたり半分にしたり)に関してロバスト
- 重み行列のスケールやシフト(ずらしたり)に関してロバスト
Instance Normalization
各データ1つの各チャンネルに対して正規化します。
コントラストの正規化等で使用されます。
WaveNet
時系列データである音声に畳込みニューラルネットワーク(Dilated Convolution)を用いた音声生成モデルです。
次元間でのつながりがある場合、畳込みができます。
また、畳込みは2次元(画像)だけでなく1次元や3次元等も可能です。
確認テスト
- 深層学習を用いて結合確率を学習する際に、効率的に学習が行えるアーキテクチャを提案したことがWaveNetの大きな貢献の1つである。提案された新しいConvolution型アーキテクチャは(あ)と呼ばれ、結合確率を効率的に学習できるようになっている。
⇨(あ):Dilated Causal Convolution
- (あ)を用いた際の大きな利点は、単純なConvolution Layerと比べて(い)ことである。
⇨(い):パラメータ数に対する受容野が広い
Transformer
RNNを使用せず、Attentionのみ使用しているモデルです。
機械翻訳のみならず、様々な自然言語処理で使用されています。
Decoderは次の単語を予測しますが、RNNを使っていないため、系列全ての単語が一度に与えられます。
そのため、未来の単語が見えないようになり、Decoderでは未来の単語をマスクします。
Source Target-Attentionに加えてSelf-Attentionも用いられ、正規化としてLayer Normalizationが用いられます。
Source Target-Attention(ソースターゲット注意機構)
Queryに正解となる系列を与え、Key、Valueに入力となる系列を与えるAttentionです。
Self-Attention(自己注意機構)
Source Target Attentionと違い、Self-AttentionではQuery、Key、Value全て入力となる系列を与えます。
そうすることによって周辺単語から内部情報を得ることができます。
Scaled dot product attention
全単語に関するAttentionをまとめて計算します。
$$
Attention(Q,K,V)=softmax\left(\frac{QK^{T}}{\sqrt{d_{k}}}\right)V
$$
Multi-Head Attention
8個のScaled dot product attentionを並べ、出力を合わせて線形変換して出力とします。
Scaled dot product attentionを8個用意することで、それぞれ異なる注意(Attention)の掛け方が得られます。
Residual Connection
入出力の差分を与えることで学習の効率化を図る手法です。
Position Encoding
TransfomerではRNNを使っていないため、単語の語順情報を与えます。
$$
PE_{(pos,2i)}=sin\left(\frac{pos}{10000^{2i/512}}\right)
$$
$$
PE_{(pos,2i+1)}=cos\left(\frac{pos}{10000^{2i/512}}\right)
$$
物体検知・セグメンテーション
物体認識の種類
分類
画像に対して単一または複数のクラスラベルです。
画像のどこかにある物体を検知するだけで、位置に意味はありません。
物体検知
Bounding Box(どこに物体があるか、どんな物体か)を検知します。
セマンティックセグメンテーション
各ピクセルに対して単一のクラスラベルです。
例えば、画像の中の風船のピクセルを検知します。
インスタンスセグメンテーション
個々の物体の各ピクセルに対して単一のクラスラベルです。
例えば、画像の中にある各風船それぞれのピクセルを検知します。
物体個々に対して着目します。
代表的なデータセット
データセットを見る上で、クラス数、データ数も大事だが、1枚の画像あたりにいくつ物体があるかという点も大切です。
実際の日常生活で出てくる画像は、画像の真ん中に目的のものが1つだけあるようなことはありません。
1枚の画像あたりにいくつ物体があるかが低いと物体検知には向かず、高いほど、他の物体との重なりがあるなど日常生活に近づきます。
VOC12
Instance Annotation(物体個々にラベリング)が与えられています。
日常生活で代表的な20クラスです。
クラス数:20
Train+Valデータ数:11,540
Box/画像(1枚の画像あたりにいくつ物体があるか):2.4
ILSVRC12
ImageNetのサブセットです。
クラス数:200
Train+Valデータ数:476,668
Box/画像(1枚の画像あたりにいくつ物体があるか):1.1
MS COCO18
Instance Annotation(物体個々にラベリング)が与えられています。
また、物体位置推定に対する評価指標を提案しています。
クラス数:80
Train+Valデータ数:123,287
Box/画像(1枚の画像あたりにいくつ物体があるか):7.3
OICOD18
Instance Annotation(物体個々にラベリング)が与えられています。
画像サイズがバラバラです。
Open Images V4のサブセットになります。
クラス数:500
Train+Valデータ数:1,743,042
Box/画像(1枚の画像あたりにいくつ物体があるか):7.0
評価指標
分類の場合、閾値を変えても混同行列に含まれる件数は変わりません。
物体検出の場合は分類と異なり、件数が変わります。
IOU(Intersection over Union:Jaccard係数)
物体位置の予測精度を評価する指標です。
予測したBounding Boxに対する専有面積を表します。
$$
IoU=\frac{Area\ of\ Overlap}{Area\ of\ Union}
$$
混同行列を用いて表現すると下記のようになります。
$$
IoU=\frac{TP}{TP+FP+FN}
$$
Precision/Recallの計算
画像1枚
ある画像に対する検出結果を閾値0.5で抽出した結果を以下とします。
IoUの閾値を0.5とした場合、判定列のように判定されます。
conf | pred | IoU | 判定 | |
P1 | 0.92 | 人 | 0.88 | IoU>0.5=TP |
P2 | 0.85 | 車 | 0.46 | IoU<0.5=FP |
P3 | 0.81 | 車 | 0.92 | IoU>0.5=TP |
P4 | 0.70 | 犬 | 0.83 | IoU>0.5=TP |
P5 | 0.69 | 人 | 0.76 | IoU>0.5=TPだが人は検出済みのためFP |
P6 | 0.54 | 車 | 0.20 | IoU<0.5=FP |
$$
Precision=\frac{TP}{TP+FP}=\frac{3}{3+3}=0.5
$$
$$
Recall=\frac{TP}{TP+FN}=\frac{3}{3+0}=1.0
$$
クラス単位
クラス「人」に対して複数画像の検出結果を閾値0.5で抽出した結果を以下とします。
「人」が含まれている画像は4枚あるとします。
画像 | conf | pred | IoU | 判定 | |
---|---|---|---|---|---|
P1 | 1 | 0.92 | 人 | 0.88 | TP |
P2 | 2 | 0.85 | 人 | 0.46 | FP |
P3 | 2 | 0.81 | 人 | 0.92 | TP |
P4 | 3 | 0.70 | 人 | 0.83 | TP |
P5 | 1 | 0.69 | 人 | 0.76 | FP |
P6 | 3 | 0.54 | 人 | 0.20 | FP |
$$
Precision=\frac{TP}{TP+FP}=\frac{3}{3+3}=0.5
$$
$$
Recall=\frac{TP}{TP+FN}=\frac{3}{3+1}=0.75
$$
Average Precision(AP)
confを0.05から変化させることで、PR curve(Precision-Recall curve)を描くことができ、PR curveの下側面積としてAPが得られます。
confの閾値を\(\beta\)とすると、\(Recall=R(\beta)\)、\(Precision=P(\beta)\)、PR curveが\(P=f(R)\)となります。
Apはクラスごとに計算します。
$$
\int_0^1 P(R) dR
$$
Mean Average Precision(mAP)
APの平均を表します。
$$
mAP=\frac{1}{C}\sum_{i=1}^{C}AP_{i}
$$
mAP COCO
IoU閾値を固定ではなく、0.5~0.95まで0.05刻みでAPとmAPを計算し、平均をとります。
$$
mAP_{COCO}=\frac{mAP_{0.5}+mAP_{0.55}+…+mAP_{0.95}}{10}
$$
FPS(Flames per Second)
物体検出はリアルタイムで検出したい等といった速度を求められる場面も多いため、検出速度も重要となります。
FPSは1秒間に何枚処理できるかを表します。
Inference time
FPSと同様に検出速度を表す指標です。
1枚の画像の予測にどれだけ時間がかかったかを表します・
物体検出
物体検出の種類は下記のとおりです。
1段階検出器(One-stage detector)
候補領域の検出とクラス推定を同時に行います。
2段階検出器より精度が低い傾向にありますが、2段階検出器より計算量が小さいため予測も早くなりやすいです。
2段階検出器(Two-stage detector)
候補領域の検出とクラス推定を別々に行います。
1段階検出器より精度が高い傾向にありますが、1段階検出器より計算量が大きいため予測も遅くなりやすいです。
SSD(Single Shot Detector)
1段階検出器のモデルの1つで、VGG16をベースとしたアーキテクチャです。
入力のサイズによってSSD300、SSD512とも表記されます。
Default Box
最初に複数個のBounding Boxを適当な位置に適当なサイズで配置し、Default Boxを変形して物体を捉えられるようにしていきます。
特徴マップからの出力
マップ中の1つの特徴量における1つのDefault Boxの出力サイズは「クラス数+4」となります。
+4はオフセット項(\(\Delta x,\Delta y,\Delta w,\Delta h\))を表します。
Default Boxが物体を正しく検出できているとは限らないため、オフセット項で調整します。
また、\(k\)個のDefault Boxをおいた場合の出力サイズは「\(k\)(クラス数+4)」となります。
さらに特徴マップのサイズが\(m×n\)とすると、出力サイズは「\(k\)(クラス数+4)\(mn\)」となります。
特徴マップごとに用意するDefault Boxの数は\(k×mn\)です。
Non-Maximum Suppression
Default Boxを複数用意したことで、1つの物体に対して複数のBounding Boxが予測されてしまう問題があります。
対策として、IoUの閾値で複数の予測があるのであれば最もconf(確率)が高いもののみを残すというものが挙げられます。
Hard Negative Mining
VOCの21クラス目である背景のようなクラスがある場合、背景とそれ以外の物体で検出数が不均衡になる可能性が高いです。
それに対し、最大でも1:3となるように制約をかける対策があります。
損失関数
$$
L(x,c,l,g)=\frac{1}{N}(L_{conf}(x,c)+\alpha L_{loc}(x,l,g))
$$
\(L_{conf}\):confidenceに対する損失
\(L_{loc}\):検出位置に対する損失
セマンティックセグメンテーション(Semantic Segmentation)
Up-sampling
畳込みやプーリングで画像の解像度が落ちていきますが、セマンティックセグメンテーションでは各ピクセルに対してクラス分類することとなるため、解像度を元に戻す必要があります。
この解像度をもとに戻すことをUp-samplingと呼びます。
Deconvolution/Transposed convolution
Up-samplingの手法の1つです。
通常の畳込みと同様にカーネルサイズ、パディング、ストライドを指定します。
- 特徴マップのピクセル感覚をストライド分空ける
- 特徴マップの周囲に(カーネルサイズ – 1)パディング分の余白を作成
- 通常通り畳込みを行う
プーリングで失われた情報が復元されるわけではありません。
輪郭情報の補完
低プーリング層の出力をelement-wise additionすることで補完を行うことができます。
Unpooling
プーリングした時、例えば最大値プーリングであればどこが最大値を持っていたかの位置情報を持ちます。
戻す時は位置情報をもとにunpoolingを行います。
Dilated Convolution
Dilated Convolutionとは、プーリングをするのは受容野を広げるためであるため、代わりに畳み込みの段階で受容野を広げる工夫のことです。
カーネルの各ピクせルの間を空けることで受容野を広げています。
実装演習
Seq2Seq
英語を日本語に機械翻訳します。
データはTanaka Corpus、フレームワークはPytorchを使用します。
データの読み込み
# テキストファイルからデータを読み込むメソッド
def load_data(file_path):
data = []
for line in open(file_path, encoding='utf-8'):
# スペースで単語を分割
words = line.strip().split()
data.append(words)
return data
# 英語文章の読み込み
train_X = load_data('./data/train.en')
# 日本語文章の読み込み
train_Y = load_data('./data/train.ja')
# 訓練データと検証データに分割(8:2)
train_X, valid_X, train_Y, valid_Y = train_test_split(train_X, train_Y, test_size=0.2, random_state=random_state)
print('train data', train_X[0])
print('valid data', valid_X[0])
print('train data', train_Y[0])
print('valid data', valid_Y[0])
# 出力
train data ['where', 'shall', 'we', 'eat', 'tonight', '?']
valid data ['you', 'may', 'extend', 'your', 'stay', 'in', 'tokyo', '.']
train data ['今夜', 'は', 'どこ', 'で', '食事', 'を', 'し', 'よ', 'う', 'か', '。']
valid data ['東京', '滞在', 'を', '延ば', 'し', 'て', 'も', 'い', 'い', 'で', 'す', 'よ', '。']
辞書の作成
# 特殊トークンを定義
PAD_TOKEN = '<PAD>' # バッチ処理の際に、短い系列の末尾を埋めるために使う(RNNは固定長にする必要があるため) (Padding)
BOS_TOKEN = '<S>' # 系列(文章)の始まりを表す (Beggining of sentence)
EOS_TOKEN = '</S>' # 系列(文章)の終わりを表す (End of sentence)
UNK_TOKEN = '<UNK>' # 語彙に存在しない単語(未知語)を表す (Unknown)
PAD = 0
BOS = 1
EOS = 2
UNK = 3
# 語彙に含める単語の最低出現回数 最低出現回数に満たない単語はUNKに置き換えられる
# 今回は1回しか出現しない単語は辞書に含めない(無視する)
MIN_COUNT = 2
# 単語をIDに変換する辞書の初期値を設定
word2id = {
PAD_TOKEN: PAD,
BOS_TOKEN: BOS,
EOS_TOKEN: EOS,
UNK_TOKEN: UNK,
}
# 単語辞書を作成
vocab_X = Vocab(word2id=word2id)
vocab_Y = Vocab(word2id=word2id)
vocab_X.build_vocab(train_X, min_count=MIN_COUNT)
vocab_Y.build_vocab(train_Y, min_count=MIN_COUNT)
vocab_size_X = len(vocab_X.id2word)
vocab_size_Y = len(vocab_Y.id2word)
print('入力言語の語彙数:', vocab_size_X)
print('出力言語の語彙数:', vocab_size_Y)
{print(vocab_X.id2word[v]) for v in list(vocab_X.id2word)[:10]}
print()
{print(vocab_Y.id2word[v]) for v in list(vocab_Y.id2word)[:10]}]
# 出力
入力言語の語彙数: 3725
出力言語の語彙数: 4405
<PAD>
<S>
</S>
<UNK>
.
the
i
to
you
is
<PAD>
<S>
</S>
<UNK>
。
は
い
に
た
を
{None}
IDへの変換
# 単語(str)のリストをID(int)のリストに変換する関数
def sentence_to_ids(vocab, sentence):
ids = [vocab.word2id.get(word, UNK) for word in sentence]
ids += [EOS] # EOSを加える
return ids
train_X = [sentence_to_ids(vocab_X, sentence) for sentence in train_X]
train_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in train_Y]
valid_X = [sentence_to_ids(vocab_X, sentence) for sentence in valid_X]
valid_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in valid_Y]
print('train data', train_X[0])
print('valid data', valid_X[0])
{print(vocab_X.id2word[k]) for k in list(train_X[0])}
# 出力
train data [132, 321, 28, 290, 367, 12, 2]
valid data [8, 93, 3532, 36, 236, 13, 284, 4, 2]
where
shall
we
eat
tonight
?
</S>
{None}
DataLoaderの定義
def pad_seq(seq, max_length):
# 系列(seq)が指定の文長(max_length)になるように末尾をパディングする
res = seq + [PAD for i in range(max_length - len(seq))]
return res
class DataLoader(object):
def __init__(self, X, Y, batch_size, shuffle=False):
"""
:param X: list, 入力言語の文章(単語IDのリスト)のリスト
:param Y: list, 出力言語の文章(単語IDのリスト)のリスト
:param batch_size: int, バッチサイズ
:param shuffle: bool, サンプルの順番をシャッフルするか否か
"""
self.data = list(zip(X, Y))
self.batch_size = batch_size
self.shuffle = shuffle
self.start_index = 0
self.reset()
def reset(self):
if self.shuffle: # サンプルの順番をシャッフルする
self.data = shuffle(self.data, random_state=random_state)
self.start_index = 0 # ポインタの位置を初期化する
def __iter__(self):
return self
# for文の反復で呼び出される。
def __next__(self):
# ポインタが最後まで到達したら初期化する
if self.start_index >= len(self.data):
self.reset()
raise StopIteration()
# バッチを取得
seqs_X, seqs_Y = zip(*self.data[self.start_index:self.start_index+self.batch_size])
# 入力系列seqs_Xの文章の長さ順(降順)に系列ペアをソートする
seq_pairs = sorted(zip(seqs_X, seqs_Y), key=lambda p: len(p[0]), reverse=True)
seqs_X, seqs_Y = zip(*seq_pairs)
# 短い系列の末尾をパディングする
lengths_X = [len(s) for s in seqs_X] # 後述のEncoderのpack_padded_sequenceでも用いる
lengths_Y = [len(s) for s in seqs_Y]
max_length_X = max(lengths_X)
max_length_Y = max(lengths_Y)
padded_X = [pad_seq(s, max_length_X) for s in seqs_X]
padded_Y = [pad_seq(s, max_length_Y) for s in seqs_Y]
# tensorに変換し、転置する
batch_X = torch.tensor(padded_X, dtype=torch.long, device=device).transpose(0, 1)
batch_Y = torch.tensor(padded_Y, dtype=torch.long, device=device).transpose(0, 1)
# ポインタを更新する
self.start_index += self.batch_size
return batch_X, batch_Y, lengths_X
モデルの構築
PackedSequence
# 系列長がそれぞれ4,3,2の3つのサンプルからなるバッチを作成
batch = [[1,2,3,4], [5,6,7], [8,9]]
lengths = [len(sample) for sample in batch]
print('各サンプルの系列長:', lengths)
print()
# 最大系列長に合うように各サンプルをpadding
_max_length = max(lengths)
padded = torch.tensor([pad_seq(sample, _max_length) for sample in batch])
print('paddingされたテンソル:\n', padded)
padded = padded.transpose(0,1) # (max_length, batch_size)に転置
print('padding & 転置されたテンソル:\n', padded)
print('padding & 転置されたテンソルのサイズ:\n', padded.size())
print()
# PackedSequenceに変換(テンソルをRNNに入力する前に適用する)
packed = pack_padded_sequence(padded, lengths=lengths) # 各サンプルの系列長も与える
print('PackedSequenceのインスタンス:\n', packed) # テンソルのPAD以外の値(data)と各時刻で計算が必要な(=PADに到達していない)バッチの数(batch_sizes)を有するインスタンス
print()
# PackedSequenceのインスタンスをRNNに入力する(ここでは省略)
output = packed
# RNNの出力からはPackedSequenceのインスタンスで結果が得られているのでテンソルに戻す(RNNの出力に対して適用する)
output, _length = pad_packed_sequence(output) # PADを含む元のテンソルと各サンプルの系列長を返す
print('PADを含む元のテンソル:\n', output)
print('各サンプルの系列長:', _length)
# 出力
各サンプルの系列長: [4, 3, 2]
paddingされたテンソル:
tensor([[1, 2, 3, 4],
[5, 6, 7, 0],
[8, 9, 0, 0]])
padding & 転置されたテンソル:
tensor([[1, 5, 8],
[2, 6, 9],
[3, 7, 0],
[4, 0, 0]])
padding & 転置されたテンソルのサイズ:
torch.Size([4, 3])
PackedSequenceのインスタンス:
PackedSequence(data=tensor([1, 5, 8, 2, 6, 9, 3, 7, 4]), batch_sizes=tensor([3, 3, 2, 1]), sorted_indices=None, unsorted_indices=None)
PADを含む元のテンソル:
tensor([[1, 5, 8],
[2, 6, 9],
[3, 7, 0],
[4, 0, 0]])
各サンプルの系列長: tensor([4, 3, 2])
Encoder
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size):
"""
:param input_size: int, 入力言語の語彙数
:param hidden_size: int, 隠れ層のユニット数
"""
super(Encoder, self).__init__()
self.hidden_size = hidden_size
# 分散表現ベクトルとする。
self.embedding = nn.Embedding(input_size, hidden_size, padding_idx=PAD)
# GRUを使用する。
self.gru = nn.GRU(hidden_size, hidden_size)
def forward(self, seqs, input_lengths, hidden=None):
"""
:param seqs: tensor, 入力のバッチ, size=(max_length, batch_size)
:param input_lengths: 入力のバッチの各サンプルの文長
:param hidden: tensor, 隠れ状態の初期値, Noneの場合は0で初期化される
:return output: tensor, Encoderの出力, size=(max_length, batch_size, hidden_size)
:return hidden: tensor, Encoderの隠れ状態, size=(1, batch_size, hidden_size)
"""
emb = self.embedding(seqs) # seqsはパディング済み
packed = pack_padded_sequence(emb, input_lengths) # PackedSequenceオブジェクトに変換して入力とする。
output, hidden = self.gru(packed, hidden)
output, _ = pad_packed_sequence(output)
return output, hidden
Decoder
class Decoder(nn.Module):
def __init__(self, hidden_size, output_size):
"""
:param hidden_size: int, 隠れ層のユニット数
:param output_size: int, 出力言語の語彙数
:param dropout: float, ドロップアウト率
"""
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
# 分散表現ベクトルとする。
self.embedding = nn.Embedding(output_size, hidden_size, padding_idx=PAD)
# こちらもGRU
self.gru = nn.GRU(hidden_size, hidden_size)
self.out = nn.Linear(hidden_size, output_size)
def forward(self, seqs, hidden):
"""
:param seqs: tensor, 入力のバッチ, size=(1, batch_size)
:param hidden: tensor, 隠れ状態の初期値, Noneの場合は0で初期化される
:return output: tensor, Decoderの出力, size=(1, batch_size, output_size)
:return hidden: tensor, Decoderの隠れ状態, size=(1, batch_size, hidden_size)
"""
emb = self.embedding(seqs)
# Decoderではパディングしないのでpack_padded_sequenceしない。
output, hidden = self.gru(emb, hidden)
output = self.out(output)
return output, hidden
EncoderDecoder
class EncoderDecoder(nn.Module):
"""EncoderとDecoderの処理をまとめる"""
def __init__(self, input_size, output_size, hidden_size):
"""
:param input_size: int, 入力言語の語彙数
:param output_size: int, 出力言語の語彙数
:param hidden_size: int, 隠れ層のユニット数
"""
super(EncoderDecoder, self).__init__()
self.encoder = Encoder(input_size, hidden_size)
self.decoder = Decoder(hidden_size, output_size)
def forward(self, batch_X, lengths_X, max_length, batch_Y=None, use_teacher_forcing=False):
"""
:param batch_X: tensor, 入力系列のバッチ, size=(max_length, batch_size)
:param lengths_X: list, 入力系列のバッチ内の各サンプルの文長
:param max_length: int, Decoderの最大文長
:param batch_Y: tensor, Decoderで用いるターゲット系列
:param use_teacher_forcing: Decoderでターゲット系列を入力とするフラグ
:return decoder_outputs: tensor, Decoderの出力,
size=(max_length, batch_size, self.decoder.output_size)
"""
# encoderに系列を入力(複数時刻をまとめて処理)
_, encoder_hidden = self.encoder(batch_X, lengths_X)
_batch_size = batch_X.size(1)
# decoderの入力と隠れ層の初期状態を定義
decoder_input = torch.tensor([BOS] * _batch_size, dtype=torch.long, device=device) # 最初の入力にはBOSを使用する
decoder_input = decoder_input.unsqueeze(0) # (1, batch_size)
decoder_hidden = encoder_hidden # Encoderの最終隠れ状態を取得
# decoderの出力のホルダーを定義
decoder_outputs = torch.zeros(max_length, _batch_size, self.decoder.output_size, device=device) # max_length分の固定長
# 各時刻ごとに処理
for t in range(max_length):
decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
# 時刻tの出力が次の入力となる。
decoder_outputs[t] = decoder_output
# 次の時刻のdecoderの入力を決定
if use_teacher_forcing and batch_Y is not None: # teacher forceの場合、ターゲット系列を用いる
decoder_input = batch_Y[t].unsqueeze(0)
else: # teacher forceでない場合、自身の出力を用いる
decoder_input = decoder_output.max(-1)[1]
return decoder_outputs
学習
誤差関数の定義
mce = nn.CrossEntropyLoss(size_average=False, ignore_index=PAD) # PADを無視する
def masked_cross_entropy(logits, target):
logits_flat = logits.view(-1, logits.size(-1)) # (max_seq_len * batch_size, output_size)
target_flat = target.view(-1) # (max_seq_len * batch_size, 1)
return mce(logits_flat, target_flat)
ハイパーパラメータ
# ハイパーパラメータの設定
# エポック数
num_epochs = 10
# バッチサイズ
batch_size = 64
# 学習率
lr = 1e-3
# Teacher Forcingを行う確率
teacher_forcing_rate = 0.2
ckpt_path = 'model.pth' # 学習済みのモデルを保存するパス
model_args = {
'input_size': vocab_size_X,
'output_size': vocab_size_Y,
'hidden_size': 256,
}
データローダとモデル、最適化関数の定義
# 学習用、検証用データローダを定義
train_dataloader = DataLoader(train_X, train_Y, batch_size=batch_size, shuffle=True)
valid_dataloader = DataLoader(valid_X, valid_Y, batch_size=batch_size, shuffle=False)
# モデルとOptimizerを定義
# to(device)がないとCUDAではなくCPUで実行されてしまうため注意。
model = EncoderDecoder(**model_args).to(device)
# 最適化関数としてAdamを使用
optimizer = optim.Adam(model.parameters(), lr=lr)
誤差を計算する関数
def compute_loss(batch_X, batch_Y, lengths_X, model, optimizer=None, is_train=True):
# 損失を計算する関数
model.train(is_train) # train/evalモードの切替え
# 一定確率でTeacher Forcingを行う(乱数の値がハイパーパラメータで定義した0.2未満なら教師矯正をする)
use_teacher_forcing = is_train and (random.random() < teacher_forcing_rate)
max_length = batch_Y.size(0)
# 推論結果を得る
pred_Y = model(batch_X, lengths_X, max_length, batch_Y, use_teacher_forcing)
# 損失関数を計算
loss = masked_cross_entropy(pred_Y.contiguous(), batch_Y.contiguous())
if is_train: # 訓練時はパラメータを更新
optimizer.zero_grad()
loss.backward()
optimizer.step() # モデル更新
batch_Y = batch_Y.transpose(0, 1).contiguous().data.cpu().tolist()
pred = pred_Y.max(dim=-1)[1].data.cpu().numpy().T.tolist()
return loss.item(), batch_Y, pred
BLEU
def calc_bleu(refs, hyps):
"""
BLEUスコアを計算する関数
:param refs: list, 参照訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...])
:param hyps: list, モデルの生成した訳。単語のリストのリスト (例: ['I', 'have', 'a', 'pen'])
:return: float, BLEUスコア(0~100)
"""
refs = [[ref[:ref.index(EOS)]] for ref in refs] # EOSは評価しないで良いので切り捨てる, refsのほうは複数なのでlistが一個多くかかっている
hyps = [hyp[:hyp.index(EOS)] if EOS in hyp else hyp for hyp in hyps]
return 100 * bleu_score.corpus_bleu(refs, hyps)
学習実行
# 訓練
best_valid_bleu = 0.
for epoch in range(1, num_epochs+1):
train_loss = 0.
train_refs = []
train_hyps = []
valid_loss = 0.
valid_refs = []
valid_hyps = []
# train
for batch in train_dataloader:
batch_X, batch_Y, lengths_X = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, lengths_X, model, optimizer,
is_train=True
)
train_loss += loss
train_refs += gold
train_hyps += pred
# valid
for batch in valid_dataloader:
batch_X, batch_Y, lengths_X = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, lengths_X, model,
is_train=False
)
valid_loss += loss
valid_refs += gold
valid_hyps += pred
# 損失をサンプル数で割って正規化
train_loss = np.sum(train_loss) / len(train_dataloader.data)
valid_loss = np.sum(valid_loss) / len(valid_dataloader.data)
# BLEUを計算
train_bleu = calc_bleu(train_refs, train_hyps)
valid_bleu = calc_bleu(valid_refs, valid_hyps)
# validationデータでBLEUが改善した場合にはモデルを保存
if valid_bleu > best_valid_bleu:
ckpt = model.state_dict()
torch.save(ckpt, ckpt_path)
best_valid_bleu = valid_bleu
print('Epoch {}: train_loss: {:5.2f} train_bleu: {:2.2f} valid_loss: {:5.2f} valid_bleu: {:2.2f}'.format(
epoch, train_loss, train_bleu, valid_loss, valid_bleu))
print('-'*80)
# 出力(一部省略)
Epoch 10: train_loss: 28.19 train_bleu: 25.92 valid_loss: 40.96 valid_bleu: 17.60
評価
保存したモデルを使って生成
# 学習済みモデルの読み込み
ckpt = torch.load(ckpt_path) # cpuで処理する場合はmap_locationで指定する必要があります。
model.load_state_dict(ckpt)
model.eval()
def ids_to_sentence(vocab, ids):
# IDのリストを単語のリストに変換する
return [vocab.id2word[_id] for _id in ids]
def trim_eos(ids):
# IDのリストからEOS以降の単語を除外する
if EOS in ids:
return ids[:ids.index(EOS)]
else:
return ids
# テストデータの読み込み
test_X = load_data('./data/dev.en')
test_Y = load_data('./data/dev.ja')
test_X = [sentence_to_ids(vocab_X, sentence) for sentence in test_X]
test_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in test_Y]
# テストデータのデータローダ
test_dataloader = DataLoader(test_X, test_Y, batch_size=1, shuffle=False)
# 生成
batch_X, batch_Y, lengths_X = next(test_dataloader)
sentence_X = ' '.join(ids_to_sentence(vocab_X, batch_X.data.cpu().numpy()[:-1, 0]))
sentence_Y = ' '.join(ids_to_sentence(vocab_Y, batch_Y.data.cpu().numpy()[:-1, 0]))
print('src: {}'.format(sentence_X))
print('tgt: {}'.format(sentence_Y))
output = model(batch_X, lengths_X, max_length=20)
output = output.max(dim=-1)[1].view(-1).data.cpu().tolist()
output_sentence = ' '.join(ids_to_sentence(vocab_Y, trim_eos(output)))
output_sentence_without_trim = ' '.join(ids_to_sentence(vocab_Y, output))
print('out: {}'.format(output_sentence))
print('without trim: {}'.format(output_sentence_without_trim))
# 出力
src: show your own business .
tgt: 自分 の 事 を しろ 。
out: よけい の よけい よけい し て 。 。
without trim: よけい の よけい よけい し て 。 。 </S> </S> </S> </S> </S> </S> </S> </S> </S> </S> </S> </S>
src: he lived a hard life .
tgt: 彼 は つら い 人生 を 送 っ た 。
out: 彼 は 辛 な 人生 を 送 っ た 。
without trim: 彼 は 辛 な 人生 を 送 っ た 。 </S> </S> </S> </S> </S> </S> </S> </S> </S> </S>
BLEUの計算
# BLEUの計算
test_dataloader = DataLoader(test_X, test_Y, batch_size=1, shuffle=False)
refs_list = []
hyp_list = []
for batch in test_dataloader:
batch_X, batch_Y, lengths_X = batch
pred_Y = model(batch_X, lengths_X, max_length=20)
pred = pred_Y.max(dim=-1)[1].view(-1).data.cpu().tolist()
refs = batch_Y.view(-1).data.cpu().tolist()
refs_list.append(refs)
hyp_list.append(pred)
bleu = calc_bleu(refs_list, hyp_list)
print(bleu)
# 出力
17.97547118082943
Transformer
Seq2Seqと同様のデータや辞書などを使用します。
Position Encoding
def position_encoding_init(n_position, d_pos_vec):
"""
Positional Encodingのための行列の初期化を行う
:param n_position: int, 系列長
:param d_pos_vec: int, 隠れ層の次元数
:return torch.tensor, size=(n_position, d_pos_vec)
"""
# PADがある単語の位置はpos=0にしておき、position_encも0にする
position_enc = np.array([
[pos / np.power(10000, 2 * (j // 2) / d_pos_vec) for j in range(d_pos_vec)]
if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)])
position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i
position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1
return torch.tensor(position_enc, dtype=torch.float)
Scaled Dot-Product Attention
class ScaledDotProductAttention(nn.Module):
def __init__(self, d_model, attn_dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param attn_dropout: float, ドロップアウト率
"""
super(ScaledDotProductAttention, self).__init__()
self.temper = np.power(d_model, 0.5) # スケーリング因子
self.dropout = nn.Dropout(attn_dropout)
self.softmax = nn.Softmax(dim=-1)
def forward(self, q, k, v, attn_mask):
"""
:param q: torch.tensor, queryベクトル,
size=(n_head*batch_size, len_q, d_model/n_head)
:param k: torch.tensor, key,
size=(n_head*batch_size, len_k, d_model/n_head)
:param v: torch.tensor, valueベクトル,
size=(n_head*batch_size, len_v, d_model/n_head)
:param attn_mask: torch.tensor, Attentionに適用するマスク,
size=(n_head*batch_size, len_q, len_k)
:return output: 出力ベクトル,
size=(n_head*batch_size, len_q, d_model/n_head)
:return attn: Attention
size=(n_head*batch_size, len_q, len_k)
"""
# QとKの内積でAttentionの重みを求め、スケーリングする
attn = torch.bmm(q, k.transpose(1, 2)) / self.temper # (n_head*batch_size, len_q, len_k)
# Attentionをかけたくない部分がある場合は、その部分を負の無限大に飛ばしてSoftmaxの値が0になるようにする
attn.data.masked_fill_(attn_mask, -float('inf'))
attn = self.softmax(attn)
attn = self.dropout(attn)
output = torch.bmm(attn, v)
return output, attn
Multi-Head Attention
class MultiHeadAttention(nn.Module):
def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1):
"""
:param n_head: int, ヘッド数
:param d_model: int, 隠れ層の次元数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(MultiHeadAttention, self).__init__()
# Scaled Dot-Product Attentionの数を定義
self.n_head = n_head
self.d_k = d_k
self.d_v = d_v
# 各ヘッドごとに異なる重みで線形変換を行うための重み
# nn.Parameterを使うことで、Moduleのパラメータとして登録できる. TFでは更新が必要な変数はtf.Variableでラップするのでわかりやすい
self.w_qs = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float))
self.w_ks = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float))
self.w_vs = nn.Parameter(torch.empty([n_head, d_model, d_v], dtype=torch.float))
# nn.init.xavier_normal_で重みの値を初期化(Xavier)
nn.init.xavier_normal_(self.w_qs)
nn.init.xavier_normal_(self.w_ks)
nn.init.xavier_normal_(self.w_vs)
self.attention = ScaledDotProductAttention(d_model)
self.layer_norm = nn.LayerNorm(d_model) # 各層においてバイアスを除く活性化関数への入力を平均0、分散1に正則化
self.proj = nn.Linear(n_head*d_v, d_model) # 複数ヘッド分のAttentionの結果を元のサイズに写像するための線形層
# nn.init.xavier_normal_で重みの値を初期化
nn.init.xavier_normal_(self.proj.weight)
self.dropout = nn.Dropout(dropout)
def forward(self, q, k, v, attn_mask=None):
"""
:param q: torch.tensor, queryベクトル,
size=(batch_size, len_q, d_model)
:param k: torch.tensor, key,
size=(batch_size, len_k, d_model)
:param v: torch.tensor, valueベクトル,
size=(batch_size, len_v, d_model)
:param attn_mask: torch.tensor, Attentionに適用するマスク,
size=(batch_size, len_q, len_k)
:return outputs: 出力ベクトル,
size=(batch_size, len_q, d_model)
:return attns: Attention
size=(n_head*batch_size, len_q, len_k)
"""
d_k, d_v = self.d_k, self.d_v
n_head = self.n_head
# residual connectionのための入力 出力に入力をそのまま加算する
residual = q
batch_size, len_q, d_model = q.size()
batch_size, len_k, d_model = k.size()
batch_size, len_v, d_model = v.size()
# 複数ヘッド化
# torch.repeat または .repeatで指定したdimに沿って同じテンソルを作成
q_s = q.repeat(n_head, 1, 1) # (n_head*batch_size, len_q, d_model)
k_s = k.repeat(n_head, 1, 1) # (n_head*batch_size, len_k, d_model)
v_s = v.repeat(n_head, 1, 1) # (n_head*batch_size, len_v, d_model)
# ヘッドごとに並列計算させるために、n_headをdim=0に、batch_sizeをdim=1に寄せる
q_s = q_s.view(n_head, -1, d_model) # (n_head, batch_size*len_q, d_model)
k_s = k_s.view(n_head, -1, d_model) # (n_head, batch_size*len_k, d_model)
v_s = v_s.view(n_head, -1, d_model) # (n_head, batch_size*len_v, d_model)
# 各ヘッドで線形変換を並列計算(p16左側`Linear`)
q_s = torch.bmm(q_s, self.w_qs) # (n_head, batch_size*len_q, d_k)
k_s = torch.bmm(k_s, self.w_ks) # (n_head, batch_size*len_k, d_k)
v_s = torch.bmm(v_s, self.w_vs) # (n_head, batch_size*len_v, d_v)
# Attentionは各バッチ各ヘッドごとに計算させるためにbatch_sizeをdim=0に寄せる
q_s = q_s.view(-1, len_q, d_k) # (n_head*batch_size, len_q, d_k)
k_s = k_s.view(-1, len_k, d_k) # (n_head*batch_size, len_k, d_k)
v_s = v_s.view(-1, len_v, d_v) # (n_head*batch_size, len_v, d_v)
# Attentionを計算(p16.左側`Scaled Dot-Product Attention * h`)
outputs, attns = self.attention(q_s, k_s, v_s, attn_mask=attn_mask.repeat(n_head, 1, 1))
# 各ヘッドの結果を連結(p16左側`Concat`)
# torch.splitでbatch_sizeごとのn_head個のテンソルに分割
outputs = torch.split(outputs, batch_size, dim=0) # (batch_size, len_q, d_model) * n_head
# dim=-1で連結
outputs = torch.cat(outputs, dim=-1) # (batch_size, len_q, d_model*n_head)
# residual connectionのために元の大きさに写像(p16左側`Linear`)
outputs = self.proj(outputs) # (batch_size, len_q, d_model)
outputs = self.dropout(outputs)
outputs = self.layer_norm(outputs + residual)
return outputs, attns
Position-Wise Feed Forward Network
class PositionwiseFeedForward(nn.Module):
"""
:param d_hid: int, 隠れ層1層目の次元数
:param d_inner_hid: int, 隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
def __init__(self, d_hid, d_inner_hid, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
# window size 1のconv層を定義することでPosition wiseな全結合層を実現する.
self.w_1 = nn.Conv1d(d_hid, d_inner_hid, 1)
self.w_2 = nn.Conv1d(d_inner_hid, d_hid, 1)
self.layer_norm = nn.LayerNorm(d_hid)
self.dropout = nn.Dropout(dropout)
self.relu = nn.ReLU()
def forward(self, x):
"""
:param x: torch.tensor,
size=(batch_size, max_length, d_hid)
:return: torch.tensor,
size=(batch_size, max_length, d_hid)
"""
residual = x
output = self.relu(self.w_1(x.transpose(1, 2)))
output = self.w_2(output).transpose(2, 1)
output = self.dropout(output)
return self.layer_norm(output + residual)
Masking
Encoder側
def get_attn_padding_mask(seq_q, seq_k):
"""
keyのPADに対するattentionを0にするためのマスクを作成する
:param seq_q: tensor, queryの系列, size=(batch_size, len_q)
:param seq_k: tensor, keyの系列, size=(batch_size, len_k)
:return pad_attn_mask: tensor, size=(batch_size, len_q, len_k)
"""
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
pad_attn_mask = seq_k.data.eq(PAD).unsqueeze(1) # (N, 1, len_k) PAD以外のidを全て0にする
pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k) # (N, len_q, len_k)
return pad_attn_mask
_seq_q = torch.tensor([[1, 2, 3]])
_seq_k = torch.tensor([[4, 5, 6, 7, PAD]])
_mask = get_attn_padding_mask(_seq_q, _seq_k) # 行がquery、列がkeyに対応し、key側がPAD(=0)の時刻だけ1で他が0の行列ができる
print('query:\n', _seq_q)
print('key:\n', _seq_k)
print('mask:\n', _mask)
# 出力
query:
tensor([[1, 2, 3]])
key:
tensor([[4, 5, 6, 7, 0]])
mask:
tensor([[[False, False, False, False, True],
[False, False, False, False, True],
[False, False, False, False, True]]])
Decoder側
def get_attn_subsequent_mask(seq):
"""
未来の情報に対するattentionを0にするためのマスクを作成する
:param seq: tensor, size=(batch_size, length)
:return subsequent_mask: tensor, size=(batch_size, length, length)
"""
attn_shape = (seq.size(1), seq.size(1))
# 上三角行列(diagonal=1: 対角線より上が1で下が0)
subsequent_mask = torch.triu(torch.ones(attn_shape, dtype=torch.uint8, device=device), diagonal=1)
subsequent_mask = subsequent_mask.repeat(seq.size(0), 1, 1)
return subsequent_mask
_seq = torch.tensor([[1,2,3,4]])
_mask = get_attn_subsequent_mask(_seq) # 行がquery、列がkeyに対応し、queryより未来のkeyの値が1で他は0の行列ができいる
print('seq:\n', _seq)
print('mask:\n', _mask)
# 出力
seq:
tensor([[1, 2, 3, 4]])
mask:
tensor([[[0, 1, 1, 1],
[0, 0, 1, 1],
[0, 0, 0, 1],
[0, 0, 0, 0]]], device='cuda:0', dtype=torch.uint8)
モデルの構築
EncoderLayer
class EncoderLayer(nn.Module):
"""Encoderのブロックのクラス"""
def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(EncoderLayer, self).__init__()
# Encoder内のSelf-Attention
self.slf_attn = MultiHeadAttention(
n_head, d_model, d_k, d_v, dropout=dropout)
# Postionwise FFN
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout)
def forward(self, enc_input, slf_attn_mask=None):
"""
:param enc_input: tensor, Encoderの入力,
size=(batch_size, max_length, d_model)
:param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:return enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:return enc_slf_attn: tensor, EncoderのSelf Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
"""
# Self-Attentionのquery, key, valueにはすべてEncoderの入力(enc_input)が入る
enc_output, enc_slf_attn = self.slf_attn(
enc_input, enc_input, enc_input, attn_mask=slf_attn_mask)
enc_output = self.pos_ffn(enc_output)
return enc_output, enc_slf_attn
Encoder
class Encoder(nn.Module):
"""EncoderLayerブロックからなるEncoderのクラス"""
def __init__(
self, n_src_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64,
d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1):
"""
:param n_src_vocab: int, 入力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
super(Encoder, self).__init__()
n_position = max_length + 1
self.max_length = max_length
self.d_model = d_model
# Positional Encodingを用いたEmbedding
self.position_enc = nn.Embedding(n_position, d_word_vec, padding_idx=PAD)
self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec)
# 一般的なEmbedding
self.src_word_emb = nn.Embedding(n_src_vocab, d_word_vec, padding_idx=PAD)
# EncoderLayerをn_layers個積み重ねる
self.layer_stack = nn.ModuleList([
EncoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
def forward(self, src_seq, src_pos):
"""
:param src_seq: tensor, 入力系列,
size=(batch_size, max_length)
:param src_pos: tensor, 入力系列の各単語の位置情報,
size=(batch_size, max_length)
:return enc_output: tensor, Encoderの最終出力,
size=(batch_size, max_length, d_model)
:return enc_slf_attns: list, EncoderのSelf Attentionの行列のリスト
"""
# 一般的な単語のEmbeddingを行う
enc_input = self.src_word_emb(src_seq)
# Positional EncodingのEmbeddingを加算する
enc_input += self.position_enc(src_pos)
enc_slf_attns = []
enc_output = enc_input
# key(=enc_input)のPADに対応する部分のみ1のマスクを作成
enc_slf_attn_mask = get_attn_padding_mask(src_seq, src_seq)
# n_layers個のEncoderLayerに入力を通す
for enc_layer in self.layer_stack:
enc_output, enc_slf_attn = enc_layer(
enc_output, slf_attn_mask=enc_slf_attn_mask)
enc_slf_attns += [enc_slf_attn]
return enc_output, enc_slf_attns
DecoderLayer
class DecoderLayer(nn.Module):
"""Decoderのブロックのクラス"""
def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(DecoderLayer, self).__init__()
# Decoder内のSelf-Attention
self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
# Encoder-Decoder間のSource-Target Attention
self.enc_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
# Positionwise FFN
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout)
def forward(self, dec_input, enc_output, slf_attn_mask=None, dec_enc_attn_mask=None):
"""
:param dec_input: tensor, Decoderの入力,
size=(batch_size, max_length, d_model)
:param enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:param dec_enc_attn_mask: tensor, Soutce-Target Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:return dec_output: tensor, Decoderの出力,
size=(batch_size, max_length, d_model)
:return dec_slf_attn: tensor, DecoderのSelf Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
:return dec_enc_attn: tensor, DecoderのSoutce-Target Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
"""
# Self-Attentionのquery, key, valueにはすべてDecoderの入力(dec_input)が入る
dec_output, dec_slf_attn = self.slf_attn(
dec_input, dec_input, dec_input, attn_mask=slf_attn_mask)
# Source-Target-AttentionのqueryにはDecoderの出力(dec_output), key, valueにはEncoderの出力(enc_output)が入る
dec_output, dec_enc_attn = self.enc_attn(
dec_output, enc_output, enc_output, attn_mask=dec_enc_attn_mask)
dec_output = self.pos_ffn(dec_output)
return dec_output, dec_slf_attn, dec_enc_attn
Decoder
class Decoder(nn.Module):
"""DecoderLayerブロックからなるDecoderのクラス"""
def __init__(
self, n_tgt_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64,
d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1):
"""
:param n_tgt_vocab: int, 出力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
super(Decoder, self).__init__()
n_position = max_length + 1
self.max_length = max_length
self.d_model = d_model
# Positional Encodingを用いたEmbedding
self.position_enc = nn.Embedding(
n_position, d_word_vec, padding_idx=PAD)
self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec)
# 一般的なEmbedding
self.tgt_word_emb = nn.Embedding(
n_tgt_vocab, d_word_vec, padding_idx=PAD)
self.dropout = nn.Dropout(dropout)
# DecoderLayerをn_layers個積み重ねる
self.layer_stack = nn.ModuleList([
DecoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
def forward(self, tgt_seq, tgt_pos, src_seq, enc_output):
"""
:param tgt_seq: tensor, 出力系列,
size=(batch_size, max_length)
:param tgt_pos: tensor, 出力系列の各単語の位置情報,
size=(batch_size, max_length)
:param src_seq: tensor, 入力系列,
size=(batch_size, n_src_vocab)
:param enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:return dec_output: tensor, Decoderの最終出力,
size=(batch_size, max_length, d_model)
:return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト
:return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト
"""
# 一般的な単語のEmbeddingを行う
dec_input = self.tgt_word_emb(tgt_seq)
# Positional EncodingのEmbeddingを加算する
dec_input += self.position_enc(tgt_pos)
# Self-Attention用のマスクを作成
# key(=dec_input)のPADに対応する部分が1のマスクと、queryから見たkeyの未来の情報に対応する部分が1のマスクのORをとる
dec_slf_attn_pad_mask = get_attn_padding_mask(tgt_seq, tgt_seq) # (N, max_length, max_length)
dec_slf_attn_sub_mask = get_attn_subsequent_mask(tgt_seq) # (N, max_length, max_length)
dec_slf_attn_mask = torch.gt(dec_slf_attn_pad_mask + dec_slf_attn_sub_mask, 0) # ORをとる
# key(=dec_input)のPADに対応する部分のみ1のマスクを作成
dec_enc_attn_pad_mask = get_attn_padding_mask(tgt_seq, src_seq) # (N, max_length, max_length)
dec_slf_attns, dec_enc_attns = [], []
dec_output = dec_input
# n_layers個のDecoderLayerに入力を通す
for dec_layer in self.layer_stack:
dec_output, dec_slf_attn, dec_enc_attn = dec_layer(
dec_output, enc_output,
slf_attn_mask=dec_slf_attn_mask,
dec_enc_attn_mask=dec_enc_attn_pad_mask)
dec_slf_attns += [dec_slf_attn]
dec_enc_attns += [dec_enc_attn]
return dec_output, dec_slf_attns, dec_enc_attns
Transformer
class Transformer(nn.Module):
"""Transformerのモデル全体のクラス"""
def __init__(
self, n_src_vocab, n_tgt_vocab, max_length, n_layers=6, n_head=8,
d_word_vec=512, d_model=512, d_inner_hid=1024, d_k=64, d_v=64,
dropout=0.1, proj_share_weight=True):
"""
:param n_src_vocab: int, 入力言語の語彙数
:param n_tgt_vocab: int, 出力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
:param proj_share_weight: bool, 出力言語の単語のEmbeddingと出力の写像で重みを共有する
"""
super(Transformer, self).__init__()
self.encoder = Encoder(
n_src_vocab, max_length, n_layers=n_layers, n_head=n_head,
d_word_vec=d_word_vec, d_model=d_model,
d_inner_hid=d_inner_hid, dropout=dropout)
self.decoder = Decoder(
n_tgt_vocab, max_length, n_layers=n_layers, n_head=n_head,
d_word_vec=d_word_vec, d_model=d_model,
d_inner_hid=d_inner_hid, dropout=dropout)
self.tgt_word_proj = nn.Linear(d_model, n_tgt_vocab, bias=False)
nn.init.xavier_normal_(self.tgt_word_proj.weight)
self.dropout = nn.Dropout(dropout)
assert d_model == d_word_vec # 各モジュールの出力のサイズは揃える
if proj_share_weight:
# 出力言語の単語のEmbeddingと出力の写像で重みを共有する
assert d_model == d_word_vec
self.tgt_word_proj.weight = self.decoder.tgt_word_emb.weight
def get_trainable_parameters(self):
# Positional Encoding以外のパラメータを更新する
enc_freezed_param_ids = set(map(id, self.encoder.position_enc.parameters()))
dec_freezed_param_ids = set(map(id, self.decoder.position_enc.parameters()))
freezed_param_ids = enc_freezed_param_ids | dec_freezed_param_ids
return (p for p in self.parameters() if id(p) not in freezed_param_ids)
def forward(self, src, tgt):
src_seq, src_pos = src
tgt_seq, tgt_pos = tgt
src_seq = src_seq[:, 1:]
src_pos = src_pos[:, 1:]
tgt_seq = tgt_seq[:, :-1]
tgt_pos = tgt_pos[:, :-1]
enc_output, *_ = self.encoder(src_seq, src_pos)
dec_output, *_ = self.decoder(tgt_seq, tgt_pos, src_seq, enc_output)
seq_logit = self.tgt_word_proj(dec_output)
return seq_logit
学習
ハイパーパラメータ
MAX_LENGTH = 20
batch_size = 64
num_epochs = 15
lr = 0.001
ckpt_path = 'transformer.pth'
max_length = MAX_LENGTH + 2
model_args = {
'n_src_vocab': vocab_size_X,
'n_tgt_vocab': vocab_size_Y,
'max_length': max_length,
'proj_share_weight': True,
'd_k': 32,
'd_v': 32,
'd_model': 128,
'd_word_vec': 128,
'd_inner_hid': 256,
'n_layers': 3,
'n_head': 6,
'dropout': 0.1,
}
学習実行
誤差関数や最適化関数はSeq2Seqでやったことと同様なので結果のみを掲載します。
Epoch 15 [0.5min]: train_loss: 16.16 train_bleu: 37.75 valid_loss: 19.13 valid_bleu: 35.45
評価
BLEUのスコアは、
26.112543785483595となりました。