Skip to content

Internals

dmf

Deterministic Memory Framework package.

AnalysisReport dataclass

NLP feature envelope for a single conversation interaction.

Produced by the NLP Engine and consumed by the Scoring Engine. All
fields are mandatory except `raw_metadata`, which defaults to an empty
dict to avoid mutable default argument issues (PEP 557).

Attributes
----------
info_density : float
    Information Density ($ID$) — ratio of semantic lemmas
    (NOUN, VERB, ADJ, PROPN) to total token count. Range: [0.0, 1.0].
sentiment_abs : float
    Absolute VADER compound sentiment score ($|S|$). Range: [0.0, 1.0].
entity_count : int
    Count of named entities ($E$) recognised by spaCy's standard NER
    (PERSON, ORG, GPE, …). Non-negative integer.
is_system_prompt : bool
    True when this report describes the system prompt interaction.
    Drives the `analyze_system_prompt` branch in NLPConfig.
latency_ms : float
    Wall-clock time taken by the NLP pipeline to produce this report,
    in milliseconds. Populated from ExecutionLatencyTimer.
semantic_divergence : float
    Geometric divergence ($D$) of this interaction's embedding from the
    sliding-window centroid: D = 1 − cosine_similarity(vector, centroid).
    Range: [0.0, 2.0]. 0.0 = perfectly aligned with context; 2.0 =
    diametrically opposed. Defaults to 0.0 for the first interaction
    (no prior context) and for skipped system-prompt interactions.
    Populated by InteractionPipeline; NLPEngine alone leaves
    it at the default.
survival_score : float | None
    Continuous Survival Score Ω ∈ (0, 1) computed by ScoringEngine
    None until ``ScoringEngine.calculate_score`` has been called on
    this report. Stamped in-place by the engine immediately after
    rounding so callers always have the authoritative value on the
    report itself.
status : SurvivalStatus | None
    Categorical tier derived from Ω after scoring:
    HEALTHY (Ω > 0.6), UNSTABLE (0.3 < Ω ≤ 0.6), CRITICAL (Ω ≤ 0.3).
    None until ``ScoringEngine.calculate_score`` stamps it.  None is
    the correct sentinel for "not yet scored" — defaulting to CRITICAL
    would conflate unscored reports with genuinely low-value ones.
raw_metadata : dict[str, Any]
    Audit-level detail: POS tag counts, raw token list, intermediate
    scores, and any other data needed for post-execution inspection.
    Defaults to an empty dict when not supplied.
provenance : InteractionProvenance
    Structured provenance metadata kept separate from raw NLP audit
    detail. Defaults to an empty ``InteractionProvenance``.
signals : InteractionSignals
    Structured conversational/pragmatic cues kept separate from the raw
    audit trail so downstream policy can consume a stable schema.
topic_identity : str | None
    Stable topic field extracted from the interaction when enough
    structure is available. Used for memory-level conflict policies.
topic_value : str | None
    Value expressed for ``topic_identity`` in this interaction, when a
    stable value can be extracted.
is_query_like : bool
    True when the interaction is primarily a query/request rather than
    memory content. This is derived in the language adapter so memory
    policy can stay language-agnostic.
is_ack_like : bool
    True when the interaction is primarily an acknowledgement or light
    reformulation rather than a new memory-bearing statement.

Parameters:

Name Type Description Default
info_density float

See the function signature and surrounding type hints.

required
sentiment_abs float

See the function signature and surrounding type hints.

required
entity_count int

See the function signature and surrounding type hints.

required
is_system_prompt bool

See the function signature and surrounding type hints.

required
latency_ms float

See the function signature and surrounding type hints.

required
semantic_divergence float

See the function signature and surrounding type hints.

0.0
survival_score float | None

See the function signature and surrounding type hints.

None
status SurvivalStatus | None

See the function signature and surrounding type hints.

None
raw_metadata dict[str, Any]

See the function signature and surrounding type hints.

dict()
provenance InteractionProvenance

See the function signature and surrounding type hints.

InteractionProvenance()
signals InteractionSignals

See the function signature and surrounding type hints.

InteractionSignals()
topic_identity str | None

See the function signature and surrounding type hints.

None
topic_value str | None

See the function signature and surrounding type hints.

None
is_query_like bool

See the function signature and surrounding type hints.

False
is_ack_like bool

See the function signature and surrounding type hints.

False

Returns:

Type Description

Instance of this class.

Source code in dmf/models/analysis.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
@dataclass
class AnalysisReport:
    """NLP feature envelope for a single conversation interaction.

        Produced by the NLP Engine and consumed by the Scoring Engine. All
        fields are mandatory except `raw_metadata`, which defaults to an empty
        dict to avoid mutable default argument issues (PEP 557).

        Attributes
        ----------
        info_density : float
            Information Density ($ID$) — ratio of semantic lemmas
            (NOUN, VERB, ADJ, PROPN) to total token count. Range: [0.0, 1.0].
        sentiment_abs : float
            Absolute VADER compound sentiment score ($|S|$). Range: [0.0, 1.0].
        entity_count : int
            Count of named entities ($E$) recognised by spaCy's standard NER
            (PERSON, ORG, GPE, …). Non-negative integer.
        is_system_prompt : bool
            True when this report describes the system prompt interaction.
            Drives the `analyze_system_prompt` branch in NLPConfig.
        latency_ms : float
            Wall-clock time taken by the NLP pipeline to produce this report,
            in milliseconds. Populated from ExecutionLatencyTimer.
        semantic_divergence : float
            Geometric divergence ($D$) of this interaction's embedding from the
            sliding-window centroid: D = 1 − cosine_similarity(vector, centroid).
            Range: [0.0, 2.0]. 0.0 = perfectly aligned with context; 2.0 =
            diametrically opposed. Defaults to 0.0 for the first interaction
            (no prior context) and for skipped system-prompt interactions.
            Populated by InteractionPipeline; NLPEngine alone leaves
            it at the default.
        survival_score : float | None
            Continuous Survival Score Ω ∈ (0, 1) computed by ScoringEngine
            None until ``ScoringEngine.calculate_score`` has been called on
            this report. Stamped in-place by the engine immediately after
            rounding so callers always have the authoritative value on the
            report itself.
        status : SurvivalStatus | None
            Categorical tier derived from Ω after scoring:
            HEALTHY (Ω > 0.6), UNSTABLE (0.3 < Ω ≤ 0.6), CRITICAL (Ω ≤ 0.3).
            None until ``ScoringEngine.calculate_score`` stamps it.  None is
            the correct sentinel for "not yet scored" — defaulting to CRITICAL
            would conflate unscored reports with genuinely low-value ones.
        raw_metadata : dict[str, Any]
            Audit-level detail: POS tag counts, raw token list, intermediate
            scores, and any other data needed for post-execution inspection.
            Defaults to an empty dict when not supplied.
        provenance : InteractionProvenance
            Structured provenance metadata kept separate from raw NLP audit
            detail. Defaults to an empty ``InteractionProvenance``.
        signals : InteractionSignals
            Structured conversational/pragmatic cues kept separate from the raw
            audit trail so downstream policy can consume a stable schema.
        topic_identity : str | None
            Stable topic field extracted from the interaction when enough
            structure is available. Used for memory-level conflict policies.
        topic_value : str | None
            Value expressed for ``topic_identity`` in this interaction, when a
            stable value can be extracted.
        is_query_like : bool
            True when the interaction is primarily a query/request rather than
            memory content. This is derived in the language adapter so memory
            policy can stay language-agnostic.
        is_ack_like : bool
            True when the interaction is primarily an acknowledgement or light
            reformulation rather than a new memory-bearing statement.

    Args:
        info_density: See the function signature and surrounding type hints.
        sentiment_abs: See the function signature and surrounding type hints.
        entity_count: See the function signature and surrounding type hints.
        is_system_prompt: See the function signature and surrounding type hints.
        latency_ms: See the function signature and surrounding type hints.
        semantic_divergence: See the function signature and surrounding type hints.
        survival_score: See the function signature and surrounding type hints.
        status: See the function signature and surrounding type hints.
        raw_metadata: See the function signature and surrounding type hints.
        provenance: See the function signature and surrounding type hints.
        signals: See the function signature and surrounding type hints.
        topic_identity: See the function signature and surrounding type hints.
        topic_value: See the function signature and surrounding type hints.
        is_query_like: See the function signature and surrounding type hints.
        is_ack_like: See the function signature and surrounding type hints.

    Returns:
        Instance of this class.

    Raises:
        None.
    """

    info_density: float
    sentiment_abs: float
    entity_count: int
    is_system_prompt: bool
    latency_ms: float
    semantic_divergence: float = 0.0
    survival_score: float | None = None
    status: SurvivalStatus | None = None
    raw_metadata: dict[str, Any] = field(default_factory=dict)
    provenance: InteractionProvenance = field(default_factory=InteractionProvenance)
    signals: InteractionSignals = field(default_factory=InteractionSignals)
    topic_identity: str | None = None
    topic_value: str | None = None
    is_query_like: bool = False
    is_ack_like: bool = False


    def to_dict(self) -> dict[str, Any]:
        """Return a plain dictionary representation of this report.

                Uses dataclasses.asdict() for a deep copy — safe to mutate the
                result without affecting the original report. Enum fields are
                converted to their ``.value`` string so the output is directly
                JSON-serialisable without a custom encoder.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        d = dataclasses.asdict(self)
        # Convert any Enum instances left by asdict's deepcopy to plain
        # values so json.dumps works without a custom encoder.
        for key, value in d.items():
            if isinstance(value, Enum):
                d[key] = value.value
        return d

    def to_json(self) -> str:
        """Return a JSON string representation of this report.

                Produces human-readable output (indent=2) suitable for structured
                logging and metadata persistence.
                The output is guaranteed to be parseable by json.loads().

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return json.dumps(self.to_dict(), indent=2)

to_dict()

Return a plain dictionary representation of this report.

    Uses dataclasses.asdict() for a deep copy — safe to mutate the
    result without affecting the original report. Enum fields are
    converted to their ``.value`` string so the output is directly
    JSON-serialisable without a custom encoder.

Returns:

Type Description
dict[str, Any]

See the return type annotation.

Source code in dmf/models/analysis.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def to_dict(self) -> dict[str, Any]:
    """Return a plain dictionary representation of this report.

            Uses dataclasses.asdict() for a deep copy — safe to mutate the
            result without affecting the original report. Enum fields are
            converted to their ``.value`` string so the output is directly
            JSON-serialisable without a custom encoder.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    d = dataclasses.asdict(self)
    # Convert any Enum instances left by asdict's deepcopy to plain
    # values so json.dumps works without a custom encoder.
    for key, value in d.items():
        if isinstance(value, Enum):
            d[key] = value.value
    return d

to_json()

Return a JSON string representation of this report.

    Produces human-readable output (indent=2) suitable for structured
    logging and metadata persistence.
    The output is guaranteed to be parseable by json.loads().

Returns:

Type Description
str

See the return type annotation.

Source code in dmf/models/analysis.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def to_json(self) -> str:
    """Return a JSON string representation of this report.

            Produces human-readable output (indent=2) suitable for structured
            logging and metadata persistence.
            The output is guaranteed to be parseable by json.loads().

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    return json.dumps(self.to_dict(), indent=2)

ChromaLTMHook

Persistent vector LTM store with raw-record retrieval via ChromaDB.

Parameters:

Name Type Description Default
collection_name str

Chroma collection used for raw records.

DEFAULT_LTM_COLLECTION_NAME
persist_directory Path | str

Directory where Chroma persists local state.

DEFAULT_LTM_CHROMA_PATH
distance_threshold float

Maximum cosine distance accepted for recall hits.

DEFAULT_LTM_DISTANCE_THRESHOLD
vector_config VectorConfig | None

Optional embedding configuration for lazy indexing.

None
embed_text Callable[[str], ndarray] | None

Optional embedding function override.

None
cards_enabled bool

Whether to index projected auxiliary cards.

False
cards_path Path | str | None

Optional JSONL audit path for projected cards.

None
card_store JsonlMemoryCardStore | None

Optional prebuilt JSONL card store.

None
cards_collection_name str

Chroma collection used for projected cards.

DEFAULT_LTM_CARDS_COLLECTION_NAME

Returns:

Type Description

Chroma-backed LTM hook instance.

Raises:

Type Description
OSError

If the persistence directory cannot be created.

Warning

Chroma cosine distance is lower-is-better. The public similarity_score exposed by hits is derived as 1.0 - distance only for deterministic ordering diagnostics.

Source code in dmf/memory/chroma_ltm.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
class ChromaLTMHook:
    """Persistent vector LTM store with raw-record retrieval via ChromaDB.

    Args:
        collection_name: Chroma collection used for raw records.
        persist_directory: Directory where Chroma persists local state.
        distance_threshold: Maximum cosine distance accepted for recall hits.
        vector_config: Optional embedding configuration for lazy indexing.
        embed_text: Optional embedding function override.
        cards_enabled: Whether to index projected auxiliary cards.
        cards_path: Optional JSONL audit path for projected cards.
        card_store: Optional prebuilt JSONL card store.
        cards_collection_name: Chroma collection used for projected cards.

    Returns:
        Chroma-backed LTM hook instance.

    Raises:
        OSError: If the persistence directory cannot be created.
        ChromaDB exceptions may surface during client or collection creation.

    Warning:
        Chroma cosine distance is lower-is-better. The public
        ``similarity_score`` exposed by hits is derived as ``1.0 - distance``
        only for deterministic ordering diagnostics.
    """

    def __init__(
        self,
        collection_name: str = DEFAULT_LTM_COLLECTION_NAME,
        persist_directory: Path | str = DEFAULT_LTM_CHROMA_PATH,
        distance_threshold: float = DEFAULT_LTM_DISTANCE_THRESHOLD,
        vector_config: VectorConfig | None = None,
        embed_text: Callable[[str], np.ndarray] | None = None,
        cards_enabled: bool = False,
        cards_path: Path | str | None = None,
        card_store: JsonlMemoryCardStore | None = None,
        cards_collection_name: str = DEFAULT_LTM_CARDS_COLLECTION_NAME,
    ) -> None:
        self._distance_threshold = distance_threshold
        self._lock = threading.Lock()
        self._vector_config = vector_config or VectorConfig()
        self._embed_text = embed_text
        self._embedding_engine = None
        self._cards_enabled = cards_enabled
        self._card_store = card_store
        if self._card_store is None and cards_enabled:
            default_cards_path = Path(persist_directory) / "ltm_cards.jsonl"
            self._card_store = JsonlMemoryCardStore(cards_path or default_cards_path)

        persist_path = Path(persist_directory)
        persist_path.mkdir(parents=True, exist_ok=True)
        self._persist_directory: Path = persist_path

        self._client = chromadb.PersistentClient(
            path=str(persist_path),
            settings=Settings(anonymized_telemetry=False),
        )
        self._collection = self._client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"},
        )

        self._cards_collection = None
        if cards_enabled:
            self._cards_collection = self._client.get_or_create_collection(
                name=cards_collection_name,
                metadata={"hnsw:space": "cosine"},
            )
        self._card_projector = MemoryCardProjector()

    def archive(self, entry: MemoryEntry) -> None:
        """Index one evicted raw interaction record into ChromaDB.

        Args:
            entry: Working-memory entry selected for archival.

        Returns:
            None.

        Raises:
            ChromaDB exceptions may surface during upsert.
            TypeError: If raw-record or card metadata cannot be serialised.
        """
        raw_record = entry.to_raw_ltm_record()
        metadata = {
            "raw_record": json.dumps(raw_record.to_dict(), ensure_ascii=False),
            "record_id": raw_record.record_id,
            "raw_interaction_id": raw_record.interaction_id,
            "raw_role": raw_record.role,
            "raw_created_at": raw_record.created_at,
        }

        with self._lock:
            self._collection.upsert(
                ids=[raw_record.record_id],
                embeddings=[self._embed_text_payload(raw_record.text)],
                documents=[raw_record.text],
                metadatas=[metadata],
            )
            if getattr(self, "_cards_collection", None) is not None:
                cards = self._card_projector.project(entry)
                source_vector = entry.vector.tolist()
                for card in cards:
                    card_text = " ".join(
                        piece for piece in [card.kind, card.subject, card.predicate, card.object]
                        if piece
                    )
                    card_metadata = {
                        "card": json.dumps(card.to_dict(), ensure_ascii=False),
                        "card_id": card.card_id,
                        "source_record_id": card.provenance.source_record_id,
                        "kind": card.kind,
                    }
                    self._cards_collection.upsert(
                        ids=[card.card_id],
                        embeddings=[source_vector],
                        documents=[card_text],
                        metadatas=[card_metadata],
                    )
        if self._card_store is not None:
            self._card_store.archive(entry)

    def search_raw(
        self,
        query_vector: list[float],
        k: int = 5,
    ) -> list[RawRecallHit]:
        """Retrieve the top-k most relevant raw records by vector similarity.

        Args:
            query_vector: Query embedding in the same vector space as indexed
                raw records.
            k: Maximum number of raw hits requested.

        Returns:
            Raw hits under the configured distance threshold.

        Raises:
            ChromaDB exceptions may surface during query execution.
        """
        if k <= 0:
            return []

        results = self._collection.query(
            query_embeddings=[query_vector],
            n_results=k,
            include=["metadatas", "distances"],
        )

        metadatas: list[dict[str, object]] = results["metadatas"][0]
        distances: list[float] = results["distances"][0]

        hits: list[RawRecallHit] = []
        for idx, (meta, dist) in enumerate(zip(metadatas, distances)):
            if dist > self._distance_threshold:
                continue
            try:
                record = self._deserialize_raw_record(meta)
            except (KeyError, TypeError, ValueError, json.JSONDecodeError):
                continue
            hits.append(
                RawRecallHit(
                    record=record,
                    distance=float(dist),
                    similarity_score=1.0 - float(dist),
                    rank_hint=idx,
                )
            )
        return hits

    def search_cards(
        self,
        query_vector: list[float],
        k: int = 5,
    ) -> list[RawRecallHit]:
        """Retrieve the top-k most relevant card hits by vector similarity.

        Each returned :class:`RawRecallHit` points to the *raw source record*
        that the matching card was projected from.  If the source record is no
        longer present in the main collection the card hit is silently skipped.

        Args:
            query_vector: Query embedding in the same vector space as indexed
                cards.
            k: Maximum number of card hits requested.

        Returns:
            Raw hits corresponding to the source records of matching cards.

        Raises:
            ChromaDB exceptions may surface during the initial card query.
        """
        if self._cards_collection is None:
            return []
        if k <= 0:
            return []

        results = self._cards_collection.query(
            query_embeddings=[query_vector],
            n_results=k,
            include=["metadatas", "distances"],
        )

        metadatas: list[dict[str, object]] = results["metadatas"][0]
        distances: list[float] = results["distances"][0]

        hits: list[RawRecallHit] = []
        for idx, (meta, dist) in enumerate(zip(metadatas, distances)):
            if dist > self._distance_threshold:
                continue
            source_record_id = meta.get("source_record_id")
            if not source_record_id:
                continue
            try:
                raw_results = self._collection.get(
                    ids=[str(source_record_id)],
                    include=["metadatas"],
                )
            except Exception:  # noqa: BLE001
                continue
            raw_metas = raw_results.get("metadatas") or []
            if not raw_metas:
                continue
            try:
                record = self._deserialize_raw_record(raw_metas[0])
            except (KeyError, TypeError, ValueError, json.JSONDecodeError):
                continue
            hits.append(
                RawRecallHit(
                    record=record,
                    distance=float(dist),
                    similarity_score=1.0 - float(dist),
                    rank_hint=idx,
                )
            )
        return hits

    def count_cards(self) -> int:
        """Return the number of indexed card records.

        Returns:
            Indexed card count, or 0 when cards are disabled.

        Raises:
            ChromaDB exceptions may surface when counting an enabled collection.
        """
        if self._cards_collection is None:
            return 0
        return self._cards_collection.count()

    def read_all(self) -> list[RawLTMRecord]:
        """Return all archived raw records ordered by interaction_id ascending.

        Fetches every document from the main collection using
        ``include=["metadatas"]`` and deserialises each entry via
        ``_deserialize_raw_record``.  Records that fail deserialisation are
        silently skipped.

        Returns:
            Deserialisable raw records sorted by interaction id.

        Raises:
            ChromaDB exceptions may surface during collection reads.

        Warning:
            Malformed records are skipped to keep recall resilient to partial
            writes or older metadata schemas.
        """
        results = self._collection.get(include=["metadatas"])
        metadatas: list[dict[str, object]] = results.get("metadatas") or []
        records: list[RawLTMRecord] = []
        for meta in metadatas:
            try:
                records.append(self._deserialize_raw_record(meta))
            except (KeyError, TypeError, ValueError, json.JSONDecodeError):
                continue
        records.sort(key=lambda r: r.interaction_id)
        return records

    def count(self) -> int:
        """Return the number of indexed raw records in the collection.

        Returns:
            Indexed raw-record count.

        Raises:
            ChromaDB exceptions may surface during count.
        """
        return self._collection.count()

    def clear(self) -> None:
        """Delete all indexed records from the raw-record collection.

        Returns:
            None.

        Raises:
            ChromaDB exceptions may surface during deletion.
        """
        ids = self._collection.get(include=[])["ids"]
        if ids:
            self._collection.delete(ids=ids)

    @property
    def card_store(self) -> JsonlMemoryCardStore | None:
        """Auxiliary JSONL card audit store, when configured.

        Returns:
            Configured card store or ``None``.

        Raises:
            None.
        """
        return self._card_store

    def _deserialize_raw_record(self, meta: dict[str, object]) -> RawLTMRecord:
        """Hydrate a raw-LTM record from Chroma metadata."""
        raw_payload = meta["raw_record"]
        if not isinstance(raw_payload, str) or not raw_payload:
            raise ValueError("Missing raw_record metadata")
        return RawLTMRecord.from_dict(json.loads(raw_payload))

    def _embed_text_payload(self, text: str) -> list[float]:
        """Return the vector used to index one raw record."""
        vector = self._get_embedder()(text)
        return vector.tolist()

    def _get_embedder(self) -> Callable[[str], np.ndarray]:
        """Return the text embedder, lazily initialising the default engine."""
        if self._embed_text is not None:
            return self._embed_text

        if self._embedding_engine is None:
            from dmf.analysis.embedding_engine import EmbeddingEngine  # noqa: PLC0415

            self._embedding_engine = EmbeddingEngine(self._vector_config)

        return self._embedding_engine.get_embedding

