diff --git a/docs/Topeax.md b/docs/Topeax.md
new file mode 100644
index 0000000..f7c8a4d
--- /dev/null
+++ b/docs/Topeax.md
@@ -0,0 +1,18 @@
+# Topeax
+
+Topeax is a probabilistic topic model based on the Peax clustering model, which finds topics based on peaks in point density in the embedding space.
+It can recover the number of topics automatically.
+
+
+
+
+ Schematic overview of the steps of the Peax clustering algorithm
+
+
+:warning: **This part of the documentation is still under construction, as more details and a paper are on their way.** :warning:
+
+## API Reference
+
+::: turftopic.models.topeax.Topeax
+
+::: turftopic.models.topeax.Peax
diff --git a/docs/images/peax.png b/docs/images/peax.png
new file mode 100644
index 0000000..fa2a7bc
Binary files /dev/null and b/docs/images/peax.png differ
diff --git a/pyproject.toml b/pyproject.toml
index f284a70..c91abef 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -9,7 +9,7 @@ profile = "black"
[project]
name = "turftopic"
-version = "0.19.1"
+version = "0.20.0"
description = "Topic modeling with contextual representations from sentence transformers."
authors = [
{ name = "Márton Kardos ", email = "martonkardos@cas.au.dk" }
diff --git a/turftopic/__init__.py b/turftopic/__init__.py
index cc3ab74..99250de 100644
--- a/turftopic/__init__.py
+++ b/turftopic/__init__.py
@@ -7,6 +7,7 @@
from turftopic.models.fastopic import FASTopic
from turftopic.models.gmm import GMM
from turftopic.models.keynmf import KeyNMF
+from turftopic.models.topeax import Topeax
from turftopic.serialization import load_model
try:
@@ -20,6 +21,7 @@
"ClusteringTopicModel",
"SemanticSignalSeparation",
"GMM",
+ "Topeax",
"KeyNMF",
"AutoEncodingTopicModel",
"ContextualModel",
diff --git a/turftopic/encoders/utils.py b/turftopic/encoders/utils.py
index 4a527f4..ba3a749 100644
--- a/turftopic/encoders/utils.py
+++ b/turftopic/encoders/utils.py
@@ -1,6 +1,10 @@
import itertools
from typing import Iterable, List
+import numpy as np
+import torch
+from tqdm import trange
+
def batched(iterable, n: int) -> Iterable[List[str]]:
"Batch data into tuples of length n. The last batch may be shorter."
@@ -10,3 +14,47 @@ def batched(iterable, n: int) -> Iterable[List[str]]:
it = iter(iterable)
while batch := list(itertools.islice(it, n)):
yield batch
+
+
+def encode_chunks(
+ encoder,
+ sentences,
+ batch_size=64,
+ window_size=50,
+ step_size=40,
+ return_chunks=False,
+ show_progress_bar=False,
+):
+ chunks = []
+ chunk_embeddings = []
+ for start_index in trange(
+ 0,
+ len(sentences),
+ batch_size,
+ desc="Encoding batches...",
+ disable=not show_progress_bar,
+ ):
+ batch = sentences[start_index : start_index + batch_size]
+ features = encoder.tokenize(batch)
+ with torch.no_grad():
+ output_features = encoder.forward(features)
+ n_tokens = output_features["attention_mask"].sum(axis=1)
+ for i_doc in range(len(batch)):
+ for chunk_start in range(0, n_tokens[i_doc], step_size):
+ chunk_end = min(chunk_start + window_size, n_tokens[i_doc])
+ _emb = output_features["token_embeddings"][
+ i_doc, chunk_start:chunk_end, :
+ ].mean(axis=0)
+ chunk_embeddings.append(_emb)
+ if return_chunks:
+ chunks.append(
+ encoder.tokenizer.decode(
+ features["input_ids"][i_doc, chunk_start:chunk_end]
+ )
+ .replace("[CLS]", "")
+ .replace("[SEP]", "")
+ )
+ if not return_chunks:
+ chunks = None
+ chunk_embeddings = np.stack(chunk_embeddings)
+ return chunk_embeddings, chunks
diff --git a/turftopic/models/_keynmf.py b/turftopic/models/_keynmf.py
index 3a27628..5e135e8 100644
--- a/turftopic/models/_keynmf.py
+++ b/turftopic/models/_keynmf.py
@@ -2,7 +2,8 @@
import warnings
from collections import defaultdict
from datetime import datetime
-from typing import Iterable, Literal, Optional
+from functools import partial
+from typing import Iterable, Literal, Optional, Union
import igraph as ig
import numpy as np
@@ -21,6 +22,10 @@
from sklearn.utils.validation import check_non_negative
from turftopic.base import Encoder
+from turftopic.optimization import (
+ decomposition_gaussian_bic,
+ optimize_n_components,
+)
NOT_MATCHING_ERROR = (
"Document embedding dimensionality ({n_dims}) doesn't match term embedding dimensionality ({n_word_dims}). "
@@ -242,7 +247,7 @@ def batch_extract_keywords(
class KeywordNMF:
def __init__(
self,
- n_components: int,
+ n_components: Union[int, Literal["auto"]],
seed: Optional[int] = None,
top_n: Optional[int] = None,
):
@@ -318,6 +323,15 @@ def vectorize(
def fit_transform(self, keywords: list[dict[str, float]]) -> np.ndarray:
X = self.vectorize(keywords, fitting=True)
+ if self.n_components == "auto":
+ # Finding N components with BIC
+ bic_fn = partial(
+ decomposition_gaussian_bic,
+ decomp_class=NMF,
+ X=X,
+ )
+ n_components = optimize_n_components(bic_fn, min_n=1, verbose=True)
+ self.n_components = n_components
check_non_negative(X, "NMF (input X)")
W, H = _initialize_nmf(X, self.n_components, random_state=self.seed)
W, H, self.n_iter = NMF(
@@ -339,6 +353,10 @@ def transform(self, keywords: list[dict[str, float]]):
return W.astype(X.dtype)
def partial_fit(self, keyword_batch: list[dict[str, float]]):
+ if self.n_components == "auto":
+ raise ValueError(
+ "Cannot infer number of components with BIC when online fitting the model."
+ )
X = self.vectorize(keyword_batch, fitting=True)
try:
check_non_negative(X, "NMF (input X)")
@@ -365,6 +383,15 @@ def fit_transform_dynamic(
n_bins = len(time_bin_edges) - 1
document_term_matrix = self.vectorize(keywords, fitting=True)
check_non_negative(document_term_matrix, "NMF (input X)")
+ if self.n_components == "auto":
+ # Finding N components with BIC
+ bic_fn = partial(
+ decomposition_gaussian_bic,
+ decomp_class=NMF,
+ X=X,
+ )
+ n_components = optimize_n_components(bic_fn, verbose=True)
+ self.n_components = n_components
document_topic_matrix, H = _initialize_nmf(
document_term_matrix,
self.n_components,
diff --git a/turftopic/models/gmm.py b/turftopic/models/gmm.py
index 9760ed4..69ebad0 100644
--- a/turftopic/models/gmm.py
+++ b/turftopic/models/gmm.py
@@ -1,4 +1,6 @@
+import warnings
from datetime import datetime
+from functools import partial
from typing import Literal, Optional, Union
import numpy as np
@@ -7,77 +9,103 @@
from sklearn.base import TransformerMixin
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.mixture import BayesianGaussianMixture, GaussianMixture
-from sklearn.pipeline import Pipeline, make_pipeline
+from sklearn.pipeline import Pipeline
+from turftopic._datamapplot import build_datamapplot
from turftopic.base import ContextualModel, Encoder
from turftopic.dynamic import DynamicTopicModel
from turftopic.encoders.multimodal import MultimodalEncoder
-from turftopic.feature_importance import soft_ctf_idf
+from turftopic.feature_importance import (
+ ctf_idf,
+ fighting_words,
+ npmi,
+ soft_ctf_idf,
+)
from turftopic.multimodal import (
Image,
ImageRepr,
MultimodalEmbeddings,
MultimodalModel,
)
+from turftopic.optimization import optimize_n_components
from turftopic.vectorizers.default import default_vectorizer
+FEATURE_IMPORTANCE_METHODS = {
+ "soft-c-tf-idf": soft_ctf_idf,
+ "c-tf-idf": ctf_idf,
+ "fighting-words": fighting_words,
+ "npmi": partial(npmi, smoothing=2),
+}
+LexicalWordImportance = Literal[
+ "soft-c-tf-idf",
+ "c-tf-idf",
+ "npmi",
+ "fighting-words",
+]
+
class GMM(ContextualModel, DynamicTopicModel, MultimodalModel):
"""Multivariate Gaussian Mixture Model over document embeddings.
- Models topics as mixture components.
-
- ```python
- from turftopic import GMM
+ Models topics as mixture components.
+ ```python
+ from turftopic import GMM
corpus: list[str] = ["some text", "more text", ...]
- model = GMM(10, weight_prior="dirichlet_process").fit(corpus)
- model.print_topics()
- ```
+ model = GMM(10, weight_prior="dirichlet_process").fit(corpus)
+ model.print_topics()
+ ```
- Parameters
- ----------
- n_components: int
- Number of topics. If you're using priors on the weight,
- feel free to overshoot with this value.
- encoder: str or SentenceTransformer
- Model to encode documents/terms, all-MiniLM-L6-v2 is the default.
- vectorizer: CountVectorizer, default None
- Vectorizer used for term extraction.
- Can be used to prune or filter the vocabulary.
- weight_prior: 'dirichlet', 'dirichlet_process' or None, default 'dirichlet'
- Prior to impose on component weights, if None,
- maximum likelihood is optimized with expectation maximization,
- otherwise variational inference is used.
- gamma: float, default None
- Concentration parameter of the symmetric prior.
- By default 1/n_components is used.
- Ignored when weight_prior is None.
- dimensionality_reduction: TransformerMixin, default None
- Optional dimensionality reduction step before GMM is run.
- This is recommended for very large datasets with high dimensionality,
- as the number of parameters grows vast in the model otherwise.
- We recommend using PCA, as it is a linear solution, and will likely
- result in Gaussian components.
- For even larger datasets you can use IncrementalPCA to reduce
- memory load.
- random_state: int, default None
- Random state to use so that results are exactly reproducible.
+ Parameters
+ ----------
+ n_components: int or "auto"
+ Number of topics.
+ If "auto", the Bayesian Information criterion
+ will be used to estimate this quantity.
+ *Note that "auto" can only be used when no priors as specified*.
+ encoder: str or SentenceTransformer
+ Model to encode documents/terms, all-MiniLM-L6-v2 is the default.
+ vectorizer: CountVectorizer, default None
+ Vectorizer used for term extraction.
+ Can be used to prune or filter the vocabulary.
+ weight_prior: 'dirichlet', 'dirichlet_process' or None, default 'dirichlet'
+ Prior to impose on component weights, if None,
+ maximum likelihood is optimized with expectation maximization,
+ otherwise variational inference is used.
+ gamma: float, default None
+ Concentration parameter of the symmetric prior.
+ By default 1/n_components is used.
+ Ignored when weight_prior is None.
+ dimensionality_reduction: TransformerMixin, default None
+ Optional dimensionality reduction step before GMM is run.
+ This is recommended for very large datasets with high dimensionality,
+ as the number of parameters grows vast in the model otherwise.
+ We recommend using PCA, as it is a linear solution, and will likely
+ result in Gaussian components.
+ For even larger datasets you can use IncrementalPCA to reduce
+ memory load.
+ feature_importance: LexicalWordImportance, default 'soft-c-tf-idf'
+ Feature importance method to use.
+ *Note that only lexical methods can be used with GMM,
+ not embedding-based ones.*
+ random_state: int, default None
+ Random state to use so that results are exactly reproducible.
- Attributes
- ----------
- weights_: ndarray of shape (n_components)
- Weights of the different mixture components.
+ Attributes
+ ----------
+ weights_: ndarray of shape (n_components)
+ Weights of the different mixture components.
"""
def __init__(
self,
- n_components: int,
+ n_components: Union[int, Literal["auto"]],
encoder: Union[
Encoder, str, MultimodalEncoder
] = "sentence-transformers/all-MiniLM-L6-v2",
vectorizer: Optional[CountVectorizer] = None,
dimensionality_reduction: Optional[TransformerMixin] = None,
+ feature_importance: LexicalWordImportance = "soft-c-tf-idf",
weight_prior: Literal["dirichlet", "dirichlet_process", None] = None,
gamma: Optional[float] = None,
random_state: Optional[int] = None,
@@ -96,7 +124,63 @@ def __init__(
self.vectorizer = default_vectorizer()
else:
self.vectorizer = vectorizer
+ if feature_importance not in FEATURE_IMPORTANCE_METHODS:
+ valid = list(FEATURE_IMPORTANCE_METHODS.keys())
+ raise ValueError(
+ f"{feature_importance} not in list of valid feature importance methods: {valid}"
+ )
+ self.feature_importance = feature_importance
self.dimensionality_reduction = dimensionality_reduction
+ if (self.n_components == "auto") and (self.weight_prior is not None):
+ raise ValueError(
+ "You cannot use N='auto' with a prior. Try setting weight_prior=None."
+ )
+
+ def estimate_components(
+ self,
+ feature_importance: Optional[LexicalWordImportance] = None,
+ doc_topic_matrix=None,
+ doc_term_matrix=None,
+ ) -> np.ndarray:
+ feature_importance = feature_importance or self.feature_importance
+ imp_fn = FEATURE_IMPORTANCE_METHODS[feature_importance]
+ doc_topic_matrix = (
+ doc_topic_matrix
+ if doc_topic_matrix is not None
+ else self.doc_topic_matrix
+ )
+ doc_term_matrix = (
+ doc_term_matrix
+ if doc_term_matrix is not None
+ else self.doc_term_matrix
+ )
+ self.components_ = imp_fn(doc_topic_matrix, doc_term_matrix)
+ return self.components_
+
+ def _create_bic(self, embeddings: np.ndarray):
+ def f_bic(n_components: int):
+ random_state = 42
+ success = False
+ n_tries = 1
+ while not success and (n_tries <= 5):
+ try:
+ # This can sometimes run into problems especially
+ # with covariance estimation
+ model = GaussianMixture(
+ n_components, random_state=self.random_state
+ )
+ model.fit(embeddings)
+ success = True
+ except Exception:
+ random_state += 1
+ n_tries += 1
+ if n_tries > 5:
+ return 0
+ return model.bic(embeddings)
+
+ return f_bic
+
+ def _init_model(self, n_components: int):
if self.weight_prior is not None:
mixture = BayesianGaussianMixture(
n_components=n_components,
@@ -105,17 +189,14 @@ def __init__(
if self.weight_prior == "dirichlet"
else "dirichlet_process"
),
- weight_concentration_prior=gamma,
+ weight_concentration_prior=self.gamma,
random_state=self.random_state,
)
else:
mixture = GaussianMixture(
n_components, random_state=self.random_state
)
- if dimensionality_reduction is not None:
- self.gmm_ = make_pipeline(dimensionality_reduction, mixture)
- else:
- self.gmm_ = mixture
+ return mixture
def fit_transform(
self, raw_documents, y=None, embeddings: Optional[np.ndarray] = None
@@ -126,22 +207,34 @@ def fit_transform(
status.update("Encoding documents")
embeddings = self.encoder_.encode(raw_documents)
console.log("Documents encoded.")
+ self.embeddings = embeddings
status.update("Extracting terms.")
- document_term_matrix = self.vectorizer.fit_transform(raw_documents)
+ self.doc_term_matrix = self.vectorizer.fit_transform(raw_documents)
console.log("Term extraction done.")
+ X = embeddings
+ if self.dimensionality_reduction is not None:
+ status.update("Reducing embedding dimensionality.")
+ X = self.dimensionality_reduction.fit_transform(embeddings)
+ console.log("Dimensionality reduction complete.")
+ self.reduced_embeddings = X
+ n_components = self.n_components
+ if self.n_components == "auto":
+ status.update("Finding optimal value of N")
+ f_bic = self._create_bic(X)
+ n_components = optimize_n_components(f_bic, verbose=True)
+ console.log(f"Found optimal N={n_components}.")
status.update("Fitting mixture model.")
- self.gmm_.fit(embeddings)
+ self.gmm_ = self._init_model(n_components)
+ self.gmm_.fit(X)
console.log("Mixture model fitted.")
status.update("Estimating term importances.")
- document_topic_matrix = self.gmm_.predict_proba(embeddings)
- self.components_ = soft_ctf_idf(
- document_topic_matrix, document_term_matrix
- )
+ self.doc_topic_matrix = self.gmm_.predict_proba(X)
+ self.components_ = self.estimate_components()
console.log("Model fitting done.")
self.top_documents = self.get_top_documents(
- raw_documents, document_topic_matrix=document_topic_matrix
+ raw_documents, document_topic_matrix=self.doc_topic_matrix
)
- return document_topic_matrix
+ return self.doc_topic_matrix
def fit_transform_multimodal(
self,
@@ -161,31 +254,49 @@ def fit_transform_multimodal(
)
console.log("Documents encoded.")
status.update("Extracting terms.")
- document_term_matrix = self.vectorizer.fit_transform(raw_documents)
+ self.doc_term_matrix = self.vectorizer.fit_transform(raw_documents)
console.log("Term extraction done.")
+ X = self.multimodal_embeddings["document_embeddings"]
+ if self.dimensionality_reduction is not None:
+ status.update("Reducing embedding dimensionality.")
+ X = self.dimensionality_reduction.fit_transform(embeddings)
+ console.log("Dimensionality reduction complete.")
+ n_components = self.n_components
+ if self.n_components == "auto":
+ status.update("Finding optimal value of N")
+ f_bic = self._create_bic(X)
+ n_components = optimize_n_components(f_bic, verbose=True)
+ console.log(f"Found optimal N={n_components}.")
status.update("Fitting mixture model.")
- self.gmm_.fit(self.multimodal_embeddings["document_embeddings"])
+ self.gmm_ = self._init_model(n_components)
+ self.gmm_.fit(X)
console.log("Mixture model fitted.")
status.update("Estimating term importances.")
- document_topic_matrix = self.gmm_.predict_proba(
- self.multimodal_embeddings["document_embeddings"]
- )
- self.components_ = soft_ctf_idf(
- document_topic_matrix, document_term_matrix
- )
+ self.doc_topic_matrix = self.gmm_.predict_proba(X)
+ self.components_ = self.estimate_components()
console.log("Model fitting done.")
- self.image_topic_matrix = self.transform(
- raw_documents,
- embeddings=self.multimodal_embeddings["document_embeddings"],
- )
+ try:
+ self.image_topic_matrix = self.transform(
+ raw_documents,
+ embeddings=self.multimodal_embeddings["image_embeddings"],
+ )
+ except Exception as e:
+ warnings.warn(
+ f"Couldn't produce image topic matrix due to exception: {e}, using doc-topic matrix."
+ )
+ self.image_topic_matrix = self.doc_topic_matrix
self.top_images: list[list[Image.Image]] = self.collect_top_images(
images, self.image_topic_matrix
)
self.top_documents = self.get_top_documents(
- raw_documents, document_topic_matrix=document_topic_matrix
+ raw_documents, document_topic_matrix=self.doc_topic_matrix
)
console.log("Transformation done.")
- return document_topic_matrix
+ return self.doc_topic_matrix
+
+ @property
+ def labels_(self):
+ return np.argmax(self.doc_topic_matrix, axis=1)
@property
def weights_(self) -> np.ndarray:
@@ -214,6 +325,8 @@ def transform(
"""
if embeddings is None:
embeddings = self.encoder_.encode(raw_documents)
+ if self.dimensionality_reduction is not None:
+ embeddings = self.dimensionality_reduction.transform(embeddings)
return self.gmm_.predict_proba(embeddings)
def fit_transform_dynamic(
@@ -234,11 +347,11 @@ def fit_transform_dynamic(
doc_topic_matrix = self.fit_transform(
raw_documents, embeddings=embeddings
)
- document_term_matrix = self.vectorizer.transform(raw_documents)
+ self.doc_term_matrix = self.vectorizer.transform(raw_documents)
n_comp, n_vocab = self.components_.shape
n_bins = len(self.time_bin_edges) - 1
self.temporal_components_ = np.zeros(
- (n_bins, n_comp, n_vocab), dtype=document_term_matrix.dtype
+ (n_bins, n_comp, n_vocab), dtype=self.doc_term_matrix.dtype
)
self.temporal_importance_ = np.zeros((n_bins, n_comp))
for i_timebin in np.unique(time_labels):
@@ -247,10 +360,153 @@ def fit_transform_dynamic(
)
# Normalizing
topic_importances = topic_importances / topic_importances.sum()
- components = soft_ctf_idf(
- doc_topic_matrix[time_labels == i_timebin],
- document_term_matrix[time_labels == i_timebin], # type: ignore
+ components = self.estimate_components(
+ doc_topic_matrix=doc_topic_matrix[time_labels == i_timebin],
+ doc_term_matrix=self.doc_term_matrix[time_labels == i_timebin], # type: ignore
)
self.temporal_components_[i_timebin] = components
self.temporal_importance_[i_timebin] = topic_importances
return doc_topic_matrix
+
+ def plot_components_datamapplot(
+ self,
+ coordinates: Optional[np.ndarray] = None,
+ hover_text: Optional[list[str]] = None,
+ **kwargs,
+ ):
+ """Creates an interactive browser plot of the topics in your data using datamapplot.
+
+ Parameters
+ ----------
+ coordinates: np.ndarray, default None
+ Lower dimensional projection of the embeddings.
+ If None, will try to use the projections from the
+ dimensionality_reduction method of the model.
+ hover_text: list of str, optional
+ Text to show when hovering over a document.
+
+ Returns
+ -------
+ plot
+ Interactive datamap plot, you can call the `.show()` method to
+ display it in your default browser or save it as static HTML using `.write_html()`.
+ """
+ if coordinates is None:
+ if not hasattr(self, "reduced_embeddings"):
+ raise ValueError(
+ "Coordinates not specified, but the model does not contain reduced embeddings."
+ )
+ coordinates = self.reduced_embeddings[:, (0, 1)]
+ labels = np.argmax(self.doc_topic_matrix, axis=1)
+ plot = build_datamapplot(
+ coordinates=coordinates,
+ topic_names=self.topic_names,
+ labels=labels,
+ classes=np.arange(self.gmm_.n_components),
+ top_words=self.get_top_words(),
+ hover_text=hover_text,
+ topic_descriptions=getattr(self, "topic_descriptions", None),
+ **kwargs,
+ )
+ return plot
+
+ def plot_density(
+ self, hover_text: list[str] = None, show_points=False, light_mode=False
+ ):
+ try:
+ import plotly.graph_objects as go
+ except (ImportError, ModuleNotFoundError) as e:
+ raise ModuleNotFoundError(
+ "Please install plotly if you intend to use plots in Turftopic."
+ ) from e
+
+ if not hasattr(self, "reduced_embeddings"):
+ raise ValueError(
+ "No reduced embeddings found, can't display in 2d space."
+ )
+ if self.reduced_embeddings.shape[1] != 2:
+ warnings.warn(
+ "Embeddings are not in 2d space, only using first 2 dimensions"
+ )
+
+ coord_min, coord_max = np.min(self.reduced_embeddings), np.max(
+ self.reduced_embeddings
+ )
+ coord_spread = coord_max - coord_min
+ coord_min = coord_min - coord_spread * 0.05
+ coord_max = coord_max + coord_spread * 0.05
+ coord = np.linspace(coord_min, coord_max, num=100)
+ z = []
+ for yval in coord:
+ points = np.stack([coord, np.full(coord.shape, yval)]).T
+ prob = np.exp(self.gmm_.score_samples(points))
+ z.append(prob)
+ z = np.stack(z)
+ color_grid = [0.0, 0.25, 0.5, 0.75, 1.0]
+ colorscale = [
+ "#01014B",
+ "#000080",
+ "#5D5DEF",
+ "#B7B7FF",
+ "#ffffff",
+ ]
+ if light_mode:
+ colorscale = colorscale[::-1]
+ traces = [
+ go.Contour(
+ z=z,
+ colorscale=list(zip(color_grid, colorscale)),
+ showscale=False,
+ x=coord,
+ y=coord,
+ hoverinfo="skip",
+ ),
+ ]
+ if show_points:
+ scatter = go.Scatter(
+ x=self.reduced_embeddings[:, 0],
+ y=self.reduced_embeddings[:, 1],
+ mode="markers",
+ showlegend=False,
+ text=hover_text,
+ marker=dict(
+ symbol="circle",
+ opacity=0.5,
+ color="white",
+ size=8,
+ line=dict(width=1),
+ ),
+ )
+ traces.append(scatter)
+ fig = go.Figure(data=traces)
+ fig = fig.update_layout(
+ showlegend=False, margin=dict(r=0, l=0, t=0, b=0)
+ )
+ fig = fig.update_xaxes(showticklabels=False)
+ fig = fig.update_yaxes(showticklabels=False)
+ for mean, name, keywords in zip(
+ self.gmm_.means_, self.topic_names, self.get_top_words()
+ ):
+ _keys = ""
+ for i, key in enumerate(keywords):
+ if (i % 5) == 0:
+ _keys += "
"
+ _keys += key
+ if i < (len(keywords) - 1):
+ _keys += ","
+ _keys += " "
+ text = f"{name} {_keys} "
+ fig.add_annotation(
+ text=text,
+ x=mean[0],
+ y=mean[1],
+ align="left",
+ showarrow=False,
+ xshift=0,
+ yshift=50,
+ font=dict(family="Roboto Mono", size=18, color="black"),
+ bgcolor="rgba(255,255,255,0.9)",
+ bordercolor="black",
+ borderwidth=2,
+ )
+ return fig
diff --git a/turftopic/models/keynmf.py b/turftopic/models/keynmf.py
index 3d26b9b..c9618a7 100644
--- a/turftopic/models/keynmf.py
+++ b/turftopic/models/keynmf.py
@@ -43,8 +43,10 @@ class KeyNMF(ContextualModel, DynamicTopicModel, MultimodalModel):
Parameters
----------
- n_components: int
+ n_components: int or "auto"
Number of topics.
+ Auto selects the number of topics using
+ the Bayesian Information Criterion.
encoder: str or SentenceTransformer
Model to encode documents/terms, all-MiniLM-L6-v2 is the default.
vectorizer: CountVectorizer, default None
@@ -71,7 +73,7 @@ class KeyNMF(ContextualModel, DynamicTopicModel, MultimodalModel):
def __init__(
self,
- n_components: int,
+ n_components: Union[int, Literal["auto"]],
encoder: Union[
Encoder, str, MultimodalEncoder
] = "sentence-transformers/all-MiniLM-L6-v2",
diff --git a/turftopic/models/topeax.py b/turftopic/models/topeax.py
new file mode 100644
index 0000000..7f17608
--- /dev/null
+++ b/turftopic/models/topeax.py
@@ -0,0 +1,211 @@
+from typing import Optional, Union
+
+import numpy as np
+from scipy.ndimage import (
+ binary_erosion,
+ generate_binary_structure,
+ maximum_filter,
+)
+from scipy.stats import gaussian_kde
+from sklearn.base import BaseEstimator, ClusterMixin
+from sklearn.feature_extraction.text import CountVectorizer
+from sklearn.manifold import TSNE
+from sklearn.metrics.pairwise import cosine_similarity
+from sklearn.mixture import GaussianMixture
+from sklearn.mixture._gaussian_mixture import (
+ _compute_precision_cholesky,
+ _estimate_gaussian_parameters,
+)
+
+from turftopic.base import Encoder
+from turftopic.encoders.multimodal import MultimodalEncoder
+from turftopic.models.gmm import GMM, LexicalWordImportance
+
+
+def detect_peaks(image):
+ # define an 8-connected neighborhood
+ neighborhood = generate_binary_structure(2, 25)
+ local_max = maximum_filter(image, footprint=neighborhood) == image
+ # local_max is a mask that contains the peaks we are
+ # looking for, but also the background.
+ # In order to isolate the peaks we must remove the background from the mask.
+ # we create the mask of the background
+ background = image == 0
+ # a little technicality: we must erode the background in order to
+ # successfully subtract it form local_max, otherwise a line will
+ # appear along the background border (artifact of the local maximum filter)
+ eroded_background = binary_erosion(
+ background, structure=neighborhood, border_value=1
+ )
+ # we obtain the final mask, containing only peaks,
+ # by removing the background from the local_max mask (xor operation)
+ detected_peaks = local_max ^ eroded_background
+ return detected_peaks
+
+
+class FixedMeanGaussianMixture(GaussianMixture):
+ def _m_step(self, X, log_resp):
+ # Skipping mean update
+ self.weights_, _, self.covariances_ = _estimate_gaussian_parameters(
+ X, np.exp(log_resp), self.reg_covar, self.covariance_type
+ )
+ self.weights_ /= self.weights_.sum()
+ self.precisions_cholesky_ = _compute_precision_cholesky(
+ self.covariances_, self.covariance_type
+ )
+
+
+class Peax(ClusterMixin, BaseEstimator):
+ """Clustering model based on density peaks.
+
+ Parameters
+ ----------
+ random_state: int, default None
+ Random seed to use for fitting gaussian mixture to peaks.
+ """
+
+ def __init__(self, random_state: Optional[int] = None):
+ self.random_state = random_state
+
+ def fit(self, X, y=None):
+ self.X_range = np.min(X), np.max(X)
+ self.density = gaussian_kde(X.T, "scott")
+ coord = np.linspace(*self.X_range, num=100)
+ z = []
+ for yval in coord:
+ points = np.stack([coord, np.full(coord.shape, yval)]).T
+ prob = np.exp(self.density.logpdf(points.T))
+ z.append(prob)
+ z = np.stack(z)
+ peaks = detect_peaks(z.T)
+ peak_ind = np.nonzero(peaks)
+ peak_pos = np.stack([coord[peak_ind[0]], coord[peak_ind[1]]]).T
+ weights = self.density.pdf(peak_pos.T)
+ weights = weights / weights.sum()
+ self.gmm_ = FixedMeanGaussianMixture(
+ peak_pos.shape[0],
+ means_init=peak_pos,
+ weights_init=weights,
+ random_state=self.random_state,
+ )
+ self.labels_ = self.gmm_.fit_predict(X)
+ # Checking whether there are close to zero components
+ is_zero = np.isclose(self.gmm_.weights_, 0)
+ n_zero = np.sum(is_zero)
+ if n_zero > 0:
+ print(
+ f"{n_zero} components have zero weight, removing them and refitting."
+ )
+ peak_pos = peak_pos[~is_zero]
+ weights = self.gmm_.weights_[~is_zero]
+ weights = weights / weights.sum()
+ self.gmm_ = FixedMeanGaussianMixture(
+ peak_pos.shape[0],
+ means_init=peak_pos,
+ weights_init=weights,
+ random_state=self.random_state,
+ )
+ self.labels_ = self.gmm_.fit_predict(X)
+ self.classes_ = np.sort(np.unique(self.labels_))
+ self.means_ = self.gmm_.means_
+ self.weights_ = self.gmm_.weights_
+ return self.labels_
+
+ @property
+ def n_components(self) -> int:
+ return self.gmm_.n_components
+
+ def predict_proba(self, X):
+ return self.gmm_.predict_proba(X)
+
+ def score_samples(self, X):
+ return self.density.logpdf(X.T)
+
+ def score(self, X):
+ return np.mean(self.score_samples(X))
+
+
+class Topeax(GMM):
+ """Topic model based on the Peax clustering algorithm.
+ The algorithm discovers the number of topics automatically, and is based on GMM.
+
+ Parameters
+ ----------
+ encoder: str or SentenceTransformer
+ Model to encode documents/terms, all-MiniLM-L6-v2 is the default.
+ vectorizer: CountVectorizer, default None
+ Vectorizer used for term extraction.
+ Can be used to prune or filter the vocabulary.
+ perplexity: int, default 50
+ Number of neighbours to take into account when running TSNE.
+ random_state: int, default None
+ Random state to use so that results are exactly reproducible.
+
+ """
+
+ def __init__(
+ self,
+ encoder: Union[
+ Encoder, str, MultimodalEncoder
+ ] = "sentence-transformers/all-MiniLM-L6-v2",
+ vectorizer: Optional[CountVectorizer] = None,
+ perplexity: int = 50,
+ random_state: Optional[int] = None,
+ ):
+ dimensionality_reduction = TSNE(
+ 2,
+ metric="cosine",
+ perplexity=perplexity,
+ random_state=random_state,
+ )
+ super().__init__(
+ n_components=0,
+ encoder=encoder,
+ vectorizer=vectorizer,
+ dimensionality_reduction=dimensionality_reduction,
+ random_state=random_state,
+ )
+
+ def estimate_components(
+ self,
+ feature_importance: Optional[LexicalWordImportance] = None,
+ doc_topic_matrix=None,
+ doc_term_matrix=None,
+ ) -> np.ndarray:
+ doc_topic_matrix = (
+ doc_topic_matrix
+ if doc_topic_matrix is not None
+ else self.doc_topic_matrix
+ )
+ doc_term_matrix = (
+ doc_term_matrix
+ if doc_term_matrix is not None
+ else self.doc_term_matrix
+ )
+ lexical_components = super().estimate_components(
+ "npmi", doc_topic_matrix, doc_term_matrix
+ )
+ vocab = self.get_vocab()
+ if getattr(self, "vocab_embeddings", None) is None or (
+ self.vocab_embeddings.shape[0] != vocab.shape[0]
+ ):
+ self.vocab_embeddings = self.encode_documents(vocab)
+ topic_embeddings = []
+ for weight in doc_topic_matrix.T:
+ topic_embeddings.append(
+ np.average(self.embeddings, axis=0, weights=weight)
+ )
+ self.topic_embeddings = np.stack(topic_embeddings)
+ semantic_components = cosine_similarity(
+ self.topic_embeddings, self.vocab_embeddings
+ )
+ # Transforming to positive values from 0 to 1
+ # Then taking geometric average of the two values
+ self.components_ = np.sqrt(
+ ((1 + lexical_components) / 2) * ((1 + semantic_components) / 2)
+ )
+ return self.components_
+
+ def _init_model(self, n_components: int):
+ mixture = Peax()
+ return mixture
diff --git a/turftopic/optimization.py b/turftopic/optimization.py
new file mode 100644
index 0000000..76763a0
--- /dev/null
+++ b/turftopic/optimization.py
@@ -0,0 +1,114 @@
+from functools import cache
+from typing import Callable
+
+import numpy as np
+from scipy import optimize
+from scipy.sparse import issparse
+
+
+def decomposition_gaussian_bic(
+ n_components: int, decomp_class, X, random_state: int = 42
+) -> float:
+ """Computes Bayesian information criterion for
+ a decomposition model with assumed Gaussian noise.
+
+ Parameters
+ ----------
+ n_components: int
+ Number of components in the model.
+ decomp_class
+ Class of the decomposition model, e.g. `decomp_class=FastICA`.
+ X: array
+ Training data to decompose.
+ random_state: int, default 42
+ Seed to pass to the decomposition algorithm.
+ """
+ decomp = decomp_class(n_components=n_components, random_state=42)
+ doc_topic = decomp.fit_transform(X)
+ if not hasattr(decomp, "reconstruction_err_"):
+ X_reconstruction = decomp.inverse_transform(doc_topic)
+ # Computing residual sum of squares
+ if issparse(X):
+ X = np.asarray(X.todense())
+ rss = np.sum(np.square(X - X_reconstruction))
+ else:
+ rss = np.square(decomp.reconstruction_err_)
+ n_params = decomp.components_.size + np.prod(doc_topic.shape)
+ n_datapoints = np.prod(X.shape)
+ # Computing Bayesian information criterion assuming IID
+ bic = n_params * np.log(n_datapoints) + n_datapoints * np.log(
+ rss / n_datapoints
+ )
+ return bic
+
+
+def optimize_n_components(
+ f_ic: Callable[int, float], min_n: int = 2, max_n: int = 250, verbose=False
+) -> int:
+ """Optimizes the nuber of components using the Brent minimum finding
+ algorithm given an information criterion.
+ The bracket is found by exponentially incrementing n_components.
+
+ Parameters
+ ----------
+ f_ic: Callable[int, float]
+ Information criterion given N components.
+ min_n: int
+ Minimum value of N.
+ max_n: int
+ Maximum value of N, in case the algorithm doesn't converge.
+ """
+ if verbose:
+ print("Optimizing N based on an information criterion...")
+
+ # Caching and adding debugging statements
+ @cache
+ def _f_ic(n_components) -> float:
+ # Making sure n_components is an integer
+ # The optimization algorithm definitely give it a float
+ n_components = int(n_components)
+ val = f_ic(n_components)
+ if verbose:
+ print(f" - IC(N={n_components})={val:.2f}")
+ return val
+
+ # Finding bracket
+ if verbose:
+ print(" - Finding bracket...")
+ low = min_n
+ n_comp = 2
+ while not _f_ic(n_comp) < _f_ic(min_n):
+ n_comp += 1
+ if n_comp >= 10:
+ if verbose:
+ print(
+ " - Couldn't find lower value than n=1 up to n=10, stopping."
+ )
+ return 1
+ middle = n_comp
+ current = _f_ic(middle)
+ inc = 5
+ while not current > _f_ic(middle):
+ n_comp += inc
+ if n_comp >= max_n:
+ if verbose:
+ print(f" - Bracket didn't converge, returning max N: {max_n}")
+ return max_n
+ current = _f_ic(n_comp)
+ if current < _f_ic(middle):
+ low = n_comp - inc
+ middle = n_comp
+ inc *= 2
+ bracket = low, middle, n_comp
+ if verbose:
+ print(f" - Running optimization with bracket: {bracket}")
+ # Optimizing
+ res = optimize.minimize_scalar(
+ _f_ic,
+ method="brent",
+ bracket=(low, middle, n_comp),
+ options=dict(xtol=0.2),
+ )
+ if verbose:
+ print(f" - Converged after {res.nit} iterations.")
+ return int(res.x)