中文分词词性和序列标注之二阶HMM

回顾一阶HMM

在一阶HMM中,下一个隐藏状态是当前隐藏状态的条件概率:
\[ P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k, \cdots ) \approx P(q_{t+1} = S_j | q_t = S_i) \]
即转移矩阵:
\[ A = [a_{ij}] \quad \text{where} \quad a_{ij} \equiv P(q_{t+1} = S_j | q_t = S_i) \]
且特定时刻观察状态只和当前隐藏状态有关。
\[ b_j(m) \equiv P(O_t = \nu_m | q_t = S_j) \]
即观测矩阵:
\[ B = [b_j(m)] \quad \text{where} \quad b_j(m) \equiv P(O_t = \nu_m | q_t = S_j) \]

二阶HMM

对于字序列标记分词,当前分词标记和上一个字的分词标记相关,这是二元组 Bigram,但分析样本发现切分位置并不一定只和上一个字相关,可能会有更长远的关系,比如假设当前字标记和之前两个字标记有关,那么就成为了三元组。即 \( Trigram \):
\[ P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k, \cdots ) \approx P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k) \]
同时还假设当前观测到的字符除和当前分词标记(隐藏状态)相关外,也与上一个隐藏状态相关:
\[ b_{ij}(m) \equiv P(O_t = \nu_m | q_t = S_j, q_{t-1} = S_i) \]
对应转移矩阵和观测矩阵也改动为:
\[ \begin{align}
A &= [a_{ijk}] \quad \text{where} \quad a_{ijk} \equiv P(q_{t+1} = S_k | q_t = S_j , q_{t-1} = S_i ) \\
B &= [b_{ij}(m)] \quad \text{where} \quad b_{ij}(m) \equiv P(O_t = \nu_m | q_t = S_j , q_{t-1} = S_i)
\end{align} \]

二阶Viterbi

在每次的计算在中,要考虑先后两个状态,因此viterbi对应更改:
\[ \begin{align}
\delta_t(i, j) & \equiv \max_{{q_1}{q_2}{\cdots}{q_{t-2}}} P( {q_1}{q_2}{\cdots}{q_{t-2},q_{t-1}=S_i, q_t = S_j, O_1 \cdots O_t | \lambda } ) \\
\psi_t(i, j) & \equiv \arg\max_{{q_1}{q_2}{\cdots}{q_{t-2}}} P( {q_1}{q_2}{\cdots}{q_{t-2},q_{t-1}=S_i, q_t = S_j, O_1 \cdots O_t | \lambda } )
\end{align} \]

  1. 初始化
    此时并不满足二阶条件。 继续使用一阶HMM的初始/转移矩阵,但要第一步时需忽略参数 \( i \),且不考虑改进后的观测矩阵。当然其实也可以认为\( i \)事件发生概率在这里永远是1:
    \[ \begin{align}
    \text{when} \quad t=1 \text{:} \qquad & \\
    \delta_1(i, j) & = \pi_j b_j(O_1) \\
    \psi_1(i,j) & = 0 \\
    \text{when} \quad t=2 \text{:} \qquad & \\
    \delta_2(i, j) & = \delta_1( , i)a_{ij} \cdot b_{ij}(O_2) \\
    \psi_2(i,j) & = 0 \\
    \end{align} \]
    恩,似乎有点麻烦,直接从t=2作为初始化,合并到一起好了。从t=3开始计算\( \psi \)。
    \[ \begin{align}
    \delta_2(i, j) & = \pi_i b_i(O_1) \cdot a_{ij} \cdot b_{ij}(O_2) \\
    \psi_2(i,j) & = 0
    \end{align} \]

  2. 递归
    \[ \begin{align}
    \delta_t(j, k) & = \max_i \delta_{t-1}(i, j) \cdot a_{ijk} \cdot b_{jk}(O_t) \\
    \psi_t(j, k) & = \arg\max_i \delta_{t-1}(i, j) \cdot a_{ijk}
    \end{align} \]

  3. 终止
    \[ \begin{align}
    p^* & = \max_{i,j} \delta_T(i, j) \\
    q^*_{T-1} & = \arg_i\max_{i,j} \delta_T(i, j) \\
    q^*_{T} & = \arg_j\max_{i,j} \delta_T(i, j) \\
    \end{align} \]

  4. 回朔状态序列路径
    \[ q^*_t = \psi_{t+2}(q^*_{t+1}, q^*_{t+2}), t = T-2, T-3, \cdots, 1 \]

