HMM-5-连续观测-高斯混合模型(HMM-GMM)

5. 连续观测

有两个方法将连续观测序列和HMM结合.

  • 将连续观测聚类为离散类别,这就对应到HMM离散观测序列. 然后按照HMM离散观测序列处理.

  • 更改观测概率函数为密度函数,例如高斯概率密度, 这需要替换观测状态序列生成函数.

5.1 替换观测概率为高斯密度

在离散观测序列中,发射函数\( b_j(m) \)定义为:

\[ B = [b_j(m)] \quad where \quad b_j(m) \equiv P(O_t = v_m | q_t = S_j) \]

将其替换为高斯密度函数(也可以是其他分布概率密度函数, 但参数更新部分需要同步替换),可以让HMM实现连续观测.

\[\begin{align}
B = [b_j(O_t)] & \equiv P(O_t | q_t = S_j, \lambda) \sim \mathcal{N}(\mu_j, \sigma_j^2) \\
&= \frac{1}{\sqrt{2 \pi \sigma_j^2}} \exp \left[ \frac{-(O_t – \mu_j)^2}{2 \sigma_j^2} \right]
\end{align}\]

参数更新:

\[\begin{align}
\bar{\mu}_j &= \frac{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(i) O_t^k }{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(i)} \\
\bar{\sigma}_j^2 &= \frac{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(i) (O_t^k – \bar{\mu}_j)^2 }{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(i)}
\end{align}\]

5.2 高斯混合分布

考虑观测序列为\( L \)个高斯分布按某比例混合.

\[\begin{align}
B = [b_j(O_t)] & \equiv P(O_t | q_t = S_j, \lambda) = \sum_{l=1}^L C_{jl} \mathcal{N}(O_t, \mu_{jl}, \sigma_{jl}^2) \\
\text{s.t.} \\
& \sum_{l=1}^L C_{jl} = 1.0
\end{align}\]

参数更新需要考虑混合系数,通过混合密度期望比例更新.

\[\begin{align}
\gamma_t^k(j, l) &= \gamma_t^k(j) \frac{C_{jl} \mathcal{N}(O_t^k, \mu_{jl}, \sigma_{jl}^2)}{\sum_{l=1}^L C_{jl} \mathcal{N}(O_t^k, \mu_{jl}, \sigma_{jl}^2)} \\
\bar{C}_{jl} &= \frac{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(j,l)}{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \sum_{l=1}^L \gamma_t^k(j, l)} \\
\bar{\mu}_{jl} &= \frac{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(j,l) O_t^k }{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(j,l)} \\
\bar{\sigma}_{jl}^2 &= \frac{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(j,l) (O_t^k – \bar{\mu}_{jl})^2 }{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(j,l)}
\end{align}\]

5.3 多维高斯混合分布

多维相对单维只是替换了高斯函数,其余没有变化, \( d \)表示维度.

\[\begin{align}
\mathcal{N}(x, \mu, \Sigma) &= \frac{1}{ (2 \pi)^{\frac{d}{2}} |\Sigma|^{\frac{1}{2}} } \exp \left[ -\frac{1}{2} (x – \mu)^T \Sigma^{-1} (x – \mu) \right] \\
\gamma_t^k(j, l) &= \gamma_t^k(j) \frac{C_{jl} \mathcal{N}(O_t^k, \mu_{jl}, \Sigma_{jl})}{\sum_{l=1}^L C_{jl} \mathcal{N}(O_t^k, \mu_{jl}, \Sigma_{jl})} \\
\bar{\Sigma}_{jl} &= \frac{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(j,l) (O_t^k – \bar{\mu}_{jl}) (O_t^k – \bar{\mu}_{jl})^T }{\sum_{k=1}^K \sum_{t=1}^{T_{O^k}} \gamma_t^k(j,l)}
\end{align}\]

在多维分布计算中,连续观测序列的数据样本过少,样本各维度之间存在线形组合,会造成协方差矩阵奇异(共线/共面), 然后高斯函数计算失败. 因此样本数量需要远多于维度,且通过预处理消除线形组合维,如必要,可以有意添加噪声.

5.4 数值稳定

工程中使用\( \hat{\gamma}_t^k(j) \) 替换 \( \gamma_t^k(j) \), 否则可能因为序列过长导致计算结果超出机器精度容许范围.

5.5 代码试验

import numpy as np
from matplotlib import pyplot as plt

