之前的分词算法都需要附带词表,最大概率法需要计算词频,对于词表中不存在的新词,算法效果并不是很好。HMM是基于字统计的分词算法。无需词表,无需统计词频。对新词识别友好。
HMM用于分词的原理。
基于字标注词语切分
共同创造美好的新世纪
共同 创造 美好 的 新世纪
用字母B表示词语开始,M表示词语中间,E表示词语结束,S表示独立成词。那么上面的切分以字为单位重新标注
\[ \begin{array}{cccccccccccccc}
B & E & \quad & B & E & \quad & B & E & \quad & S & \quad & B & M & E \\
共 & 同 & \quad & 创 & 造 & \quad & 美 & 好 & \quad & 的 & \quad & 新 & 世 & 纪
\end{array} \]
状态序列和观测序列
假设切分标注只有从左到右的前后依赖关系,且每一个标注只与上一个标注有关,但和上一次之前的标注无关。那么从上面的字标注可以看出。其适合HMM模型,此时待切分语句就是观测序列,切分结果就是隐藏的状态序列。于是分词任务就变成HMM算法中给定模型和观测序列找到最大化隐藏状态序列任务。
建造HMM模型
- \(N\):模型隐藏状态个数,在分词切分标注时,隐藏状态就4个,即B,M,E,S
\[ S = \{ B,M,E,S \} \] - \(N\):模型观测状态个数,在分词时,观测状态就是语料文本中的所有字符。
但是这里要注意,难免有些字符在语料中并不存在,所以,需要如同之前最大概率法分词那样设置一个未知字符观测状态,并且在观测概率中给其指定一个平滑概率.
\[ V = \{ \upsilon_1,\upsilon_2,\cdots,\upsilon_M \} \] - 隐藏状态转移概率矩阵,这里可以看到部分转移概率是0,这是很显然的事。
\[ A = \begin{array}{c|cccc}
\text{} & B & M & E & S \\
\hline
B & 0.0 & p(M|B) & p(E|B) & 0.0 \\
M & 0.0 & p(M|M) & p(E|M) & 0.0 \\
E & p(B|E) & 0.0 & 0.0 & p(S|E) \\
S & p(B|S) & 0.0 & 0.0 & p(S|S)
\end{array} \] - 观测矩阵, 这里需注意预留一个未知观测状态/字符列。
\[ B = \begin{bmatrix}
p(\upsilon_1|B) & p(\upsilon_2|B) & \cdots & p(\upsilon_M|B) \\
p(\upsilon_1|M) & p(\upsilon_2|M) & \cdots & p(\upsilon_M|M) \\
p(\upsilon_1|E) & p(\upsilon_2|E) & \cdots & p(\upsilon_M|E) \\
p(\upsilon_1|S) & p(\upsilon_2|S) & \cdots & p(\upsilon_M|S)
\end{bmatrix} \] - 初始状态概率,很显然P(M)=P(E)=0.0。
\[ \Pi = \begin{bmatrix}
p(B) \\
p(M) \\
p(E) \\
p(S) \\
\end{bmatrix} \]
分词效果和代码
分词效果看起来比最大概率法差很多,因字标记只考虑了上个字标记,而词语可能会存在更长的上下文依赖。不过可以看出新词召回率比最大概率法更高,以字为单位分词标记提供了新词识别的可能。
=== SUMMARY: === TOTAL INSERTIONS: 4546 === TOTAL DELETIONS: 6535 === TOTAL SUBSTITUTIONS: 15505 === TOTAL NCHANGE: 26586 === TOTAL TRUE WORD COUNT: 104372 === TOTAL TEST WORD COUNT: 102383 === TOTAL TRUE WORDS RECALL: 0.789 === TOTAL TEST WORDS PRECISION: 0.804 === F MEASURE: 0.796 === OOV Rate: 0.058 === OOV Recall Rate: 0.364 === IV Recall Rate: 0.815 |
完整实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | # -*- coding: utf-8 -*- import sys, re, math sys.argv.append('./gold/pku_training_words.utf8') sys.argv.append('./training/pku_training.utf8') sys.argv.append('./testing/pku_test.utf8') assert len(sys.argv) == 4 training_words_filename = sys.argv[1] training_filename = sys.argv[2] test_filename = sys.argv[3] with open(training_words_filename, 'rt', encoding='utf8') as f: training_words = f.readlines() with open(training_filename, 'rt', encoding='utf8') as f: training = f.readlines() with open(test_filename, 'rt', encoding='utf8') as f: test = f.readlines() # training += training_words # word tag by char A, B, P = {}, {}, {} for line in training: #print( line ) prev_a = None for w, word in enumerate(re.split(r'\s{2}', line)): I = len(word) for i, c in enumerate(word): if I == 1: a = 'S' else: if i == 0: a = 'B' elif i == I-1: a = 'E' else: a = 'M' # print(w, i, c, a) if prev_a is None: # calculate Initial state Number if a not in P: P[a] = 0 P[a] += 1 else: # calculate State transition Number if prev_a not in A: A[prev_a] = {} if a not in A[prev_a]: A[prev_a][a] = 0 A[prev_a][a] += 1 prev_a = a # calculate Observation Number if a not in B: B[a] = {} if c not in B[a]: B[a][c] = 0 B[a][c] += 1 # calculate probability for k, v in A.items(): total = sum(v.values()) A[k] = dict([(x, math.log(y / total)) for x, y in v.items()]) for k, v in B.items(): # plus 1 smooth total = sum(v.values()) V = len(v.values()) B[k] = dict([(x, math.log((y+1.0) / (total+V))) for x, y in v.items()]) # plus 1 smooth B[k]['<UNK>'] = math.log(1.0 / (total+V)) minlog = math.log( sys.float_info.min ) total = sum(P.values()) for k, v in P.items(): P[k] = math.log( v / total ) def viterbi(observation): state = ['B','M','E','S'] T = len(observation) delta = [None] * (T + 1) psi = [None] * (T + 1) delta[0] = dict([(i, P.get(i, minlog) + B[i].get(observation[0], B[i]['<UNK>'])) for i in state]) psi[0] = dict( [ (i, None) for i in state ] ) for t in range(1, len(observation)): Ot = observation[t] delta[t] = dict([(j, max([delta[t-1][i] + A[i].get(j, minlog) + B[j].get(Ot, B[j]['<UNK>']) for i in state])) for j in state]) psi[t] = dict([(j, max([(delta[t-1][i] + A[i].get(j, minlog), i) for i in state])[1]) for j in state ]) delta[T] = max( [ delta[T-1][i] for i in state ] ) psi[T] = max( [(delta[T-1][i], i) for i in state ] )[1] q = [None] * (T+1) q[T] = psi[T] for t in range(T-1, -1, -1): q[t] = psi[t][q[t+1]] return q[1:] for sent in test: sequence = viterbi( list(sent) ) segment = [] for char, tag in zip(sent, sequence): if tag == 'B': segment.append(char) elif tag == 'M': segment[-1] += char elif tag == 'E': segment[-1] += char elif tag == 'S': segment.append(char) else: raise Exception() print(' '.join(segment), sep='', end='') #break |
Leave a Reply