回顾一阶HMM
在一阶HMM中,下一个隐藏状态是当前隐藏状态的条件概率:
\[ P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k, \cdots ) \approx P(q_{t+1} = S_j | q_t = S_i) \]
即转移矩阵:
\[ A = [a_{ij}] \quad \text{where} \quad a_{ij} \equiv P(q_{t+1} = S_j | q_t = S_i) \]
且特定时刻观察状态只和当前隐藏状态有关。
\[ b_j(m) \equiv P(O_t = \nu_m | q_t = S_j) \]
即观测矩阵:
\[ B = [b_j(m)] \quad \text{where} \quad b_j(m) \equiv P(O_t = \nu_m | q_t = S_j) \]
二阶HMM
对于字序列标记分词,当前分词标记和上一个字的分词标记相关,这是二元组 Bigram,但分析样本发现切分位置并不一定只和上一个字相关,可能会有更长远的关系,比如假设当前字标记和之前两个字标记有关,那么就成为了三元组。即 \( Trigram \):
\[ P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k, \cdots ) \approx P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k) \]
同时还假设当前观测到的字符除和当前分词标记(隐藏状态)相关外,也与上一个隐藏状态相关:
\[ b_{ij}(m) \equiv P(O_t = \nu_m | q_t = S_j, q_{t-1} = S_i) \]
对应转移矩阵和观测矩阵也改动为:
\[ \begin{align}
A &= [a_{ijk}] \quad \text{where} \quad a_{ijk} \equiv P(q_{t+1} = S_k | q_t = S_j , q_{t-1} = S_i ) \\
B &= [b_{ij}(m)] \quad \text{where} \quad b_{ij}(m) \equiv P(O_t = \nu_m | q_t = S_j , q_{t-1} = S_i)
\end{align} \]
二阶Viterbi
在每次的计算在中,要考虑先后两个状态,因此viterbi对应更改:
\[ \begin{align}
\delta_t(i, j) & \equiv \max_{{q_1}{q_2}{\cdots}{q_{t-2}}} P( {q_1}{q_2}{\cdots}{q_{t-2},q_{t-1}=S_i, q_t = S_j, O_1 \cdots O_t | \lambda } ) \\
\psi_t(i, j) & \equiv \arg\max_{{q_1}{q_2}{\cdots}{q_{t-2}}} P( {q_1}{q_2}{\cdots}{q_{t-2},q_{t-1}=S_i, q_t = S_j, O_1 \cdots O_t | \lambda } )
\end{align} \]
- 初始化
此时并不满足二阶条件。 继续使用一阶HMM的初始/转移矩阵,但要第一步时需忽略参数 \( i \),且不考虑改进后的观测矩阵。当然其实也可以认为\( i \)事件发生概率在这里永远是1:
\[ \begin{align}
\text{when} \quad t=1 \text{:} \qquad & \\
\delta_1(i, j) & = \pi_j b_j(O_1) \\
\psi_1(i,j) & = 0 \\
\text{when} \quad t=2 \text{:} \qquad & \\
\delta_2(i, j) & = \delta_1( , i)a_{ij} \cdot b_{ij}(O_2) \\
\psi_2(i,j) & = 0 \\
\end{align} \]
恩,似乎有点麻烦,直接从t=2作为初始化,合并到一起好了。从t=3开始计算\( \psi \)。
\[ \begin{align}
\delta_2(i, j) & = \pi_i b_i(O_1) \cdot a_{ij} \cdot b_{ij}(O_2) \\
\psi_2(i,j) & = 0
\end{align} \] -
递归
\[ \begin{align}
\delta_t(j, k) & = \max_i \delta_{t-1}(i, j) \cdot a_{ijk} \cdot b_{jk}(O_t) \\
\psi_t(j, k) & = \arg\max_i \delta_{t-1}(i, j) \cdot a_{ijk}
\end{align} \] -
终止
\[ \begin{align}
p^* & = \max_{i,j} \delta_T(i, j) \\
q^*_{T-1} & = \arg_i\max_{i,j} \delta_T(i, j) \\
q^*_{T} & = \arg_j\max_{i,j} \delta_T(i, j) \\
\end{align} \] -
回朔状态序列路径
\[ q^*_t = \psi_{t+2}(q^*_{t+1}, q^*_{t+2}), t = T-2, T-3, \cdots, 1 \]
平滑
在二阶HMM算法中,可能无法从样本中统计到所有的 \( a_{jk}, a_{ijk},b_{ij}(m),b_j{m} \) 参数。因此和HMM一样可能存在零概率状态序列,为解决这个问题需要对转移矩阵和观测矩阵做平滑,在计算时估计新样本的概率。
平滑转移矩阵
Brants, Thorsten. “TnT: a statistical part-of-speech tagger. 提出的平滑方法。
\[ p(t_3 | t_2, t_1) = \lambda_1 \hat{p}(t_3) + \lambda_2 \hat{p}(t_3 | t_2) + \lambda_3 \hat{p}(t_3 | t_2, t_1) \]
系数 \( \lambda_1 + \lambda_2 + \lambda_3 = 1 \) ,于是\( p \)表达为它们各自概率的线性组合。以下是系数计算方法
\[ \begin{align}
& \text{set} \quad \lambda_1 + \lambda_2 + \lambda_3 = 0 \\
& \text{foreach trigram} \quad t_1, t_2, t_3 \quad \text{with} \quad f(t_1,t_2,t_3) > 0 \\
& \qquad \text{depending on the maximum of the following three values:} \\
& \qquad \qquad \text{case} \quad \frac{f(t_1,t_2,t_3)-1}{f(t_1,t_2)-1} \text{:} \qquad \text{increment} \quad \lambda_3 \quad \text{by} \quad f(t_1,t_2,t_3) \\
& \qquad \qquad \text{case} \quad \frac{f(t_2,t_3)-1}{f(t_2)-1} \text{:} \qquad \text{increment} \quad \lambda_2 \quad \text{by} \quad f(t_1,t_2,t_3) \\
& \qquad \qquad \text{case} \quad \frac{f(t_3)-1}{N-1} \text{:} \qquad \text{increment} \quad \lambda_1 \quad \text{by} \quad f(t_1,t_2,t_3) \\
& \qquad \text{end} \\
& \text{end} \\
& \text{normalize} \quad \lambda_1,\lambda_2,\lambda_3
\end{align} \]
计算时如果其中某个式子分母为0,则让计算结果为0。
其实算法就是在累计三元,二元,一元在每个三元组下的数量,最后用这个数量归一得到比例。这就将三元组概率表达为线性插值算法。分子分母减1是考虑到可能有些样本没有观察到。
其实在这里用梯度下降训练系数可能效果会更好些,或改为总数累计比例, 实际项目中需要尝试多种系数计算方法对比平滑效果。考虑到字序列分词只有BMES三种标签,实际上样本稍微多点就不需要平滑。
平滑观察概率矩阵
零概率问题不只是在转移概率中存在,因为我们在观察概率中也考虑了连续两个隐藏状态,这里也可能存在两个隐藏序列标注观察到某字符是零概率(考虑到汉字可有8万个之多,难免有样本中未统计到的状况)。在这里的平滑方式模仿转移概率平滑,但是考虑到观测字符在样本中并未出现过的情况,此时退回到其他的平滑方式,例如最简单的+1平滑。
代码实现和效果
并未优化代码结构和性能,只验证算法。因转移矩阵并无采样缺失,所以未做转移矩阵平滑,只对观察矩阵做了\( TnT \)平滑, \( +1 \)平滑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | # -*- coding: utf-8 -*- import sys, re, math import numpy as np sys.argv.append('./gold/pku_training_words.utf8') sys.argv.append('./training/pku_training.utf8') sys.argv.append('./testing/pku_test.utf8') assert len(sys.argv) == 4 training_words_filename = sys.argv[1] training_filename = sys.argv[2] test_filename = sys.argv[3] with open(training_words_filename, 'rt', encoding='utf8') as f: training_words = f.readlines() with open(training_filename, 'rt', encoding='utf8') as f: training = f.readlines() with open(test_filename, 'rt', encoding='utf8') as f: test = f.readlines() # training += training_words # word tag by char hidden_state = ['B','M','E','S'] A, B, P = {}, {}, {} _N = 0 _O = {} for line in training: #print( line ) prev_a = None for w, word in enumerate(re.split(r'\s{2}', line)): I = len(word) _N += I for i, c in enumerate(word): _O[c] = _O.get(c, 0) + 1 if I == 1: a = 'S' else: if i == 0: a = 'B' elif i == I-1: a = 'E' else: a = 'M' # print(w, i, c, a) if prev_a is None: # calculate Initial state Number if a not in P: P[a] = 0 P[a] += 1 else: # calculate State transition Number if prev_a not in A: A[prev_a] = {} if a not in A[prev_a]: A[prev_a][a] = 0 A[prev_a][a] += 1 prev_a = a # calculate Observation Number if a not in B: B[a] = {} if c not in B[a]: B[a][c] = 0 B[a][c] += 1 _B = B.copy() # calculate probability for k, v in A.items(): total = sum(v.values()) A[k] = dict([(x, math.log(y / total)) for x, y in v.items()]) for k, v in B.items(): # plus 1 smooth total = sum(v.values()) V = len(v.values()) B[k] = dict([(x, math.log((y+1.0) / (total+V))) for x, y in v.items()]) # plus 1 smooth B[k]['<UNK>'] = math.log(1.0 / (total+V)) minlog = math.log( sys.float_info.min ) total = sum(P.values()) for k, v in P.items(): P[k] = math.log( v / total ) A2,B2 = {}, {} for line in training: temptags = [] tempsent = [] for w, word in enumerate(re.split(r'\s{2}', line)): I = len(word) for i, c in enumerate(word): if I == 1: a = 'S' else: if i == 0: a = 'B' elif i == I-1: a = 'E' else: a = 'M' temptags.append(a) tempsent.append(c) if len(temptags) >= 3: if temptags[-3] not in A2: A2[temptags[-3]] = {} if temptags[-2] not in A2[temptags[-3]]: A2[temptags[-3]][temptags[-2]] = {} if temptags[-1] not in A2[temptags[-3]][temptags[-2]]: A2[temptags[-3]][temptags[-2]][temptags[-1]] = 0 A2[temptags[-3]][temptags[-2]][temptags[-1]] += 1 if len(temptags) >= 2: if temptags[-2] not in B2: B2[temptags[-2]] = {} if temptags[-1] not in B2[temptags[-2]]: B2[temptags[-2]][temptags[-1]] = {} if tempsent[-1] not in B2[temptags[-2]][temptags[-1]]: B2[temptags[-2]][temptags[-1]][tempsent[-1]] = 0 B2[temptags[-2]][temptags[-1]][tempsent[-1]] += 1 #print( temptags, tempsent ) #break # calculate A2 log probabilitis for i in A2: for j in A2[i]: total = sum([A2[i][j][k] for k in A2[i][j]]) for k in A2[i][j]: A2[i][j][k] = math.log( A2[i][j][k] / total ) _A = np.array( [0,0,0] ) for i in B2: for j in B2[i]: total = sum([B2[i][j][k] for k in B2[i][j]]) V = len( B2[i][j] ) for k in B2[i][j]: # TnT smooth # 计算 TnT 平滑系数 a3 = ((B2[i][j][k] - 1.) / (total - 1.)) if total > 1 else 0 a2 = (_B[j][k] - 1.) / ( sum([_B[j][n] for n in _B[j]]) - 1. ) a1 = (_O[k] - 1.) / (_N - 1.) _A[np.argmax([a1,a2,a3])] += B2[i][j][k] # 计算二阶观察矩阵概率(+1平滑) B2[i][j][k] = math.log( ( B2[i][j][k] + 1 ) / (total + V) ) B2[i][j]['<UNK>'] = math.log( 1.0 / (total + V) ) # 计算 TnT 平滑系数 _A = _A / _A.sum() # 计算+1平滑字频 for o in _O: _O[o] = math.log( (_O[o] + 1.0) / (_N + len(_O)) ) _O['<UNK>'] = math.log( 1.0 / (_N + len(_O)) ) def viterbi2(observation): def _A2(i,j,k): key = ''.join([i,j,k]) if key.find('BB') >=0 : return minlog if key.find('MB') >=0 : return minlog if key.find('SM') >=0 : return minlog if key.find('EM') >=0 : return minlog if key.find('EE') >=0 : return minlog if key.find('SE') >=0 : return minlog if key.find('BS') >=0 : return minlog if key.find('MS') >=0 : return minlog try: return A2[i][j][k] except Exception as e: print( i, j, k) raise e def _B2(i,j,o): #return B[j].get(o, B[j]['<UNK>']) key = ''.join([i,j]) if key == 'BB': return minlog if key == 'MB': return minlog if key == 'SM': return minlog if key == 'EM': return minlog if key == 'EE': return minlog if key == 'SE': return minlog if key == 'BS': return minlog if key == 'MS': return minlog #if o not in B2[i][j]: # return B[j].get(o, B[j]['<UNK>']) #return B2[i][j].get(o, B2[i][j]['<UNK>']) return _A[0] * _O.get(o, _O['<UNK>']) + _A[1] * B[j].get(o, B[j]['<UNK>']) + _A[2] * B2[i][j].get(o, B2[i][j]['<UNK>']) state = ['B','M','E','S'] T = len(observation) delta = [None] * (T + 1) psi = [None] * (T + 2) for i in state: if delta[1] is None: delta[1] = {} if i not in delta[1]: delta[1][i] = {} for j in state: delta[1][i][j] = P.get(i, minlog) + B[i].get(observation[0], B[i]['<UNK>']) + A[i].get(j, minlog) + _B2(i, j, observation[1]) for t in range(2, T): Ot = observation[t] if delta[t] is None: delta[t] = {} if psi[t] is None: psi[t] = {} for j in state: if j not in delta[t]: delta[t][j] = {} if j not in psi[t]: psi[t][j] = {} for k in state: delta[t][j][k], psi[t][j][k] = max( [ (delta[t-1][i][j] + _A2(i,j,k) + _B2(j,k,Ot), i) for i in state] ) delta[T], (psi[T], psi[T+1]) = max( [ (delta[T-1][i][j], (i, j)) for i in state for j in state] ) q = [None] * (T+2) q[T+1] = psi[T+1] q[T] = psi[T] for t in range(T-1, 1, -1): q[t] = psi[t][q[t+1]][q[t+2]] return q[2:] for sent in test: if len(sent) < 2: print(sent, sep='', end='') continue sequence = viterbi2( list(sent) ) segment = [] for char, tag in zip(sent, sequence): if tag == 'B': segment.append(char) elif tag == 'M': segment[-1] += char elif tag == 'E': segment[-1] += char elif tag == 'S': segment.append(char) else: raise Exception() print(' '.join(segment), sep='', end='') #break |
训练和评估使用pku语料:
=== SUMMARY: === TOTAL INSERTIONS: 3921 === TOTAL DELETIONS: 7163 === TOTAL SUBSTITUTIONS: 13864 === TOTAL NCHANGE: 24948 === TOTAL TRUE WORD COUNT: 104372 === TOTAL TEST WORD COUNT: 101130 === TOTAL TRUE WORDS RECALL: 0.799 === TOTAL TEST WORDS PRECISION: 0.824 === F MEASURE: 0.811 === OOV Rate: 0.058 === OOV Recall Rate: 0.360 === IV Recall Rate: 0.825 |
pku测试算法分词性能对比
algorithm | P | R | F | OOV | OOV Recall | IV Recall |
---|---|---|---|---|---|---|
maximum matching | 0.836 | 0.904 | 0.869 | 0.058 | 0.059 | 0.956 |
maximum probability | 0.859 | 0.919 | 0.888 | 0.058 | 0.085 | 0.970 |
HMM | 0.804 | 0.789 | 0.796 | 0.058 | 0.364 | 0.815 |
Full Second Order HMM | 0.824 | 0.799 | 0.811 | 0.058 | 0.360 | 0.825 |
参考资料
[1] Brants, Thorsten. “TnT: a statistical part-of-speech tagger.” Proceedings of the sixth conference on Applied natural language processing. Association for Computational Linguistics, 2000.
[2] Scott M. Thede and Mary P. Harper, A Second-Order Hidden Markov Model for Part-of-Speech
Tagging
谢谢分享!