# multivaiance normal probability density function
def Multivar_Norm_PDF(x, mean, cov):
    dev = x - mean
    y = np.dot(dev, np.linalg.pinv(cov))  
    y = np.sum(y * dev, axis=0)  
    y = np.exp(  -0.5 * y )    
    y = (1.0 / ((2*np.pi)**(len(x) / 2) * np.linalg.det(cov) ** 0.5)) * y
    return y

# Gaussian mixture model
def GMM_PDF(x, coef, mean, cov):
    from scipy import stats
    v1 = sum([coef[l] * stats.multivariate_normal(mean[l], cov[l], True).pdf(x) for l in range(len(coef))])
    # v2 = sum([coef[l] * Multivar_Norm_PDF(x, mean[l], cov[l]) for l in range(len(coef))])
    # np.testing.assert_allclose(v1, v2, equal_nan=False, err_msg='cov compare')
    return v1

# the probability density of the model given observation sequence and state sequence
def PDFOQ(Q, O, Lambda):
  ''' p(Q,O | \lambda)=\prod^T_{t=1}p(q_t|q_{t-1}, \lambda)p(O_t|q_t, \lambda) '''
  A, B_coef, B_mean, B_cov, Pi = Lambda
  T = O.shape[0]
  assert( len(Q) == len(O) )
  assert( A.shape[0] == A.shape[1] == B_coef.shape[0] == Pi.shape[0] )
  assert( Q.min() >= 0 and Q.max() <  A.shape[0] )
  return np.prod([(A[Q[t-1], Q[t]] if t>0 else Pi[Q[t]]) * GMM_PDF(O[t], B_coef[Q[t]],B_mean[Q[t]],B_cov[Q[t]]) for t in range(T)])
 
# the log probability density of the model given observation sequence and state sequence
def logPDFOQ(Q, O, Lambda):
  ''' \log P(Q,O | \lambda)=\sum^T_{t=1} \log p(q_t|q_{t-1}, \lambda) + \log p(O_t|q_t, \lambda) '''
  A, B_coef, B_mean, B_cov, Pi = Lambda
  T = O.shape[0]
  assert( len(Q) == len(O) )
  assert( A.shape[0] == A.shape[1] == B_coef.shape[0] == Pi.shape[0] )
  assert( Q.min() >= 0 and Q.max() <  A.shape[0] )
  logPi, logA = np.log(Pi), np.log(A)
  return np.sum([(logA[Q[t-1], Q[t]] if t>0 else logPi[Q[t]]) + np.log(GMM_PDF(O[t], B_coef[Q[t]],B_mean[Q[t]],B_cov[Q[t]])) for t in range(T)])

# normal forward algorithm
class Alpha_GMM:
  def __init__(self, O, Lambda):
    self.O = O
    self.A, self.B_coef, self.B_mean, self.B_cov, self.Pi = Lambda
    self.T = len(O)
    self.N = self.A.shape[0]
    assert( self.A.shape[0] == self.A.shape[1] == self.B_coef.shape[0] == self.B_mean.shape[0] == self.Pi.shape[0] )
    assert( self.B_coef.shape[1] == self.B_mean.shape[1] == self.B_cov.shape[1] )
    assert( self.B_mean.shape[2] == self.B_cov.shape[2] == self.B_cov.shape[3] == len(self.O[0]) )
    '''
    \alpha_t(i) \equiv P(O_1 \cdots O_t, q_t = S_i | \lambda)
    notice: param t: 0=>t1, 1=>t2, ..., T-1=>T
    '''
    self.alpha = np.zeros(shape=(self.T, self.N))
    for t in range(self.T):
      if t == 0:
        self.alpha[t] = [self.Pi[i] * GMM_PDF(self.O[t], self.B_coef[i], self.B_mean[i], self.B_cov[i]) for i in range(self.N)]
      else:
        self.alpha[t] = [sum([self.alpha[t-1, h] * self.A[h, i] for h in range(self.N)]) * GMM_PDF(self.O[t], self.B_coef[i],self.B_mean[i],self.B_cov[i]) for i in range(self.N)]
 
  def __call__(self, t, i):
    return self.alpha[t, i]

