Writing an EnricherPlugin for Recall
Recall's plugin system has one constraint that surprises most people when they first read about it: all plugins are compiled into the binary. There is no dynamic loading, no plugin marketplace, no WASM sandbox, no FFI bridge. If you want to extend Recall's behavior, you implement a Rust trait, register it in PluginRegistry at startup, and ship a new binary.
This is deliberate. Every running Recall instance is fully auditable — you can inspect the binary, check its dependency tree, and know with certainty what code is executing in your memory pipeline. There's no runtime surprise from a dynamically loaded plugin that wasn't reviewed, no version drift between plugin and host, no sandbox escape surface. The tradeoff is that adding a plugin requires a compile step. For a system that handles memory — which is to say, stores and returns information about your users — we decided that audibility was worth the deployment friction.
The two plugin types
Recall defines two extension traits, each with a different role:
EnricherPlugin — runs inside the write pipeline, after entity resolution and before deduplication. Receives a mutable slice of candidate memories for the current turn batch. Can add tags, adjust confidence, filter candidates, or attach external metadata. This is the one that ships today.
ConnectorPlugin — pulls external data into the write pipeline on a schedule. The trait is defined and the interface is stable; the built-in implementations (Google Drive, GitHub, Notion, Slack) are not yet shipped. The trait exists so you can implement it for your internal systems now.
Where in the pipeline enrichers run
pre_filter → extract → resolve_refs → [ENRICHERS HERE] → dedupe → conflict → persistThe position is specific and intentional. After resolve_refs, every candidate memory has a stable EntityId for its subject — not a pronoun, not an alias, not "my manager," but the canonical ent_priya_XYZ that persists across sessions. You know exactly who each memory is about.
Before dedupe, the candidate set hasn't been collapsed yet. Near-duplicate candidates are still separate items. This means you can:
- Add tags that affect how deduplication scores candidates against existing memories
- Boost or reduce confidence on candidates based on external signals, changing which ones survive the conflict-detection phase
- Filter candidates outright if your domain rules say they shouldn't be stored
If enrichers ran before entity resolution, you'd receive unresolved subjects. If they ran after dedup, you'd see fewer candidates (the interesting borderline cases would already be merged or dropped). The position gives you the richest input at the moment when enrichment is most useful.
The trait hierarchy
Every plugin implements RecallPlugin as a base:
pub trait RecallPlugin: Send + Sync {
fn plugin_id(&self) -> &'static str;
fn plugin_name(&self) -> &'static str;
fn initialize(&self, config: &serde_json::Value) -> PluginResult<()> {
Ok(()) // default: no-op initialization
}
fn on_write(&self, outcome: &WriteOutcome) {} // fire-and-forget after persist
fn shutdown(&self) {}
}
pub type PluginResult<T> = Result<T, Box<dyn Error + Send + Sync>>;EnricherPlugin extends it:
pub trait EnricherPlugin: RecallPlugin {
fn enrich(
&self,
candidates: &mut Vec<CandidateMemory>,
scope: &Scope,
) -> PluginResult<()>;
}The enrich method receives candidates by mutable reference. You iterate, modify in place, and return. All mutations are visible to the next enricher in the chain (enrichers run in registration order).
PluginResult<T> uses boxed errors, not anyhow — this is intentional. Plugins are third-party code; using anyhow would force a specific error handling strategy on plugin authors. Boxed errors are more flexible and work with any error type that implements std::error::Error.
A complete working example: domain ontology tagger
Suppose your users are software engineers, and your codebase has a well-defined ontology: certain entity IDs are products, certain ones are teams, certain ones are external vendors. You want this classification added to memory candidates before they're stored, both for retrieval filtering and for downstream analytics.
Here's a complete implementation:
use std::collections::HashMap;
use std::sync::Arc;
use recall_core::plugins::{EnricherPlugin, RecallPlugin, PluginResult};
use recall_core::{CandidateMemory, Scope};
pub struct OntologyEnricher {
entity_classes: Arc<HashMap<String, EntityClass>>,
}
#[derive(Clone)]
pub enum EntityClass {
Product,
InternalTeam,
ExternalVendor,
Person,
}
impl EntityClass {
fn as_tag(&self) -> &'static str {
match self {
EntityClass::Product => "domain:product",
EntityClass::InternalTeam => "domain:team",
EntityClass::ExternalVendor => "domain:vendor",
EntityClass::Person => "domain:person",
}
}
fn confidence_boost(&self) -> f32 {
match self {
EntityClass::Product => 0.05, // products are stable, boost confidence
EntityClass::Person => 0.0, // no adjustment
_ => 0.02,
}
}
}
impl RecallPlugin for OntologyEnricher {
fn plugin_id(&self) -> &'static str { "domain-ontology" }
fn plugin_name(&self) -> &'static str { "Domain Ontology Tagger" }
fn initialize(&self, config: &serde_json::Value) -> PluginResult<()> {
// config["ontology_path"] → load entity class map from file or DB
// This example assumes entity_classes is pre-populated at construction
Ok(())
}
}
impl EnricherPlugin for OntologyEnricher {
fn enrich(
&self,
candidates: &mut Vec<CandidateMemory>,
_scope: &Scope,
) -> PluginResult<()> {
for candidate in candidates.iter_mut() {
if let Some(subject) = &candidate.subject {
if let Some(class) = self.entity_classes.get(subject.as_str()) {
candidate.tags.push(class.as_tag().to_string());
// Clamp to [0.0, 1.0] after adjustment
candidate.confidence_adjustment +=
class.confidence_boost();
}
}
}
Ok(())
}
}Register it at startup:
let mut registry = PluginRegistry::new();
let ontology = Arc::new(OntologyEnricher {
entity_classes: Arc::new(load_ontology_from_config(&config)?),
});
registry.register_enricher(Arc::new(ontology));
// Pass registry to write pipeline
let write_pipeline = WritePipeline::new(config, registry, storage, llm);Enable in configuration:
plugins:
enrichers:
- id: domain-ontology
config:
ontology_path: /etc/recall/ontology.jsonAfter this enricher runs, every candidate memory about a product entity will have tags: ["domain:product"] and a confidence 0.05 higher than the extractor assigned. In retrieval, you can filter by these tags:
const results = await recall.search({
query: "what decisions did we make about the API?",
scope,
filter: { tags: ["domain:product"] }
});The fail-open contract
If enrich() returns an Err, the write pipeline does not abort. The error is logged at warn level, the candidate vector is unchanged (or partially changed if the enricher modified it before failing), and the pipeline continues to deduplication.
This is the fail-open contract: a failing enricher does not block memory writes.
The implication is that enrichers must not have side effects that require rollback. Don't make external writes inside enrich() and expect them to be reversed if the enricher fails later. Don't assume that if enrich() returns Ok(()), the memory will definitely be stored — downstream stages (dedup, conflict, persist) can still fail. The enricher is a best-effort decoration pass, not a transactional participant.
If an enricher fails consistently, the warn logs will accumulate. There's no built-in circuit breaker for enrichers — the assumption is that enricher failure is rare and represents a bug, not an expected operational state. Monitor plugin errors in your log aggregation.
Running I/O inside an enricher
All plugin methods are synchronous. If your enricher needs to make HTTP calls (for example, looking up entity metadata from an external API), you need to block the current thread inside enrich(). In Tokio, that means using tokio::task::block_in_place:
fn enrich(&self, candidates: &mut Vec<CandidateMemory>, scope: &Scope) -> PluginResult<()> {
let entity_ids: Vec<_> = candidates.iter()
.filter_map(|c| c.subject.as_deref())
.collect();
// Block the current thread to make an async HTTP call
let classifications = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(
self.external_api.classify_entities(&entity_ids)
)
})?;
for (candidate, classification) in candidates.iter_mut().zip(classifications) {
candidate.tags.push(classification.tag);
}
Ok(())
}This works but adds latency to every write pipeline execution. For high-throughput namespaces, consider caching external lookups in the enricher's state. The enricher is Arc<dyn EnricherPlugin> — it's shared across write pipeline threads, so internal state must be Send + Sync. Arc<Mutex<HashMap<...>>> or DashMap are good choices for a thread-safe cache.
For heavy I/O that would genuinely slow down the write path, consider doing the enrichment asynchronously in the background worker instead. The on_write hook fires after persist completes and is fire-and-forget — you can use it to trigger a background enrichment job that updates the memory's tags after the fact.
ConnectorPlugin: external data ingestion
The ConnectorPlugin trait defines how external data sources feed into the write pipeline:
pub trait ConnectorPlugin: RecallPlugin {
fn fetch(
&self,
scope: &Scope,
since: Option<DateTime<Utc>>,
) -> PluginResult<Vec<ConnectorItem>>;
}
pub struct ConnectorItem {
pub content: String,
pub source_uri: Option<String>,
pub title: Option<String>,
}The background worker polls registered connectors on a schedule, passes the fetched items through the write pipeline (extract → resolve → dedupe → conflict → persist), and records the source_uri in the memory's provenance.
Built-in implementations for Google Drive, GitHub, Notion, and Slack are on the roadmap. If you need any of these today, implementing ConnectorPlugin for your internal data source is straightforward — the fetch() method is simple: return a list of items with content and optional metadata.
Example: a Notion connector for your company wiki:
pub struct NotionConnector {
client: Arc<NotionApiClient>,
page_ids: Vec<String>,
}
impl ConnectorPlugin for NotionConnector {
fn fetch(&self, scope: &Scope, since: Option<DateTime<Utc>>) -> PluginResult<Vec<ConnectorItem>> {
let pages = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(
self.client.fetch_pages_modified_since(&self.page_ids, since)
)
})?;
Ok(pages.into_iter().map(|p| ConnectorItem {
content: p.plain_text,
source_uri: Some(format!("notion://pages/{}", p.id)),
title: Some(p.title),
}).collect())
}
}The worker calls fetch() on each connector, packages the items as Turn structs (with the source_uri as the turn's origin), and sends them through the standard write pipeline. From the pipeline's perspective, a Notion page is just another turn to extract from.
When not to use a plugin
Plugins solve a specific problem: extending Recall's behavior from inside the write pipeline with domain-specific logic. They're the wrong tool for several related problems:
Document ingestion (PDFs, HTML, Word) — extract text upstream using Docling, LlamaParse, or similar tools, then pass the extracted plain text to remember(). Recall's write pipeline expects conversational turn text, not raw document formats. Trying to handle document parsing inside a plugin puts the wrong complexity in the wrong place.
Custom memory types — the five types (fact, preference, event, entity, relation) are fixed in the data model and stored schema. Plugins cannot add new types. If your domain needs a sixth type, the right path is to use tags to subdivide one of the existing types, or to open an issue discussing schema extension.
Backpressure-sensitive I/O — if your enricher calls an external API that can be slow or unavailable, and you need proper backpressure handling (rate limiting, circuit breaking, retry with exponential backoff), the synchronous plugin interface is awkward. Consider fetching external data in a separate service and caching the results in the enricher's in-process state, refreshed on a schedule.
Complex read-time logic — there is no on_read plugin hook today. Read-time plugins (for re-ranking, post-processing, or formatting) are deferred until enricher use cases are validated. If you need read-time customization, use the explain: true retrieval API to understand what scores are being assigned, and adjust retrieval configuration rather than trying to hook into the retrieval path.
What's coming
The roadmap includes:
- Built-in
ConnectorPluginimplementations (Google Drive, GitHub, Notion, Slack) - An
on_readplugin hook for read-time post-processing - A plugin configuration validator that catches mismatches between plugin requirements and runtime config at startup, rather than at first call
The compile-time constraint is not going away. If you need a use case that genuinely requires dynamic plugin loading — for example, multiple teams shipping plugins independently without coordinating binary builds — the right answer is to run separate Recall instances with different build configurations, not to add dynamic loading to a single instance.
Plugin feature flags: the pdf-extract and scraper crates for document preprocessing are behind Cargo feature flags (off by default) to avoid pulling heavy dependencies into every binary. Enable them in your Cargo.toml only if your enricher needs them.
Chaining multiple enrichers
Multiple enrichers can be registered and run sequentially. Each enricher in the chain sees the mutations of all previous enrichers — a tag added by enricher A is present in candidate.tags when enricher B runs. This makes composition natural: a domain classifier runs first, and a confidence adjuster runs second with access to the domain classification. The two concerns stay separate in code while cooperating at runtime.
let mut registry = PluginRegistry::new();
// Order matters: ontology tagger runs first
registry.register_enricher(Arc::new(OntologyEnricher {
entity_classes: Arc::new(load_ontology()?),
}));
// Confidence adjuster runs second, sees tags from ontology tagger
registry.register_enricher(Arc::new(ConfidenceAdjuster {
rules: vec![
// Boost confidence for product-domain entities
AdjustmentRule {
requires_tag: "domain:product",
confidence_delta: 0.08,
},
// Reduce confidence for vendor-domain memories (vendors change faster)
AdjustmentRule {
requires_tag: "domain:vendor",
confidence_delta: -0.05,
},
],
}));The ConfidenceAdjuster implementation reads candidate.tags to discover what earlier enrichers produced:
impl EnricherPlugin for ConfidenceAdjuster {
fn enrich(&self, candidates: &mut Vec<CandidateMemory>, _scope: &Scope) -> PluginResult<()> {
for candidate in candidates.iter_mut() {
for rule in &self.rules {
if candidate.tags.contains(&rule.requires_tag.to_string()) {
candidate.confidence_adjustment += rule.confidence_delta;
}
}
// Clamp to valid range after all adjustments
candidate.confidence_adjustment = candidate.confidence_adjustment.clamp(-1.0, 1.0);
}
Ok(())
}
}The clamping step inside ConfidenceAdjuster is essential when multiple enrichers apply adjustments independently. Without it, two enrichers each adding 0.08 could push a starting adjustment of 0.9 to 1.06 — a value that exceeds the valid range and gets silently truncated at the persist stage rather than at the enricher that caused the overflow. Clamping at the end of every enricher that modifies confidence_adjustment keeps the invariant visible in the code that breaks it.
Registration order is the execution order. There is no dependency declaration system — if enricher B depends on enricher A's output, register A first. The absence of a formal dependency graph is intentional: enricher chains that require complex ordering are a signal that the logic belongs somewhere else (a single enricher with richer internal state, or a pre-processing step outside the plugin system). Document inter-enricher dependencies explicitly as a comment in the registration block, not in the enricher implementation itself, so the dependency is visible at the call site.
One subtlety: if an enricher returns an Err, the chain continues from the next enricher, not the beginning. The failed enricher's partial mutations (any modifications it made before the error) remain on the candidate vector. If your enricher makes partial modifications and then encounters an error, prefer reverting your changes before returning the error rather than leaving the vector in an inconsistent state. Reversions are straightforward when each enricher processes candidates independently; they're harder when an enricher's logic reads the modifications it's already made.
Testing enrichers
Enricher unit tests do not require a running Recall instance or a real database. The enrich() method takes a Vec<CandidateMemory> by mutable reference — you can construct candidate memories directly in test code, call enrich(), and assert on the mutations. This makes enrichers among the most straightforward components to test in the entire write pipeline.
#[cfg(test)]
mod tests {
use super::*;
use recall_core::{CandidateMemory, EntityId, MemoryType};
fn make_candidate(subject: &str) -> CandidateMemory {
CandidateMemory {
subject: Some(EntityId::from_str(subject).unwrap()),
memory_type: MemoryType::Fact,
content: "test content".to_string(),
confidence_adjustment: 0.0,
tags: vec![],
// ... other fields with defaults
..Default::default()
}
}
#[test]
fn test_ontology_tagger_adds_correct_tag() {
let mut entity_classes = HashMap::new();
entity_classes.insert("ent_recall_api".to_string(), EntityClass::Product);
let enricher = OntologyEnricher {
entity_classes: Arc::new(entity_classes),
};
let mut candidates = vec![
make_candidate("ent_recall_api"),
make_candidate("ent_unknown_entity"),
];
enricher.enrich(&mut candidates, &Scope::default()).unwrap();
assert!(candidates[0].tags.contains(&"domain:product".to_string()));
assert_eq!(candidates[0].confidence_adjustment, 0.05);
assert!(candidates[1].tags.is_empty());
assert_eq!(candidates[1].confidence_adjustment, 0.0);
}
#[test]
fn test_enricher_returns_ok_on_empty_candidates() {
let enricher = OntologyEnricher { entity_classes: Arc::new(HashMap::new()) };
let mut candidates = vec![];
let result = enricher.enrich(&mut candidates, &Scope::default());
assert!(result.is_ok());
}
}Three test cases every enricher should have: (1) the happy path, where a candidate matching your enricher's criteria receives the expected mutation; (2) a non-matching candidate, which should be untouched; (3) an empty candidate vector, which should return Ok(()) without panicking. The empty-vector case is surprisingly easy to miss and surprisingly common in production — deduplication upstream occasionally produces zero candidates for a given turn.
For edge cases involving the Scope parameter: most enrichers today ignore scope and should be tested with Scope::default(). If your enricher uses scope.user_id or scope.agent_id to look up user-specific configuration, write parameterized tests that vary the scope and verify the enricher behavior changes accordingly. Scope-dependent enrichers are harder to reason about in production; make the dependency explicit in the test coverage.
Integration tests need a database. recall_core::testing::TestNamespace spins up an in-memory SQLite instance with a fresh schema and returns a fully configured RecallClient that you can use exactly like a production client:
#[tokio::test]
async fn test_enricher_in_full_pipeline() {
let (recall, _guard) = TestNamespace::setup().await;
recall.register_enricher(Arc::new(OntologyEnricher {
entity_classes: Arc::new(test_ontology()),
}));
let result = recall.remember(vec![
Turn::user("I'm working on the Recall API product today"),
], test_scope()).await.unwrap();
let memories = recall.listMemories(test_scope()).await.unwrap();
let product_memories: Vec<_> = memories.iter()
.filter(|m| m.tags.contains(&"domain:product".to_string()))
.collect();
assert!(!product_memories.is_empty());
}The _guard variable holds the TestNamespace cleanup handle — drop it and the in-memory database is torn down. Assign it to _ (single underscore) and the Rust compiler will drop it immediately at the end of the binding statement, before your assertions run. Always use a named binding with a double underscore prefix to keep it live for the test's duration.
One common integration test mistake: asserting on the exact number of memories extracted rather than the presence of a particular tag. Extraction counts vary with model behavior and test conversation content. Assert on structure (the tag is present, the confidence adjustment is in the expected range) rather than counts.
Monitoring enricher performance
Enrichers add latency to the synchronous write pipeline. Unlike the LLM extraction call — which is the dominant latency contributor at 200–800ms — a well-written enricher should be nearly invisible: under 2ms for pure in-process logic, under 10ms when it makes a single cached lookup. The pipeline traces each enricher's execution time in its distributed trace, accessible via the trace API:
const trace = await recall.getTrace(traceId);
const enricherSpans = trace.spans.filter(s => s.name.startsWith("enricher."));
// [
// { name: "enricher.domain-ontology", duration_ms: 1.2, error: null },
// { name: "enricher.confidence-adjuster", duration_ms: 0.3, error: null }
// ]If an enricher consistently takes more than 5ms on production-scale candidate batches, profile the enrich() method. The most common performance issue is an O(N×M) algorithm where N is the number of candidates per turn and M is the size of the enricher's lookup table. The domain ontology example above does a HashMap::get() per candidate — O(1) per lookup, O(N) total. An implementation that instead does a linear scan over the entity_classes map for each candidate is O(N×M), which becomes noticeable when M is in the thousands. The fix is always to restructure the lookup as a HashMap keyed on entity ID.
The second most common performance issue is unnecessary cloning. If your enricher constructs a new String or Vec for every candidate, even when most candidates don't match any enrichment rule, you're allocating on the hot path. Use .contains() before allocating; push into the existing tags vec rather than constructing a new one.
Enricher error rates are surfaced in Prometheus metrics:
recall_plugin_errors_total{plugin_id="domain-ontology", error_type="timeout"} 0
recall_plugin_errors_total{plugin_id="domain-ontology", error_type="external_api"} 12A steady stream of external_api errors means the enricher's external dependency is flaky. The fail-open contract means these errors don't block writes, but they do mean the affected memories are stored without your enricher's tags — a silent quality degradation that accumulates over time. Implement a local in-process cache with a short TTL to absorb transient failures:
pub struct CachingOntologyEnricher {
entity_classes: Arc<HashMap<String, EntityClass>>,
external_api: Arc<OntologyApiClient>,
cache: Arc<DashMap<String, EntityClass>>,
cache_ttl: Duration,
}
impl EnricherPlugin for CachingOntologyEnricher {
fn enrich(&self, candidates: &mut Vec<CandidateMemory>, _: &Scope) -> PluginResult<()> {
for candidate in candidates.iter_mut() {
if let Some(subject) = &candidate.subject {
let class = self.cache.get(subject.as_str())
.map(|e| e.value().clone())
.or_else(|| {
// Fallback to static config when cache misses
self.entity_classes.get(subject.as_str()).cloned()
});
if let Some(c) = class {
candidate.tags.push(c.as_tag().to_string());
}
}
}
Ok(())
}
}Cache a DashMap rather than a Mutex<HashMap> for the concurrent access pattern that enrichers see — multiple write pipeline threads may call enrich() simultaneously, since the Arc<dyn EnricherPlugin> is shared. DashMap is sharded and avoids the lock contention that Mutex<HashMap> introduces under concurrent reads. A cache TTL of 5–10 minutes is appropriate for most domain ontologies: short enough that a newly classified entity shows up within the hour, long enough to absorb external API blips that last tens of seconds.
Alert on recall_plugin_errors_total by plugin_id. If any plugin's error rate exceeds 1% over a 5-minute window, investigate before the next deployment. A 1% error rate on an enricher processing 10,000 memories per day means 100 memories per day are missing their enrichment tags — small in absolute terms but compounding as the namespace grows.
Using the on_write hook for async enrichment
The on_write hook on RecallPlugin fires after persist completes. It has no return value and no error propagation — errors from on_write are logged at warn level and discarded. This makes it the right place for enrichment work that's too expensive for the synchronous write path: network calls to slow external APIs, batch classification requests, heavy computation over the full stored memory rather than just the candidate.
impl RecallPlugin for SlowEnricher {
fn plugin_id(&self) -> &'static str { "slow-enricher" }
fn plugin_name(&self) -> &'static str { "Slow Background Enricher" }
fn on_write(&self, outcome: &WriteOutcome) {
// This runs after persist; memory is already in the store
let memory_ids = outcome.stored_memory_ids.clone();
let client = self.slow_api_client.clone();
let storage = self.storage.clone();
// Spawn a background task — do not block the write pipeline
tokio::spawn(async move {
for memory_id in memory_ids {
if let Ok(classification) = client.classify(memory_id.as_str()).await {
// Update the memory's tags after the fact
let _ = storage.update_tags(
&memory_id,
&[classification.tag],
"slow-enricher"
).await;
}
}
});
}
}The WriteOutcome struct contains the IDs of memories that were actually persisted (not deduplicated away or filtered). This is an important distinction from the candidate vector that enrich() receives: on_write only fires for memories that made it all the way through dedup, conflict resolution, and persist. You don't process candidates that were merged into existing memories. This means on_write sees a smaller, more final set of memory IDs than enrich() sees of candidates.
The critical limitation of on_write enrichment: memories are retrievable before the background enrichment completes. From the moment persist returns, the new memories are in the index and searchable. If another request arrives within the first few seconds after persist, it may retrieve those memories without the tags your on_write enricher is adding in the background. For retrieval queries that filter on tags (filter: { tags: ["domain:classified"] }), this creates a temporary blind spot: the new memory exists but doesn't yet have the tag that would include it in the query.
Design retrieval to tolerate this window. The recommended pattern for tag-filtered retrieval where on_write enrichment is involved: use a union query that returns memories matching the tag OR memories created recently (within the last 60 seconds) that don't yet have the tag. The recent-memory fallback gives the background enricher time to complete while still returning new memories to users who query immediately after writing.
const results = await recall.search({
query: "recent product decisions",
scope,
filter: {
operator: "OR",
conditions: [
{ tags: ["domain:product"] },
{ created_after: Date.now() - 60_000, tags_absent: ["domain:product"] }
]
}
});The 60-second window is a conservative upper bound for background enrichment latency on a healthy external API. If your slow API occasionally takes longer, extend the window. Monitor the actual p99 of your on_write task completion time and set the window to 2× that value.
A second limitation worth noting: on_write runs in the Tokio runtime's task executor. Spawning a task with tokio::spawn is cheap but not free — at very high write throughput (thousands of memories per second), a large number of background tasks can accumulate in the executor queue. If your on_write enricher's external API starts backing up, the number of queued tasks grows without bound. Implement a semaphore to cap concurrent background tasks:
fn on_write(&self, outcome: &WriteOutcome) {
let memory_ids = outcome.stored_memory_ids.clone();
let client = self.slow_api_client.clone();
let storage = self.storage.clone();
let sem = Arc::clone(&self.concurrency_limit); // Arc<Semaphore>
tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
for memory_id in memory_ids {
if let Ok(classification) = client.classify(memory_id.as_str()).await {
let _ = storage.update_tags(&memory_id, &[classification.tag], "slow-enricher").await;
}
}
// _permit drops here, releasing the semaphore slot
});
}A semaphore with 20–50 permits is appropriate for most production workloads. Size it based on your external API's concurrency limit, not your write throughput — the semaphore protects the external dependency, not the Tokio executor.