平滑

在二阶HMM算法中,可能无法从样本中统计到所有的 \( a_{jk}, a_{ijk},b_{ij}(m),b_j{m} \) 参数。因此和HMM一样可能存在零概率状态序列,为解决这个问题需要对转移矩阵和观测矩阵做平滑,在计算时估计新样本的概率。

平滑转移矩阵

Brants, Thorsten. “TnT: a statistical part-of-speech tagger. 提出的平滑方法。
\[ p(t_3 | t_2, t_1) = \lambda_1 \hat{p}(t_3) + \lambda_2 \hat{p}(t_3 | t_2) + \lambda_3 \hat{p}(t_3 | t_2, t_1) \]
系数 \( \lambda_1 + \lambda_2 + \lambda_3 = 1 \) ,于是\( p \)表达为它们各自概率的线性组合。以下是系数计算方法
\[ \begin{align}
& \text{set} \quad \lambda_1 + \lambda_2 + \lambda_3 = 0 \\
& \text{foreach trigram} \quad t_1, t_2, t_3 \quad \text{with} \quad f(t_1,t_2,t_3) > 0 \\
& \qquad \text{depending on the maximum of the following three values:} \\
& \qquad \qquad \text{case} \quad \frac{f(t_1,t_2,t_3)-1}{f(t_1,t_2)-1} \text{:} \qquad \text{increment} \quad \lambda_3 \quad \text{by} \quad f(t_1,t_2,t_3) \\
& \qquad \qquad \text{case} \quad \frac{f(t_2,t_3)-1}{f(t_2)-1} \text{:} \qquad \text{increment} \quad \lambda_2 \quad \text{by} \quad f(t_1,t_2,t_3) \\
& \qquad \qquad \text{case} \quad \frac{f(t_3)-1}{N-1} \text{:} \qquad \text{increment} \quad \lambda_1 \quad \text{by} \quad f(t_1,t_2,t_3) \\
& \qquad \text{end} \\
& \text{end} \\
& \text{normalize} \quad \lambda_1,\lambda_2,\lambda_3
\end{align} \]
计算时如果其中某个式子分母为0,则让计算结果为0。
其实算法就是在累计三元,二元,一元在每个三元组下的数量,最后用这个数量归一得到比例。这就将三元组概率表达为线性插值算法。分子分母减1是考虑到可能有些样本没有观察到。

其实在这里用梯度下降训练系数可能效果会更好些,或改为总数累计比例, 实际项目中需要尝试多种系数计算方法对比平滑效果。考虑到字序列分词只有BMES三种标签,实际上样本稍微多点就不需要平滑。

平滑观察概率矩阵

零概率问题不只是在转移概率中存在,因为我们在观察概率中也考虑了连续两个隐藏状态,这里也可能存在两个隐藏序列标注观察到某字符是零概率(考虑到汉字可有8万个之多,难免有样本中未统计到的状况)。在这里的平滑方式模仿转移概率平滑,但是考虑到观测字符在样本中并未出现过的情况,此时退回到其他的平滑方式,例如最简单的+1平滑。

代码实现和效果

并未优化代码结构和性能,只验证算法。因转移矩阵并无采样缺失,所以未做转移矩阵平滑,只对观察矩阵做了\( TnT \)平滑, \( +1 \)平滑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# -*- coding: utf-8 -*-
import sys, re, math
import numpy as np
 
sys.argv.append('./gold/pku_training_words.utf8')
sys.argv.append('./training/pku_training.utf8')
sys.argv.append('./testing/pku_test.utf8')
 
assert len(sys.argv) == 4
 
training_words_filename = sys.argv[1]
training_filename = sys.argv[2]
test_filename = sys.argv[3]
 
with open(training_words_filename, 'rt', encoding='utf8') as f:
    training_words = f.readlines()
 
with open(training_filename, 'rt', encoding='utf8') as f:
    training = f.readlines()
 
with open(test_filename, 'rt', encoding='utf8') as f:
    test = f.readlines()
 