# normal backword algorithm
class Beta_GMM:
  def __init__(self, O, Lambda):
    self.O = O
    self.A, self.B_coef, self.B_mean, self.B_cov, self.Pi = Lambda
    self.T = len(O)
    self.N = self.A.shape[0]
    assert( self.A.shape[0] == self.A.shape[1] == self.B_coef.shape[0] == self.B_mean.shape[0] == self.Pi.shape[0] )
    assert( self.B_coef.shape[1] == self.B_mean.shape[1] == self.B_cov.shape[1] )
    assert( self.B_mean.shape[2] == self.B_cov.shape[2] == self.B_cov.shape[3] == len(self.O[0]) )
    '''
    \beta_t(i) \equiv P(O_{t+1} \cdots O_T | q_t = S_i, \lambda)
    notice: param t: T=>T,T-1=>T-1,...,1=>t1,0=>t0
    '''
    self.beta = np.ones(shape=(self.T+1, self.N))
    for t in range(self.T-1, -1, -1):
      if t == 0:
        self.beta[t] = np.array([ self.Pi[j] * GMM_PDF(self.O[t], self.B_coef[j],self.B_mean[j],self.B_cov[j]) * self.beta[t+1, j] for j in range(self.N)]).sum()
      else:
        self.beta[t] = np.array([ self.A[:,j] * GMM_PDF(self.O[t], self.B_coef[j],self.B_mean[j],self.B_cov[j]) * self.beta[t+1, j] for j in range(self.N)]).sum(axis=0)
 
  def __call__(self, t, i):
    return self.beta[t, i]

# scaling forward algorithm
class Alpha_Hat_GMM:
  def __init__(self, O, Lambda):
    self.O = O
    self.A, self.B_coef, self.B_mean, self.B_cov, self.Pi = Lambda
    self.T = len(O)
    self.N = self.A.shape[0]
    assert( self.A.shape[0] == self.A.shape[1] == self.B_coef.shape[0] == self.B_mean.shape[0] == self.Pi.shape[0] )
    assert( self.B_coef.shape[1] == self.B_mean.shape[1] == self.B_cov.shape[1] )
    assert( self.B_mean.shape[2] == self.B_cov.shape[2] == self.B_cov.shape[3] == len(self.O[0]) )
    '''
    \hat{\alpha}_t(i) & \equiv P(q_t = S_i | O_1 \cdots O_t, \lambda)
    notice: param t: 0=>t1, 1=>t2, ..., T-1=>T
    '''
    self.C = np.zeros(shape=(self.T,))
    self.alpha_hat = np.zeros(shape=(self.T, self.N))
    for t in range(self.T):
      if t==0:
        self.alpha_hat[t] = [self.Pi[i] * GMM_PDF(self.O[t], self.B_coef[i],self.B_mean[i],self.B_cov[i]) for i in range(self.N)]
      else:
        self.alpha_hat[t] = [sum([self.alpha_hat[t-1, h] * self.A[h, i] for h in range(self.N)]) * GMM_PDF(self.O[t], self.B_coef[i],self.B_mean[i],self.B_cov[i]) for i in range(self.N)]
      self.C[t] = self.alpha_hat[t].sum()
      self.alpha_hat[t] /= self.C[t]
 
  def __call__(self, t, i):
    return self.alpha_hat[t,i]

# scaling backward algorithm
class Beta_Hat_GMM:
  def __init__(self, O, Lambda, C):
    self.O = O
    self.A, self.B_coef, self.B_mean, self.B_cov, self.Pi = Lambda
    self.T = len(O)
    self.N = self.A.shape[0]
    self.C = C
    assert( len(self.O) == len(self.C) )
    assert( self.A.shape[0] == self.A.shape[1] == self.B_coef.shape[0] == self.B_mean.shape[0] == self.Pi.shape[0] )
    assert( self.B_coef.shape[1] == self.B_mean.shape[1] == self.B_cov.shape[1] )
    assert( self.B_mean.shape[2] == self.B_cov.shape[2] == self.B_cov.shape[3] == len(self.O[0]) )
    '''
    \hat{\beta}_t(i) = \frac{ \beta_t(i) }{ \prod_{u=t+1}^T C_u } 
    notice: param t: T=>T,T-1=>T-1,...,1=>t1,0=>t0
    '''
    self.beta_hat = np.ones(shape=(self.T+1, self.N))
    for t in range(self.T-1, -1, -1):
      if t == 0:
        self.beta_hat[t] = np.array([ self.Pi[j] * GMM_PDF(self.O[t], self.B_coef[j],self.B_mean[j],self.B_cov[j]) * self.beta_hat[t+1, j] for j in range(self.N)]).sum()
      else:
        self.beta_hat[t] = np.array([ self.A[:,j] * GMM_PDF(self.O[t], self.B_coef[j],self.B_mean[j],self.B_cov[j]) * self.beta_hat[t+1, j] for j in range(self.N)]).sum(axis=0)
      self.beta_hat[t] /= self.C[t]
      
  def __call__(self, t, i):
    return self.beta_hat[t,i]