card_store property

Auxiliary JSONL card audit store, when configured.

Returns:

Type Description
JsonlMemoryCardStore | None

Configured card store or None.

archive(entry)

Index one evicted raw interaction record into ChromaDB.

Parameters:

Name Type Description Default
entry MemoryEntry

Working-memory entry selected for archival.

required

Returns:

Type Description
None

None.

Raises:

Type Description
TypeError

If raw-record or card metadata cannot be serialised.

Source code in dmf/memory/chroma_ltm.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def archive(self, entry: MemoryEntry) -> None:
    """Index one evicted raw interaction record into ChromaDB.

    Args:
        entry: Working-memory entry selected for archival.

    Returns:
        None.

    Raises:
        ChromaDB exceptions may surface during upsert.
        TypeError: If raw-record or card metadata cannot be serialised.
    """
    raw_record = entry.to_raw_ltm_record()
    metadata = {
        "raw_record": json.dumps(raw_record.to_dict(), ensure_ascii=False),
        "record_id": raw_record.record_id,
        "raw_interaction_id": raw_record.interaction_id,
        "raw_role": raw_record.role,
        "raw_created_at": raw_record.created_at,
    }

    with self._lock:
        self._collection.upsert(
            ids=[raw_record.record_id],
            embeddings=[self._embed_text_payload(raw_record.text)],
            documents=[raw_record.text],
            metadatas=[metadata],
        )
        if getattr(self, "_cards_collection", None) is not None:
            cards = self._card_projector.project(entry)
            source_vector = entry.vector.tolist()
            for card in cards:
                card_text = " ".join(
                    piece for piece in [card.kind, card.subject, card.predicate, card.object]
                    if piece
                )
                card_metadata = {
                    "card": json.dumps(card.to_dict(), ensure_ascii=False),
                    "card_id": card.card_id,
                    "source_record_id": card.provenance.source_record_id,
                    "kind": card.kind,
                }
                self._cards_collection.upsert(
                    ids=[card.card_id],
                    embeddings=[source_vector],
                    documents=[card_text],
                    metadatas=[card_metadata],
                )
    if self._card_store is not None:
        self._card_store.archive(entry)

clear()

Delete all indexed records from the raw-record collection.

Returns:

Type Description
None

None.

Source code in dmf/memory/chroma_ltm.py
348
349
350
351
352
353
354
355
356
357
358
359
def clear(self) -> None:
    """Delete all indexed records from the raw-record collection.

    Returns:
        None.

    Raises:
        ChromaDB exceptions may surface during deletion.
    """
    ids = self._collection.get(include=[])["ids"]
    if ids:
        self._collection.delete(ids=ids)

count()

Return the number of indexed raw records in the collection.

Returns:

Type Description
int

Indexed raw-record count.

Source code in dmf/memory/chroma_ltm.py
337
338
339
340
341
342
343
344
345
346
def count(self) -> int:
    """Return the number of indexed raw records in the collection.

    Returns:
        Indexed raw-record count.

    Raises:
        ChromaDB exceptions may surface during count.
    """
    return self._collection.count()

count_cards()

Return the number of indexed card records.

Returns:

Type Description
int

Indexed card count, or 0 when cards are disabled.

Source code in dmf/memory/chroma_ltm.py
295
296
297
298
299
300
301
302
303
304
305
306
def count_cards(self) -> int:
    """Return the number of indexed card records.

    Returns:
        Indexed card count, or 0 when cards are disabled.

    Raises:
        ChromaDB exceptions may surface when counting an enabled collection.
    """
    if self._cards_collection is None:
        return 0
    return self._cards_collection.count()

read_all()

Return all archived raw records ordered by interaction_id ascending.

Fetches every document from the main collection using include=["metadatas"] and deserialises each entry via _deserialize_raw_record. Records that fail deserialisation are silently skipped.

Returns:

Type Description
list[RawLTMRecord]

Deserialisable raw records sorted by interaction id.

Warning

Malformed records are skipped to keep recall resilient to partial writes or older metadata schemas.

Source code in dmf/memory/chroma_ltm.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def read_all(self) -> list[RawLTMRecord]:
    """Return all archived raw records ordered by interaction_id ascending.

    Fetches every document from the main collection using
    ``include=["metadatas"]`` and deserialises each entry via
    ``_deserialize_raw_record``.  Records that fail deserialisation are
    silently skipped.

    Returns:
        Deserialisable raw records sorted by interaction id.

    Raises:
        ChromaDB exceptions may surface during collection reads.

    Warning:
        Malformed records are skipped to keep recall resilient to partial
        writes or older metadata schemas.
    """
    results = self._collection.get(include=["metadatas"])
    metadatas: list[dict[str, object]] = results.get("metadatas") or []
    records: list[RawLTMRecord] = []
    for meta in metadatas:
        try:
            records.append(self._deserialize_raw_record(meta))
        except (KeyError, TypeError, ValueError, json.JSONDecodeError):
            continue
    records.sort(key=lambda r: r.interaction_id)
    return records

search_cards(query_vector, k=5)

Retrieve the top-k most relevant card hits by vector similarity.

Each returned :class:RawRecallHit points to the raw source record that the matching card was projected from. If the source record is no longer present in the main collection the card hit is silently skipped.

Parameters:

Name Type Description Default
query_vector list[float]

Query embedding in the same vector space as indexed cards.

required
k int

Maximum number of card hits requested.

5

Returns:

Type Description
list[RawRecallHit]

Raw hits corresponding to the source records of matching cards.

Source code in dmf/memory/chroma_ltm.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def search_cards(
    self,
    query_vector: list[float],
    k: int = 5,
) -> list[RawRecallHit]:
    """Retrieve the top-k most relevant card hits by vector similarity.

    Each returned :class:`RawRecallHit` points to the *raw source record*
    that the matching card was projected from.  If the source record is no
    longer present in the main collection the card hit is silently skipped.

    Args:
        query_vector: Query embedding in the same vector space as indexed
            cards.
        k: Maximum number of card hits requested.

    Returns:
        Raw hits corresponding to the source records of matching cards.

    Raises:
        ChromaDB exceptions may surface during the initial card query.
    """
    if self._cards_collection is None:
        return []
    if k <= 0:
        return []

    results = self._cards_collection.query(
        query_embeddings=[query_vector],
        n_results=k,
        include=["metadatas", "distances"],
    )

    metadatas: list[dict[str, object]] = results["metadatas"][0]
    distances: list[float] = results["distances"][0]

    hits: list[RawRecallHit] = []
    for idx, (meta, dist) in enumerate(zip(metadatas, distances)):
        if dist > self._distance_threshold:
            continue
        source_record_id = meta.get("source_record_id")
        if not source_record_id:
            continue
        try:
            raw_results = self._collection.get(
                ids=[str(source_record_id)],
                include=["metadatas"],
            )
        except Exception:  # noqa: BLE001
            continue
        raw_metas = raw_results.get("metadatas") or []
        if not raw_metas:
            continue
        try:
            record = self._deserialize_raw_record(raw_metas[0])
        except (KeyError, TypeError, ValueError, json.JSONDecodeError):
            continue
        hits.append(
            RawRecallHit(
                record=record,
                distance=float(dist),
                similarity_score=1.0 - float(dist),
                rank_hint=idx,
            )
        )
    return hits

search_raw(query_vector, k=5)

Retrieve the top-k most relevant raw records by vector similarity.

Parameters:

Name Type Description Default
query_vector list[float]

Query embedding in the same vector space as indexed raw records.

required
k int

Maximum number of raw hits requested.

5

Returns:

Type Description
list[RawRecallHit]

Raw hits under the configured distance threshold.

Source code in dmf/memory/chroma_ltm.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def search_raw(
    self,
    query_vector: list[float],
    k: int = 5,
) -> list[RawRecallHit]:
    """Retrieve the top-k most relevant raw records by vector similarity.

    Args:
        query_vector: Query embedding in the same vector space as indexed
            raw records.
        k: Maximum number of raw hits requested.

    Returns:
        Raw hits under the configured distance threshold.

    Raises:
        ChromaDB exceptions may surface during query execution.
    """
    if k <= 0:
        return []

    results = self._collection.query(
        query_embeddings=[query_vector],
        n_results=k,
        include=["metadatas", "distances"],
    )

    metadatas: list[dict[str, object]] = results["metadatas"][0]
    distances: list[float] = results["distances"][0]

    hits: list[RawRecallHit] = []
    for idx, (meta, dist) in enumerate(zip(metadatas, distances)):
        if dist > self._distance_threshold:
            continue
        try:
            record = self._deserialize_raw_record(meta)
        except (KeyError, TypeError, ValueError, json.JSONDecodeError):
            continue
        hits.append(
            RawRecallHit(
                record=record,
                distance=float(dist),
                similarity_score=1.0 - float(dist),
                rank_hint=idx,
            )
        )
    return hits

EmbeddingEngine

Converts text to L2-normalised dense vectors via FastEmbed.

The underlying TextEmbedding model is loaded lazily on the first call
to get_embedding(), not at construction time. Only configuration is
stored initially so heavy I/O is deferred until the engine is used.

Attributes
----------
_config : VectorConfig
    Immutable configuration (model name, expected dimension, cache path).
_model : TextEmbedding | None
    FastEmbed model instance. None until the first get_embedding() call.

Parameters:

Name Type Description Default
config VectorConfig

See the function signature and surrounding type hints.

required

Returns:

Type Description

Instance of this class.

Source code in dmf/analysis/embedding_engine.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class EmbeddingEngine:
    """Converts text to L2-normalised dense vectors via FastEmbed.

        The underlying TextEmbedding model is loaded lazily on the first call
        to get_embedding(), not at construction time. Only configuration is
        stored initially so heavy I/O is deferred until the engine is used.

        Attributes
        ----------
        _config : VectorConfig
            Immutable configuration (model name, expected dimension, cache path).
        _model : TextEmbedding | None
            FastEmbed model instance. None until the first get_embedding() call.

    Args:
        config: See the function signature and surrounding type hints.

    Returns:
        Instance of this class.

    Raises:
        None.
    """

    def __init__(self, config: VectorConfig) -> None:
        """Store configuration; do NOT load the embedding model yet.

        Parameters
        ----------
        config : VectorConfig
            Immutable configuration for this engine instance.
        """
        self._config: VectorConfig = config
        self._model: TextEmbedding | None = None


    def get_embedding(self, text: str) -> np.ndarray:
        """Return the L2-normalised embedding vector for *text*.

                Triggers model loading on the first invocation; subsequent calls
                reuse the already-loaded model with no additional I/O.

                Parameters
                ----------
                text : str
                    Raw text to embed.

                Returns
                -------
                np.ndarray
                    1-D array of shape (vector_dim,) with unit L2 norm.

        Args:
            text: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        self._load_model_if_needed()
        raw_vector = self._generate_raw_vector(text)
        return self._apply_l2_normalization(raw_vector)


    def _load_model_if_needed(self) -> None:
        """Instantiate TextEmbedding on the first call; no-op thereafter.

        FastEmbed downloads model weights to config.cache_dir on the very
        first instantiation (~24 MB for bge-small-en-v1.5) and reads from
        the local cache on all subsequent runs.
        """
        if self._model is None:
            self._model = TextEmbedding(
                model_name=self._config.model_name,
                cache_dir=self._config.cache_dir,
            )


    def _generate_raw_vector(self, text: str) -> np.ndarray:
        """Run FastEmbed inference and return a guaranteed 1-D raw vector.

        FastEmbed's embed() returns a generator of batches. We pass a
        single-element list, retrieve the first (and only) result with
        next(iter(...)), then call .flatten() to collapse any extra batch
        dimensions — ensuring the 1-D contract regardless of FastEmbed's
        internal output shape.

        Parameters
        ----------
        text : str
            Raw text to embed.

        Returns
        -------
        np.ndarray
            1-D array of shape (vector_dim,) before normalisation.
        """
        raw = next(iter(self._model.embed([text])))  # type: ignore[union-attr]
        return np.array(raw).flatten()

    def _apply_l2_normalization(self, vector: np.ndarray) -> np.ndarray:
        """Divide *vector* by its L2 norm to produce a unit vector.

        A zero-vector guard returns the zero vector unchanged to avoid
        producing NaN values — mathematically there is no well-defined
        unit direction for the zero vector.

        Parameters
        ----------
        vector : np.ndarray
            Raw embedding to normalise.

        Returns
        -------
        np.ndarray
            Unit-length vector (norm ≈ 1.0), or the zero vector unchanged.
        """
        norm = np.linalg.norm(vector)
        if norm == 0.0:
            return vector
        return vector / norm

get_embedding(text)

Return the L2-normalised embedding vector for text.

    Triggers model loading on the first invocation; subsequent calls
    reuse the already-loaded model with no additional I/O.

    Parameters
    ----------
    text : str
        Raw text to embed.

    Returns
    -------
    np.ndarray
        1-D array of shape (vector_dim,) with unit L2 norm.

Parameters:

Name Type Description Default
text str

See the function signature and surrounding type hints.

required
Source code in dmf/analysis/embedding_engine.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def get_embedding(self, text: str) -> np.ndarray:
    """Return the L2-normalised embedding vector for *text*.

            Triggers model loading on the first invocation; subsequent calls
            reuse the already-loaded model with no additional I/O.

            Parameters
            ----------
            text : str
                Raw text to embed.

            Returns
            -------
            np.ndarray
                1-D array of shape (vector_dim,) with unit L2 norm.

    Args:
        text: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    self._load_model_if_needed()
    raw_vector = self._generate_raw_vector(text)
    return self._apply_l2_normalization(raw_vector)

FileLTMHook

Append-only JSONL archive for raw long-term-memory records.

Parameters:

Name Type Description Default
storage_path Path | str

Filesystem path for the raw-record JSONL archive.

required
cards_enabled bool

Whether to project auxiliary memory cards on archive.

False
cards_path Path | str | None

Optional JSONL path for auxiliary cards.

None
card_store JsonlMemoryCardStore | None

Optional prebuilt card store. Takes precedence over cards_enabled and cards_path.

None

Returns:

Type Description

File-backed LTM hook instance.

Raises:

Type Description
OSError

If the archive parent directory cannot be created.

Warning

search_raw intentionally returns an empty list. This backend is archival-only; semantic retrieval requires a vector-backed hook.

Source code in dmf/memory/ltm_engine.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class FileLTMHook:
    """Append-only JSONL archive for raw long-term-memory records.

    Args:
        storage_path: Filesystem path for the raw-record JSONL archive.
        cards_enabled: Whether to project auxiliary memory cards on archive.
        cards_path: Optional JSONL path for auxiliary cards.
        card_store: Optional prebuilt card store. Takes precedence over
            ``cards_enabled`` and ``cards_path``.

    Returns:
        File-backed LTM hook instance.

    Raises:
        OSError: If the archive parent directory cannot be created.

    Warning:
        ``search_raw`` intentionally returns an empty list. This backend is
        archival-only; semantic retrieval requires a vector-backed hook.
    """

    def __init__(
        self,
        storage_path: Path | str,
        *,
        cards_enabled: bool = False,
        cards_path: Path | str | None = None,
        card_store: JsonlMemoryCardStore | None = None,
    ) -> None:
        self._path: Path = Path(storage_path)
        self._path.parent.mkdir(parents=True, exist_ok=True)
        self._lock = threading.Lock()
        self._card_store = card_store
        if self._card_store is None and cards_enabled:
            self._card_store = JsonlMemoryCardStore(
                cards_path or self._path.with_suffix(".cards.jsonl")
            )

    def archive(self, entry: MemoryEntry) -> None:
        """Append one raw record and optional auxiliary cards.

        Args:
            entry: Working-memory entry selected for archival.

        Returns:
            None.

        Raises:
            OSError: If the archive cannot be written.
            TypeError: If the raw record cannot be JSON-serialised.
        """
        line = json.dumps(entry.to_raw_ltm_record().to_dict(), ensure_ascii=False)
        with self._lock:
            with open(self._path, "a", encoding=DEFAULT_TEXT_ENCODING) as fh:
                fh.write(line + "\n")
        if self._card_store is not None:
            self._card_store.archive(entry)

    def search_raw(self, query_vector: list[float], k: int = DEFAULT_LTM_RECALL_LIMIT) -> list[RawRecallHit]:
        """Return no raw search hits for this archival-only backend.

        Args:
            query_vector: Ignored query embedding.
            k: Ignored hit limit.

        Returns:
            Empty list.

        Raises:
            None.
        """
        return []

    def read_all(self) -> list[RawLTMRecord]:
        """Return all archived raw records in insertion order.

        Reads the JSONL archive line by line and deserialises each record.
        Returns an empty list when the archive file does not yet exist.

        Returns:
            Raw records in file order.

        Raises:
            OSError: If the archive exists but cannot be read.
            json.JSONDecodeError: If a non-empty line is not valid JSON.
            KeyError: If a raw-record payload is missing required fields.
        """
        if not self._path.exists():
            return []
        records: list[RawLTMRecord] = []
        with self._lock:
            with open(self._path, encoding=DEFAULT_TEXT_ENCODING) as fh:
                for line in fh:
                    line = line.strip()
                    if not line:
                        continue
                    records.append(RawLTMRecord.from_dict(json.loads(line)))
        return records

    @property
    def path(self) -> Path:
        """Filesystem path of the JSONL archive.

        Returns:
            Configured raw-record archive path.

        Raises:
            None.
        """
        return self._path

    @property
    def card_store(self) -> JsonlMemoryCardStore | None:
        """Auxiliary JSONL card audit store, when configured.

        Returns:
            Configured card store or ``None``.

        Raises:
            None.
        """
        return self._card_store

card_store property

Auxiliary JSONL card audit store, when configured.

Returns:

Type Description
JsonlMemoryCardStore | None

Configured card store or None.

path property

Filesystem path of the JSONL archive.

Returns:

Type Description
Path

Configured raw-record archive path.

archive(entry)

Append one raw record and optional auxiliary cards.

Parameters:

Name Type Description Default
entry MemoryEntry

Working-memory entry selected for archival.

required

Returns:

Type Description
None

None.

Raises:

Type Description
OSError

If the archive cannot be written.

TypeError

If the raw record cannot be JSON-serialised.

Source code in dmf/memory/ltm_engine.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def archive(self, entry: MemoryEntry) -> None:
    """Append one raw record and optional auxiliary cards.

    Args:
        entry: Working-memory entry selected for archival.

    Returns:
        None.

    Raises:
        OSError: If the archive cannot be written.
        TypeError: If the raw record cannot be JSON-serialised.
    """
    line = json.dumps(entry.to_raw_ltm_record().to_dict(), ensure_ascii=False)
    with self._lock:
        with open(self._path, "a", encoding=DEFAULT_TEXT_ENCODING) as fh:
            fh.write(line + "\n")
    if self._card_store is not None:
        self._card_store.archive(entry)

read_all()

Return all archived raw records in insertion order.

Reads the JSONL archive line by line and deserialises each record. Returns an empty list when the archive file does not yet exist.

Returns:

Type Description
list[RawLTMRecord]