# training += training_words
# word tag by char
hidden_state = ['B','M','E','S']
A, B, P = {}, {}, {}
_N = 0
_O = {}
for line in training:
    #print( line )
    prev_a = None
    for w, word in enumerate(re.split(r'\s{2}', line)):
        I = len(word)
        _N += I
        for i, c in enumerate(word):
            _O[c] = _O.get(c, 0) + 1
            if I == 1:
                a = 'S'
            else:
                if i == 0:
                    a = 'B'
                elif i == I-1:
                    a = 'E'
                else:
                    a = 'M'
            # print(w, i, c, a)
            if prev_a is None: # calculate Initial state Number
                if a not in P: P[a] = 0
                P[a] += 1
            else: # calculate State transition Number
                if prev_a not in A: A[prev_a] = {}
                if a not in A[prev_a]: A[prev_a][a] = 0
                A[prev_a][a] += 1
            prev_a = a
            # calculate Observation Number
            if a not in B: B[a] = {}
            if c not in B[a]: B[a][c] = 0
            B[a][c] += 1
_B = B.copy()            
# calculate probability
for k, v in A.items():
    total = sum(v.values())
    A[k] = dict([(x, math.log(y / total)) for x, y in v.items()])
for k, v in B.items():
    # plus 1 smooth
    total = sum(v.values())
    V = len(v.values())
    B[k] = dict([(x, math.log((y+1.0) / (total+V))) for x, y in v.items()])
    # plus 1 smooth
    B[k]['<UNK>'] = math.log(1.0 / (total+V))
minlog = math.log( sys.float_info.min )
total = sum(P.values())
for k, v in P.items():
    P[k] = math.log( v / total )
 
A2,B2 = {}, {}
for line in training:
    temptags = []
    tempsent = []
    for w, word in enumerate(re.split(r'\s{2}', line)):
        I = len(word)
        for i, c in enumerate(word):
            if I == 1:
                a = 'S'
            else:
                if i == 0:
                    a = 'B'
                elif i == I-1:
                    a = 'E'
                else:
                    a = 'M'
            temptags.append(a)
            tempsent.append(c)
            if len(temptags) >= 3:
                if temptags[-3] not in A2: A2[temptags[-3]] = {}
                if temptags[-2] not in A2[temptags[-3]]: A2[temptags[-3]][temptags[-2]] = {}
                if temptags[-1] not in A2[temptags[-3]][temptags[-2]]: A2[temptags[-3]][temptags[-2]][temptags[-1]] = 0
                A2[temptags[-3]][temptags[-2]][temptags[-1]] += 1
            if len(temptags) >= 2:
                if temptags[-2] not in B2: B2[temptags[-2]] = {}
                if temptags[-1] not in B2[temptags[-2]]: B2[temptags[-2]][temptags[-1]] = {}
                if tempsent[-1] not in B2[temptags[-2]][temptags[-1]]: B2[temptags[-2]][temptags[-1]][tempsent[-1]] = 0
                B2[temptags[-2]][temptags[-1]][tempsent[-1]] += 1
    #print( temptags, tempsent )
    #break
 
# calculate A2 log probabilitis
for i in A2:
    for j in A2[i]:
        total = sum([A2[i][j][k] for k in A2[i][j]])
        for k in A2[i][j]:
            A2[i][j][k] = math.log( A2[i][j][k] / total )
_A = np.array( [0,0,0] )
for i in B2:
    for j in B2[i]:
        total = sum([B2[i][j][k] for k in B2[i][j]])
        V = len( B2[i][j] )
        for k in B2[i][j]:
            # TnT smooth
                        # 计算 TnT 平滑系数
            a3 = ((B2[i][j][k] - 1.) / (total - 1.)) if total > 1 else 0
            a2 = (_B[j][k] - 1.) / ( sum([_B[j][n] for n in _B[j]]) - 1. )
            a1 = (_O[k] - 1.) / (_N - 1.)
            _A[np.argmax([a1,a2,a3])] += B2[i][j][k]
                        # 计算二阶观察矩阵概率(+1平滑)
            B2[i][j][k] = math.log( ( B2[i][j][k] + 1 ) / (total + V) )
        B2[i][j]['<UNK>'] = math.log( 1.0 / (total + V) )
 
# 计算 TnT 平滑系数
_A = _A / _A.sum()
 
# 计算+1平滑字频
for o in _O:
    _O[o] = math.log( (_O[o] + 1.0) / (_N + len(_O)) )
_O['<UNK>'] = math.log( 1.0 / (_N + len(_O)) )
 
