回顾一阶HMM
在一阶HMM中,下一个隐藏状态是当前隐藏状态的条件概率:
\[ P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k, \cdots ) \approx P(q_{t+1} = S_j | q_t = S_i) \]
即转移矩阵:
\[ A = [a_{ij}] \quad \text{where} \quad a_{ij} \equiv P(q_{t+1} = S_j | q_t = S_i) \]
且特定时刻观察状态只和当前隐藏状态有关。
\[ b_j(m) \equiv P(O_t = \nu_m | q_t = S_j) \]
即观测矩阵:
\[ B = [b_j(m)] \quad \text{where} \quad b_j(m) \equiv P(O_t = \nu_m | q_t = S_j) \]
二阶HMM
对于字序列标记分词,当前分词标记和上一个字的分词标记相关,这是二元组 Bigram,但分析样本发现切分位置并不一定只和上一个字相关,可能会有更长远的关系,比如假设当前字标记和之前两个字标记有关,那么就成为了三元组。即 \( Trigram \):
\[ P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k, \cdots ) \approx P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k) \]
同时还假设当前观测到的字符除和当前分词标记(隐藏状态)相关外,也与上一个隐藏状态相关:
\[ b_{ij}(m) \equiv P(O_t = \nu_m | q_t = S_j, q_{t-1} = S_i) \]
对应转移矩阵和观测矩阵也改动为:
\[ \begin{align}
A &= [a_{ijk}] \quad \text{where} \quad a_{ijk} \equiv P(q_{t+1} = S_k | q_t = S_j , q_{t-1} = S_i ) \\
B &= [b_{ij}(m)] \quad \text{where} \quad b_{ij}(m) \equiv P(O_t = \nu_m | q_t = S_j , q_{t-1} = S_i)
\end{align} \]
二阶Viterbi
在每次的计算在中,要考虑先后两个状态,因此viterbi对应更改:
\[ \begin{align}
\delta_t(i, j) & \equiv \max_{{q_1}{q_2}{\cdots}{q_{t-2}}} P( {q_1}{q_2}{\cdots}{q_{t-2},q_{t-1}=S_i, q_t = S_j, O_1 \cdots O_t | \lambda } ) \\
\psi_t(i, j) & \equiv \arg\max_{{q_1}{q_2}{\cdots}{q_{t-2}}} P( {q_1}{q_2}{\cdots}{q_{t-2},q_{t-1}=S_i, q_t = S_j, O_1 \cdots O_t | \lambda } )
\end{align} \]
- 初始化
此时并不满足二阶条件。 继续使用一阶HMM的初始/转移矩阵,但要第一步时需忽略参数 \( i \),且不考虑改进后的观测矩阵。当然其实也可以认为\( i \)事件发生概率在这里永远是1:
\[ \begin{align}
\text{when} \quad t=1 \text{:} \qquad & \\
\delta_1(i, j) & = \pi_j b_j(O_1) \\
\psi_1(i,j) & = 0 \\
\text{when} \quad t=2 \text{:} \qquad & \\
\delta_2(i, j) & = \delta_1( , i)a_{ij} \cdot b_{ij}(O_2) \\
\psi_2(i,j) & = 0 \\
\end{align} \]
恩,似乎有点麻烦,直接从t=2作为初始化,合并到一起好了。从t=3开始计算\( \psi \)。
\[ \begin{align}
\delta_2(i, j) & = \pi_i b_i(O_1) \cdot a_{ij} \cdot b_{ij}(O_2) \\
\psi_2(i,j) & = 0
\end{align} \] -
递归
\[ \begin{align}
\delta_t(j, k) & = \max_i \delta_{t-1}(i, j) \cdot a_{ijk} \cdot b_{jk}(O_t) \\
\psi_t(j, k) & = \arg\max_i \delta_{t-1}(i, j) \cdot a_{ijk}
\end{align} \] -
终止
\[ \begin{align}
p^* & = \max_{i,j} \delta_T(i, j) \\
q^*_{T-1} & = \arg_i\max_{i,j} \delta_T(i, j) \\
q^*_{T} & = \arg_j\max_{i,j} \delta_T(i, j) \\
\end{align} \] -
回朔状态序列路径
\[ q^*_t = \psi_{t+2}(q^*_{t+1}, q^*_{t+2}), t = T-2, T-3, \cdots, 1 \]
平滑
在二阶HMM算法中,可能无法从样本中统计到所有的 \( a_{jk}, a_{ijk},b_{ij}(m),b_j{m} \) 参数。因此和HMM一样可能存在零概率状态序列,为解决这个问题需要对转移矩阵和观测矩阵做平滑,在计算时估计新样本的概率。
平滑转移矩阵
Brants, Thorsten. “TnT: a statistical part-of-speech tagger. 提出的平滑方法。
\[ p(t_3 | t_2, t_1) = \lambda_1 \hat{p}(t_3) + \lambda_2 \hat{p}(t_3 | t_2) + \lambda_3 \hat{p}(t_3 | t_2, t_1) \]
系数 \( \lambda_1 + \lambda_2 + \lambda_3 = 1 \) ,于是\( p \)表达为它们各自概率的线性组合。以下是系数计算方法
\[ \begin{align}
& \text{set} \quad \lambda_1 + \lambda_2 + \lambda_3 = 0 \\
& \text{foreach trigram} \quad t_1, t_2, t_3 \quad \text{with} \quad f(t_1,t_2,t_3) > 0 \\
& \qquad \text{depending on the maximum of the following three values:} \\
& \qquad \qquad \text{case} \quad \frac{f(t_1,t_2,t_3)-1}{f(t_1,t_2)-1} \text{:} \qquad \text{increment} \quad \lambda_3 \quad \text{by} \quad f(t_1,t_2,t_3) \\
& \qquad \qquad \text{case} \quad \frac{f(t_2,t_3)-1}{f(t_2)-1} \text{:} \qquad \text{increment} \quad \lambda_2 \quad \text{by} \quad f(t_1,t_2,t_3) \\
& \qquad \qquad \text{case} \quad \frac{f(t_3)-1}{N-1} \text{:} \qquad \text{increment} \quad \lambda_1 \quad \text{by} \quad f(t_1,t_2,t_3) \\
& \qquad \text{end} \\
& \text{end} \\
& \text{normalize} \quad \lambda_1,\lambda_2,\lambda_3
\end{align} \]
计算时如果其中某个式子分母为0,则让计算结果为0。
其实算法就是在累计三元,二元,一元在每个三元组下的数量,最后用这个数量归一得到比例。这就将三元组概率表达为线性插值算法。分子分母减1是考虑到可能有些样本没有观察到。
其实在这里用梯度下降训练系数可能效果会更好些,或改为总数累计比例, 实际项目中需要尝试多种系数计算方法对比平滑效果。考虑到字序列分词只有BMES三种标签,实际上样本稍微多点就不需要平滑。
平滑观察概率矩阵
零概率问题不只是在转移概率中存在,因为我们在观察概率中也考虑了连续两个隐藏状态,这里也可能存在两个隐藏序列标注观察到某字符是零概率(考虑到汉字可有8万个之多,难免有样本中未统计到的状况)。在这里的平滑方式模仿转移概率平滑,但是考虑到观测字符在样本中并未出现过的情况,此时退回到其他的平滑方式,例如最简单的+1平滑。
代码实现和效果
并未优化代码结构和性能,只验证算法。因转移矩阵并无采样缺失,所以未做转移矩阵平滑,只对观察矩阵做了\( TnT \)平滑, \( +1 \)平滑。
# -*- coding: utf-8 -*-
import sys, re, math
import numpy as np
sys.argv.append('./gold/pku_training_words.utf8')
sys.argv.append('./training/pku_training.utf8')
sys.argv.append('./testing/pku_test.utf8')
assert len(sys.argv) == 4
training_words_filename = sys.argv[1]
training_filename = sys.argv[2]
test_filename = sys.argv[3]
with open(training_words_filename, 'rt', encoding='utf8') as f:
training_words = f.readlines()
with open(training_filename, 'rt', encoding='utf8') as f:
training = f.readlines()
with open(test_filename, 'rt', encoding='utf8') as f:
test = f.readlines()
# training += training_words
# word tag by char
hidden_state = ['B','M','E','S']
A, B, P = {}, {}, {}
_N = 0
_O = {}
for line in training:
#print( line )
prev_a = None
for w, word in enumerate(re.split(r'\s{2}', line)):
I = len(word)
_N += I
for i, c in enumerate(word):
_O[c] = _O.get(c, 0) + 1
if I == 1:
a = 'S'
else:
if i == 0:
a = 'B'
elif i == I-1:
a = 'E'
else:
a = 'M'
# print(w, i, c, a)
if prev_a is None: # calculate Initial state Number
if a not in P: P[a] = 0
P[a] += 1
else: # calculate State transition Number
if prev_a not in A: A[prev_a] = {}
if a not in A[prev_a]: A[prev_a][a] = 0
A[prev_a][a] += 1
prev_a = a
# calculate Observation Number
if a not in B: B[a] = {}
if c not in B[a]: B[a][c] = 0
B[a][c] += 1
_B = B.copy()
# calculate probability
for k, v in A.items():
total = sum(v.values())
A[k] = dict([(x, math.log(y / total)) for x, y in v.items()])
for k, v in B.items():
# plus 1 smooth
total = sum(v.values())
V = len(v.values())
B[k] = dict([(x, math.log((y+1.0) / (total+V))) for x, y in v.items()])
# plus 1 smooth
B[k][''] = math.log(1.0 / (total+V))
minlog = math.log( sys.float_info.min )
total = sum(P.values())
for k, v in P.items():
P[k] = math.log( v / total )
A2,B2 = {}, {}
for line in training:
temptags = []
tempsent = []
for w, word in enumerate(re.split(r'\s{2}', line)):
I = len(word)
for i, c in enumerate(word):
if I == 1:
a = 'S'
else:
if i == 0:
a = 'B'
elif i == I-1:
a = 'E'
else:
a = 'M'
temptags.append(a)
tempsent.append(c)
if len(temptags) >= 3:
if temptags[-3] not in A2: A2[temptags[-3]] = {}
if temptags[-2] not in A2[temptags[-3]]: A2[temptags[-3]][temptags[-2]] = {}
if temptags[-1] not in A2[temptags[-3]][temptags[-2]]: A2[temptags[-3]][temptags[-2]][temptags[-1]] = 0
A2[temptags[-3]][temptags[-2]][temptags[-1]] += 1
if len(temptags) >= 2:
if temptags[-2] not in B2: B2[temptags[-2]] = {}
if temptags[-1] not in B2[temptags[-2]]: B2[temptags[-2]][temptags[-1]] = {}
if tempsent[-1] not in B2[temptags[-2]][temptags[-1]]: B2[temptags[-2]][temptags[-1]][tempsent[-1]] = 0
B2[temptags[-2]][temptags[-1]][tempsent[-1]] += 1
#print( temptags, tempsent )
#break
# calculate A2 log probabilitis
for i in A2:
for j in A2[i]:
total = sum([A2[i][j][k] for k in A2[i][j]])
for k in A2[i][j]:
A2[i][j][k] = math.log( A2[i][j][k] / total )
_A = np.array( [0,0,0] )
for i in B2:
for j in B2[i]:
total = sum([B2[i][j][k] for k in B2[i][j]])
V = len( B2[i][j] )
for k in B2[i][j]:
# TnT smooth
# 计算 TnT 平滑系数
a3 = ((B2[i][j][k] - 1.) / (total - 1.)) if total > 1 else 0
a2 = (_B[j][k] - 1.) / ( sum([_B[j][n] for n in _B[j]]) - 1. )
a1 = (_O[k] - 1.) / (_N - 1.)
_A[np.argmax([a1,a2,a3])] += B2[i][j][k]
# 计算二阶观察矩阵概率(+1平滑)
B2[i][j][k] = math.log( ( B2[i][j][k] + 1 ) / (total + V) )
B2[i][j][''] = math.log( 1.0 / (total + V) )
# 计算 TnT 平滑系数
_A = _A / _A.sum()
# 计算+1平滑字频
for o in _O:
_O[o] = math.log( (_O[o] + 1.0) / (_N + len(_O)) )
_O[''] = math.log( 1.0 / (_N + len(_O)) )
def viterbi2(observation):
def _A2(i,j,k):
key = ''.join([i,j,k])
if key.find('BB') >=0 : return minlog
if key.find('MB') >=0 : return minlog
if key.find('SM') >=0 : return minlog
if key.find('EM') >=0 : return minlog
if key.find('EE') >=0 : return minlog
if key.find('SE') >=0 : return minlog
if key.find('BS') >=0 : return minlog
if key.find('MS') >=0 : return minlog
try:
return A2[i][j][k]
except Exception as e:
print( i, j, k)
raise e
def _B2(i,j,o):
#return B[j].get(o, B[j][''])
key = ''.join([i,j])
if key == 'BB': return minlog
if key == 'MB': return minlog
if key == 'SM': return minlog
if key == 'EM': return minlog
if key == 'EE': return minlog
if key == 'SE': return minlog
if key == 'BS': return minlog
if key == 'MS': return minlog
#if o not in B2[i][j]:
# return B[j].get(o, B[j][''])
#return B2[i][j].get(o, B2[i][j][''])
return _A[0] * _O.get(o, _O['']) + _A[1] * B[j].get(o, B[j]['']) + _A[2] * B2[i][j].get(o, B2[i][j][''])
state = ['B','M','E','S']
T = len(observation)
delta = [None] * (T + 1)
psi = [None] * (T + 2)
for i in state:
if delta[1] is None: delta[1] = {}
if i not in delta[1]: delta[1][i] = {}
for j in state:
delta[1][i][j] = P.get(i, minlog) + B[i].get(observation[0], B[i]['']) + A[i].get(j, minlog) + _B2(i, j, observation[1])
for t in range(2, T):
Ot = observation[t]
if delta[t] is None: delta[t] = {}
if psi[t] is None: psi[t] = {}
for j in state:
if j not in delta[t]: delta[t][j] = {}
if j not in psi[t]: psi[t][j] = {}
for k in state:
delta[t][j][k], psi[t][j][k] = max( [ (delta[t-1][i][j] + _A2(i,j,k) + _B2(j,k,Ot), i) for i in state] )
delta[T], (psi[T], psi[T+1]) = max( [ (delta[T-1][i][j], (i, j)) for i in state for j in state] )
q = [None] * (T+2)
q[T+1] = psi[T+1]
q[T] = psi[T]
for t in range(T-1, 1, -1):
q[t] = psi[t][q[t+1]][q[t+2]]
return q[2:]
for sent in test:
if len(sent) < 2:
print(sent, sep='', end='')
continue
sequence = viterbi2( list(sent) )
segment = []
for char, tag in zip(sent, sequence):
if tag == 'B':
segment.append(char)
elif tag == 'M':
segment[-1] += char
elif tag == 'E':
segment[-1] += char
elif tag == 'S':
segment.append(char)
else:
raise Exception()
print(' '.join(segment), sep='', end='')
#break
训练和评估使用pku语料:
=== SUMMARY:
=== TOTAL INSERTIONS: 3921
=== TOTAL DELETIONS: 7163
=== TOTAL SUBSTITUTIONS: 13864
=== TOTAL NCHANGE: 24948
=== TOTAL TRUE WORD COUNT: 104372
=== TOTAL TEST WORD COUNT: 101130
=== TOTAL TRUE WORDS RECALL: 0.799
=== TOTAL TEST WORDS PRECISION: 0.824
=== F MEASURE: 0.811
=== OOV Rate: 0.058
=== OOV Recall Rate: 0.360
=== IV Recall Rate: 0.825
pku测试算法分词性能对比
algorithm | P | R | F | OOV | OOV Recall | IV Recall |
---|---|---|---|---|---|---|
maximum matching | 0.836 | 0.904 | 0.869 | 0.058 | 0.059 | 0.956 |
maximum probability | 0.859 | 0.919 | 0.888 | 0.058 | 0.085 | 0.970 |
HMM | 0.804 | 0.789 | 0.796 | 0.058 | 0.364 | 0.815 |
Full Second Order HMM | 0.824 | 0.799 | 0.811 | 0.058 | 0.360 | 0.825 |
参考资料
[1] Brants, Thorsten. "TnT: a statistical part-of-speech tagger." Proceedings of the sixth conference on Applied natural language processing. Association for Computational Linguistics, 2000.
[2] Scott M. Thede and Mary P. Harper, A Second-Order Hidden Markov Model for Part-of-Speech
Tagging
谢谢分享!