def Xi_GMM(t, i, j, O, Lambda, alpha, beta):
  # \xi_t(i, j) = \frac{ \alpha_t(i) a_{ij} b_j(O_{t+1}) \beta_{t+1}(j) }{ \sum_k^N \sum_l^N \alpha_t(k) a_{kl} b_l(O_{t+1}) \beta_{t+1}(l) }
  A, B_coef, B_mean, B_cov, Pi = Lambda
  N = Pi.shape[0]
  numerator = alpha(t, i) * A[i, j] * GMM_PDF(O[t+1], B_coef[j],B_mean[j],B_cov[j]) * beta(t+2, j)
  denominator = sum( [alpha(t, i) * A[i, j] * GMM_PDF(O[t+1], B_coef[j],B_mean[j],B_cov[j]) * beta(t+2, j) for i in range(N) for j in range(N)] )
  return numerator/denominator
 
def Xi_Hat_GMM(t, i, j, O, Lambda, alpha_hat, beta_hat):
  # \hat{\xi}_t(i, j) &= \frac{1}{C_{t+1}} \hat{\alpha}_t(i) a_{ij} b_j(O_{t+1}) \hat{\beta}_{t+1}(j)
  A, B_coef, B_mean, B_cov, Pi = Lambda
  numerator = alpha_hat(t, i) * A[i, j] * GMM_PDF(O[t+1], B_coef[j],B_mean[j],B_cov[j]) * beta_hat(t+2, j)
  denominator = alpha_hat.C[t+1]
  return numerator/denominator

def Gamma_GMM(t, i, O, Lambda, alpha, beta):
  # \gamma_t(i) = \frac{ \alpha_t(i) \beta_t(i) }{ \sum_l^N \alpha_t(l) \beta_t(l) }
  A, B_coef, B_mean, B_cov, Pi = Lambda
  N, L = B_coef.shape
  numerator = alpha(t, i) * beta(t+1, i)
  denominator = sum( [alpha(t, i) * beta(t+1, i) for i in range(N)] )
  return numerator / denominator

def Gamma_Hat_GMM(t, i, O, Lambda, alpha_hat, beta_hat):
  # \hat{\gamma}_t(i) &= \hat{\alpha}_t(i) \hat{\beta}_t(i)
  return alpha_hat(t, i) * beta_hat(t+1, i)

def Gamma_L_GMM(t, i, l, O, Lambda, alpha, beta):
    A, B_coef, B_mean, B_cov, Pi = Lambda
    num = B_coef[i,l] * GMM_PDF(O[t], [1.0], [B_mean[i,l]], [B_cov[i,l]])
    den = GMM_PDF(O[t], B_coef[i], B_mean[i], B_cov[i])
    p1 = Gamma_GMM(t, i, O, Lambda, alpha, beta)
    p2 = num / den
    return p1 * p2

def Gamma_Hat_L_GMM(t, i, l, O, Lambda, alpha_hat, beta_hat):
    A, B_coef, B_mean, B_cov, Pi = Lambda
    num = B_coef[i,l] * GMM_PDF(O[t], [1.0], [B_mean[i,l]], [B_cov[i,l]])
    den = GMM_PDF(O[t], B_coef[i], B_mean[i], B_cov[i])
    p1 = Gamma_Hat_GMM(t, i, O, Lambda, alpha_hat, beta_hat)
    p2 = num / den
    return p1 * p2