def viterbi2(observation):
    def _A2(i,j,k):
        key = ''.join([i,j,k])
        if key.find('BB') >=0 : return minlog
        if key.find('MB') >=0 : return minlog
        if key.find('SM') >=0 : return minlog
        if key.find('EM') >=0 : return minlog
        if key.find('EE') >=0 : return minlog
        if key.find('SE') >=0 : return minlog
        if key.find('BS') >=0 : return minlog
        if key.find('MS') >=0 : return minlog
        try:
            return A2[i][j][k]
        except Exception as e:
            print( i, j, k)
            raise e
 
    def _B2(i,j,o):
        #return B[j].get(o, B[j]['<UNK>'])
        key = ''.join([i,j])
        if key == 'BB': return minlog
        if key == 'MB': return minlog
        if key == 'SM': return minlog
        if key == 'EM': return minlog
        if key == 'EE': return minlog
        if key == 'SE': return minlog
        if key == 'BS': return minlog
        if key == 'MS': return minlog
        #if o not in B2[i][j]:
        #    return B[j].get(o, B[j]['<UNK>'])
        #return B2[i][j].get(o, B2[i][j]['<UNK>'])
        return _A[0] * _O.get(o, _O['<UNK>']) + _A[1] * B[j].get(o, B[j]['<UNK>']) + _A[2] * B2[i][j].get(o, B2[i][j]['<UNK>'])
 
    state = ['B','M','E','S']
    T = len(observation)
    delta = [None] * (T + 1)
    psi = [None] * (T + 2)
    for i in state:
        if delta[1] is None: delta[1] = {}
        if i not in delta[1]: delta[1][i] = {}
        for j in state:
            delta[1][i][j] = P.get(i, minlog) + B[i].get(observation[0], B[i]['<UNK>']) + A[i].get(j, minlog) + _B2(i, j, observation[1])
    for t in range(2, T):
        Ot = observation[t]
        if delta[t] is None: delta[t] = {}
        if psi[t] is None: psi[t] = {}
        for j in state:
            if j not in delta[t]: delta[t][j] = {}
            if j not in psi[t]: psi[t][j] = {}
            for k in state:
                delta[t][j][k], psi[t][j][k] = max( [ (delta[t-1][i][j] + _A2(i,j,k) + _B2(j,k,Ot), i) for i in state] )
    delta[T], (psi[T], psi[T+1]) = max( [ (delta[T-1][i][j], (i, j)) for i in state for j in state] )
    q = [None] * (T+2)
    q[T+1] = psi[T+1]
    q[T] = psi[T]
    for t in range(T-1, 1, -1):
        q[t] = psi[t][q[t+1]][q[t+2]]
    return q[2:]
 
for sent in test:
    if len(sent) < 2:
        print(sent, sep='', end='')
        continue
    sequence = viterbi2( list(sent) )
    segment = []
    for char, tag in zip(sent, sequence):
        if tag == 'B':
            segment.append(char)
        elif tag == 'M':
            segment[-1] += char
        elif tag == 'E':
            segment[-1] += char
        elif tag == 'S':
            segment.append(char)
        else:
            raise Exception()
    print('  '.join(segment), sep='', end='')
    #break

训练和评估使用pku语料:

=== SUMMARY:
=== TOTAL INSERTIONS:   3921
=== TOTAL DELETIONS:    7163
=== TOTAL SUBSTITUTIONS:    13864
=== TOTAL NCHANGE:  24948
=== TOTAL TRUE WORD COUNT:  104372
=== TOTAL TEST WORD COUNT:  101130
=== TOTAL TRUE WORDS RECALL:    0.799
=== TOTAL TEST WORDS PRECISION: 0.824
=== F MEASURE:  0.811
=== OOV Rate:   0.058
=== OOV Recall Rate:    0.360
=== IV Recall Rate: 0.825

pku测试算法分词性能对比

algorithm P R F OOV OOV Recall IV Recall
maximum matching 0.836 0.904 0.869 0.058 0.059 0.956
maximum probability 0.859 0.919 0.888 0.058 0.085 0.970
HMM 0.804 0.789 0.796 0.058 0.364 0.815
Full Second Order HMM 0.824 0.799 0.811 0.058 0.360 0.825

参考资料

[1] Brants, Thorsten. “TnT: a statistical part-of-speech tagger.” Proceedings of the sixth conference on Applied natural language processing. Association for Computational Linguistics, 2000.
[2] Scott M. Thede and Mary P. Harper, A Second-Order Hidden Markov Model for Part-of-Speech
Tagging

One Response to“中文分词词性和序列标注之二阶HMM”

  1. inso
    2018年11月28日 at pm2:30 #

    谢谢分享!

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload the CAPTCHA.

Proudly powered by WordPress   Premium Style Theme by www.gopiplus.com