Raw records in file order.

Raises:

Type Description
OSError

If the archive exists but cannot be read.

JSONDecodeError

If a non-empty line is not valid JSON.

KeyError

If a raw-record payload is missing required fields.

Source code in dmf/memory/ltm_engine.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def read_all(self) -> list[RawLTMRecord]:
    """Return all archived raw records in insertion order.

    Reads the JSONL archive line by line and deserialises each record.
    Returns an empty list when the archive file does not yet exist.

    Returns:
        Raw records in file order.

    Raises:
        OSError: If the archive exists but cannot be read.
        json.JSONDecodeError: If a non-empty line is not valid JSON.
        KeyError: If a raw-record payload is missing required fields.
    """
    if not self._path.exists():
        return []
    records: list[RawLTMRecord] = []
    with self._lock:
        with open(self._path, encoding=DEFAULT_TEXT_ENCODING) as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                records.append(RawLTMRecord.from_dict(json.loads(line)))
    return records

search_raw(query_vector, k=DEFAULT_LTM_RECALL_LIMIT)

Return no raw search hits for this archival-only backend.

Parameters:

Name Type Description Default
query_vector list[float]

Ignored query embedding.

required
k int

Ignored hit limit.

DEFAULT_LTM_RECALL_LIMIT

Returns:

Type Description
list[RawRecallHit]

Empty list.

Source code in dmf/memory/ltm_engine.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def search_raw(self, query_vector: list[float], k: int = DEFAULT_LTM_RECALL_LIMIT) -> list[RawRecallHit]:
    """Return no raw search hits for this archival-only backend.

    Args:
        query_vector: Ignored query embedding.
        k: Ignored hit limit.

    Returns:
        Empty list.

    Raises:
        None.
    """
    return []

InteractionMatrix

Sliding-window container for recent interaction embedding vectors.

Backed by a ``collections.deque`` with ``maxlen=config.window_size``.
When the window is full, appending a new vector automatically evicts
the oldest entry (FIFO) — no manual bookkeeping required.

The centroid is maintained as a **stateful, L2-normalised** attribute
(``_current_centroid``) that is recomputed only inside ``add_vector``
after the deque is updated. ``get_centroid()`` is therefore an O(1)
read with no recomputation. L2 normalisation projects the mean vector
back onto the unit hypersphere, ensuring that divergence comparisons
remain pure angular measurements.

Attributes
----------
_config : VectorConfig
    Immutable configuration (only ``window_size`` is used here).
_vectors : deque[np.ndarray]
    Bounded deque of the most recent embedding vectors.
_current_centroid : np.ndarray | None
    Cached, L2-normalised centroid of the current window. None when
    the window is empty.

Parameters:

Name Type Description Default
config VectorConfig

See the function signature and surrounding type hints.

required

Returns:

Type Description

Instance of this class.

Source code in dmf/analysis/geometry.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
class InteractionMatrix:
    """Sliding-window container for recent interaction embedding vectors.

        Backed by a ``collections.deque`` with ``maxlen=config.window_size``.
        When the window is full, appending a new vector automatically evicts
        the oldest entry (FIFO) — no manual bookkeeping required.

        The centroid is maintained as a **stateful, L2-normalised** attribute
        (``_current_centroid``) that is recomputed only inside ``add_vector``
        after the deque is updated. ``get_centroid()`` is therefore an O(1)
        read with no recomputation. L2 normalisation projects the mean vector
        back onto the unit hypersphere, ensuring that divergence comparisons
        remain pure angular measurements.

        Attributes
        ----------
        _config : VectorConfig
            Immutable configuration (only ``window_size`` is used here).
        _vectors : deque[np.ndarray]
            Bounded deque of the most recent embedding vectors.
        _current_centroid : np.ndarray | None
            Cached, L2-normalised centroid of the current window. None when
            the window is empty.

    Args:
        config: See the function signature and surrounding type hints.

    Returns:
        Instance of this class.

    Raises:
        None.
    """

    def __init__(self, config: VectorConfig) -> None:
        """Initialise the matrix with an empty sliding window.

        Parameters
        ----------
        config : VectorConfig
            Immutable configuration. ``window_size`` controls the maximum
            number of vectors retained.
        """
        self._config: VectorConfig = config
        self._vectors: deque[np.ndarray] = deque(maxlen=config.window_size)
        self._current_centroid: np.ndarray | None = None


    @classmethod
    def from_dmf_config(cls, config: DMFConfig) -> InteractionMatrix:
        """Construct an ``InteractionMatrix`` from a universal ``DMFConfig``.

                Uses ``nlp.vector_dim`` and ``capacity.window_size`` from the TOML.

                Translation map
                ---------------
                ==================== ====================== ===================
                TOML key             DMFConfig path         VectorConfig field
                ==================== ====================== ===================
                nlp.vector_dim       config.nlp.vector_dim  vector_dim
                capacity.window_size config.capacity.window_size window_size
                ==================== ====================== ===================

                Parameters
                ----------
                config : DMFConfig
                    Fully populated config object returned by ``load_dmf_config()``.

                Returns
                -------
                InteractionMatrix
                    Fully initialised instance with an empty sliding window.

        Args:
            config: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        vector_cfg = VectorConfig(
            vector_dim=config.nlp.vector_dim,
            window_size=config.capacity.window_size,
        )
        return cls(config=vector_cfg)


    def add_vector(self, vector: np.ndarray) -> float:
        """Append *vector* to the window and return its divergence score.

                Execution order (critical for correctness):

                1. Compute divergence D against the **previous** centroid — this
                   measures how far the new interaction departs from the existing
                   context *before* it becomes part of that context.
                2. Append the new vector to the deque (may evict the oldest).
                3. Recompute and L2-normalise the centroid from the updated window.

                On the very first call (empty window) divergence is defined as
                0.0 because there is no prior context to diverge from.

                Parameters
                ----------
                vector : np.ndarray
                    L2-normalised embedding vector for the new interaction.

                Returns
                -------
                float
                    Semantic divergence D ∈ [0.0, 2.0].

        Args:
            vector: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        if len(self._vectors) == 0:
            self._vectors.append(vector)
            self._recalculate_centroid()
            return 0.0

        # Divergence must be measured before the window absorbs the new vector;
        # otherwise every turn partially compares against itself.
        divergence = calculate_divergence(vector, self._current_centroid)

        self._vectors.append(vector)

        self._recalculate_centroid()

        return divergence

    def get_centroid(self) -> np.ndarray | None:
        """Return the cached, L2-normalised centroid (O(1) read).

                Returns
                -------
                np.ndarray | None
                    Unit-length centroid of the current window, or None when the
                    window is empty.

        Raises:
            None.
        """
        return self._current_centroid


    def _recalculate_centroid(self) -> None:
        """Recompute the centroid from the current deque and L2-normalise it.

        The raw element-wise mean of the window vectors is divided by its
        L2 norm to project it back onto the unit hypersphere. This ensures
        that ``calculate_divergence`` always compares unit vectors, making
        D a pure angular measurement with no magnitude bias.

        A zero-norm guard preserves the zero vector unchanged to avoid NaN.
        """
        mean = calculate_centroid(list(self._vectors))
        norm = float(np.linalg.norm(mean))
        if norm > 0.0:
            self._current_centroid = mean / norm
        else:
            self._current_centroid = mean


    @property
    def size(self) -> int:
        """Number of vectors currently in the sliding window.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return len(self._vectors)

    @property
    def is_empty(self) -> bool:
        """True when the window contains no vectors.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return len(self._vectors) == 0

    @property
    def window_size(self) -> int:
        """Maximum capacity of the sliding window (from config).

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return self._config.window_size

    def remove_vector(self, vector: np.ndarray) -> bool:
        """Remove a specific vector from the sliding window by object identity.

                Comparison uses Python's ``is`` operator (object identity), not
                element-wise numpy equality.  This is safe because the same
                ``np.ndarray`` object is stored on both the ``MemoryEntry`` and
                this deque — ``add_vector`` receives the reference and appends it
                directly without copying.

                If the vector is no longer in the window (evicted by FIFO overflow
                because more than ``window_size`` interactions arrived since it was
                added), this is a no-op and returns ``False``.

                After removal the centroid is recomputed from the remaining vectors.
                If the window becomes empty, the centroid is reset to ``None``.

                Parameters
                ----------
                vector : np.ndarray
                    The exact array object to remove (identity comparison).

                Returns
                -------
                bool
                    ``True`` if the vector was found and removed; ``False`` if it
                    was not in the current window.

        Args:
            vector: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        # Identity matching avoids removing a different ndarray with equal
        # values when duplicate vectors are present in the sliding window.
        found = False
        new_vectors: deque[np.ndarray] = deque(maxlen=self._config.window_size)
        for v in self._vectors:
            if not found and v is vector:
                found = True
            else:
                new_vectors.append(v)

        if not found:
            return False

        self._vectors = new_vectors
        if self._vectors:
            self._recalculate_centroid()
        else:
            self._current_centroid = None
        return True

is_empty property

True when the window contains no vectors.

Returns:

Type Description
bool

See the return type annotation.

size property

Number of vectors currently in the sliding window.

Returns:

Type Description
int

See the return type annotation.

window_size property

Maximum capacity of the sliding window (from config).

Returns:

Type Description
int

See the return type annotation.

add_vector(vector)

Append vector to the window and return its divergence score.

    Execution order (critical for correctness):

    1. Compute divergence D against the **previous** centroid — this
       measures how far the new interaction departs from the existing
       context *before* it becomes part of that context.
    2. Append the new vector to the deque (may evict the oldest).
    3. Recompute and L2-normalise the centroid from the updated window.

    On the very first call (empty window) divergence is defined as
    0.0 because there is no prior context to diverge from.

    Parameters
    ----------
    vector : np.ndarray
        L2-normalised embedding vector for the new interaction.

    Returns
    -------
    float
        Semantic divergence D ∈ [0.0, 2.0].

Parameters:

Name Type Description Default
vector ndarray

See the function signature and surrounding type hints.

required
Source code in dmf/analysis/geometry.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def add_vector(self, vector: np.ndarray) -> float:
    """Append *vector* to the window and return its divergence score.

            Execution order (critical for correctness):

            1. Compute divergence D against the **previous** centroid — this
               measures how far the new interaction departs from the existing
               context *before* it becomes part of that context.
            2. Append the new vector to the deque (may evict the oldest).
            3. Recompute and L2-normalise the centroid from the updated window.

            On the very first call (empty window) divergence is defined as
            0.0 because there is no prior context to diverge from.

            Parameters
            ----------
            vector : np.ndarray
                L2-normalised embedding vector for the new interaction.

            Returns
            -------
            float
                Semantic divergence D ∈ [0.0, 2.0].

    Args:
        vector: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    if len(self._vectors) == 0:
        self._vectors.append(vector)
        self._recalculate_centroid()
        return 0.0

    # Divergence must be measured before the window absorbs the new vector;
    # otherwise every turn partially compares against itself.
    divergence = calculate_divergence(vector, self._current_centroid)

    self._vectors.append(vector)

    self._recalculate_centroid()

    return divergence

from_dmf_config(config) classmethod

Construct an InteractionMatrix from a universal DMFConfig.

    Uses ``nlp.vector_dim`` and ``capacity.window_size`` from the TOML.

    Translation map
    ---------------
    ==================== ====================== ===================
    TOML key             DMFConfig path         VectorConfig field
    ==================== ====================== ===================
    nlp.vector_dim       config.nlp.vector_dim  vector_dim
    capacity.window_size config.capacity.window_size window_size
    ==================== ====================== ===================

    Parameters
    ----------
    config : DMFConfig
        Fully populated config object returned by ``load_dmf_config()``.

    Returns
    -------
    InteractionMatrix
        Fully initialised instance with an empty sliding window.

Parameters:

Name Type Description Default
config DMFConfig

See the function signature and surrounding type hints.

required
Source code in dmf/analysis/geometry.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@classmethod
def from_dmf_config(cls, config: DMFConfig) -> InteractionMatrix:
    """Construct an ``InteractionMatrix`` from a universal ``DMFConfig``.

            Uses ``nlp.vector_dim`` and ``capacity.window_size`` from the TOML.

            Translation map
            ---------------
            ==================== ====================== ===================
            TOML key             DMFConfig path         VectorConfig field
            ==================== ====================== ===================
            nlp.vector_dim       config.nlp.vector_dim  vector_dim
            capacity.window_size config.capacity.window_size window_size
            ==================== ====================== ===================

            Parameters
            ----------
            config : DMFConfig
                Fully populated config object returned by ``load_dmf_config()``.

            Returns
            -------
            InteractionMatrix
                Fully initialised instance with an empty sliding window.

    Args:
        config: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    vector_cfg = VectorConfig(
        vector_dim=config.nlp.vector_dim,
        window_size=config.capacity.window_size,
    )
    return cls(config=vector_cfg)

get_centroid()

Return the cached, L2-normalised centroid (O(1) read).

    Returns
    -------
    np.ndarray | None
        Unit-length centroid of the current window, or None when the
        window is empty.
Source code in dmf/analysis/geometry.py
261
262
263
264
265
266
267
268
269
270
271
272
273
def get_centroid(self) -> np.ndarray | None:
    """Return the cached, L2-normalised centroid (O(1) read).

            Returns
            -------
            np.ndarray | None
                Unit-length centroid of the current window, or None when the
                window is empty.

    Raises:
        None.
    """
    return self._current_centroid

remove_vector(vector)

Remove a specific vector from the sliding window by object identity.

    Comparison uses Python's ``is`` operator (object identity), not
    element-wise numpy equality.  This is safe because the same
    ``np.ndarray`` object is stored on both the ``MemoryEntry`` and
    this deque — ``add_vector`` receives the reference and appends it
    directly without copying.

    If the vector is no longer in the window (evicted by FIFO overflow
    because more than ``window_size`` interactions arrived since it was
    added), this is a no-op and returns ``False``.

    After removal the centroid is recomputed from the remaining vectors.
    If the window becomes empty, the centroid is reset to ``None``.

    Parameters
    ----------
    vector : np.ndarray
        The exact array object to remove (identity comparison).

    Returns
    -------
    bool
        ``True`` if the vector was found and removed; ``False`` if it
        was not in the current window.

Parameters:

Name Type Description Default
vector ndarray

See the function signature and surrounding type hints.

required
Source code in dmf/analysis/geometry.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def remove_vector(self, vector: np.ndarray) -> bool:
    """Remove a specific vector from the sliding window by object identity.

            Comparison uses Python's ``is`` operator (object identity), not
            element-wise numpy equality.  This is safe because the same
            ``np.ndarray`` object is stored on both the ``MemoryEntry`` and
            this deque — ``add_vector`` receives the reference and appends it
            directly without copying.

            If the vector is no longer in the window (evicted by FIFO overflow
            because more than ``window_size`` interactions arrived since it was
            added), this is a no-op and returns ``False``.

            After removal the centroid is recomputed from the remaining vectors.
            If the window becomes empty, the centroid is reset to ``None``.

            Parameters
            ----------
            vector : np.ndarray
                The exact array object to remove (identity comparison).

            Returns
            -------
            bool
                ``True`` if the vector was found and removed; ``False`` if it
                was not in the current window.

    Args:
        vector: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    # Identity matching avoids removing a different ndarray with equal
    # values when duplicate vectors are present in the sliding window.
    found = False
    new_vectors: deque[np.ndarray] = deque(maxlen=self._config.window_size)
    for v in self._vectors:
        if not found and v is vector:
            found = True
        else:
            new_vectors.append(v)

    if not found:
        return False

    self._vectors = new_vectors
    if self._vectors:
        self._recalculate_centroid()
    else:
        self._current_centroid = None
    return True

InteractionPipeline

Full-pipeline coordinator for a single conversation session.

Owns one instance each of NLPEngine, EmbeddingEngine, and
InteractionMatrix for the lifetime of the session. Callers interact
only through analyze_interaction(); the internal routing is opaque.

Attributes
----------
_nlp_config : NLPConfig
    Immutable NLP configuration forwarded to NLPEngine.
_vector_config : VectorConfig
    Immutable vector configuration forwarded to EmbeddingEngine and
    InteractionMatrix.
_nlp_engine : NLPEngine
    Extracts ID, |S|, E from raw text via spaCy + VADER.
_embedding_engine : EmbeddingEngine
    Converts text to an L2-normalised dense vector.
    FastEmbed model is loaded lazily on the first get_embedding() call.
_interaction_matrix : InteractionMatrix
    Maintains the sliding-window centroid and computes D.

Parameters:

Name Type Description Default
nlp_config NLPConfig

See the function signature and surrounding type hints.

required
vector_config VectorConfig

See the function signature and surrounding type hints.

required

Returns:

Type Description

Instance of this class.

Source code in dmf/runtime/pipeline.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class InteractionPipeline:
    """Full-pipeline coordinator for a single conversation session.

        Owns one instance each of NLPEngine, EmbeddingEngine, and
        InteractionMatrix for the lifetime of the session. Callers interact
        only through analyze_interaction(); the internal routing is opaque.

        Attributes
        ----------
        _nlp_config : NLPConfig
            Immutable NLP configuration forwarded to NLPEngine.
        _vector_config : VectorConfig
            Immutable vector configuration forwarded to EmbeddingEngine and
            InteractionMatrix.
        _nlp_engine : NLPEngine
            Extracts ID, |S|, E from raw text via spaCy + VADER.
        _embedding_engine : EmbeddingEngine
            Converts text to an L2-normalised dense vector.
            FastEmbed model is loaded lazily on the first get_embedding() call.
        _interaction_matrix : InteractionMatrix
            Maintains the sliding-window centroid and computes D.

    Args:
        nlp_config: See the function signature and surrounding type hints.
        vector_config: See the function signature and surrounding type hints.

    Returns:
        Instance of this class.

    Raises:
        None.
    """

    def __init__(
        self,
        nlp_config: NLPConfig,
        vector_config: VectorConfig,
    ) -> None:
        """Initialise the pipeline by constructing all component engines.

        All three components are constructed here. Only NLPEngine triggers I/O
        at construction time (loading the spaCy model). EmbeddingEngine defers
        FastEmbed loading to the first get_embedding() call.

        Parameters
        ----------
        nlp_config : NLPConfig
            Configuration for the NLP Engine (spaCy model, system-prompt
            gating flag).
        vector_config : VectorConfig
            Configuration for the Embedding Engine and InteractionMatrix
            (FastEmbed model, cache directory, window size).
        """
        self._nlp_config: NLPConfig = nlp_config
        self._vector_config: VectorConfig = vector_config

        self._nlp_engine: NLPEngine = NLPEngine(config=nlp_config)
        self._embedding_engine: EmbeddingEngine = EmbeddingEngine(config=vector_config)
        self._interaction_matrix: InteractionMatrix = InteractionMatrix(config=vector_config)

    @classmethod
    def from_dmf_config(
        cls,
        config: DMFConfig,
        *,
        analyze_system_prompt: bool | None = None,
    ) -> InteractionPipeline:
        """Construct an ``InteractionPipeline`` from the universal ``DMFConfig``.

                Parameters
                ----------
                config : DMFConfig
                    Fully populated config object returned by ``load_dmf_config()``.
                analyze_system_prompt : bool | None
                    Optional override for ``NLPConfig.analyze_system_prompt``.
                    When ``None``, the ``NLPConfig`` default is preserved.

        Args:
            config: See the function signature and surrounding type hints.
            analyze_system_prompt: See the function signature and surrounding type hints.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        nlp_kwargs: dict[str, object] = {"spacy_model": config.nlp.spacy_model}
        if analyze_system_prompt is not None:
            nlp_kwargs["analyze_system_prompt"] = analyze_system_prompt

        nlp_cfg = NLPConfig(**nlp_kwargs)
        vector_cfg = VectorConfig(
            model_name=config.nlp.model_name,
            vector_dim=config.nlp.vector_dim,
            window_size=config.capacity.window_size,
        )
        return cls(
            nlp_config=nlp_cfg,
            vector_config=vector_cfg,
        )


    def analyze_interaction(
        self,
        text: str,
        is_system: bool = False,
        provenance: InteractionProvenance | None = None,
    ) -> AnalysisReport:
        """Process *text* through the full pipeline and return an enriched report.

                Steps
                -----
                1. NLP extraction (always runs, except for gated system prompts).
                2. Early return if the interaction was skipped by NLPEngine gating:
                   ``semantic_divergence`` remains at its default of 0.0.
                3. Embedding: convert text to a unit vector via FastEmbed.
                4. Geometry: compute divergence D and update the sliding window.
                5. Stamp ``semantic_divergence`` onto the report and return.

                Parameters
                ----------
                text : str
                    Raw interaction text (user message, assistant turn, etc.).
                is_system : bool
                    True when processing the system prompt. Combined with
                    ``NLPConfig.analyze_system_prompt``, controls whether the
                    system prompt is gated or fully analysed.
                provenance : InteractionProvenance | None
                    Optional structured provenance metadata supplied by the caller.
                    The pipeline copies it onto the returned report without adding
                    adapter-specific heuristics.

                Returns
                -------
                AnalysisReport
                    Fully enriched report with ID, |S|, E, D, latency_ms, and
                    raw_metadata populated.

        Args:
            text: See the function signature and surrounding type hints.
            is_system: See the function signature and surrounding type hints.
            provenance: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        report, _ = self.analyze_interaction_with_vector(
            text,
            is_system=is_system,
            provenance=provenance,
        )
        return report

    def analyze_interaction_with_vector(
        self,
        text: str,
        is_system: bool = False,
        provenance: InteractionProvenance | None = None,
    ) -> tuple[AnalysisReport, np.ndarray | None]:
        """Process *text* and also return the canonical embedding used.

                This is the public boundary for callers that need both the enriched
                ``AnalysisReport`` and the exact vector stamped into the session
                geometry. It avoids re-embedding the same text out-of-band.

                Returns
                -------
                tuple[AnalysisReport, np.ndarray | None]
                    ``(report, vector)`` for normal turns; ``vector`` is ``None``
                    when the interaction was skipped by system-prompt gating.

        Args:
            text: See the function signature and surrounding type hints.
            is_system: See the function signature and surrounding type hints.
            provenance: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        report = self._nlp_engine.analyze_interaction(text, is_system=is_system)

        if report.raw_metadata.get("skipped"):
            return (
                dataclasses.replace(
                    report,
                    provenance=provenance or InteractionProvenance(),
                ),
                None,
            )

        vector = self._embedding_engine.get_embedding(text)
        divergence = self._interaction_matrix.add_vector(vector)

        return (
            dataclasses.replace(
            report,
            semantic_divergence=divergence,
            provenance=provenance or InteractionProvenance(),
            ),
            vector,
        )

