之前的分词算法都需要附带词表,最大概率法需要计算词频,对于词表中不存在的新词,算法效果并不是很好。HMM是基于字统计的分词算法。无需词表,无需统计词频。对新词识别友好。
HMM用于分词的原理。
基于字标注词语切分
共同创造美好的新世纪
共同 创造 美好 的 新世纪
用字母B表示词语开始,M表示词语中间,E表示词语结束,S表示独立成词。那么上面的切分以字为单位重新标注
\[ \begin{array}{cccccccccccccc}
B & E & \quad & B & E & \quad & B & E & \quad & S & \quad & B & M & E \\
共 & 同 & \quad & 创 & 造 & \quad & 美 & 好 & \quad & 的 & \quad & 新 & 世 & 纪
\end{array} \]
状态序列和观测序列
假设切分标注只有从左到右的前后依赖关系,且每一个标注只与上一个标注有关,但和上一次之前的标注无关。那么从上面的字标注可以看出。其适合HMM模型,此时待切分语句就是观测序列,切分结果就是隐藏的状态序列。于是分词任务就变成HMM算法中给定模型和观测序列找到最大化隐藏状态序列任务。
建造HMM模型
- \(N\):模型隐藏状态个数,在分词切分标注时,隐藏状态就4个,即B,M,E,S
\[ S = \{ B,M,E,S \} \] - \(N\):模型观测状态个数,在分词时,观测状态就是语料文本中的所有字符。
但是这里要注意,难免有些字符在语料中并不存在,所以,需要如同之前最大概率法分词那样设置一个未知字符观测状态,并且在观测概率中给其指定一个平滑概率.
\[ V = \{ \upsilon_1,\upsilon_2,\cdots,\upsilon_M \} \] - 隐藏状态转移概率矩阵,这里可以看到部分转移概率是0,这是很显然的事。
\[ A = \begin{array}{c|cccc}
\text{} & B & M & E & S \\
\hline
B & 0.0 & p(M|B) & p(E|B) & 0.0 \\
M & 0.0 & p(M|M) & p(E|M) & 0.0 \\
E & p(B|E) & 0.0 & 0.0 & p(S|E) \\
S & p(B|S) & 0.0 & 0.0 & p(S|S)
\end{array} \] - 观测矩阵, 这里需注意预留一个未知观测状态/字符列。
\[ B = \begin{bmatrix}
p(\upsilon_1|B) & p(\upsilon_2|B) & \cdots & p(\upsilon_M|B) \\
p(\upsilon_1|M) & p(\upsilon_2|M) & \cdots & p(\upsilon_M|M) \\
p(\upsilon_1|E) & p(\upsilon_2|E) & \cdots & p(\upsilon_M|E) \\
p(\upsilon_1|S) & p(\upsilon_2|S) & \cdots & p(\upsilon_M|S)
\end{bmatrix} \] - 初始状态概率,很显然P(M)=P(E)=0.0。
\[ \Pi = \begin{bmatrix}
p(B) \\
p(M) \\
p(E) \\
p(S) \\
\end{bmatrix} \]
分词效果和代码
分词效果看起来比最大概率法差很多,因字标记只考虑了上个字标记,而词语可能会存在更长的上下文依赖。不过可以看出新词召回率比最大概率法更高,以字为单位分词标记提供了新词识别的可能。
=== SUMMARY:
=== TOTAL INSERTIONS: 4546
=== TOTAL DELETIONS: 6535
=== TOTAL SUBSTITUTIONS: 15505
=== TOTAL NCHANGE: 26586
=== TOTAL TRUE WORD COUNT: 104372
=== TOTAL TEST WORD COUNT: 102383
=== TOTAL TRUE WORDS RECALL: 0.789
=== TOTAL TEST WORDS PRECISION: 0.804
=== F MEASURE: 0.796
=== OOV Rate: 0.058
=== OOV Recall Rate: 0.364
=== IV Recall Rate: 0.815
完整实现代码
# -*- coding: utf-8 -*-
import sys, re, math
sys.argv.append('./gold/pku_training_words.utf8')
sys.argv.append('./training/pku_training.utf8')
sys.argv.append('./testing/pku_test.utf8')
assert len(sys.argv) == 4
training_words_filename = sys.argv[1]
training_filename = sys.argv[2]
test_filename = sys.argv[3]
with open(training_words_filename, 'rt', encoding='utf8') as f:
training_words = f.readlines()
with open(training_filename, 'rt', encoding='utf8') as f:
training = f.readlines()
with open(test_filename, 'rt', encoding='utf8') as f:
test = f.readlines()
# training += training_words
# word tag by char
A, B, P = {}, {}, {}
for line in training:
#print( line )
prev_a = None
for w, word in enumerate(re.split(r'\s{2}', line)):
I = len(word)
for i, c in enumerate(word):
if I == 1:
a = 'S'
else:
if i == 0:
a = 'B'
elif i == I-1:
a = 'E'
else:
a = 'M'
# print(w, i, c, a)
if prev_a is None: # calculate Initial state Number
if a not in P: P[a] = 0
P[a] += 1
else: # calculate State transition Number
if prev_a not in A: A[prev_a] = {}
if a not in A[prev_a]: A[prev_a][a] = 0
A[prev_a][a] += 1
prev_a = a
# calculate Observation Number
if a not in B: B[a] = {}
if c not in B[a]: B[a][c] = 0
B[a][c] += 1
# calculate probability
for k, v in A.items():
total = sum(v.values())
A[k] = dict([(x, math.log(y / total)) for x, y in v.items()])
for k, v in B.items():
# plus 1 smooth
total = sum(v.values())
V = len(v.values())
B[k] = dict([(x, math.log((y+1.0) / (total+V))) for x, y in v.items()])
# plus 1 smooth
B[k][''] = math.log(1.0 / (total+V))
minlog = math.log( sys.float_info.min )
total = sum(P.values())
for k, v in P.items():
P[k] = math.log( v / total )
def viterbi(observation):
state = ['B','M','E','S']
T = len(observation)
delta = [None] * (T + 1)
psi = [None] * (T + 1)
delta[0] = dict([(i, P.get(i, minlog) + B[i].get(observation[0], B[i][''])) for i in state])
psi[0] = dict( [ (i, None) for i in state ] )
for t in range(1, len(observation)):
Ot = observation[t]
delta[t] = dict([(j, max([delta[t-1][i] + A[i].get(j, minlog) + B[j].get(Ot, B[j]['']) for i in state])) for j in state])
psi[t] = dict([(j, max([(delta[t-1][i] + A[i].get(j, minlog), i) for i in state])[1]) for j in state ])
delta[T] = max( [ delta[T-1][i] for i in state ] )
psi[T] = max( [(delta[T-1][i], i) for i in state ] )[1]
q = [None] * (T+1)
q[T] = psi[T]
for t in range(T-1, -1, -1):
q[t] = psi[t][q[t+1]]
return q[1:]
for sent in test:
sequence = viterbi( list(sent) )
segment = []
for char, tag in zip(sent, sequence):
if tag == 'B':
segment.append(char)
elif tag == 'M':
segment[-1] += char
elif tag == 'E':
segment[-1] += char
elif tag == 'S':
segment.append(char)
else:
raise Exception()
print(' '.join(segment), sep='', end='')
#break
Leave a Reply