最大概率法分词是在最大匹配分词算法上的改进。在某些语句切分时,按最大长度切分词语可能并不是最优切分。而不按最优长度切分词语,则同一语句会出现多种切分结果。计算每种切分结果的概率,选取概率最高的切分作为最优分词切分。
一、计算切分概率
- \(W\):所有可能的切分组合
- \(S\):待切分语句
最佳切分:
\[ {\arg\max}_w{P(W|S=s)} \]
切分概率:
\[ p(w|s) = \frac{p(w)p(s|w)}{p(s)} \]
因为始终是同一语句,\( p(s) \) 未曾改变, \( p(s|w) \) 始终为1. 仅需计算 \( p(w) \).
在词表中需记录每个词语的词频,计算每个独立词语出现的联合概率. 使用一元语法(不考虑词语上下文关系)则.
\[ \begin{align}
p(w^n_1) &= p(w_1,w_2, \cdots, w_n) \\
&= p(w_1)p(w_2){\cdots}p(w_n) \\
&= {\prod}^{n}_{1}p(w_n) \\
\end{align} \]
在实践中一般用对数求和替代求积,原因是词语概率一般会很小,多次连乘后可能造成溢出.
\[ \log{p(w)} = {\sum}^{n}_{1}\log{p(w_n)} \]
只需要比较大小,所以最后计算结果无需再做一次 \( \log \)
二、平滑
词表中不可能记录所有可能存在的词语和其词频,对于词表中没有的词如果认为其概率为0,则会存在0概率切分,为了解决零切分问题,给新词指定一个比较低的概率. 例如指定为词表中出现频数最低的词概率,或将其看待为只出现过一次,在这里 \(N\) 表示词表词条总数。\( p(w_i) \) 是词表中的词语概率。
- 将新词指定为词表中的最低词频率。
\[ p(w_{new}) = \min_i{p(w_i)} \] - 加\(1\)平滑。
假设在建造词表时所有词语都被漏计1次。所以刚好在词表中查询不到在待切分样本中碰到的新词(因为在切分特定位置时,新词只出现了1次)。因此在计算词表中的词语词频和新词词频时,给所有的词语词频\(+1\)。新的词语概率计算如下:
\[ p(w)=\frac{c(w)+1}{N+V} \qquad c(w)=\begin{cases}
0, & \text{} \\
count(w), & \text{ \( {w} \in {dict} \) } \end{cases} \]
\( N \) 词表中所有词语总计词频.
\( V \) 词语总数,新词也算一个词语。
\( c (w)\) 词表中对应词语频数,新词词频为\(0\)。
三、切分词语
切分词语时从语句中每个字作为词语前缀字符查询词表,查询结果构造为DAG图,如此表中无此词汇,则单字成词。以下代码摘录自结巴分词。
def get_DAG(sentence):
global pfdict, FREQ
DAG = {}
N = len(sentence)
for k in range(N):
tmplist = []
i = k
frag = sentence[k]
while i < N and frag in pfdict:
if frag in FREQ:
tmplist.append(i)
i += 1
frag = sentence[k:i+1]
if not tmplist:
tmplist.append(k)
DAG[k] = tmplist
return DAG
四、选择最优切分
有两种方法计算
1. 对DAG图做DFS搜索,得到所有切分组合,分别计算各组合概率,取最大概率组合输出。
segment_sent = []
def dfs(words, x):
for y in DAG[x]:
if y < N:
new = words.copy()
new.append(sentence[x:y])
dfs(new, y)
else:
new = words.copy()
new.append(sentence[x:y])
segment_sent.append( new )
dfs([], 0)
bestseg = max([(sum(fdist.get(w, math.log(1.0/total)) for w in seg), seg) for seg in segment_sent])
很显然,这个计算复杂度非常高 \( O(\overline{word-length}^{(N-1)}) \) 以语句长度呈指数增长。
2. 对DAG图做动态规划DP算法, 从深度搜索这里可以看到, 其实很多局部计算过程是重复的。因此这里从图后面向前面反向计算, 每次累计最大概率结果。计算每个字为前缀的不同切分词语概率和后续最大切分概率的乘积,取最大结果为最佳切分,依次向语句前方递推。最后得到语句第一个字开始的最佳切分词语位置,并由第一个字开始的最佳切分词语位置向后依次查询切分词语,最终得到最佳切分。以下代码摘录自结巴分词。
def calc(sentence, DAG, idx, route):
N = len(sentence)
route[N] = (0.0, '')
for idx in range(N-1, -1, -1):
route[idx] = max((FREQ.g
def __cut_DAG_NO_HMM(sentence):
re_eng = re.compile(r'[a-zA-Z0-9]',re.U)
DAG = get_DAG(sentence)
route = {}
calc(sentence, DAG, 0, route)
x = 0
N = len(sentence)
buf = ''
while x < N:
y = route[x][1] + 1
l_word = sentence[x:y]
if re_eng.match(l_word) and len(l_word) == 1:
buf += l_word
x = y
else:
if buf:
yield buf
buf = ''
yield l_word
x = y
if buf:
yield buf
buf = ''
五、评估效果
使用icwb2-data,pku语料, 词频也来自pku语料统计,封闭测试。最大概率法分词效果如下:
=== SUMMARY:
=== TOTAL INSERTIONS: 8644
=== TOTAL DELETIONS: 1325
=== TOTAL SUBSTITUTIONS: 7114
=== TOTAL NCHANGE: 17083
=== TOTAL TRUE WORD COUNT: 104372
=== TOTAL TEST WORD COUNT: 111691
=== TOTAL TRUE WORDS RECALL: 0.919
=== TOTAL TEST WORDS PRECISION: 0.859
=== F MEASURE: 0.888
=== OOV Rate: 0.058
=== OOV Recall Rate: 0.085
=== IV Recall Rate: 0.970
完整代码
# -*- coding: utf-8 -*-
import sys
import os.path
import math
import nltk
from nltk.corpus import PlaintextCorpusReader
sys.argv.append('./gold/pku_training_words.utf8')
sys.argv.append('./training/pku_training.utf8')
sys.argv.append('./testing/pku_test.utf8')
assert len(sys.argv) == 4
with open(sys.argv[1], 'rt', encoding='utf8') as f:
training_words = [w.strip() for w in f.readlines()]
training = PlaintextCorpusReader( *os.path.split(sys.argv[2]) )
training_words += list(training.words())
#training_words = list(training.words())
N = len(training_words)
V = len( set(training_words) )
fdist = nltk.FreqDist(training_words)
fdist = dict([(w, math.log((c+1.0)/(N+V))) for w, c in fdist.items()])
defprob = math.log(1.0/(N+V))
with open(sys.argv[3], 'rt', encoding='utf8') as f:
test = f.readlines()
def get_DAG(sentence):
DAG = {}
T = len(sentence)
for x in range(T):
ys = []
for y in range(x+1, T+1):
if sentence[x:y] in fdist:
ys.append(y)
if not ys:
ys.append(x+1)
DAG[x] = ys
return DAG
def dfs(DAG, sentence):
segments = []
T = len(sentence)
def _dfs(words, x):
for y in DAG[x]:
if y < T:
new = words.copy()
new.append(sentence[x:y])
_dfs(new, y)
else:
new = words.copy()
new.append(sentence[x:y])
segments.append( new )
_dfs([], 0)
bestseg = max([(sum(fdist.get(w, defprob) for w in seg), seg) for seg in segments])
return bestseg[1]
def dp(DAG, sentence):
T = len(sentence)
prob = {T:(0.0,)}
for x in range(T-1, -1, -1):
prob[x] = max([(fdist.get(sentence[x:y], defprob) + prob[y][0], y) for y in DAG[x]])
x = 0
bestseg = []
while x < T:
y = prob[x][1]
bestseg.append( sentence[x:y] )
x = y
return bestseg
for sent in test:
DAG = get_DAG(sent)
#seg1 = dfs(DAG, sent)
seg2 = dp(DAG, sent)
#print(' '.join(seg1), sep='', end='')
print(' '.join(seg2), sep='', end='')
#break
Leave a Reply