analyze_interaction(text, is_system=False, provenance=None)

Process text through the full pipeline and return an enriched report.

    Steps
    -----
    1. NLP extraction (always runs, except for gated system prompts).
    2. Early return if the interaction was skipped by NLPEngine gating:
       ``semantic_divergence`` remains at its default of 0.0.
    3. Embedding: convert text to a unit vector via FastEmbed.
    4. Geometry: compute divergence D and update the sliding window.
    5. Stamp ``semantic_divergence`` onto the report and return.

    Parameters
    ----------
    text : str
        Raw interaction text (user message, assistant turn, etc.).
    is_system : bool
        True when processing the system prompt. Combined with
        ``NLPConfig.analyze_system_prompt``, controls whether the
        system prompt is gated or fully analysed.
    provenance : InteractionProvenance | None
        Optional structured provenance metadata supplied by the caller.
        The pipeline copies it onto the returned report without adding
        adapter-specific heuristics.

    Returns
    -------
    AnalysisReport
        Fully enriched report with ID, |S|, E, D, latency_ms, and
        raw_metadata populated.

Parameters:

Name Type Description Default
text str

See the function signature and surrounding type hints.

required
is_system bool

See the function signature and surrounding type hints.

False
provenance InteractionProvenance | None

See the function signature and surrounding type hints.

None
Source code in dmf/runtime/pipeline.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def analyze_interaction(
    self,
    text: str,
    is_system: bool = False,
    provenance: InteractionProvenance | None = None,
) -> AnalysisReport:
    """Process *text* through the full pipeline and return an enriched report.

            Steps
            -----
            1. NLP extraction (always runs, except for gated system prompts).
            2. Early return if the interaction was skipped by NLPEngine gating:
               ``semantic_divergence`` remains at its default of 0.0.
            3. Embedding: convert text to a unit vector via FastEmbed.
            4. Geometry: compute divergence D and update the sliding window.
            5. Stamp ``semantic_divergence`` onto the report and return.

            Parameters
            ----------
            text : str
                Raw interaction text (user message, assistant turn, etc.).
            is_system : bool
                True when processing the system prompt. Combined with
                ``NLPConfig.analyze_system_prompt``, controls whether the
                system prompt is gated or fully analysed.
            provenance : InteractionProvenance | None
                Optional structured provenance metadata supplied by the caller.
                The pipeline copies it onto the returned report without adding
                adapter-specific heuristics.

            Returns
            -------
            AnalysisReport
                Fully enriched report with ID, |S|, E, D, latency_ms, and
                raw_metadata populated.

    Args:
        text: See the function signature and surrounding type hints.
        is_system: See the function signature and surrounding type hints.
        provenance: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    report, _ = self.analyze_interaction_with_vector(
        text,
        is_system=is_system,
        provenance=provenance,
    )
    return report

analyze_interaction_with_vector(text, is_system=False, provenance=None)

Process text and also return the canonical embedding used.

    This is the public boundary for callers that need both the enriched
    ``AnalysisReport`` and the exact vector stamped into the session
    geometry. It avoids re-embedding the same text out-of-band.

    Returns
    -------
    tuple[AnalysisReport, np.ndarray | None]
        ``(report, vector)`` for normal turns; ``vector`` is ``None``
        when the interaction was skipped by system-prompt gating.

Parameters:

Name Type Description Default
text str

See the function signature and surrounding type hints.

required
is_system bool

See the function signature and surrounding type hints.

False
provenance InteractionProvenance | None

See the function signature and surrounding type hints.

None
Source code in dmf/runtime/pipeline.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def analyze_interaction_with_vector(
    self,
    text: str,
    is_system: bool = False,
    provenance: InteractionProvenance | None = None,
) -> tuple[AnalysisReport, np.ndarray | None]:
    """Process *text* and also return the canonical embedding used.

            This is the public boundary for callers that need both the enriched
            ``AnalysisReport`` and the exact vector stamped into the session
            geometry. It avoids re-embedding the same text out-of-band.

            Returns
            -------
            tuple[AnalysisReport, np.ndarray | None]
                ``(report, vector)`` for normal turns; ``vector`` is ``None``
                when the interaction was skipped by system-prompt gating.

    Args:
        text: See the function signature and surrounding type hints.
        is_system: See the function signature and surrounding type hints.
        provenance: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    report = self._nlp_engine.analyze_interaction(text, is_system=is_system)

    if report.raw_metadata.get("skipped"):
        return (
            dataclasses.replace(
                report,
                provenance=provenance or InteractionProvenance(),
            ),
            None,
        )

    vector = self._embedding_engine.get_embedding(text)
    divergence = self._interaction_matrix.add_vector(vector)

    return (
        dataclasses.replace(
        report,
        semantic_divergence=divergence,
        provenance=provenance or InteractionProvenance(),
        ),
        vector,
    )

from_dmf_config(config, *, analyze_system_prompt=None) classmethod

Construct an InteractionPipeline from the universal DMFConfig.

    Parameters
    ----------
    config : DMFConfig
        Fully populated config object returned by ``load_dmf_config()``.
    analyze_system_prompt : bool | None
        Optional override for ``NLPConfig.analyze_system_prompt``.
        When ``None``, the ``NLPConfig`` default is preserved.

Parameters:

Name Type Description Default
config DMFConfig

See the function signature and surrounding type hints.

required
analyze_system_prompt bool | None

See the function signature and surrounding type hints.

None

Returns:

Type Description
InteractionPipeline

See the return type annotation.

Source code in dmf/runtime/pipeline.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@classmethod
def from_dmf_config(
    cls,
    config: DMFConfig,
    *,
    analyze_system_prompt: bool | None = None,
) -> InteractionPipeline:
    """Construct an ``InteractionPipeline`` from the universal ``DMFConfig``.

            Parameters
            ----------
            config : DMFConfig
                Fully populated config object returned by ``load_dmf_config()``.
            analyze_system_prompt : bool | None
                Optional override for ``NLPConfig.analyze_system_prompt``.
                When ``None``, the ``NLPConfig`` default is preserved.

    Args:
        config: See the function signature and surrounding type hints.
        analyze_system_prompt: See the function signature and surrounding type hints.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    nlp_kwargs: dict[str, object] = {"spacy_model": config.nlp.spacy_model}
    if analyze_system_prompt is not None:
        nlp_kwargs["analyze_system_prompt"] = analyze_system_prompt

    nlp_cfg = NLPConfig(**nlp_kwargs)
    vector_cfg = VectorConfig(
        model_name=config.nlp.model_name,
        vector_dim=config.nlp.vector_dim,
        window_size=config.capacity.window_size,
    )
    return cls(
        nlp_config=nlp_cfg,
        vector_config=vector_cfg,
    )

NLPEngine

Deterministic NLP feature extractor for DMF interactions.

Loads a spaCy model and VADER once at construction time and reuses
them across all calls, avoiding per-interaction model loading.

Attributes are prefixed with _ to signal they are implementation
details; callers interact only through analyze_interaction().

Parameters:

Name Type Description Default
config NLPConfig

See the function signature and surrounding type hints.

required

Returns:

Type Description

Instance of this class.

Source code in dmf/analysis/nlp_engine.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class NLPEngine:
    """Deterministic NLP feature extractor for DMF interactions.

        Loads a spaCy model and VADER once at construction time and reuses
        them across all calls, avoiding per-interaction model loading.

        Attributes are prefixed with _ to signal they are implementation
        details; callers interact only through analyze_interaction().

    Args:
        config: See the function signature and surrounding type hints.

    Returns:
        Instance of this class.

    Raises:
        None.
    """

    def __init__(self, config: NLPConfig) -> None:
        """Initialise the NLP Engine with the provided configuration.

        Parameters
        ----------
        config : NLPConfig
            Immutable configuration object. spacy_model and
            analyze_system_prompt are read once here and never changed.

        Raises
        ------
        OSError
            If the spaCy model named in config.spacy_model is not installed.
            Run: python -m spacy download <model_name>
        """
        self._config: NLPConfig = config
        self._nlp_model: Language = spacy.load(config.spacy_model)
        self._sentiment_analyzer: SentimentIntensityAnalyzer = SentimentIntensityAnalyzer()
        self._latency_timer: ExecutionLatencyTimer = ExecutionLatencyTimer()
        self._signal_adapter = EnglishSignalAdapter(self._nlp_model)


    def analyze_interaction(
        self,
        text: str,
        is_system: bool = False,
    ) -> AnalysisReport:
        """Extract NLP features from a single interaction text.

                When is_system=True and config.analyze_system_prompt=False, the
                system prompt is intentionally skipped and neutral weights (0.0)
                are returned. This prevents the system prompt's style (e.g.
                "be assertive") from polluting content-based scoring.

                Parameters
                ----------
                text : str
                    Raw interaction content to analyse.
                is_system : bool
                    True if this text comes from the system prompt role.

                Returns
                -------
                AnalysisReport
                    Populated with ID, |S|, E, latency_ms, and raw_metadata.

        Args:
            text: See the function signature and surrounding type hints.
            is_system: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        if is_system and not self._config.analyze_system_prompt:
            return self._build_skipped_system_prompt_report()

        with self._latency_timer:
            doc = self._nlp_model(text)
            info_density = self._calculate_information_density_score(doc)
            sentiment_abs = self._calculate_absolute_sentiment_score(text)
            entity_count = self._calculate_entity_count(doc)
            pos_counts = self._extract_pos_counts(doc)
            entity_details = self._extract_entity_details(doc)
            extraction = self._signal_adapter.extract(text, doc)
            extraction_metadata = extraction.metadata_fields()

        return AnalysisReport(
            info_density=info_density,
            sentiment_abs=sentiment_abs,
            entity_count=entity_count,
            is_system_prompt=is_system,
            latency_ms=self._latency_timer.elapsed_ms,
            topic_identity=extraction_metadata["topic_identity"],
            topic_value=extraction_metadata["topic_value"],
            is_query_like=extraction_metadata["is_query_like"],
            is_ack_like=extraction_metadata["is_ack_like"],
            raw_metadata={
                "pos_counts": pos_counts,
                "token_count": len(doc),
                "entities": entity_details,
                "signals": self._signals_to_metadata(extraction.signals),
                **extraction_metadata,
            },
            signals=extraction.signals,
        )


    def _calculate_information_density_score(
        self,
        doc: spacy.tokens.Doc,
    ) -> float:
        """Compute ID as the ratio of semantic tokens to total tokens.

        Returns 0.0 for empty documents to avoid division-by-zero; this
        neutral weight is safe to pass downstream to the Scoring Engine.
        """
        total_token_count = len(doc)
        if total_token_count == 0:
            return NEUTRAL_INFO_DENSITY

        semantic_token_count = sum(
            1 for token in doc if token.pos_ in SEMANTIC_POS_TAGS
        )
        return semantic_token_count / total_token_count

    def _calculate_absolute_sentiment_score(self, text: str) -> float:
        """Compute |S| as the absolute value of VADER's compound score.

        Returns 0.0 for blank text — VADER returns 0.0 compound for empty
        strings anyway, but the explicit guard documents intent clearly.
        """
        if not text.strip():
            return NEUTRAL_SENTIMENT_ABS

        vader_scores = self._sentiment_analyzer.polarity_scores(text)
        return abs(vader_scores["compound"])

    def _calculate_entity_count(self, doc: spacy.tokens.Doc) -> int:
        """Count the total number of named entities found by spaCy NER."""
        return len(doc.ents)


    def _extract_pos_counts(
        self,
        doc: spacy.tokens.Doc,
    ) -> dict[str, int]:
        """Build a frequency map of universal POS tags for the document.

        Stored in raw_metadata for audit and offline analysis — allows
        reconstruction of the ID calculation without re-running spaCy.
        """
        pos_counts: dict[str, int] = {}
        for token in doc:
            pos_counts[token.pos_] = pos_counts.get(token.pos_, 0) + 1
        return pos_counts

    def _extract_entity_details(
        self,
        doc: spacy.tokens.Doc,
    ) -> list[dict[str, str]]:
        """Extract entity text and label for every NER span in the document.

        Each entry is {"text": <surface form>, "label": <NER type>}.
        Stored in raw_metadata so the entity list is auditable without
        re-running the spaCy pipeline.
        """
        return [
            {"text": entity.text, "label": entity.label_}
            for entity in doc.ents
        ]

    def _signals_to_metadata(self, signals: InteractionSignals) -> dict[str, object]:
        """Return a JSON-friendly metadata snapshot of the interaction signals."""
        return {
            "is_current_state": signals.is_current_state,
            "is_past_state": signals.is_past_state,
            "is_preference": signals.is_preference,
            "is_constraint": signals.is_constraint,
            "is_correction": signals.is_correction,
            "has_negation": signals.has_negation,
            "has_replacement": signals.has_replacement,
            "operational_weight": signals.operational_weight,
            "personal_relevance": signals.personal_relevance,
            "quantitative_relevance": signals.quantitative_relevance,
            "task_relevance": signals.task_relevance,
            "temporal_markers": signals.temporal_markers,
            "cue_phrases": signals.cue_phrases,
        }


    def _build_skipped_system_prompt_report(self) -> AnalysisReport:
        """Return a zero-weight report for system prompts that are not analysed.

        Using explicit neutral constants keeps the Scoring Engine's formula
        numerically stable — multiplying by 0.0 ID/sentiment won't produce
        NaN or unexpectedly high scores.
        """
        empty_signals = InteractionSignals()
        return AnalysisReport(
            info_density=NEUTRAL_INFO_DENSITY,
            sentiment_abs=NEUTRAL_SENTIMENT_ABS,
            entity_count=NEUTRAL_ENTITY_COUNT,
            is_system_prompt=True,
            latency_ms=0.0,
            signals=empty_signals,
            topic_identity=None,
            topic_value=None,
            is_query_like=False,
            is_ack_like=False,
            raw_metadata={
                "skipped": True,
                "reason": "analyze_system_prompt=False in NLPConfig",
                "signals": self._signals_to_metadata(empty_signals),
                "signal_evidence": [],
                "topic_identity": None,
                "topic_value": None,
                "is_query_like": False,
                "is_ack_like": False,
            },
        )

analyze_interaction(text, is_system=False)

Extract NLP features from a single interaction text.

    When is_system=True and config.analyze_system_prompt=False, the
    system prompt is intentionally skipped and neutral weights (0.0)
    are returned. This prevents the system prompt's style (e.g.
    "be assertive") from polluting content-based scoring.

    Parameters
    ----------
    text : str
        Raw interaction content to analyse.
    is_system : bool
        True if this text comes from the system prompt role.

    Returns
    -------
    AnalysisReport
        Populated with ID, |S|, E, latency_ms, and raw_metadata.

Parameters:

Name Type Description Default
text str

See the function signature and surrounding type hints.

required
is_system bool

See the function signature and surrounding type hints.