# find best state sequnce in the model given the parameters of the model and observation sequence
def Viterbi(O, Lambda):
  A, B_coef, B_mean, B_cov, Pi = Lambda
  T = O.shape[0]
  N = A.shape[0]
  assert( A.shape[0] == A.shape[1] == B_coef.shape[0] == B_mean.shape[0] == Pi.shape[0] )
  assert( B_coef.shape[1] == B_mean.shape[1] == B_cov.shape[1] )
  assert( B_mean.shape[2] == B_cov.shape[2] == B_cov.shape[3] == len(O[0]) )
  
  delta = np.zeros(shape=(T, N))
  psi = np.zeros(shape=(T, N), dtype=np.int)
  for t in range(T):
    ''' 0=>t1, 1=>t2, ...,T-1=>T '''
    if t == 0:
      # \delta_1(i) &= \pi_i b_i(O_1)
      # delta[t] = Pi * B[:, O[t]]
      for i in range(N):
          delta[t,i] = Pi[i] * GMM_PDF(O[t], B_coef[i],B_mean[i],B_cov[i])
      # \psi_1(i) &= null
      psi[t] = np.nan
    else:
      # \delta_t(j) &= \underset{i}{\max} [\delta_{t-1}(i) a_{ij}] b_j(O_t)
      # \psi_t(j) &= \underset{i}{\arg \max} [\delta_{t-1}(i) a_{ij}] 
      for j in range(N):
        max_delta = max([(delta[t-1, i] * A[i, j] * GMM_PDF(O[t], B_coef[j],B_mean[j],B_cov[j]), i) for i in range(N)], key=lambda x:x[0])
        delta[t, j] = max_delta[0]
        psi[t, j] = max_delta[1]
        
  Q = np.zeros(shape=T, dtype=np.int)
  # \bar{P} &= \underset{i}{\max} \delta_T(i) 
  # \bar{q}_T &= \underset{i}{\arg \max} \delta_T(i) 
  P = np.max(delta[T-1])
  Q[T-1] = np.argmax(delta[T-1])
  # \bar{q}_t = \psi_{t+1}( \bar{q}_{t+1} ) \qquad \text{where} \qquad t=T-1,T-2,\cdots,2,1
  for t in range(T-2, -1, -1):
    Q[t] = psi[t+1, Q[t+1]]
    
  return Q, P

# find best state sequnce using log in the model given the parameters of the model and observation sequence
def logViterbi(O, Lambda):
  logA, B_coef, B_mean, B_cov, logPi = np.log(Lambda[0]), Lambda[1], Lambda[2], Lambda[3], np.log(Lambda[4])
  T = O.shape[0]
  N = logA.shape[0]
  assert( logA.shape[0] == logA.shape[1] == B_coef.shape[0] == B_mean.shape[0] == logPi.shape[0] )
  assert( B_coef.shape[1] == B_mean.shape[1] == B_cov.shape[1] )
  assert( B_mean.shape[2] == B_cov.shape[2] == B_cov.shape[3] == len(O[0]) )
  
  logdelta = np.zeros(shape=(T, N))
  logpsi = np.zeros(shape=(T, N), dtype=np.int)
  for t in range(T):
    ''' 0=>t1, 1=>t2, ...,T-1=>T '''
    if t == 0:
      # \delta_1(i) &= \pi_i b_i(O_1)
      # logdelta[t] = logPi + logB[:, O[t]]
      for i in range(N):
         logdelta[t,i] = logPi[i] + np.log(GMM_PDF(O[t], B_coef[i],B_mean[i],B_cov[i]))
      # \psi_1(i) &= null
      logpsi[t] = np.nan
    else:
      # \delta_{t+1}(j) = \underset{i}{\max} \delta_t(i) + \log a_{ij} + \log b_j(O_{t+1})
      # \psi_t(j) &= \underset{i}{\arg \max} [\delta_{t-1}(i) a_{ij}]
      for j in range(N):
        max_logdelta = max([(logdelta[t-1, i] + logA[i, j] + np.log(GMM_PDF(O[t], B_coef[j],B_mean[j],B_cov[j])), i) for i in range(N)], key=lambda x:x[0])
        logdelta[t, j] = max_logdelta[0]
        logpsi[t, j] = max_logdelta[1]
        
  logQ = np.zeros(shape=T, dtype=np.int)
  # \bar{P} &= \underset{i}{\max} \delta_T(i) 
  # \bar{q}_T &= \underset{i}{\arg \max} \delta_T(i) 
  logP = np.max(logdelta[T-1])
  logQ[T-1] = np.argmax(logdelta[T-1])
  # \bar{q}_t = \psi_{t+1}( \bar{q}_{t+1} ) \qquad \text{where} \qquad t=T-1,T-2,\cdots,2,1
  for t in range(T-2, -1, -1):
    logQ[t] = logpsi[t+1, logQ[t+1]]
 
  return logQ, logP

def Prepare_X_GMM(X, Lambda):
  from collections import namedtuple
  PreX = namedtuple('PreX', 'O T alpha beta alpha_hat beta_hat')
  ret = []
  for k in range(len(X)):
    O = X[k]
    T = len(O)
    alpha = Alpha_GMM(O, Lambda)
    beta = Beta_GMM(O, Lambda)
    alpha_hat = Alpha_Hat_GMM(O, Lambda)
    beta_hat = Beta_Hat_GMM(O, Lambda, alpha_hat.C)
    ret.append(PreX(O=O,T=T,alpha=alpha,beta=beta,alpha_hat=alpha_hat,beta_hat=beta_hat))
  return ret

