中文分词词性和序列标注之HMM

之前的分词算法都需要附带词表,最大概率法需要计算词频,对于词表中不存在的新词,算法效果并不是很好。HMM是基于字统计的分词算法。无需词表,无需统计词频。对新词识别友好。

HMM用于分词的原理。

基于字标注词语切分

共同创造美好的新世纪
共同 创造 美好 的 新世纪

用字母B表示词语开始,M表示词语中间,E表示词语结束,S表示独立成词。那么上面的切分以字为单位重新标注
\[ \begin{array}{cccccccccccccc}
B & E & \quad & B & E & \quad & B & E & \quad & S & \quad & B & M & E \\
共 & 同 & \quad & 创 & 造 & \quad & 美 & 好 & \quad & 的 & \quad & 新 & 世 & 纪
\end{array} \]

状态序列和观测序列

假设切分标注只有从左到右的前后依赖关系,且每一个标注只与上一个标注有关,但和上一次之前的标注无关。那么从上面的字标注可以看出。其适合HMM模型,此时待切分语句就是观测序列,切分结果就是隐藏的状态序列。于是分词任务就变成HMM算法中给定模型和观测序列找到最大化隐藏状态序列任务。

建造HMM模型

  1. \(N\):模型隐藏状态个数,在分词切分标注时,隐藏状态就4个,即B,M,E,S
    \[ S = \{ B,M,E,S \} \]
  2. \(N\):模型观测状态个数,在分词时,观测状态就是语料文本中的所有字符。
    但是这里要注意,难免有些字符在语料中并不存在,所以,需要如同之前最大概率法分词那样设置一个未知字符观测状态,并且在观测概率中给其指定一个平滑概率.
    \[ V = \{ \upsilon_1,\upsilon_2,\cdots,\upsilon_M \} \]
  3. 隐藏状态转移概率矩阵,这里可以看到部分转移概率是0,这是很显然的事。
    \[ A = \begin{array}{c|cccc}
    \text{} & B & M & E & S \\
    \hline
    B & 0.0 & p(M|B) & p(E|B) & 0.0 \\
    M & 0.0 & p(M|M) & p(E|M) & 0.0 \\
    E & p(B|E) & 0.0 & 0.0 & p(S|E) \\
    S & p(B|S) & 0.0 & 0.0 & p(S|S)
    \end{array} \]
  4. 观测矩阵, 这里需注意预留一个未知观测状态/字符列。
    \[ B = \begin{bmatrix}
    p(\upsilon_1|B) & p(\upsilon_2|B) & \cdots & p(\upsilon_M|B) \\
    p(\upsilon_1|M) & p(\upsilon_2|M) & \cdots & p(\upsilon_M|M) \\
    p(\upsilon_1|E) & p(\upsilon_2|E) & \cdots & p(\upsilon_M|E) \\
    p(\upsilon_1|S) & p(\upsilon_2|S) & \cdots & p(\upsilon_M|S)
    \end{bmatrix} \]
  5. 初始状态概率,很显然P(M)=P(E)=0.0。
    \[ \Pi = \begin{bmatrix}
    p(B) \\
    p(M) \\
    p(E) \\
    p(S) \\
    \end{bmatrix} \]

分词效果和代码

分词效果看起来比最大概率法差很多,因字标记只考虑了上个字标记,而词语可能会存在更长的上下文依赖。不过可以看出新词召回率比最大概率法更高,以字为单位分词标记提供了新词识别的可能。

=== SUMMARY:
=== TOTAL INSERTIONS:   4546
=== TOTAL DELETIONS:    6535
=== TOTAL SUBSTITUTIONS:    15505
=== TOTAL NCHANGE:  26586
=== TOTAL TRUE WORD COUNT:  104372
=== TOTAL TEST WORD COUNT:  102383
=== TOTAL TRUE WORDS RECALL:    0.789
=== TOTAL TEST WORDS PRECISION: 0.804
=== F MEASURE:  0.796
=== OOV Rate:   0.058
=== OOV Recall Rate:    0.364
=== IV Recall Rate: 0.815