False
Source code in dmf/analysis/nlp_engine.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def analyze_interaction(
    self,
    text: str,
    is_system: bool = False,
) -> AnalysisReport:
    """Extract NLP features from a single interaction text.

            When is_system=True and config.analyze_system_prompt=False, the
            system prompt is intentionally skipped and neutral weights (0.0)
            are returned. This prevents the system prompt's style (e.g.
            "be assertive") from polluting content-based scoring.

            Parameters
            ----------
            text : str
                Raw interaction content to analyse.
            is_system : bool
                True if this text comes from the system prompt role.

            Returns
            -------
            AnalysisReport
                Populated with ID, |S|, E, latency_ms, and raw_metadata.

    Args:
        text: See the function signature and surrounding type hints.
        is_system: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    if is_system and not self._config.analyze_system_prompt:
        return self._build_skipped_system_prompt_report()

    with self._latency_timer:
        doc = self._nlp_model(text)
        info_density = self._calculate_information_density_score(doc)
        sentiment_abs = self._calculate_absolute_sentiment_score(text)
        entity_count = self._calculate_entity_count(doc)
        pos_counts = self._extract_pos_counts(doc)
        entity_details = self._extract_entity_details(doc)
        extraction = self._signal_adapter.extract(text, doc)
        extraction_metadata = extraction.metadata_fields()

    return AnalysisReport(
        info_density=info_density,
        sentiment_abs=sentiment_abs,
        entity_count=entity_count,
        is_system_prompt=is_system,
        latency_ms=self._latency_timer.elapsed_ms,
        topic_identity=extraction_metadata["topic_identity"],
        topic_value=extraction_metadata["topic_value"],
        is_query_like=extraction_metadata["is_query_like"],
        is_ack_like=extraction_metadata["is_ack_like"],
        raw_metadata={
            "pos_counts": pos_counts,
            "token_count": len(doc),
            "entities": entity_details,
            "signals": self._signals_to_metadata(extraction.signals),
            **extraction_metadata,
        },
        signals=extraction.signals,
    )

ScoringEngine

Computes the Survival Score Ω for a single AnalysisReport.

Constructed once with an immutable ScoringConfig and reused across
all interactions in a session.

Attributes
----------
_config : ScoringConfig
    Immutable weight and sigmoid parameters. Read once at construction
    time and never mutated.

Parameters:

Name Type Description Default
config ScoringConfig

See the function signature and surrounding type hints.

required

Returns:

Type Description

Instance of this class.

Source code in dmf/analysis/scoring_engine.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class ScoringEngine:
    """Computes the Survival Score Ω for a single AnalysisReport.

        Constructed once with an immutable ScoringConfig and reused across
        all interactions in a session.

        Attributes
        ----------
        _config : ScoringConfig
            Immutable weight and sigmoid parameters. Read once at construction
            time and never mutated.

    Args:
        config: See the function signature and surrounding type hints.

    Returns:
        Instance of this class.

    Raises:
        None.
    """

    def __init__(self, config: ScoringConfig) -> None:
        """Initialise the Scoring Engine with the given configuration.

        Parameters
        ----------
        config : ScoringConfig
            Frozen dataclass carrying α, β, γ, δ, x₀, E_cap, and
            social floor parameters.
        """
        self._config: ScoringConfig = config


    @classmethod
    def from_dmf_config(cls, config: DMFConfig) -> ScoringEngine:
        """Construct a ``ScoringEngine`` from a universal ``DMFConfig``.

                Translates the human-readable TOML field names (``alpha_density``,
                ``beta_entities``, …) to the internal ``ScoringConfig`` Greek-letter
                fields, preserving the existing constructor signature and all tests.

                Translation map
                ---------------
                ======================== ======================== ===================
                TOML key                 DMFConfig path           ScoringConfig field
                ======================== ======================== ===================
                scoring_weights.alpha_density    scoring.alpha_density    alpha
                scoring_weights.beta_entities    scoring.beta_entities    gamma
                scoring_weights.gamma_sentiment  scoring.gamma_sentiment  beta
                scoring_weights.delta_technical  scoring.delta_technical  delta
                scoring_weights.sigmoid_midpoint scoring.sigmoid_midpoint x0
                scoring_weights.entity_cap       scoring.entity_cap       e_cap
                scoring_weights.social_threshold scoring.social_threshold social_threshold
                scoring_weights.min_social_score scoring.min_social_score min_social_score
                ======================== ======================== ===================

                Note: ``beta_entities`` maps to ``ScoringConfig.gamma`` (entity
                weight) and ``gamma_sentiment`` maps to ``ScoringConfig.beta``
                (sentiment weight).  The TOML uses signal-descriptive names;
                the internal config uses Greek letters matching the formula.

                Parameters
                ----------
                config : DMFConfig
                    Fully populated config object returned by ``load_dmf_config()``.

                Returns
                -------
                ScoringEngine
                    Fully initialised instance.

        Args:
            config: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        scoring_cfg = ScoringConfig(
            alpha=config.scoring.alpha_density,
            beta=config.scoring.gamma_sentiment,
            gamma=config.scoring.beta_entities,
            delta=config.scoring.delta_technical,
            x0=config.scoring.sigmoid_midpoint,
            e_cap=config.scoring.entity_cap,
            social_threshold=config.scoring.social_threshold,
            min_social_score=config.scoring.min_social_score,
            critical_threshold=config.tiers.critical_max,
            healthy_threshold=config.tiers.healthy_min,
            lambda_operational=config.scoring.lambda_operational,
            eta_constraint=config.scoring.eta_constraint,
            eta_preference=config.scoring.eta_preference,
            eta_current_state=config.scoring.eta_current_state,
            eta_correction=config.scoring.eta_correction,
            eta_replacement=config.scoring.eta_replacement,
            eta_past_state=config.scoring.eta_past_state,
            user_correction_boost=config.scoring.user_correction_boost,
            preference_update_boost=config.scoring.preference_update_boost,
            constraint_boost=config.scoring.constraint_boost,
            corrected_by_user_penalty=config.scoring.corrected_by_user_penalty,
        )
        return cls(config=scoring_cfg)


    def calculate_score(
        self,
        report: AnalysisReport,
        text: str = "",
    ) -> float:
        """Compute the Survival Score Ω for *report*.

                Steps
                -----
                1. Normalise entity count: E_norm = min(E, E_cap) / E_cap.
                2. Compute weighted content logit.
                3. Compute weighted operational logit from conversational signals.
                4. Compute provenance logit from structured provenance metadata.
                5. Apply a single shifted sigmoid to the total logit.
                6. Social Floor: if Ω < social_threshold and *text* is a social
                   cue, boost Ω to at least min_social_score.
                7. Round to 4 decimal places.
                8. Stamp *report* in-place: set ``report.survival_score = Ω`` and
                   classify ``report.status`` using runtime thresholds so the report
                   always carries its own authoritative score and tier.

                Parameters
                ----------
                report : AnalysisReport
                    Fully populated report carrying ID, |S|, E, and D.
                text : str
                    Original interaction text. Required for Social Floor
                    detection (keyword matching). When empty the Social Floor
                    is skipped, preserving backward compatibility.

                Returns
                -------
                float
                    Survival Score Ω ∈ (0, 1), rounded to 4 decimal places.

        Args:
            report: See the function signature and surrounding type hints.
            text: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        cfg = self._config

        entity_norm = min(report.entity_count, cfg.e_cap) / cfg.e_cap

        z_content = _calculate_content_logit(report, entity_norm, cfg)

        z_operational = _calculate_operational_logit(report, cfg)

        z_provenance = _calculate_provenance_logit(report, cfg)

        omega = _sigmoid((z_content + z_operational + z_provenance) - cfg.x0)

        if text and omega < cfg.social_threshold and _is_social_cue(text):
            omega = max(omega, cfg.min_social_score)

        omega = round(omega, 4)

        report.survival_score = omega
        report.status = classify_survival_status(
            omega=omega,
            critical_threshold=cfg.critical_threshold,
            healthy_threshold=cfg.healthy_threshold,
        )

        return omega

calculate_score(report, text='')

Compute the Survival Score Ω for report.

    Steps
    -----
    1. Normalise entity count: E_norm = min(E, E_cap) / E_cap.
    2. Compute weighted content logit.
    3. Compute weighted operational logit from conversational signals.
    4. Compute provenance logit from structured provenance metadata.
    5. Apply a single shifted sigmoid to the total logit.
    6. Social Floor: if Ω < social_threshold and *text* is a social
       cue, boost Ω to at least min_social_score.
    7. Round to 4 decimal places.
    8. Stamp *report* in-place: set ``report.survival_score = Ω`` and
       classify ``report.status`` using runtime thresholds so the report
       always carries its own authoritative score and tier.

    Parameters
    ----------
    report : AnalysisReport
        Fully populated report carrying ID, |S|, E, and D.
    text : str
        Original interaction text. Required for Social Floor
        detection (keyword matching). When empty the Social Floor
        is skipped, preserving backward compatibility.

    Returns
    -------
    float
        Survival Score Ω ∈ (0, 1), rounded to 4 decimal places.

Parameters:

Name Type Description Default
report AnalysisReport

See the function signature and surrounding type hints.

required
text str

See the function signature and surrounding type hints.

''
Source code in dmf/analysis/scoring_engine.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def calculate_score(
    self,
    report: AnalysisReport,
    text: str = "",
) -> float:
    """Compute the Survival Score Ω for *report*.

            Steps
            -----
            1. Normalise entity count: E_norm = min(E, E_cap) / E_cap.
            2. Compute weighted content logit.
            3. Compute weighted operational logit from conversational signals.
            4. Compute provenance logit from structured provenance metadata.
            5. Apply a single shifted sigmoid to the total logit.
            6. Social Floor: if Ω < social_threshold and *text* is a social
               cue, boost Ω to at least min_social_score.
            7. Round to 4 decimal places.
            8. Stamp *report* in-place: set ``report.survival_score = Ω`` and
               classify ``report.status`` using runtime thresholds so the report
               always carries its own authoritative score and tier.

            Parameters
            ----------
            report : AnalysisReport
                Fully populated report carrying ID, |S|, E, and D.
            text : str
                Original interaction text. Required for Social Floor
                detection (keyword matching). When empty the Social Floor
                is skipped, preserving backward compatibility.

            Returns
            -------
            float
                Survival Score Ω ∈ (0, 1), rounded to 4 decimal places.

    Args:
        report: See the function signature and surrounding type hints.
        text: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    cfg = self._config

    entity_norm = min(report.entity_count, cfg.e_cap) / cfg.e_cap

    z_content = _calculate_content_logit(report, entity_norm, cfg)

    z_operational = _calculate_operational_logit(report, cfg)

    z_provenance = _calculate_provenance_logit(report, cfg)

    omega = _sigmoid((z_content + z_operational + z_provenance) - cfg.x0)

    if text and omega < cfg.social_threshold and _is_social_cue(text):
        omega = max(omega, cfg.min_social_score)

    omega = round(omega, 4)

    report.survival_score = omega
    report.status = classify_survival_status(
        omega=omega,
        critical_threshold=cfg.critical_threshold,
        healthy_threshold=cfg.healthy_threshold,
    )

    return omega

from_dmf_config(config) classmethod

Construct a ScoringEngine from a universal DMFConfig.

    Translates the human-readable TOML field names (``alpha_density``,
    ``beta_entities``, …) to the internal ``ScoringConfig`` Greek-letter
    fields, preserving the existing constructor signature and all tests.

    Translation map
    ---------------
    ======================== ======================== ===================
    TOML key                 DMFConfig path           ScoringConfig field
    ======================== ======================== ===================
    scoring_weights.alpha_density    scoring.alpha_density    alpha
    scoring_weights.beta_entities    scoring.beta_entities    gamma
    scoring_weights.gamma_sentiment  scoring.gamma_sentiment  beta
    scoring_weights.delta_technical  scoring.delta_technical  delta
    scoring_weights.sigmoid_midpoint scoring.sigmoid_midpoint x0
    scoring_weights.entity_cap       scoring.entity_cap       e_cap
    scoring_weights.social_threshold scoring.social_threshold social_threshold
    scoring_weights.min_social_score scoring.min_social_score min_social_score
    ======================== ======================== ===================

    Note: ``beta_entities`` maps to ``ScoringConfig.gamma`` (entity
    weight) and ``gamma_sentiment`` maps to ``ScoringConfig.beta``
    (sentiment weight).  The TOML uses signal-descriptive names;
    the internal config uses Greek letters matching the formula.

    Parameters
    ----------
    config : DMFConfig
        Fully populated config object returned by ``load_dmf_config()``.

    Returns
    -------
    ScoringEngine
        Fully initialised instance.

Parameters:

Name Type Description Default
config DMFConfig

See the function signature and surrounding type hints.