def Baum_Welch_GMM(X, N, L, max_iter=30, epsilon=1e-8):
    # initial state probabilities
    Pi = np.random.dirichlet(np.ones(N), size=1).flatten() # \Pi = [\pi_i]
    # state transition probabilities
    A = np.random.dirichlet(np.ones(N), size=N) # A = [a_{ij}]
    
    # observation emission probabilitity densities
    # dementional
    X_d = len(X[0][0])
    # probability of sepecify gaussian case
    B_coef = np.random.dirichlet(np.ones(L), size=N)
    # gaussian mean
    B_mean = np.random.random(size=[N, L, X_d])
    # gaussian covariance matrix
    B_cov = np.random.random(size=[N, L, X_d, X_d])
    B_cov = np.matmul(B_cov, B_cov.transpose([0,1,3,2]))
    B_cov = B_cov + B_cov.transpose([0,1,3,2])
    
    # the parameters of the model
    Lambda = (A, B_coef, B_mean, B_cov, Pi) # \lambda = (A,B,\Pi)
    
    # total of observations
    X_K = len(X)
    
    pltx, plty, pltz = [], [], []
    
    for it in range(max_iter):
        pre_X = Prepare_X_GMM(X, Lambda)
        
        # probability density for X
        # PDFX = np.prod( [sum([ pre_X[k].alpha(pre_X[k].T-1, i) for i in range(N)]) for k in range(X_K)] )
        # logPDFX = sum( [sum([ np.log( pre_X[k].alpha_hat.C[t] ).sum() for t in range(pre_X[k].T)]) for k in range(X_K)] )
        # np.testing.assert_allclose(PDFX, np.exp(logPDFX), equal_nan=False, err_msg='pdf X')
        
        # PDFXQ = np.prod( [Viterbi(pre_X[k].O, Lambda)[1] for k in range(X_K)] )
        logPDFXQ = np.sum( [logViterbi(pre_X[k].O, Lambda)[1] for k in range(X_K)] )
        # np.testing.assert_allclose(PDFXQ, np.exp(logPDFXQ), equal_nan=False, err_msg='pdf XQ')
        
        pltx.append(it)
        plty.append(logPDFXQ)
        
        # calc initial probabilities
        # Pi_bar = np.zeros(shape=Pi.shape)
        Pi_bar_hat = np.zeros(shape=Pi.shape)
        for i in range(N):
            # Pi_bar[i] = sum([Gamma_GMM(0, i, pre_X[k].O, Lambda, pre_X[k].alpha, pre_X[k].beta) for k in range(X_K)]) / X_K
            Pi_bar_hat[i] = sum([Gamma_Hat_GMM(0, i, pre_X[k].O, Lambda, pre_X[k].alpha_hat, pre_X[k].beta_hat) for k in range(X_K)]) / X_K
        # np.testing.assert_allclose(Pi_bar, Pi_bar_hat, equal_nan=False, err_msg='Pi checking')
        np.testing.assert_allclose(Pi_bar_hat.sum(), 1.0, equal_nan=False, err_msg='s.t. sum of \Pi to 1.0')
        
        # calc transition matrix
        # A_bar = np.zeros(shape=A.shape)
        A_bar_hat = np.zeros(shape=A.shape)
        for i in range(N):
            for j in range(N):
                # A_bar[i, j] = sum([Xi_GMM(t, i, j, pre_X[k].O, Lambda, pre_X[k].alpha, pre_X[k].beta) for k in range(X_K) for t in range(pre_X[k].T-1)]) / sum([Gamma_GMM(t, i, pre_X[k].O, Lambda, pre_X[k].alpha, pre_X[k].beta) for k in range(X_K) for t in range(pre_X[k].T-1)])
                A_bar_hat[i, j] = sum([Xi_Hat_GMM(t, i, j, pre_X[k].O, Lambda, pre_X[k].alpha_hat, pre_X[k].beta_hat) for k in range(X_K) for t in range(pre_X[k].T-1)]) / sum([Gamma_Hat_GMM(t, i, pre_X[k].O, Lambda, pre_X[k].alpha_hat, pre_X[k].beta_hat) for k in range(X_K) for t in range(pre_X[k].T-1)])
        # np.testing.assert_allclose(A_bar, A_bar_hat, equal_nan=False, err_msg='A checking')
        np.testing.assert_allclose(A_bar_hat.sum(axis=1), np.ones(N), equal_nan=False, err_msg='s.t. \sum_j a_ij = 1.0')
        
        # calc B matrix
        # B_bar_coef, B_bar_mean, B_bar_cov = np.zeros(shape=B_coef.shape), np.zeros(shape=B_mean.shape), np.zeros(shape=B_cov.shape)
        B_bar_hat_coef, B_bar_hat_mean, B_bar_hat_cov = np.zeros(shape=B_coef.shape), np.zeros(shape=B_mean.shape), np.zeros(shape=B_cov.shape)
        for i in range(N):
            for l in range(L):
                # B_bar_coef[i, l]  = sum([Gamma_L_GMM(t, i, l, pre_X[k].O, Lambda, pre_X[k].alpha, pre_X[k].beta) for k in range(X_K) for t in range(pre_X[k].T)])
                # B_bar_coef[i, l] /= sum([Gamma_L_GMM(t, i, q, pre_X[k].O, Lambda, pre_X[k].alpha, pre_X[k].beta) for k in range(X_K) for t in range(pre_X[k].T) for q in range(L)])
                
                # B_bar_mean[i, l]  = np.sum([Gamma_L_GMM(t, i, l, pre_X[k].O, Lambda, pre_X[k].alpha, pre_X[k].beta) * pre_X[k].O[t] for k in range(X_K) for t in range(pre_X[k].T)], axis=0)
                # B_bar_mean[i, l] /= sum([Gamma_L_GMM(t, i, l, pre_X[k].O, Lambda, pre_X[k].alpha, pre_X[k].beta) for k in range(X_K) for t in range(pre_X[k].T)])
                
                # B_bar_cov[i, l]  = np.sum([Gamma_L_GMM(t, i, l, pre_X[k].O, Lambda, pre_X[k].alpha, pre_X[k].beta) * np.dot( np.reshape((pre_X[k].O[t] - B_bar_mean[i, l]), [X_d, 1]), np.reshape((pre_X[k].O[t] - B_bar_mean[i, l]), [1, X_d]) ) for k in range(X_K) for t in range(pre_X[k].T)], axis=0)
                # B_bar_cov[i, l] /= sum([Gamma_L_GMM(t, i, l, pre_X[k].O, Lambda, pre_X[k].alpha, pre_X[k].beta) for k in range(X_K) for t in range(pre_X[k].T)])
                
                # for hat
                gamma_i_l = [[Gamma_Hat_L_GMM(t, i, l, pre_X[k].O, Lambda, pre_X[k].alpha_hat, pre_X[k].beta_hat) for t in range(pre_X[k].T)] for k in range(X_K)]
                gamma_i_l_sum = sum([sum(gamma_i_l[k]) for k in range(X_K)])
                
                # coef
                B_bar_hat_coef[i, l]  = gamma_i_l_sum
                
                # mean
                B_bar_hat_mean[i, l] = np.sum([gamma_i_l[k][t] * pre_X[k].O[t] for k in range(X_K) for t in range(pre_X[k].T)], axis=0)
                B_bar_hat_mean[i, l] /= gamma_i_l_sum
                
                # cov
                B_bar_hat_cov[i, l] = np.sum([gamma_i_l[k][t] * np.matmul( np.reshape(pre_X[k].O[t] - B_bar_hat_mean[i, l], [X_d, 1]), np.reshape(pre_X[k].O[t] - B_bar_hat_mean[i, l], [1, X_d]) ) for k in range(X_K) for t in range(pre_X[k].T)], axis=0)
                B_bar_hat_cov[i, l] /= gamma_i_l_sum
                
        B_bar_hat_coef /= np.sum(B_bar_hat_coef, axis=1, keepdims=True)
        
        # np.testing.assert_allclose(B_bar_coef, B_bar_hat_coef, equal_nan=False, err_msg='coef')
        # np.testing.assert_allclose(B_bar_mean, B_bar_hat_mean, equal_nan=False, err_msg='mean')
        # np.testing.assert_allclose(B_bar_cov, B_bar_hat_cov, equal_nan=False, err_msg='cov')
        
        Lambda_bar = (A_bar_hat, B_bar_hat_coef, B_bar_hat_mean, B_bar_hat_cov, Pi_bar_hat)
        diff = np.sqrt(sum([np.square(Lambda_bar[i] - Lambda[i]).sum() for i in range(len(Lambda))]))
        pltz.append(diff)
        
        Lambda = Lambda_bar
        
        print(it, logPDFXQ, diff)
        
        if diff < epsilon:
            break
        
    return Lambda, (pltx, plty, pltz)