完整实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# -*- coding: utf-8 -*-
import sys, re, math
 
sys.argv.append('./gold/pku_training_words.utf8')
sys.argv.append('./training/pku_training.utf8')
sys.argv.append('./testing/pku_test.utf8')
 
assert len(sys.argv) == 4
 
training_words_filename = sys.argv[1]
training_filename = sys.argv[2]
test_filename = sys.argv[3]
 
with open(training_words_filename, 'rt', encoding='utf8') as f:
    training_words = f.readlines()
 
with open(training_filename, 'rt', encoding='utf8') as f:
    training = f.readlines()
 
with open(test_filename, 'rt', encoding='utf8') as f:
    test = f.readlines()
 
# training += training_words
# word tag by char
A, B, P = {}, {}, {}
for line in training:
    #print( line )
    prev_a = None
    for w, word in enumerate(re.split(r'\s{2}', line)):
        I = len(word)
        for i, c in enumerate(word):
            if I == 1:
                a = 'S'
            else:
                if i == 0:
                    a = 'B'
                elif i == I-1:
                    a = 'E'
                else:
                    a = 'M'
            # print(w, i, c, a)
            if prev_a is None: # calculate Initial state Number
                if a not in P: P[a] = 0
                P[a] += 1
            else: # calculate State transition Number
                if prev_a not in A: A[prev_a] = {}
                if a not in A[prev_a]: A[prev_a][a] = 0
                A[prev_a][a] += 1
            prev_a = a
            # calculate Observation Number
            if a not in B: B[a] = {}
            if c not in B[a]: B[a][c] = 0
            B[a][c] += 1
 
# calculate probability
for k, v in A.items():
    total = sum(v.values())
    A[k] = dict([(x, math.log(y / total)) for x, y in v.items()])
for k, v in B.items():
    # plus 1 smooth
    total = sum(v.values())
    V = len(v.values())
    B[k] = dict([(x, math.log((y+1.0) / (total+V))) for x, y in v.items()])
    # plus 1 smooth
    B[k]['<UNK>'] = math.log(1.0 / (total+V))
minlog = math.log( sys.float_info.min )
total = sum(P.values())
for k, v in P.items():
    P[k] = math.log( v / total )
 
def viterbi(observation):
    state = ['B','M','E','S']
    T = len(observation)
    delta = [None] * (T + 1)
    psi = [None] * (T + 1)
    delta[0] = dict([(i, P.get(i, minlog) + B[i].get(observation[0], B[i]['<UNK>'])) for i in state])
    psi[0] = dict( [ (i, None) for i in state ] )
    for t in range(1, len(observation)):
        Ot = observation[t]
        delta[t] = dict([(j, max([delta[t-1][i] + A[i].get(j, minlog) + B[j].get(Ot, B[j]['<UNK>']) for i in state])) for j in state])
        psi[t] = dict([(j, max([(delta[t-1][i] + A[i].get(j, minlog), i) for i in state])[1]) for j in state ])
    delta[T] = max( [ delta[T-1][i] for i in state ] )
    psi[T] = max( [(delta[T-1][i], i) for i in state  ] )[1]
    q = [None] * (T+1)
    q[T] = psi[T]
    for t in range(T-1, -1, -1):
        q[t] = psi[t][q[t+1]]
    return q[1:]
 
for sent in test:
    sequence = viterbi( list(sent) )
    segment = []
    for char, tag in zip(sent, sequence):
        if tag == 'B':
            segment.append(char)
        elif tag == 'M':
            segment[-1] += char
        elif tag == 'E':
            segment[-1] += char
        elif tag == 'S':
            segment.append(char)
        else:
            raise Exception()
    print('  '.join(segment), sep='', end='')
    #break

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload the CAPTCHA.

Proudly powered by WordPress   Premium Style Theme by www.gopiplus.com