required
Source code in dmf/analysis/scoring_engine.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
@classmethod
def from_dmf_config(cls, config: DMFConfig) -> ScoringEngine:
    """Construct a ``ScoringEngine`` from a universal ``DMFConfig``.

            Translates the human-readable TOML field names (``alpha_density``,
            ``beta_entities``, …) to the internal ``ScoringConfig`` Greek-letter
            fields, preserving the existing constructor signature and all tests.

            Translation map
            ---------------
            ======================== ======================== ===================
            TOML key                 DMFConfig path           ScoringConfig field
            ======================== ======================== ===================
            scoring_weights.alpha_density    scoring.alpha_density    alpha
            scoring_weights.beta_entities    scoring.beta_entities    gamma
            scoring_weights.gamma_sentiment  scoring.gamma_sentiment  beta
            scoring_weights.delta_technical  scoring.delta_technical  delta
            scoring_weights.sigmoid_midpoint scoring.sigmoid_midpoint x0
            scoring_weights.entity_cap       scoring.entity_cap       e_cap
            scoring_weights.social_threshold scoring.social_threshold social_threshold
            scoring_weights.min_social_score scoring.min_social_score min_social_score
            ======================== ======================== ===================

            Note: ``beta_entities`` maps to ``ScoringConfig.gamma`` (entity
            weight) and ``gamma_sentiment`` maps to ``ScoringConfig.beta``
            (sentiment weight).  The TOML uses signal-descriptive names;
            the internal config uses Greek letters matching the formula.

            Parameters
            ----------
            config : DMFConfig
                Fully populated config object returned by ``load_dmf_config()``.

            Returns
            -------
            ScoringEngine
                Fully initialised instance.

    Args:
        config: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    scoring_cfg = ScoringConfig(
        alpha=config.scoring.alpha_density,
        beta=config.scoring.gamma_sentiment,
        gamma=config.scoring.beta_entities,
        delta=config.scoring.delta_technical,
        x0=config.scoring.sigmoid_midpoint,
        e_cap=config.scoring.entity_cap,
        social_threshold=config.scoring.social_threshold,
        min_social_score=config.scoring.min_social_score,
        critical_threshold=config.tiers.critical_max,
        healthy_threshold=config.tiers.healthy_min,
        lambda_operational=config.scoring.lambda_operational,
        eta_constraint=config.scoring.eta_constraint,
        eta_preference=config.scoring.eta_preference,
        eta_current_state=config.scoring.eta_current_state,
        eta_correction=config.scoring.eta_correction,
        eta_replacement=config.scoring.eta_replacement,
        eta_past_state=config.scoring.eta_past_state,
        user_correction_boost=config.scoring.user_correction_boost,
        preference_update_boost=config.scoring.preference_update_boost,
        constraint_boost=config.scoring.constraint_boost,
        corrected_by_user_penalty=config.scoring.corrected_by_user_penalty,
    )
    return cls(config=scoring_cfg)

TemporalMemory

Active context window manager with temporal decay and eviction.

Parameters
----------
decay_config : DecayConfig | None
    Immutable decay/budget parameters.  Defaults to ``DecayConfig()``.
vector_config : VectorConfig | None
    Immutable geometry parameters.  Defaults to ``VectorConfig()``.
ltm_hook : LTMHook | None
    Archive sink for evicted entries.  Defaults to ``NullLTMHook()``
    (silent discard).  Pass a recording or concrete LTM hook for
    testing or production use.

Attributes
----------
queue : deque[MemoryEntry]
    Working-memory entries ordered oldest → newest.  Length is
    unbounded; ``token_budget`` is enforced by ``prune_to_budget``.
matrix : InteractionMatrix
    Sliding-window centroid tracker.  Receives the same vectors as
    the queue so divergence calculations reflect recent context.
config : DecayConfig
    Immutable decay and budget configuration.
No local summary is maintained. Historical context is recovered only
through raw-LTM recall.

Parameters:

Name Type Description Default
decay_config DecayConfig | None

See the function signature and surrounding type hints.

None
pruning_priority_config PruningPriorityConfig | None

See the function signature and surrounding type hints.

None
vector_config VectorConfig | None

See the function signature and surrounding type hints.

None
ltm_hook LTMHook | None

See the function signature and surrounding type hints.

None
nlp_engine NLPEngine | None

See the function signature and surrounding type hints.

None

Returns:

Type Description

Instance of this class.

Raises:

Type Description
TypeError

Raised by class validation or initialization.

ValueError

Raised by class validation or initialization.

Source code in dmf/memory/temporal_memory.py
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
class TemporalMemory:
    """Active context window manager with temporal decay and eviction.

        Parameters
        ----------
        decay_config : DecayConfig | None
            Immutable decay/budget parameters.  Defaults to ``DecayConfig()``.
        vector_config : VectorConfig | None
            Immutable geometry parameters.  Defaults to ``VectorConfig()``.
        ltm_hook : LTMHook | None
            Archive sink for evicted entries.  Defaults to ``NullLTMHook()``
            (silent discard).  Pass a recording or concrete LTM hook for
            testing or production use.

        Attributes
        ----------
        queue : deque[MemoryEntry]
            Working-memory entries ordered oldest → newest.  Length is
            unbounded; ``token_budget`` is enforced by ``prune_to_budget``.
        matrix : InteractionMatrix
            Sliding-window centroid tracker.  Receives the same vectors as
            the queue so divergence calculations reflect recent context.
        config : DecayConfig
            Immutable decay and budget configuration.
        No local summary is maintained. Historical context is recovered only
        through raw-LTM recall.

    Args:
        decay_config: See the function signature and surrounding type hints.
        pruning_priority_config: See the function signature and surrounding type hints.
        vector_config: See the function signature and surrounding type hints.
        ltm_hook: See the function signature and surrounding type hints.
        nlp_engine: See the function signature and surrounding type hints.

    Returns:
        Instance of this class.

    Raises:
        TypeError: Raised by class validation or initialization.
        ValueError: Raised by class validation or initialization.
    """

    def __init__(
        self,
        decay_config: DecayConfig | None = None,
        pruning_priority_config: PruningPriorityConfig | None = None,
        vector_config: VectorConfig | None = None,
        ltm_hook: LTMHook | None = None,
        nlp_engine: NLPEngine | None = None,
    ) -> None:
        self.config: DecayConfig = decay_config or DecayConfig()
        self._pruning_priority_config: PruningPriorityConfig = (
            pruning_priority_config or PruningPriorityConfig()
        )
        self._vector_config: VectorConfig = vector_config or VectorConfig()
        self._ltm_hook: LTMHook = ltm_hook or NullLTMHook()
        self._nlp_engine = nlp_engine

        self.queue: deque[MemoryEntry] = deque()

        self.matrix: InteractionMatrix = InteractionMatrix(self._vector_config)

        self._next_id: int = 0

        self._turn_counter: int = 0

        self._context_metrics: dict[str, int] = {
            "correction_miss_count": 0,
            "conflict_override_count": 0,
            "model_derived_memory_count": 0,
            "context_supported_memory_count": 0,
            "invalidated_memory_count": 0,
            "suppressed_conflict_count": 0,
            "superseded_memory_count": 0,
        }
        self._recall_diagnostics: dict[str, Any] = self._empty_recall_diagnostics()

        self._tokenizer: tiktoken.Encoding = _TOKENIZER


    @classmethod
    def from_dmf_config(
        cls,
        config: DMFConfig,
        ltm_hook: LTMHook | None = None,
        nlp_engine: NLPEngine | None = None,
    ) -> TemporalMemory:
        """Construct a ``TemporalMemory`` from a universal ``DMFConfig``.

                This factory bridges the TOML-loaded ``DMFConfig`` to
                the typed internal dataclasses (``DecayConfig``, ``VectorConfig``)
                without changing the existing constructor signature or breaking any
                existing test.

                Translation map
                ---------------
                ====================== ======================== =====================
                TOML key               DMFConfig path           DecayConfig / VectorConfig field
                ====================== ======================== =====================
                temporal_decay.lambda_base         decay.lambda_base          lambda_decay
                temporal_decay.inertia_strength    decay.inertia_strength     inertia_strength
                temporal_decay.hard_kill_threshold decay.hard_kill_threshold  hard_kill_threshold
                memory_tiers.critical_max          tiers.critical_max         critical_threshold
                memory_tiers.healthy_min           tiers.healthy_min          healthy_threshold
                capacity.token_budget              capacity.token_budget      token_budget
                capacity.pruning_frequency_x       capacity.pruning_frequency_x pruning_frequency
                pruning_priority.rho_*             pruning_priority.*         PruningPriorityConfig
                nlp.vector_dim                     nlp.vector_dim             VectorConfig.vector_dim
                capacity.window_size               capacity.window_size       VectorConfig.window_size
                ltm.enabled + storage_type="file"    ltm.*                    → FileLTMHook(storage_path)
                ltm.enabled + storage_type="chroma"  ltm.*                    → ChromaLTMHook(...)
                ltm.enabled + storage_type="null"    ltm.*                    → NullLTMHook
                ltm.enabled=false                     ltm.*                    → NullLTMHook
                ====================== ======================== =====================

                Parameters
                ----------
                config : DMFConfig
                    Fully populated config object returned by ``load_dmf_config()``.
                ltm_hook : LTMHook | None
                    Archive sink for evicted entries. Defaults to ``NullLTMHook()``.
                nlp_engine : NLPEngine | None
                    Optional NLP engine reused for recall-time contextualization of
                    raw LTM hits.
                Returns
                -------
                TemporalMemory
                    Fully initialised instance.

        Args:
            config: See the function signature and surrounding type hints.
            ltm_hook: See the function signature and surrounding type hints.
            nlp_engine: See the function signature and surrounding type hints.

        Raises:
            ValueError: Raised when validation fails or an invariant is violated.
        """
        decay_cfg = DecayConfig(
            lambda_decay=config.decay.lambda_base,
            inertia_strength=config.decay.inertia_strength,
            hard_kill_threshold=config.decay.hard_kill_threshold,
            token_budget=config.capacity.token_budget,
            pruning_frequency=config.capacity.pruning_frequency_x,
            critical_threshold=config.tiers.critical_max,
            healthy_threshold=config.tiers.healthy_min,
            ltm_recall_limit=config.ltm.recall_limit,
            ltm_threshold=config.ltm.distance_threshold,
        )
        pruning_cfg = PruningPriorityConfig(
            rho_constraint=config.pruning_priority.rho_constraint,
            rho_preference=config.pruning_priority.rho_preference,
            rho_current_state=config.pruning_priority.rho_current_state,
            rho_correction=config.pruning_priority.rho_correction,
            rho_replacement=config.pruning_priority.rho_replacement,
            superseded_past_penalty=config.pruning_priority.superseded_past_penalty,
        )
        vector_cfg = VectorConfig(
            model_name=config.nlp.model_name,
            vector_dim=config.nlp.vector_dim,
            window_size=config.capacity.window_size,
        )

        # Resolve LTM hook:
        #   1. Explicit injection always wins (testing / custom backends).
        #   2. ltm.enabled=false                    → NullLTMHook.
        #   3. ltm.enabled + storage_type="file"   → FileLTMHook.
        #   4. ltm.enabled + storage_type="chroma" → ChromaLTMHook.
        #   5. ltm.enabled + storage_type="null"   → NullLTMHook.
        # The import is deferred to this call site to avoid any future
        # circular-import risk if config_loader grows additional imports.
        resolved_hook: LTMHook
        if ltm_hook is not None:
            resolved_hook = ltm_hook
        elif not config.ltm.enabled:
            resolved_hook = NullLTMHook()
        elif config.ltm.storage_type == LTM_BACKEND_FILE:
            from dmf.memory.ltm_engine import FileLTMHook  # deferred import
            resolved_hook = FileLTMHook(
                config.ltm.storage_path,
                cards_enabled=config.ltm.cards_enabled,
                cards_path=config.ltm.cards_path,
            )
        elif config.ltm.storage_type == LTM_BACKEND_CHROMA:
            from dmf.memory.chroma_ltm import ChromaLTMHook  # deferred import
            resolved_hook = ChromaLTMHook(
                collection_name=config.ltm.collection_name,
                persist_directory=config.ltm.chroma_path,
                distance_threshold=config.ltm.distance_threshold,
                cards_enabled=config.ltm.cards_enabled,
                cards_path=config.ltm.cards_path,
                cards_collection_name=config.ltm.cards_collection_name,
            )
        elif config.ltm.storage_type == LTM_BACKEND_NULL:
            resolved_hook = NullLTMHook()
        else:
            raise ValueError(
                "Unsupported ltm.storage_type at runtime: "
                f"{config.ltm.storage_type!r}"
            )

        return cls(
            decay_config=decay_cfg,
            pruning_priority_config=pruning_cfg,
            vector_config=vector_cfg,
            ltm_hook=resolved_hook,
            nlp_engine=nlp_engine,
        )


    def add_interaction(
        self,
        text: str,
        report: AnalysisReport,
        vector: np.ndarray,
    ) -> MemoryEntry:
        """Insert a new interaction and enforce cleanup policies.

                Execution order
                ---------------
                1. Tokenise ``text`` via tiktoken (cached on entry).
                2. Assign monotonic ``interaction_id``, increment ``_next_id``.
                3. Update ``InteractionMatrix`` with the new vector.
                4. Construct ``MemoryEntry``, append to queue.
                5. Increment turn counter.
                6. If turn counter is a multiple of ``pruning_frequency``:
                   run ``periodic_cleanup`` (hard-kill sweep).
                7. If ``get_total_tokens() > token_budget``:
                   run ``prune_to_budget`` (pressure eviction).

                The returned entry is guaranteed to be in the queue at the time
                of return.  In extreme cases where the entry's own Ω is very low
                and a cascading eviction empties all other candidates, the entry
                may theoretically be evicted in the same call — this is correct
                behaviour and is guarded against in the test suite.

                Returns
                -------
                MemoryEntry
                    The newly created entry.

        Args:
            text: See the function signature and surrounding type hints.
            report: See the function signature and surrounding type hints.
            vector: See the function signature and surrounding type hints.

        Raises:
            None.
        """
        token_count: int = len(self._tokenizer.encode(text))

        interaction_id = self._next_id
        self._next_id += 1

        self.matrix.add_vector(vector)

        entry = MemoryEntry(
            interaction_id=interaction_id,
            text=text,
            report=report,
            vector=vector,
            token_count=token_count,
            timestamp=time.time(),
        )
        self.queue.append(entry)

        self._turn_counter += 1

        if (
            self.config.pruning_frequency > 0
            and self._turn_counter % self.config.pruning_frequency == 0
        ):
            self.periodic_cleanup()

        if self.get_total_tokens() > self.config.token_budget:
            self.prune_to_budget()

        return entry

    def get_effective_state(self) -> list[dict[str, Any]]:
        """Return the decay-adjusted state of every entry in the queue.

                For each entry computes:

                    Δn      = (_next_id − 1) − entry.interaction_id
                    Ω_eff   = calculate_effective_score(Ω, Δn, λ, η)

                The most recently added entry always has Δn = 0 → Ω_eff = Ω.

                Returns
                -------
                list[dict[str, Any]]
                    One dict per entry, ordered oldest → newest.  Keys:
                    ``interaction_id``, ``text``, ``omega``, ``omega_eff``,
                    ``delta_n``, ``token_count``, ``status_effective``,
                    ``timestamp``.

        Raises:
            None.
        """
        if not self.queue:
            return []

        most_recent_id = self._next_id - 1
        result: list[dict[str, Any]] = []
        for entry in self.queue:
            delta_n = most_recent_id - entry.interaction_id
            omega_eff = calculate_effective_score(
                omega=entry.omega,
                delta_n=delta_n,
                lambda_decay=self.config.lambda_decay,
                inertia_strength=self.config.inertia_strength,
            )
            result.append({
                "interaction_id": entry.interaction_id,
                "text": entry.text,
                "omega": entry.omega,
                "omega_eff": omega_eff,
                "delta_n": delta_n,
                "token_count": entry.token_count,
                "status_effective": classify_survival_status(
                    omega=omega_eff,
                    critical_threshold=self.config.critical_threshold,
                    healthy_threshold=self.config.healthy_threshold,
                ).value,
                "timestamp": entry.timestamp,
            })
        return result

    def get_total_tokens(self) -> int:
        """Return the total token count across all entries in the queue.

                O(N) sum over pre-calculated integers — no tokenisation on hot path.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return sum(entry.token_count for entry in self.queue)

    def get_full_context(
        self,
        query_vector: np.ndarray | None = None,
    ) -> str:
        """Assemble the complete context string for LLM prompt injection.

        Args:
            query_vector: See the function signature and surrounding type hints.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        self._reset_context_metrics()
        self._reset_recall_diagnostics()
        parts: list[str] = []
        active_guard = self._build_active_context_guard(record_metrics=True)
        recalled: list[ContextualizedRecallCandidate] = []

        if query_vector is not None:
            recalled = self.rerank_contextualized_recall_candidates(
                self.contextualize_raw_recall_hits(
                    self.get_raw_recall_hits(
                        query_vector,
                        active_guard=active_guard,
                    ),
                    active_guard=active_guard,
                )
            )
            if recalled:
                parts.append(_RECALL_HEADER)
                parts.extend(self._render_recalled_context_items(recalled))
                parts.append("")

        parts.append(_ACTIVE_HEADER)
        if recalled:
            parts.append(_ACTIVE_PRECEDENCE_NOTE)
        active_lines = self._render_active_context_items(active_guard.active_entries)
        if active_lines:
            if recalled:
                parts.append("")
            parts.extend(active_lines)

        return "\n".join(parts)

    def _render_recalled_context_items(
        self,
        candidates: list[ContextualizedRecallCandidate],
    ) -> list[str]:
        lines: list[str] = []
        for index, candidate in enumerate(candidates, start=1):
            metadata = _context_metadata_parts(
                role=candidate.record.role or candidate.record.provenance.role,
                time_value=candidate.record.created_at,
                turn_value=(
                    candidate.record.provenance.source_turn
                    if candidate.record.provenance.source_turn is not None
                    else candidate.record.interaction_id
                ),
                state=_state_from_report(candidate.report),
                prefer_time_over_turn=True,
            )
            _append_context_item(
                lines,
                _render_context_item_block(
                    label=f"R{index}",
                    metadata=metadata,
                    text=candidate.record.text,
                ),
            )
        return lines

    def _render_active_context_items(
        self,
        entries: list[MemoryEntry],
    ) -> list[str]:
        lines: list[str] = []
        for index, entry in enumerate(entries, start=1):
            metadata = _context_metadata_parts(
                role=entry.provenance.role,
                time_value=entry.timestamp,
                turn_value=(
                    entry.provenance.source_turn
                    if entry.provenance.source_turn is not None
                    else entry.interaction_id
                ),
                state=_state_from_report(entry.report),
            )
            active_time = _format_context_time(entry.timestamp)
            if active_time is not None and not any(
                part.startswith("time=") for part in metadata
            ):
                metadata.append(f"time={active_time}")
            _append_context_item(
                lines,
                _render_context_item_block(
                    label=f"A{index}",
                    metadata=metadata,
                    text=entry.text,
                ),
            )
        return lines

    def _entries_for_active_context(self) -> list[MemoryEntry]:
        """Return active entries that should remain visible in the prompt."""
        return self._build_active_context_guard(record_metrics=True).active_entries

    def get_active_raw_records(self) -> list[RawLTMRecord]:
        """Return visible active entries as raw records for retrieval.

                Structured retrieval can use these records alongside archived LTM
                records. This keeps still-active memories searchable without changing
                the raw-LTM archival contract.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return [
            entry.to_raw_ltm_record()
            for entry in self._build_active_context_guard(record_metrics=False).active_entries
        ]

    def _build_active_context_guard(
        self,
        *,
        record_metrics: bool,
    ) -> _ActiveContextGuard:
        """Resolve active visibility, suppression, and topic winners once."""
        suppression_stats: dict[str, int] = {}
        suppressed_record_ids = self._suppressed_record_ids_from_entries(
            self.queue,
            stats=suppression_stats,
        )
        suppressed_record_ids.update(
            self._topic_superseded_record_ids_from_entries(
                self.queue,
                stats=suppression_stats,
            )
        )
        if record_metrics:
            self._context_metrics["invalidated_memory_count"] += suppression_stats.get("invalidated", 0)
            self._context_metrics["suppressed_conflict_count"] += suppression_stats.get("conflict", 0)
            self._context_metrics["superseded_memory_count"] += suppression_stats.get("superseded", 0)
        active_entries: list[MemoryEntry] = []
        for entry in self.queue:
            if entry.provenance.corrected_by_user:
                if record_metrics:
                    self._context_metrics["correction_miss_count"] += 1
                continue
            if entry.record_id in suppressed_record_ids:
                continue
            if record_metrics:
                if entry.provenance.derived_from_model:
                    self._context_metrics["model_derived_memory_count"] += 1
                else:
                    self._context_metrics["context_supported_memory_count"] += 1
            active_entries.append(entry)

        topic_winners: dict[str, MemoryEntry] = {}
        for entry in active_entries:
            identity = entry.report.topic_identity
            if not identity:
                continue
            current = topic_winners.get(identity)
            if current is None:
                topic_winners[identity] = entry
                continue
            if self._active_context_topic_rank_key(entry) > self._active_context_topic_rank_key(current):
                topic_winners[identity] = entry

        return _ActiveContextGuard(
            active_entries=active_entries,
            active_record_ids={entry.record_id for entry in active_entries},
            suppressed_record_ids=suppressed_record_ids,
            topic_winners=topic_winners,
        )

    def get_raw_recall_hits(
        self,
        query_vector: np.ndarray,
        k: int | None = None,
        *,
        active_guard: _ActiveContextGuard | None = None,
    ) -> list[RawRecallHit]:
        """Fetch raw recall hits from the configured LTM hook.

        Args:
            query_vector: See the function signature and surrounding type hints.
            k: See the function signature and surrounding type hints.
            active_guard: See the function signature and surrounding type hints.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        raw_hits = self._ltm_hook.search_raw(
            query_vector.tolist(),
            k=k if k is not None else self.config.ltm_recall_limit,
        )
        hits = self._validate_raw_recall_hits(raw_hits)
        self._recall_diagnostics["raw_candidates"] = [
            self._serialise_raw_recall_hit(hit)
            for hit in hits
        ]
        active_guard = active_guard or self._build_active_context_guard(record_metrics=False)
        return self._filter_raw_recall_hits_against_active_context(hits, active_guard)

    def contextualize_raw_recall_hits(
        self,
        hits: list[RawRecallHit],
        *,
        active_guard: _ActiveContextGuard | None = None,
    ) -> list[ContextualizedRecallCandidate]:
        """Run recall-time NLP on raw recall hits.

                Storage returns raw records only. Conversational interpretation
                belongs to the active-memory orchestration layer so the current
                NLP engine can be applied consistently to recalled content.

        Args:
            hits: See the function signature and surrounding type hints.
            active_guard: See the function signature and surrounding type hints.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        if self._nlp_engine is None:
            return []

        contextualized: list[ContextualizedRecallCandidate] = []
        for hit in hits:
            report = self._nlp_engine.analyze_interaction(hit.record.text)
            contextualized.append(
                ContextualizedRecallCandidate(
                    record=hit.record,
                    report=report,
                    similarity_score=hit.similarity_score,
                    distance=hit.distance,
                    rank_hint=hit.rank_hint,
                    source=hit.source,
                )
            )
        active_guard = active_guard or self._build_active_context_guard(record_metrics=False)
        return self._filter_contextualized_recall_candidates_against_active_context(
            contextualized,
            active_guard,
        )

    def rerank_contextualized_recall_candidates(
        self,
        candidates: list[ContextualizedRecallCandidate],
    ) -> list[ContextualizedRecallCandidate]:
        """Apply deterministic selection and suppression to raw recall candidates.

        Args:
            candidates: See the function signature and surrounding type hints.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        filtered = self._filter_contextualized_recall_candidates(candidates)
        selected = self._select_contextualized_recall_candidates(filtered)
        ranked = self._rank_contextualized_recall_candidates(selected)
        topic_filtered = self._rank_contextualized_recall_candidates(
            self._filter_contextualized_candidates_by_topic_supersession(ranked)
        )
        deduped = self._dedupe_contextualized_candidates_by_same_topic_value(topic_filtered)
        memory_filtered = self._filter_contextualized_candidates_without_memory_signal(deduped)
        self._recall_diagnostics["ranked_candidates"] = [
            self._serialise_contextualized_recall_candidate(candidate)
            for candidate in memory_filtered
        ]
        final_candidates = self._limit_contextualized_recall_per_topic(memory_filtered)
        self._recall_diagnostics["final_candidates"] = [
            self._serialise_contextualized_recall_candidate(candidate)
            for candidate in final_candidates
        ]
        return final_candidates

    def _filter_contextualized_recall_candidates(
        self,
        candidates: list[ContextualizedRecallCandidate],
    ) -> list[ContextualizedRecallCandidate]:
        """Drop recalled candidates that are clearly request-like, not memory-like."""
        filtered: list[ContextualizedRecallCandidate] = []
        for candidate in candidates:
            if candidate.report.is_query_like and not self._is_personal_fact_candidate(candidate):
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "query_like_filter",
                        "candidate": self._serialise_contextualized_recall_candidate(candidate),
                    }
                )
                continue
            filtered.append(candidate)
        return filtered

    def _validate_raw_recall_hits(
        self,
        hits: object,
    ) -> list[RawRecallHit]:
        """Validate that the LTM backend returned canonical raw recall hits."""
        if not isinstance(hits, list):
            raise TypeError(
                "LTMHook.search_raw() must return list[RawRecallHit]; "
                f"got non-list result of type: {type(hits).__name__}"
            )
        invalid = [
            type(hit).__name__
            for hit in hits
            if not isinstance(hit, RawRecallHit)
        ]
        if invalid:
            joined = ", ".join(invalid)
            raise TypeError(
                "LTMHook.search_raw() must return list[RawRecallHit]; "
                f"got non-canonical hit types: {joined}"
            )
        return hits

    def _serialise_raw_recall_hit(self, hit: RawRecallHit) -> dict[str, Any]:
        """Return a JSON-serialisable representation of one raw recall hit."""
        return hit.to_dict()

    def _serialise_contextualized_recall_candidate(
        self,
        candidate: ContextualizedRecallCandidate,
    ) -> dict[str, Any]:
        """Return a JSON-serialisable representation of one contextualized candidate."""
        return candidate.to_dict()

    def _rank_contextualized_recall_candidates(
        self,
        candidates: list[ContextualizedRecallCandidate],
    ) -> list[ContextualizedRecallCandidate]:
        """Return contextualized recall candidates ordered by deterministic policy."""
        return sorted(candidates, key=self._contextualized_recall_rank_key, reverse=True)

    def _filter_raw_recall_hits_against_active_context(
        self,
        hits: list[RawRecallHit],
        active_guard: _ActiveContextGuard,
    ) -> list[RawRecallHit]:
        """Drop raw recalls already covered or invalidated by active memory."""
        filtered: list[RawRecallHit] = []
        for hit in hits:
            record_id = hit.record.record_id
            if record_id in active_guard.active_record_ids:
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "active_record_duplicate",
                        "record_id": record_id,
                        "candidate": self._serialise_raw_recall_hit(hit),
                    }
                )
                continue
            if record_id in active_guard.suppressed_record_ids:
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "active_lineage_suppression",
                        "record_id": record_id,
                        "candidate": self._serialise_raw_recall_hit(hit),
                    }
                )
                continue
            filtered.append(hit)
        return filtered

    def _filter_contextualized_recall_candidates_against_active_context(
        self,
        candidates: list[ContextualizedRecallCandidate],
        active_guard: _ActiveContextGuard,
    ) -> list[ContextualizedRecallCandidate]:
        """Drop recalled candidates made redundant by newer active winners."""
        filtered: list[ContextualizedRecallCandidate] = []
        for candidate in candidates:
            identity = candidate.report.topic_identity
            if not identity:
                filtered.append(candidate)
                continue

            winner = active_guard.topic_winners.get(identity)
            if winner is None:
                filtered.append(candidate)
                continue

            candidate_value = candidate.report.topic_value
            winner_value = winner.report.topic_value
            if candidate_value and winner_value and candidate_value == winner_value:
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "active_topic_duplicate",
                        "topic_identity": identity,
                        "topic_value": candidate_value,
                        "active_record_id": winner.record_id,
                        "candidate": self._serialise_contextualized_recall_candidate(candidate),
                    }
                )
                continue

            if self._is_contextualized_candidate_superseded_by_active_entry(candidate, winner):
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "active_topic_supersession",
                        "topic_identity": identity,
                        "active_record_id": winner.record_id,
                        "candidate": self._serialise_contextualized_recall_candidate(candidate),
                    }
                )
                continue

            filtered.append(candidate)
        return filtered

    def _select_contextualized_recall_candidates(
        self,
        candidates: list[ContextualizedRecallCandidate],
    ) -> list[ContextualizedRecallCandidate]:
        """Keep one candidate per recalled raw record before topic suppression.

        Topic-level conflicts are resolved in the dedicated supersession
        stage. This pass only avoids retaining duplicate records.
        """
        selected: dict[str, ContextualizedRecallCandidate] = {}
        for candidate in candidates:
            key = candidate.record.record_id
            current = selected.get(key)
            if current is None:
                selected[key] = candidate
                continue
            if self._contextualized_recall_rank_key(candidate) > self._contextualized_recall_rank_key(current):
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "record_id_dedupe",
                        "record_id": key,
                        "candidate": self._serialise_contextualized_recall_candidate(current),
                    }
                )
                selected[key] = candidate
            else:
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "record_id_dedupe",
                        "record_id": key,
                        "candidate": self._serialise_contextualized_recall_candidate(candidate),
                    }
                )
        return list(selected.values())

    def _filter_contextualized_candidates_by_topic_supersession(
        self,
        candidates: list[ContextualizedRecallCandidate],
    ) -> list[ContextualizedRecallCandidate]:
        """Suppress contextualized recall candidates superseded on the same topic."""
        grouped: dict[str, list[ContextualizedRecallCandidate]] = {}
        passthrough: list[ContextualizedRecallCandidate] = []

        for candidate in candidates:
            identity = candidate.report.topic_identity
            value = candidate.report.topic_value
            if not identity or not value:
                passthrough.append(candidate)
                continue
            grouped.setdefault(identity, []).append(candidate)

        kept: list[ContextualizedRecallCandidate] = list(passthrough)
        for identity, group in grouped.items():
            values = {
                candidate.report.topic_value
                for candidate in group
                if candidate.report.topic_value
            }
            if len(values) <= 1:
                kept.extend(group)
                continue

            winner = max(group, key=self._contextualized_topic_supersession_rank_key)
            kept.append(winner)
            self._context_metrics["superseded_memory_count"] += max(len(group) - 1, 0)

            for candidate in group:
                if candidate is winner:
                    continue
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "topic_supersession",
                        "topic_identity": identity,
                        "candidate": self._serialise_contextualized_recall_candidate(candidate),
                    }
                )

        return kept

    def _limit_contextualized_recall_per_topic(
        self,
        candidates: list[ContextualizedRecallCandidate],
    ) -> list[ContextualizedRecallCandidate]:
        """Keep at most one contextualized candidate per resolved topic identity."""
        selected: list[ContextualizedRecallCandidate] = []
        seen_topics: set[str] = set()

        for candidate in candidates:
            topic = candidate.report.topic_identity or candidate.record.record_id
            if topic in seen_topics:
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "topic_identity_limit",
                        "topic_identity": topic,
                        "candidate": self._serialise_contextualized_recall_candidate(candidate),
                    }
                )
                continue
            seen_topics.add(topic)
            selected.append(candidate)
        return selected

    def _dedupe_contextualized_candidates_by_same_topic_value(
        self,
        candidates: list[ContextualizedRecallCandidate],
    ) -> list[ContextualizedRecallCandidate]:
        """Collapse duplicate recalls that resolve to the same topic/value pair."""
        selected: list[ContextualizedRecallCandidate] = []
        seen_pairs: set[tuple[str, str]] = set()

        for candidate in candidates:
            identity = candidate.report.topic_identity
            value = candidate.report.topic_value
            if not identity or not value:
                selected.append(candidate)
                continue

            key = (identity, value)
            if key in seen_pairs:
                self._recall_diagnostics["suppressed"].append(
                    {
                        "reason": "same_topic_value_dedupe",
                        "topic_identity": identity,
                        "topic_value": value,
                        "candidate": self._serialise_contextualized_recall_candidate(candidate),
                    }
                )
                continue

            seen_pairs.add(key)
            selected.append(candidate)

        return selected

    def _filter_contextualized_candidates_without_memory_signal(
        self,
        candidates: list[ContextualizedRecallCandidate],
    ) -> list[ContextualizedRecallCandidate]:
        """Drop topicless/signal-less recalls when stronger memory candidates exist."""
        if not candidates:
            return []

        has_memory_bearing = any(self._is_memory_bearing_candidate(candidate) for candidate in candidates)
        if not has_memory_bearing:
            return candidates

        selected: list[ContextualizedRecallCandidate] = []
        for candidate in candidates:
            if self._is_memory_bearing_candidate(candidate):
                selected.append(candidate)
                continue
            self._recall_diagnostics["suppressed"].append(
                {
                    "reason": "non_memory_bearing_filter",
                    "candidate": self._serialise_contextualized_recall_candidate(candidate),
                }
            )
        return selected

    def _is_memory_bearing_candidate(
        self,
        candidate: ContextualizedRecallCandidate,
    ) -> bool:
        """Return whether a recalled candidate carries stable memory structure."""
        if candidate.report.topic_identity:
            return True

        signals = candidate.report.signals
        return any(
            (
                signals.is_constraint,
                signals.is_preference,
                signals.is_current_state,
                signals.is_correction,
                self._is_personal_fact_candidate(candidate),
            )
        )

    def _is_personal_fact_candidate(
        self,
        candidate: ContextualizedRecallCandidate,
    ) -> bool:
        """Return whether a candidate expresses a user autobiographical fact."""
        return (
            candidate.record.role == "user"
            and float(candidate.report.signals.personal_relevance) > 0.0
        )

    def _contextualized_recall_rank_key(
        self,
        candidate: ContextualizedRecallCandidate,
    ) -> tuple[float, int, float, int, float, int, int]:
        """Return the deterministic rank tuple for contextualized raw recalls."""
        provenance = candidate.record.provenance
        signals = candidate.report.signals
        operational_bonus = (
            (0.30 if signals.is_constraint else 0.0)
            + (0.20 if signals.is_correction else 0.0)
            + (0.15 if signals.is_current_state else 0.0)
            + (0.10 if signals.is_preference else 0.0)
        )
        personal_fact_bonus = (
            0.10 * max(float(signals.personal_relevance), 0.0)
            if candidate.record.role == "user"
            else 0.0
        )
        quantitative_bonus = 0.1 * max(float(signals.quantitative_relevance), 0.0)
        ack_penalty = (
            0.35
            if candidate.record.role == "assistant" and candidate.report.is_ack_like
            else 0.0
        )
        effective_similarity = (
            float(candidate.similarity_score)
            if candidate.similarity_score is not None
            else float("-inf")
        )
        effective_similarity += operational_bonus
        effective_similarity += personal_fact_bonus
        effective_similarity += quantitative_bonus
        effective_similarity -= ack_penalty
        return (
            effective_similarity,
            0 if provenance.corrected_by_user else 1,
            float(candidate.report.info_density),
            1 if candidate.record.role == "user" else 0,
            float(candidate.record.created_at),
            int(candidate.record.interaction_id),
            -int(candidate.rank_hint if candidate.rank_hint is not None else 1_000_000),
        )

    def _contextualized_topic_supersession_rank_key(
        self,
        candidate: ContextualizedRecallCandidate,
    ) -> tuple[int, int, int, float, int, float, int]:
        """Return the winner key for conflicting contextualized raw recalls."""
        signals = candidate.report.signals
        return (
            1 if signals.is_current_state else 0,
            0 if signals.is_past_state else 1,
            1 if candidate.record.role == "user" else 0,
            float(candidate.record.created_at),
            int(candidate.record.interaction_id),
            float(candidate.similarity_score if candidate.similarity_score is not None else float("-inf")),
            -int(candidate.rank_hint if candidate.rank_hint is not None else 1_000_000),
        )

    def _active_context_topic_rank_key(
        self,
        entry: MemoryEntry,
    ) -> tuple[int, int, int, float, int]:
        """Return the active-memory winner key for one topic."""
        signals = entry.report.signals
        return (
            1 if signals.is_current_state else 0,
            0 if signals.is_past_state else 1,
            1 if entry.provenance.role == "user" else 0,
            float(entry.timestamp),
            int(entry.interaction_id),
        )

    def _is_contextualized_candidate_superseded_by_active_entry(
        self,
        candidate: ContextualizedRecallCandidate,
        active_entry: MemoryEntry,
    ) -> bool:
        """Return whether a newer active entry supersedes one recalled candidate."""
        if active_entry.interaction_id <= candidate.record.interaction_id:
            return False

        candidate_identity = candidate.report.topic_identity
        active_identity = active_entry.report.topic_identity
        if (
            not candidate_identity
            or not active_identity
            or candidate_identity != active_identity
        ):
            return False

        candidate_value = candidate.report.topic_value
        active_value = active_entry.report.topic_value
        if (
            not candidate_value
            or not active_value
            or candidate_value == active_value
        ):
            return False

        older_signals = candidate.report.signals
        newer_signals = active_entry.report.signals

        if older_signals.is_past_state and newer_signals.is_current_state:
            return True
        if older_signals.is_preference and newer_signals.is_preference:
            return True
        if older_signals.is_current_state and newer_signals.is_current_state:
            return True
        return False

    def _suppressed_record_ids_from_entries(
        self,
        entries: deque[MemoryEntry],
        stats: dict[str, int] | None = None,
    ) -> set[str]:
        """Return source-record ids explicitly superseded or invalidated."""
        suppressed: set[str] = set()
        for entry in entries:
            suppressed.update(self._lineage_targets(entry.lineage, stats=stats))
        return suppressed

    def _lineage_targets(
        self,
        lineage: MemoryLineage | None,
        stats: dict[str, int] | None = None,
    ) -> set[str]:
        """Return ids that should be suppressed by explicit lineage edges."""
        if lineage is None:
            return set()

        targets: set[str] = set()
        for field_name in ("supersedes", "corrects", "invalidates"):
            for value in getattr(lineage, field_name):
                if value:
                    targets.add(str(value))
                    if stats is not None:
                        if field_name == "supersedes":
                            stats["superseded"] = stats.get("superseded", 0) + 1
                        elif field_name == "invalidates":
                            stats["invalidated"] = stats.get("invalidated", 0) + 1
                        elif field_name == "corrects":
                            stats["conflict"] = stats.get("conflict", 0) + 1
        return targets

    def _topic_superseded_record_ids_from_entries(
        self,
        entries: deque[MemoryEntry],
        stats: dict[str, int] | None = None,
    ) -> set[str]:
        """Return older entries superseded by newer topic updates in the queue."""
        suppressed: set[str] = set()
        ordered = list(entries)
        for older_idx, older in enumerate(ordered):
            for newer in ordered[older_idx + 1 :]:
                if self._is_topic_superseded(older, newer):
                    suppressed.add(older.record_id)
                    if stats is not None:
                        stats["superseded"] = stats.get("superseded", 0) + 1
                    break
        return suppressed

    def _is_topic_superseded(self, older: MemoryEntry, newer: MemoryEntry) -> bool:
        """Return whether a newer entry supersedes an older one on the same topic."""
        if newer.interaction_id <= older.interaction_id:
            return False

        older_identity = older.report.topic_identity
        newer_identity = newer.report.topic_identity
        if not older_identity or not newer_identity or older_identity != newer_identity:
            return False

        older_value = older.report.topic_value
        newer_value = newer.report.topic_value
        if not older_value or not newer_value or older_value == newer_value:
            return False

        older_signals = older.report.signals
        newer_signals = newer.report.signals

        if older_signals.is_past_state and newer_signals.is_current_state:
            return True
        if older_signals.is_preference and newer_signals.is_preference:
            return True
        if older_signals.is_current_state and newer_signals.is_current_state:
            return True
        return False

    def _reset_context_metrics(self) -> None:
        """Reset lightweight context metrics before rebuilding prompt context."""
        for key in self._context_metrics:
            self._context_metrics[key] = 0

    def _empty_recall_diagnostics(self) -> dict[str, Any]:
        """Return the empty recall-diagnostics structure."""
        return {
            "raw_candidates": [],
            "ranked_candidates": [],
            "final_candidates": [],
            "suppressed": [],
        }

    def _reset_recall_diagnostics(self) -> None:
        """Reset recall diagnostics before rebuilding prompt context."""
        self._recall_diagnostics = self._empty_recall_diagnostics()

    def get_context_metrics(self) -> dict[str, int]:
        """Return a snapshot of the latest context reconstruction metrics.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return dict(self._context_metrics)

    def get_recall_diagnostics(self) -> dict[str, Any]:
        """Return a snapshot of the latest recall diagnostics.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return {
            "raw_candidates": list(self._recall_diagnostics["raw_candidates"]),
            "ranked_candidates": list(self._recall_diagnostics["ranked_candidates"]),
            "final_candidates": list(self._recall_diagnostics["final_candidates"]),
            "suppressed": list(self._recall_diagnostics["suppressed"]),
        }


    def _omega_eff_for(self, entry: MemoryEntry) -> float:
        """Compute the current Ω_eff for a single entry.

        Centralises the Δn arithmetic so all pruning methods use the
        same calculation without duplicating it.
        """
        if self._next_id == 0:
            return entry.omega
        most_recent_id = self._next_id - 1
        delta_n = most_recent_id - entry.interaction_id
        return calculate_effective_score(
            omega=entry.omega,
            delta_n=delta_n,
            lambda_decay=self.config.lambda_decay,
            inertia_strength=self.config.inertia_strength,
        )

    def _retention_bonus_for(self, entry: MemoryEntry) -> float:
        """Return the pruning-only operational retention bonus for an entry."""
        signals = entry.report.signals
        cfg = self._pruning_priority_config
        return (
            (cfg.rho_constraint if signals.is_constraint else 0.0)
            + (cfg.rho_preference if signals.is_preference else 0.0)
            + (cfg.rho_current_state if signals.is_current_state else 0.0)
            + (cfg.rho_correction if signals.is_correction else 0.0)
            + (cfg.rho_replacement if signals.has_replacement else 0.0)
        )

    def _topic_superseded_penalty_for(self, entry: MemoryEntry) -> float:
        """Return the pruning penalty for an entry superseded on the same topic."""
        if entry.record_id not in self._topic_superseded_record_ids_from_entries(self.queue):
            return 0.0
        return self._pruning_priority_config.superseded_past_penalty

    def _topic_superseded_penalty_from_cached_ids(
        self,
        entry: MemoryEntry,
        superseded_record_ids: set[str],
    ) -> float:
        """Return the pruning penalty using one precomputed superseded-id set."""
        if entry.record_id not in superseded_record_ids:
            return 0.0
        return self._pruning_priority_config.superseded_past_penalty

    def _effective_pruning_score_for(self, entry: MemoryEntry) -> float:
        """Return the pressure-pruning score used for eviction ordering."""
        return (
            self._omega_eff_for(entry)
            + self._retention_bonus_for(entry)
            - self._topic_superseded_penalty_for(entry)
        )

    def _get_pruning_candidates(self) -> list[MemoryEntry]:
        """Return entries sorted by eviction priority (highest risk first).

        Only CRITICAL and UNSTABLE entries are pressure candidates.
        HEALTHY entries are protected from budget-pressure eviction.

        Candidate ordering uses ``effective_pruning_score`` ascending, with
        ``interaction_id`` ascending as the tie-breaker. Tier membership
        still depends on raw ``Ω_eff`` only, so pruning bonuses cannot
        promote an entry to HEALTHY or exempt it from decay.

        HEALTHY (Ω_eff > 0.6) messages are **not** in this list.
        They are only removed by ``periodic_cleanup`` once their
        Ω_eff decays below ``hard_kill_threshold`` (≈ 141 turns for
        Ω = 0.85 at default calibration).

        Returns
        -------
        list[MemoryEntry]
            Eviction-priority ordered references into the live queue.
            The queue itself is not modified.
        """
        candidates: list[MemoryEntry] = []
        superseded_record_ids = self._topic_superseded_record_ids_from_entries(self.queue)

        for entry in self.queue:
            omega_eff = self._omega_eff_for(entry)
            if omega_eff <= self.config.critical_threshold:
                candidates.append(entry)
            elif omega_eff <= self.config.healthy_threshold:
                candidates.append(entry)

        candidates.sort(
            key=lambda e: (
                self._omega_eff_for(e)
                + self._retention_bonus_for(e)
                - self._topic_superseded_penalty_from_cached_ids(
                    e,
                    superseded_record_ids,
                ),
                e.interaction_id,
            )
        )
        return candidates


    def prune_to_budget(self) -> list[MemoryEntry]:
        """Evict lowest-priority entries until total tokens ≤ token_budget.

                Eviction policy
                ---------------
                1. Compute the full priority list via ``_get_pruning_candidates``.
                2. Remove the head of the list (oldest CRITICAL, else oldest
                   UNSTABLE) from the queue.
                3. Sync ``InteractionMatrix`` by removing the same vector object.
                4. Archive the evicted entry via ``LTMHook.archive``.
                5. Repeat until ``get_total_tokens() ≤ config.token_budget`` or
                   no candidates remain.

                If only HEALTHY messages remain and the budget is still exceeded,
                the loop exits without evicting them.  This is intentional —
                HEALTHY messages are protected by design.  The budget deficit is
                logged via the return value for the caller to handle.

                Returns
                -------
                list[MemoryEntry]
                    All entries evicted in this call, in eviction order.

        Raises:
            None.
        """
        evicted: list[MemoryEntry] = []
        while self.get_total_tokens() > self.config.token_budget:
            candidates = self._get_pruning_candidates()
            if not candidates:
                # Only HEALTHY entries remain; cannot evict by budget pressure.
                break
            victim = candidates[0]
            self._evict(victim)
            evicted.append(victim)
        return evicted


    def periodic_cleanup(self) -> list[MemoryEntry]:
        """Hard-kill sweep: evict any entry with Ω_eff < hard_kill_threshold.

                Unlike ``prune_to_budget``, this sweep applies to **all** entries
                including HEALTHY messages that have organically decayed to near-zero.
                It runs every ``config.pruning_frequency`` turns (triggered by
                ``add_interaction``).

                A HEALTHY message (Ω = 0.85) reaches the default hard-kill floor
                (0.05) at Δn ≈ 141 turns. The periodic sweep removes these stale
                entries from active memory before they accumulate and inflate the
                token budget.

                Returns
                -------
                list[MemoryEntry]
                    All entries evicted in this sweep, in queue order.

        Raises:
            None.
        """
        threshold = self.config.hard_kill_threshold
        to_evict = [
            entry for entry in self.queue
            if self._omega_eff_for(entry) < threshold
        ]
        for entry in to_evict:
            self._evict(entry)
        return to_evict


    def _evict(self, entry: MemoryEntry) -> None:
        """Remove entry from queue and matrix, then archive it to LTM.

        This is the single eviction primitive — all removal paths call
        through here to guarantee consistent teardown.

        Steps
        -----
        1. Remove from ``queue`` (O(N) deque scan — acceptable because
           the queue is bounded by the token budget in practice).
        2. Attempt to remove the same vector object from ``InteractionMatrix``
           (O(W), where W = window_size ≤ 10).  No-op if the vector has
           already been scrolled out of the sliding window.
        3. Call ``ltm_hook.archive(entry)`` — fire-and-forget handoff.
        """
        self.queue.remove(entry)
        self.matrix.remove_vector(entry.vector)
        self._ltm_hook.archive(entry)


    @property
    def size(self) -> int:
        """Number of entries currently in the working memory queue.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return len(self.queue)

    @property
    def is_empty(self) -> bool:
        """True when the working memory queue contains no entries.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return len(self.queue) == 0

    @property
    def next_id(self) -> int:
        """The interaction_id that will be assigned to the next insertion.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return self._next_id

    @property
    def ltm_hook(self) -> LTMHook:
        """The configured LTM archive hook.

        Returns:
            See the return type annotation.

        Raises:
            None.
        """
        return self._ltm_hook