# =============================================================================
np.random.seed(42)
# observation sequence
# O = { O_1,O_2,\cdots,O_T } 
# X = \{O^k\}_{k=1}^K
g_X_D = 4
g_X_len_low = 20
g_X_len_high = 30
g_X_K = 35
g_X = [np.random.multivariate_normal(np.random.random(size=g_X_D), np.eye(g_X_D), size=np.random.randint(g_X_len_low, g_X_len_high)) for _ in range(g_X_K)]

# N: number of statesin the model
g_Model_N = 2
# the number of gaussian mixing
g_Model_L = 3

# Baum Welch
Lambda_bar, pltxyz = Baum_Welch_GMM(g_X, g_Model_N, g_Model_L, max_iter=30, epsilon=1e-8)

# testing 
g_O = g_X[0]

g_Q, g_PDFOQ = Viterbi(g_O, Lambda_bar)
g_logQ, g_logPDFOQ = logViterbi(g_O, Lambda_bar)
np.testing.assert_allclose( g_PDFOQ, np.exp( g_logPDFOQ ) ) 

g_PDFOQ = PDFOQ(g_Q, g_O, Lambda_bar)
g_logPDFOQ = logPDFOQ(g_Q, g_O, Lambda_bar)
np.testing.assert_allclose( g_PDFOQ, np.exp( g_logPDFOQ ) )

alpha = Alpha_GMM(g_O, Lambda_bar)
beta = Beta_GMM(g_O, Lambda_bar)
np.testing.assert_allclose( beta.beta[0][0], alpha.alpha[-1].sum() )
np.testing.assert_allclose( alpha.alpha[-1].sum(), sum([alpha(2, i) * beta(3, i) for i in range(g_Model_N)]) )

alpha_hat = Alpha_Hat_GMM(g_O, Lambda_bar)
beta_hat = Beta_Hat_GMM(g_O, Lambda_bar, alpha_hat.C )
np.testing.assert_allclose( alpha.alpha[-1].sum(), np.prod( alpha_hat.C ) )
np.testing.assert_allclose( alpha.alpha[-1].sum(), sum([alpha_hat(1, i) * beta_hat(2, i) * np.prod( alpha_hat.C ) for i in range(g_Model_N)]) )

# show
shifting = 0
plt.subplot(1,2,1)
plt.plot( pltxyz[0][shifting:], pltxyz[1][shifting:] )
plt.title('log viterbi(X)')
plt.xlabel('iter')
plt.ylabel('log viterbi(X)')
plt.subplot(1,2,2)
plt.plot( pltxyz[0][shifting:], pltxyz[2][shifting:] )
plt.title('diff')
plt.xlabel('iter')
plt.ylabel('diff')
plt.show()
# =============================================================================
0 -7087.354061171159 18.581745152990617
1 -5204.931912976656 0.8548144843843375
2 -5153.54691178641 0.5830950886848522
3 -5126.047050948708 0.5030507653593529
4 -5105.274011193578 0.4436046367611551
5 -5090.158096582353 0.3600060911149254
6 -5079.018933037544 0.28021077183681226
7 -5070.732359450655 0.22988248156099672
8 -5064.437962119374 0.21363280592437056
9 -5059.18254551011 0.2272585115388629
10 -5053.853454296295 0.25076407878942814
11 -5049.180538215929 0.2454115673831218
12 -5044.58614092646 0.1980486669137678
13 -5040.718908658764 0.15030996650190787
14 -5037.029475173185 0.13462583153995095
15 -5032.993484112017 0.14188905693730416
16 -5028.943112234142 0.15918200243303968
17 -5025.016058393451 0.18317863464239617
18 -5021.254030631921 0.22357609275866736
19 -5017.857780075199 0.2986755847375107
20 -5013.397911528508 0.32835242533003234
21 -5009.566010268076 0.19204835060031583
22 -5006.5764218462 0.09665568837033864
23 -5003.874996032923 0.07049132848082566
24 -5001.442887752664 0.061711199974353564
25 -4999.214561662449 0.05751775772948252
26 -4997.066795052535 0.05534578069355525
27 -4995.069743936771 0.054576498257133385
28 -4993.2136027096985 0.05536390565866719
29 -4991.492109116793 0.0587813763199914

HMM-GMM learning output

ref:

[1] Rabiner, L. R. 1989. “A Tutorial on Hidden Markov Models and Selected Applications in Speech Recognition.” Proceedings of the IEEE 77:257–286.
[2] Steve Renals and Peter Bell, Automatic Speech Recognition— ASR Lectures 4&5 28/31 January 2013
[3] Ethem Alpaydın,Introduction to Machine Learning Third Edition p418-432

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload the CAPTCHA.

Proudly powered by WordPress   Premium Style Theme by www.gopiplus.com