is_empty property

True when the working memory queue contains no entries.

Returns:

Type Description
bool

See the return type annotation.

ltm_hook property

The configured LTM archive hook.

Returns:

Type Description
LTMHook

See the return type annotation.

next_id property

The interaction_id that will be assigned to the next insertion.

Returns:

Type Description
int

See the return type annotation.

size property

Number of entries currently in the working memory queue.

Returns:

Type Description
int

See the return type annotation.

add_interaction(text, report, vector)

Insert a new interaction and enforce cleanup policies.

    Execution order
    ---------------
    1. Tokenise ``text`` via tiktoken (cached on entry).
    2. Assign monotonic ``interaction_id``, increment ``_next_id``.
    3. Update ``InteractionMatrix`` with the new vector.
    4. Construct ``MemoryEntry``, append to queue.
    5. Increment turn counter.
    6. If turn counter is a multiple of ``pruning_frequency``:
       run ``periodic_cleanup`` (hard-kill sweep).
    7. If ``get_total_tokens() > token_budget``:
       run ``prune_to_budget`` (pressure eviction).

    The returned entry is guaranteed to be in the queue at the time
    of return.  In extreme cases where the entry's own Ω is very low
    and a cascading eviction empties all other candidates, the entry
    may theoretically be evicted in the same call — this is correct
    behaviour and is guarded against in the test suite.

    Returns
    -------
    MemoryEntry
        The newly created entry.

Parameters:

Name Type Description Default
text str

See the function signature and surrounding type hints.

required
report AnalysisReport

See the function signature and surrounding type hints.

required
vector ndarray

See the function signature and surrounding type hints.

required
Source code in dmf/memory/temporal_memory.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def add_interaction(
    self,
    text: str,
    report: AnalysisReport,
    vector: np.ndarray,
) -> MemoryEntry:
    """Insert a new interaction and enforce cleanup policies.

            Execution order
            ---------------
            1. Tokenise ``text`` via tiktoken (cached on entry).
            2. Assign monotonic ``interaction_id``, increment ``_next_id``.
            3. Update ``InteractionMatrix`` with the new vector.
            4. Construct ``MemoryEntry``, append to queue.
            5. Increment turn counter.
            6. If turn counter is a multiple of ``pruning_frequency``:
               run ``periodic_cleanup`` (hard-kill sweep).
            7. If ``get_total_tokens() > token_budget``:
               run ``prune_to_budget`` (pressure eviction).

            The returned entry is guaranteed to be in the queue at the time
            of return.  In extreme cases where the entry's own Ω is very low
            and a cascading eviction empties all other candidates, the entry
            may theoretically be evicted in the same call — this is correct
            behaviour and is guarded against in the test suite.

            Returns
            -------
            MemoryEntry
                The newly created entry.

    Args:
        text: See the function signature and surrounding type hints.
        report: See the function signature and surrounding type hints.
        vector: See the function signature and surrounding type hints.

    Raises:
        None.
    """
    token_count: int = len(self._tokenizer.encode(text))

    interaction_id = self._next_id
    self._next_id += 1

    self.matrix.add_vector(vector)

    entry = MemoryEntry(
        interaction_id=interaction_id,
        text=text,
        report=report,
        vector=vector,
        token_count=token_count,
        timestamp=time.time(),
    )
    self.queue.append(entry)

    self._turn_counter += 1

    if (
        self.config.pruning_frequency > 0
        and self._turn_counter % self.config.pruning_frequency == 0
    ):
        self.periodic_cleanup()

    if self.get_total_tokens() > self.config.token_budget:
        self.prune_to_budget()

    return entry

contextualize_raw_recall_hits(hits, *, active_guard=None)

Run recall-time NLP on raw recall hits.

    Storage returns raw records only. Conversational interpretation
    belongs to the active-memory orchestration layer so the current
    NLP engine can be applied consistently to recalled content.

Parameters:

Name Type Description Default
hits list[RawRecallHit]

See the function signature and surrounding type hints.

required
active_guard _ActiveContextGuard | None

See the function signature and surrounding type hints.

None

Returns:

Type Description
list[ContextualizedRecallCandidate]

See the return type annotation.

Source code in dmf/memory/temporal_memory.py
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
def contextualize_raw_recall_hits(
    self,
    hits: list[RawRecallHit],
    *,
    active_guard: _ActiveContextGuard | None = None,
) -> list[ContextualizedRecallCandidate]:
    """Run recall-time NLP on raw recall hits.

            Storage returns raw records only. Conversational interpretation
            belongs to the active-memory orchestration layer so the current
            NLP engine can be applied consistently to recalled content.

    Args:
        hits: See the function signature and surrounding type hints.
        active_guard: See the function signature and surrounding type hints.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    if self._nlp_engine is None:
        return []

    contextualized: list[ContextualizedRecallCandidate] = []
    for hit in hits:
        report = self._nlp_engine.analyze_interaction(hit.record.text)
        contextualized.append(
            ContextualizedRecallCandidate(
                record=hit.record,
                report=report,
                similarity_score=hit.similarity_score,
                distance=hit.distance,
                rank_hint=hit.rank_hint,
                source=hit.source,
            )
        )
    active_guard = active_guard or self._build_active_context_guard(record_metrics=False)
    return self._filter_contextualized_recall_candidates_against_active_context(
        contextualized,
        active_guard,
    )

from_dmf_config(config, ltm_hook=None, nlp_engine=None) classmethod

Construct a TemporalMemory from a universal DMFConfig.

    This factory bridges the TOML-loaded ``DMFConfig`` to
    the typed internal dataclasses (``DecayConfig``, ``VectorConfig``)
    without changing the existing constructor signature or breaking any
    existing test.

    Translation map
    ---------------
    ====================== ======================== =====================
    TOML key               DMFConfig path           DecayConfig / VectorConfig field
    ====================== ======================== =====================
    temporal_decay.lambda_base         decay.lambda_base          lambda_decay
    temporal_decay.inertia_strength    decay.inertia_strength     inertia_strength
    temporal_decay.hard_kill_threshold decay.hard_kill_threshold  hard_kill_threshold
    memory_tiers.critical_max          tiers.critical_max         critical_threshold
    memory_tiers.healthy_min           tiers.healthy_min          healthy_threshold
    capacity.token_budget              capacity.token_budget      token_budget
    capacity.pruning_frequency_x       capacity.pruning_frequency_x pruning_frequency
    pruning_priority.rho_*             pruning_priority.*         PruningPriorityConfig
    nlp.vector_dim                     nlp.vector_dim             VectorConfig.vector_dim
    capacity.window_size               capacity.window_size       VectorConfig.window_size
    ltm.enabled + storage_type="file"    ltm.*                    → FileLTMHook(storage_path)
    ltm.enabled + storage_type="chroma"  ltm.*                    → ChromaLTMHook(...)
    ltm.enabled + storage_type="null"    ltm.*                    → NullLTMHook
    ltm.enabled=false                     ltm.*                    → NullLTMHook
    ====================== ======================== =====================

    Parameters
    ----------
    config : DMFConfig
        Fully populated config object returned by ``load_dmf_config()``.
    ltm_hook : LTMHook | None
        Archive sink for evicted entries. Defaults to ``NullLTMHook()``.
    nlp_engine : NLPEngine | None
        Optional NLP engine reused for recall-time contextualization of
        raw LTM hits.
    Returns
    -------
    TemporalMemory
        Fully initialised instance.

Parameters:

Name Type Description Default
config DMFConfig

See the function signature and surrounding type hints.

required
ltm_hook LTMHook | None

See the function signature and surrounding type hints.

None
nlp_engine NLPEngine | None

See the function signature and surrounding type hints.

None

Raises:

Type Description
ValueError

Raised when validation fails or an invariant is violated.

Source code in dmf/memory/temporal_memory.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
@classmethod
def from_dmf_config(
    cls,
    config: DMFConfig,
    ltm_hook: LTMHook | None = None,
    nlp_engine: NLPEngine | None = None,
) -> TemporalMemory:
    """Construct a ``TemporalMemory`` from a universal ``DMFConfig``.

            This factory bridges the TOML-loaded ``DMFConfig`` to
            the typed internal dataclasses (``DecayConfig``, ``VectorConfig``)
            without changing the existing constructor signature or breaking any
            existing test.

            Translation map
            ---------------
            ====================== ======================== =====================
            TOML key               DMFConfig path           DecayConfig / VectorConfig field
            ====================== ======================== =====================
            temporal_decay.lambda_base         decay.lambda_base          lambda_decay
            temporal_decay.inertia_strength    decay.inertia_strength     inertia_strength
            temporal_decay.hard_kill_threshold decay.hard_kill_threshold  hard_kill_threshold
            memory_tiers.critical_max          tiers.critical_max         critical_threshold
            memory_tiers.healthy_min           tiers.healthy_min          healthy_threshold
            capacity.token_budget              capacity.token_budget      token_budget
            capacity.pruning_frequency_x       capacity.pruning_frequency_x pruning_frequency
            pruning_priority.rho_*             pruning_priority.*         PruningPriorityConfig
            nlp.vector_dim                     nlp.vector_dim             VectorConfig.vector_dim
            capacity.window_size               capacity.window_size       VectorConfig.window_size
            ltm.enabled + storage_type="file"    ltm.*                    → FileLTMHook(storage_path)
            ltm.enabled + storage_type="chroma"  ltm.*                    → ChromaLTMHook(...)
            ltm.enabled + storage_type="null"    ltm.*                    → NullLTMHook
            ltm.enabled=false                     ltm.*                    → NullLTMHook
            ====================== ======================== =====================

            Parameters
            ----------
            config : DMFConfig
                Fully populated config object returned by ``load_dmf_config()``.
            ltm_hook : LTMHook | None
                Archive sink for evicted entries. Defaults to ``NullLTMHook()``.
            nlp_engine : NLPEngine | None
                Optional NLP engine reused for recall-time contextualization of
                raw LTM hits.
            Returns
            -------
            TemporalMemory
                Fully initialised instance.

    Args:
        config: See the function signature and surrounding type hints.
        ltm_hook: See the function signature and surrounding type hints.
        nlp_engine: See the function signature and surrounding type hints.

    Raises:
        ValueError: Raised when validation fails or an invariant is violated.
    """
    decay_cfg = DecayConfig(
        lambda_decay=config.decay.lambda_base,
        inertia_strength=config.decay.inertia_strength,
        hard_kill_threshold=config.decay.hard_kill_threshold,
        token_budget=config.capacity.token_budget,
        pruning_frequency=config.capacity.pruning_frequency_x,
        critical_threshold=config.tiers.critical_max,
        healthy_threshold=config.tiers.healthy_min,
        ltm_recall_limit=config.ltm.recall_limit,
        ltm_threshold=config.ltm.distance_threshold,
    )
    pruning_cfg = PruningPriorityConfig(
        rho_constraint=config.pruning_priority.rho_constraint,
        rho_preference=config.pruning_priority.rho_preference,
        rho_current_state=config.pruning_priority.rho_current_state,
        rho_correction=config.pruning_priority.rho_correction,
        rho_replacement=config.pruning_priority.rho_replacement,
        superseded_past_penalty=config.pruning_priority.superseded_past_penalty,
    )
    vector_cfg = VectorConfig(
        model_name=config.nlp.model_name,
        vector_dim=config.nlp.vector_dim,
        window_size=config.capacity.window_size,
    )

    # Resolve LTM hook:
    #   1. Explicit injection always wins (testing / custom backends).
    #   2. ltm.enabled=false                    → NullLTMHook.
    #   3. ltm.enabled + storage_type="file"   → FileLTMHook.
    #   4. ltm.enabled + storage_type="chroma" → ChromaLTMHook.
    #   5. ltm.enabled + storage_type="null"   → NullLTMHook.
    # The import is deferred to this call site to avoid any future
    # circular-import risk if config_loader grows additional imports.
    resolved_hook: LTMHook
    if ltm_hook is not None:
        resolved_hook = ltm_hook
    elif not config.ltm.enabled:
        resolved_hook = NullLTMHook()
    elif config.ltm.storage_type == LTM_BACKEND_FILE:
        from dmf.memory.ltm_engine import FileLTMHook  # deferred import
        resolved_hook = FileLTMHook(
            config.ltm.storage_path,
            cards_enabled=config.ltm.cards_enabled,
            cards_path=config.ltm.cards_path,
        )
    elif config.ltm.storage_type == LTM_BACKEND_CHROMA:
        from dmf.memory.chroma_ltm import ChromaLTMHook  # deferred import
        resolved_hook = ChromaLTMHook(
            collection_name=config.ltm.collection_name,
            persist_directory=config.ltm.chroma_path,
            distance_threshold=config.ltm.distance_threshold,
            cards_enabled=config.ltm.cards_enabled,
            cards_path=config.ltm.cards_path,
            cards_collection_name=config.ltm.cards_collection_name,
        )
    elif config.ltm.storage_type == LTM_BACKEND_NULL:
        resolved_hook = NullLTMHook()
    else:
        raise ValueError(
            "Unsupported ltm.storage_type at runtime: "
            f"{config.ltm.storage_type!r}"
        )

    return cls(
        decay_config=decay_cfg,
        pruning_priority_config=pruning_cfg,
        vector_config=vector_cfg,
        ltm_hook=resolved_hook,
        nlp_engine=nlp_engine,
    )

get_active_raw_records()

Return visible active entries as raw records for retrieval.

    Structured retrieval can use these records alongside archived LTM
    records. This keeps still-active memories searchable without changing
    the raw-LTM archival contract.

Returns:

Type Description
list[RawLTMRecord]

See the return type annotation.

Source code in dmf/memory/temporal_memory.py
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
def get_active_raw_records(self) -> list[RawLTMRecord]:
    """Return visible active entries as raw records for retrieval.

            Structured retrieval can use these records alongside archived LTM
            records. This keeps still-active memories searchable without changing
            the raw-LTM archival contract.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    return [
        entry.to_raw_ltm_record()
        for entry in self._build_active_context_guard(record_metrics=False).active_entries
    ]

get_context_metrics()

Return a snapshot of the latest context reconstruction metrics.

Returns:

Type Description
dict[str, int]

See the return type annotation.

Source code in dmf/memory/temporal_memory.py
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
def get_context_metrics(self) -> dict[str, int]:
    """Return a snapshot of the latest context reconstruction metrics.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    return dict(self._context_metrics)

get_effective_state()

Return the decay-adjusted state of every entry in the queue.

    For each entry computes:

        Δn      = (_next_id − 1) − entry.interaction_id
        Ω_eff   = calculate_effective_score(Ω, Δn, λ, η)

    The most recently added entry always has Δn = 0 → Ω_eff = Ω.

    Returns
    -------
    list[dict[str, Any]]
        One dict per entry, ordered oldest → newest.  Keys:
        ``interaction_id``, ``text``, ``omega``, ``omega_eff``,
        ``delta_n``, ``token_count``, ``status_effective``,
        ``timestamp``.
Source code in dmf/memory/temporal_memory.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def get_effective_state(self) -> list[dict[str, Any]]:
    """Return the decay-adjusted state of every entry in the queue.

            For each entry computes:

                Δn      = (_next_id − 1) − entry.interaction_id
                Ω_eff   = calculate_effective_score(Ω, Δn, λ, η)

            The most recently added entry always has Δn = 0 → Ω_eff = Ω.

            Returns
            -------
            list[dict[str, Any]]
                One dict per entry, ordered oldest → newest.  Keys:
                ``interaction_id``, ``text``, ``omega``, ``omega_eff``,
                ``delta_n``, ``token_count``, ``status_effective``,
                ``timestamp``.

    Raises:
        None.
    """
    if not self.queue:
        return []

    most_recent_id = self._next_id - 1
    result: list[dict[str, Any]] = []
    for entry in self.queue:
        delta_n = most_recent_id - entry.interaction_id
        omega_eff = calculate_effective_score(
            omega=entry.omega,
            delta_n=delta_n,
            lambda_decay=self.config.lambda_decay,
            inertia_strength=self.config.inertia_strength,
        )
        result.append({
            "interaction_id": entry.interaction_id,
            "text": entry.text,
            "omega": entry.omega,
            "omega_eff": omega_eff,
            "delta_n": delta_n,
            "token_count": entry.token_count,
            "status_effective": classify_survival_status(
                omega=omega_eff,
                critical_threshold=self.config.critical_threshold,
                healthy_threshold=self.config.healthy_threshold,
            ).value,
            "timestamp": entry.timestamp,
        })
    return result

get_full_context(query_vector=None)

Assemble the complete context string for LLM prompt injection.

Parameters:

Name Type Description Default
query_vector ndarray | None

See the function signature and surrounding type hints.

None

Returns:

Type Description
str

See the return type annotation.

Source code in dmf/memory/temporal_memory.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def get_full_context(
    self,
    query_vector: np.ndarray | None = None,
) -> str:
    """Assemble the complete context string for LLM prompt injection.

    Args:
        query_vector: See the function signature and surrounding type hints.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    self._reset_context_metrics()
    self._reset_recall_diagnostics()
    parts: list[str] = []
    active_guard = self._build_active_context_guard(record_metrics=True)
    recalled: list[ContextualizedRecallCandidate] = []

    if query_vector is not None:
        recalled = self.rerank_contextualized_recall_candidates(
            self.contextualize_raw_recall_hits(
                self.get_raw_recall_hits(
                    query_vector,
                    active_guard=active_guard,
                ),
                active_guard=active_guard,
            )
        )
        if recalled:
            parts.append(_RECALL_HEADER)
            parts.extend(self._render_recalled_context_items(recalled))
            parts.append("")

    parts.append(_ACTIVE_HEADER)
    if recalled:
        parts.append(_ACTIVE_PRECEDENCE_NOTE)
    active_lines = self._render_active_context_items(active_guard.active_entries)
    if active_lines:
        if recalled:
            parts.append("")
        parts.extend(active_lines)

    return "\n".join(parts)

get_raw_recall_hits(query_vector, k=None, *, active_guard=None)

Fetch raw recall hits from the configured LTM hook.

Parameters:

Name Type Description Default
query_vector ndarray

See the function signature and surrounding type hints.

required
k int | None

See the function signature and surrounding type hints.

None
active_guard _ActiveContextGuard | None

See the function signature and surrounding type hints.

None

Returns:

Type Description
list[RawRecallHit]

See the return type annotation.

Source code in dmf/memory/temporal_memory.py
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
def get_raw_recall_hits(
    self,
    query_vector: np.ndarray,
    k: int | None = None,
    *,
    active_guard: _ActiveContextGuard | None = None,
) -> list[RawRecallHit]:
    """Fetch raw recall hits from the configured LTM hook.

    Args:
        query_vector: See the function signature and surrounding type hints.
        k: See the function signature and surrounding type hints.
        active_guard: See the function signature and surrounding type hints.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    raw_hits = self._ltm_hook.search_raw(
        query_vector.tolist(),
        k=k if k is not None else self.config.ltm_recall_limit,
    )
    hits = self._validate_raw_recall_hits(raw_hits)
    self._recall_diagnostics["raw_candidates"] = [
        self._serialise_raw_recall_hit(hit)
        for hit in hits
    ]
    active_guard = active_guard or self._build_active_context_guard(record_metrics=False)
    return self._filter_raw_recall_hits_against_active_context(hits, active_guard)

get_recall_diagnostics()

Return a snapshot of the latest recall diagnostics.

Returns:

Type Description
dict[str, Any]

See the return type annotation.

Source code in dmf/memory/temporal_memory.py
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
def get_recall_diagnostics(self) -> dict[str, Any]:
    """Return a snapshot of the latest recall diagnostics.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    return {
        "raw_candidates": list(self._recall_diagnostics["raw_candidates"]),
        "ranked_candidates": list(self._recall_diagnostics["ranked_candidates"]),
        "final_candidates": list(self._recall_diagnostics["final_candidates"]),
        "suppressed": list(self._recall_diagnostics["suppressed"]),
    }

get_total_tokens()

Return the total token count across all entries in the queue.

    O(N) sum over pre-calculated integers — no tokenisation on hot path.

Returns:

Type Description
int

See the return type annotation.

Source code in dmf/memory/temporal_memory.py
588
589
590
591
592
593
594
595
596
597
598
599
def get_total_tokens(self) -> int:
    """Return the total token count across all entries in the queue.

            O(N) sum over pre-calculated integers — no tokenisation on hot path.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    return sum(entry.token_count for entry in self.queue)

periodic_cleanup()

Hard-kill sweep: evict any entry with Ω_eff < hard_kill_threshold.

    Unlike ``prune_to_budget``, this sweep applies to **all** entries
    including HEALTHY messages that have organically decayed to near-zero.
    It runs every ``config.pruning_frequency`` turns (triggered by
    ``add_interaction``).

    A HEALTHY message (Ω = 0.85) reaches the default hard-kill floor
    (0.05) at Δn ≈ 141 turns. The periodic sweep removes these stale
    entries from active memory before they accumulate and inflate the
    token budget.

    Returns
    -------
    list[MemoryEntry]
        All entries evicted in this sweep, in queue order.
Source code in dmf/memory/temporal_memory.py
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
def periodic_cleanup(self) -> list[MemoryEntry]:
    """Hard-kill sweep: evict any entry with Ω_eff < hard_kill_threshold.

            Unlike ``prune_to_budget``, this sweep applies to **all** entries
            including HEALTHY messages that have organically decayed to near-zero.
            It runs every ``config.pruning_frequency`` turns (triggered by
            ``add_interaction``).

            A HEALTHY message (Ω = 0.85) reaches the default hard-kill floor
            (0.05) at Δn ≈ 141 turns. The periodic sweep removes these stale
            entries from active memory before they accumulate and inflate the
            token budget.

            Returns
            -------
            list[MemoryEntry]
                All entries evicted in this sweep, in queue order.

    Raises:
        None.
    """
    threshold = self.config.hard_kill_threshold
    to_evict = [
        entry for entry in self.queue
        if self._omega_eff_for(entry) < threshold
    ]
    for entry in to_evict:
        self._evict(entry)
    return to_evict

prune_to_budget()

Evict lowest-priority entries until total tokens ≤ token_budget.

    Eviction policy
    ---------------
    1. Compute the full priority list via ``_get_pruning_candidates``.
    2. Remove the head of the list (oldest CRITICAL, else oldest
       UNSTABLE) from the queue.
    3. Sync ``InteractionMatrix`` by removing the same vector object.
    4. Archive the evicted entry via ``LTMHook.archive``.
    5. Repeat until ``get_total_tokens() ≤ config.token_budget`` or
       no candidates remain.

    If only HEALTHY messages remain and the budget is still exceeded,
    the loop exits without evicting them.  This is intentional —
    HEALTHY messages are protected by design.  The budget deficit is
    logged via the return value for the caller to handle.

    Returns
    -------
    list[MemoryEntry]
        All entries evicted in this call, in eviction order.
Source code in dmf/memory/temporal_memory.py
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
def prune_to_budget(self) -> list[MemoryEntry]:
    """Evict lowest-priority entries until total tokens ≤ token_budget.

            Eviction policy
            ---------------
            1. Compute the full priority list via ``_get_pruning_candidates``.
            2. Remove the head of the list (oldest CRITICAL, else oldest
               UNSTABLE) from the queue.
            3. Sync ``InteractionMatrix`` by removing the same vector object.
            4. Archive the evicted entry via ``LTMHook.archive``.
            5. Repeat until ``get_total_tokens() ≤ config.token_budget`` or
               no candidates remain.

            If only HEALTHY messages remain and the budget is still exceeded,
            the loop exits without evicting them.  This is intentional —
            HEALTHY messages are protected by design.  The budget deficit is
            logged via the return value for the caller to handle.

            Returns
            -------
            list[MemoryEntry]
                All entries evicted in this call, in eviction order.

    Raises:
        None.
    """
    evicted: list[MemoryEntry] = []
    while self.get_total_tokens() > self.config.token_budget:
        candidates = self._get_pruning_candidates()
        if not candidates:
            # Only HEALTHY entries remain; cannot evict by budget pressure.
            break
        victim = candidates[0]
        self._evict(victim)
        evicted.append(victim)
    return evicted

rerank_contextualized_recall_candidates(candidates)

Apply deterministic selection and suppression to raw recall candidates.

Parameters:

Name Type Description Default
candidates list[ContextualizedRecallCandidate]

See the function signature and surrounding type hints.

required

Returns:

Type Description
list[ContextualizedRecallCandidate]

See the return type annotation.

Source code in dmf/memory/temporal_memory.py
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
def rerank_contextualized_recall_candidates(
    self,
    candidates: list[ContextualizedRecallCandidate],
) -> list[ContextualizedRecallCandidate]:
    """Apply deterministic selection and suppression to raw recall candidates.

    Args:
        candidates: See the function signature and surrounding type hints.

    Returns:
        See the return type annotation.

    Raises:
        None.
    """
    filtered = self._filter_contextualized_recall_candidates(candidates)
    selected = self._select_contextualized_recall_candidates(filtered)
    ranked = self._rank_contextualized_recall_candidates(selected)
    topic_filtered = self._rank_contextualized_recall_candidates(
        self._filter_contextualized_candidates_by_topic_supersession(ranked)
    )
    deduped = self._dedupe_contextualized_candidates_by_same_topic_value(topic_filtered)
    memory_filtered = self._filter_contextualized_candidates_without_memory_signal(deduped)
    self._recall_diagnostics["ranked_candidates"] = [
        self._serialise_contextualized_recall_candidate(candidate)
        for candidate in memory_filtered
    ]
    final_candidates = self._limit_contextualized_recall_per_topic(memory_filtered)
    self._recall_diagnostics["final_candidates"] = [
        self._serialise_contextualized_recall_candidate(candidate)
        for candidate in final_candidates
    ]
    return final_candidates