API Reference¶
Complete API documentation for Gweta.
Core Types¶
Chunk¶
The universal chunk representation.
from gweta import Chunk
chunk = Chunk(
id="chunk-001", # Optional unique ID
text="Content here...", # Required text content
source="document.pdf", # Required source identifier
metadata={"page": 1}, # Optional metadata dict
quality_score=0.85, # Optional quality score (0-1)
)
Attributes:
| Attribute | Type | Description |
|---|---|---|
id |
str \| None |
Unique identifier |
text |
str |
Chunk content |
source |
str |
Source identifier |
metadata |
dict[str, Any] |
Arbitrary metadata |
quality_score |
float \| None |
Quality score (0.0 - 1.0) |
QualityIssue¶
Represents a single quality problem.
from gweta import QualityIssue
issue = QualityIssue(
code="LOW_DENSITY",
severity="warning",
message="Information density below threshold",
location="paragraph 2",
)
Attributes:
| Attribute | Type | Description |
|---|---|---|
code |
str |
Issue code (e.g., "LOW_DENSITY") |
severity |
Literal["error", "warning", "info"] |
Severity level |
message |
str |
Human-readable message |
location |
str \| None |
Location in chunk |
Intelligence Layer¶
SystemIntent¶
Defines what your RAG system is meant to do.
from gweta.intelligence import SystemIntent
# Create programmatically
intent = SystemIntent(
name="My Knowledge Base",
description="Answers questions about Zimbabwe business",
target_users=["graduates", "entrepreneurs"],
core_questions=[
"How do I register a business in Zimbabwe?",
"What are ZIMRA tax requirements?",
],
relevant_topics=["Zimbabwe business", "ZIMRA", "EcoCash"],
irrelevant_topics=["US regulations", "cryptocurrency"],
)
# Load from YAML
intent = SystemIntent.from_yaml("intents/my_system.yaml")
# Check if topic is irrelevant
intent.is_irrelevant_topic("Invest in Bitcoin now!") # True
Attributes:
| Attribute | Type | Description |
|---|---|---|
name |
str |
System name |
description |
str |
What the system does |
target_users |
list[str] |
Who uses the system |
core_questions |
list[str] |
Questions it should answer well |
relevant_topics |
list[str] |
Topics to include |
irrelevant_topics |
list[str] |
Topics to reject |
min_relevance_score |
float |
Accept threshold (default: 0.6) |
review_threshold |
float |
Review threshold (default: 0.4) |
Methods:
| Method | Returns | Description |
|---|---|---|
from_yaml(path) |
SystemIntent |
Load from YAML file |
from_dict(data) |
SystemIntent |
Load from dictionary |
to_yaml() |
str |
Export as YAML |
to_dict() |
dict |
Export as dictionary |
is_irrelevant_topic(text) |
bool |
Check if text contains irrelevant topics |
RelevanceFilter¶
Scores and filters chunks by semantic similarity to intent.
from gweta.intelligence import RelevanceFilter
filter = RelevanceFilter(intent=intent)
# Filter single chunk
result = filter.filter(chunk)
print(f"Score: {result.relevance_score:.2f}")
print(f"Decision: {result.decision}") # ACCEPT, REVIEW, or REJECT
# Filter batch
report = filter.filter_batch(chunks)
print(f"Accepted: {report.accepted_count}")
print(f"Rejected: {report.rejected_count}")
# Get accepted chunks with metadata
accepted = report.accepted()
Methods:
| Method | Returns | Description |
|---|---|---|
filter(chunk) |
RelevanceResult |
Filter single chunk |
filter_batch(chunks) |
RelevanceReport |
Filter multiple chunks |
score(chunk) |
float |
Get relevance score only |
RelevanceResult¶
Result of filtering a single chunk.
| Attribute | Type | Description |
|---|---|---|
chunk |
Chunk |
The chunk that was filtered |
relevance_score |
float |
Relevance score (0.0 - 1.0) |
decision |
RelevanceDecision |
ACCEPT, REVIEW, or REJECT |
matched_topics |
list[str] |
Topics found in chunk |
rejection_reason |
str \| None |
Why it was rejected |
accepted |
bool |
True if decision is ACCEPT |
needs_review |
bool |
True if decision is REVIEW |
rejected |
bool |
True if decision is REJECT |
RelevanceReport¶
Report from filtering multiple chunks.
| Attribute | Type | Description |
|---|---|---|
results |
list[RelevanceResult] |
All results |
total_chunks |
int |
Total processed |
accepted_count |
int |
Number accepted |
review_count |
int |
Number needing review |
rejected_count |
int |
Number rejected |
acceptance_rate |
float |
Accepted / total |
rejection_rate |
float |
Rejected / total |
avg_relevance_score |
float |
Average score |
Methods:
| Method | Returns | Description |
|---|---|---|
accepted() |
list[Chunk] |
Get accepted chunks with metadata |
for_review() |
list[Chunk] |
Get chunks needing review |
EmbeddingEngine¶
Wrapper for sentence-transformers.
from gweta.intelligence import EmbeddingEngine
# Default model: all-MiniLM-L6-v2
engine = EmbeddingEngine()
# Custom model
engine = EmbeddingEngine(model_name="all-mpnet-base-v2")
# Embed text
vector = engine.embed("Zimbabwe business registration")
# Batch embed
vectors = engine.embed_batch(["text1", "text2"])
# Compute similarity
similarity = engine.similarity(vector1, vector2)
Methods:
| Method | Returns | Description |
|---|---|---|
embed(text) |
np.ndarray |
Embed single text |
embed_batch(texts) |
np.ndarray |
Embed multiple texts |
similarity(v1, v2) |
float |
Cosine similarity |
similarity_to_reference(vectors, ref) |
np.ndarray |
Batch similarity |
Pipeline¶
Unified API for intent-aware ingestion.
from gweta.intelligence import Pipeline
# With store - full pipeline
pipeline = Pipeline(intent=intent, store=store)
result = await pipeline.ingest(chunks)
# Without store - filter only
pipeline = Pipeline(intent=intent, store=None)
report = pipeline.filter_only(chunks)
# Score single chunk
scores = pipeline.score_chunk(chunk)
Methods:
| Method | Returns | Description |
|---|---|---|
ingest(chunks) |
PipelineResult |
Filter and ingest to store |
filter_only(chunks) |
RelevanceReport |
Filter without storing |
score_chunk(chunk) |
dict |
Get quality and relevance scores |
PipelineResult¶
Result from pipeline ingestion.
| Attribute | Type | Description |
|---|---|---|
ingested |
int |
Chunks successfully stored |
rejected_count |
int |
Chunks rejected |
review_count |
int |
Chunks needing review |
acceptance_rate |
float |
Ingested / total |
relevance_report |
RelevanceReport |
Full relevance report |
Validation¶
ChunkValidator¶
Validates chunks for quality.
from gweta import ChunkValidator
validator = ChunkValidator(
min_length=50,
required_metadata=["source", "date"],
)
# Single chunk
result = validator.validate(chunk)
# Batch validation
report = validator.validate_batch(chunks)
Methods:
| Method | Returns | Description |
|---|---|---|
validate(chunk) |
ChunkResult |
Validate single chunk |
validate_batch(chunks) |
QualityReport |
Validate multiple chunks |
DomainRuleEngine¶
YAML-based domain validation rules.
from gweta.validate.rules import DomainRuleEngine
# Load from YAML
engine = DomainRuleEngine.from_yaml("rules/domain.yaml")
# Or create programmatically
engine = DomainRuleEngine(rules=[...], known_facts=[...])
# Validate chunk
result = engine.validate_chunk(chunk)
# Validate AI response against known facts
result = engine.validate_response(response_text)
Methods:
| Method | Returns | Description |
|---|---|---|
from_yaml(path) |
DomainRuleEngine |
Load from YAML file |
from_dict(data) |
DomainRuleEngine |
Load from dictionary |
validate_chunk(chunk) |
RuleValidationResult |
Validate chunk |
validate_response(text) |
RuleValidationResult |
Validate AI response |
add_rule(rule) |
None |
Add rule dynamically |
add_fact(fact) |
None |
Add known fact |
to_yaml() |
str |
Export as YAML |
GoldenDatasetRunner¶
Test retrieval quality with golden Q&A pairs.
from gweta.validate.golden import GoldenDatasetRunner
runner = GoldenDatasetRunner(
store=my_store,
dataset_path="golden/test.json",
)
# Run tests
report = await runner.run(k=5)
# Export results
junit_xml = runner.to_junit_xml(report)
json_output = runner.to_json(report)
Methods:
| Method | Returns | Description |
|---|---|---|
load_dataset(path) |
list[GoldenPair] |
Load from JSON |
run(k, threshold) |
GoldenTestReport |
Run all tests |
to_junit_xml(report) |
str |
Export as JUnit XML |
to_json(report) |
str |
Export as JSON |
Acquisition¶
GwetaCrawler¶
Web crawling with quality validation.
from gweta.acquire import GwetaCrawler
crawler = GwetaCrawler()
result = await crawler.crawl(
url="https://example.com",
depth=2,
follow_pdfs=True,
allowed_domains=["example.com"],
)
Methods:
| Method | Returns | Description |
|---|---|---|
crawl(url, **kwargs) |
CrawlResult |
Async crawl |
crawl_sync(url, **kwargs) |
CrawlResult |
Sync wrapper |
PDFExtractor¶
PDF text and table extraction.
from gweta.acquire import PDFExtractor
extractor = PDFExtractor()
result = await extractor.extract(
source="document.pdf",
extract_tables=True,
)
Methods:
| Method | Returns | Description |
|---|---|---|
extract(source, **kwargs) |
PDFExtractionResult |
Async extract |
extract_sync(source, **kwargs) |
PDFExtractionResult |
Sync wrapper |
DatabaseSource¶
SQL database connector.
from gweta.acquire import DatabaseSource
async with DatabaseSource(dsn="postgresql://...") as db:
result = await db.query("SELECT * FROM docs")
chunks = await db.extract_and_validate(
query="SELECT content FROM articles",
text_column="content",
)
Methods:
| Method | Returns | Description |
|---|---|---|
connect() |
None |
Open connection |
disconnect() |
None |
Close connection |
query(sql, params) |
QueryResult |
Execute query |
extract_and_validate(...) |
list[Chunk] |
Extract chunks |
Vector Stores¶
All stores implement BaseStore:
class BaseStore(ABC):
@property
def collection_name(self) -> str: ...
async def add(self, chunks: list[Chunk]) -> AddResult: ...
async def query(self, query: str, n_results: int) -> list[Chunk]: ...
async def delete(self, chunk_ids: list[str]) -> int: ...
async def get_all(self) -> list[Chunk]: ...
def get_stats(self) -> StoreStats: ...
ChromaStore¶
from gweta import ChromaStore, Chunk
# Default: Uses SentenceTransformer "all-MiniLM-L6-v2" embeddings
store = ChromaStore(collection_name="my_docs")
# With persistence
store = ChromaStore(
collection_name="my_docs",
persist_directory="./chroma_data",
)
# Add chunks (async)
chunks = [Chunk(text="Content...", source="doc.pdf", metadata={"page": 1})]
result = await store.add(chunks)
print(f"Added: {result.added}")
# Query (async) - IMPORTANT: use 'filter' not 'where'
results = await store.query(
query="search text", # Required
n_results=10, # Optional, default 10
filter={"page": 1}, # Optional metadata filter
)
# Returns: list[Chunk]
# Get stats (SYNC - no await!)
stats = store.get_stats()
print(f"Chunks: {stats.chunk_count}")
# Get all chunks (async)
all_chunks = await store.get_all()
# Delete by IDs (async)
deleted = await store.delete(["chunk-id-1", "chunk-id-2"])
Constructor Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
collection_name |
str |
required | Name of the collection |
client |
chromadb.Client |
None |
Existing ChromaDB client |
embedding_function |
EmbeddingFunction |
SentenceTransformer |
Custom embedding function |
persist_directory |
str |
None |
Directory for persistence |
use_default_embeddings |
bool |
True |
Use default embeddings if none provided |
Methods:
| Method | Signature | Returns | Async |
|---|---|---|---|
add |
(chunks: list[Chunk]) |
AddResult |
Yes |
query |
(query: str, n_results: int = 10, filter: dict = None) |
list[Chunk] |
Yes |
delete |
(chunk_ids: list[str]) |
int |
Yes |
get_all |
() |
list[Chunk] |
Yes |
get_stats |
() |
StoreStats |
No |
update |
(chunk: Chunk) |
bool |
Yes |
search_by_metadata |
(filter: dict, limit: int = 100) |
list[Chunk] |
Yes |
Common Mistakes:
# WRONG - 'where' doesn't exist
results = await store.query(query="text", where={"key": "value"})
# CORRECT - use 'filter'
results = await store.query(query="text", filter={"key": "value"})
# WRONG - get_stats is sync
stats = await store.get_stats()
# CORRECT - no await
stats = store.get_stats()
QdrantStore¶
from gweta.ingest.stores import QdrantStore
store = QdrantStore(
collection_name="my_docs",
url="http://localhost:6333",
api_key="optional-key",
)
PineconeStore¶
from gweta.ingest.stores import PineconeStore
store = PineconeStore(
index_name="my-index",
api_key="your-api-key",
namespace="optional-namespace",
)
WeaviateStore¶
from gweta.ingest.stores import WeaviateStore
store = WeaviateStore(
class_name="Document",
url="http://localhost:8080",
)
MCP Server¶
Starting the Server¶
from gweta.mcp import create_server, run_stdio, run_http
# stdio transport (for Claude Desktop)
run_stdio()
# HTTP transport
run_http(port=8080)
Available Tools¶
| Tool | Description |
|---|---|
crawl_and_ingest |
Crawl website and load to vector store |
validate_chunks |
Validate chunks without loading |
check_health |
Get KB health report |
crawl_site |
Crawl without loading |
ingest_from_database |
DB to vector store |
query_database |
Execute read-only query |
extract_pdf |
Extract PDF content |
fetch_api |
Fetch from REST API |
fetch_sitemap |
Parse sitemap |
fetch_rss |
Parse RSS/Atom feed |
Available Resources¶
| URI | Description |
|---|---|
gweta://sources |
List registered sources |
gweta://quality/{collection} |
Quality report |
gweta://rules/{domain} |
Domain rules |
gweta://config |
Current config |
CLI Commands¶
# Validate chunks
gweta validate <path> [--threshold 0.6] [--output report.json]
# Crawl website
gweta crawl <url> [--depth 2] [--output chunks.json]
# Check KB health
gweta health <collection> [--store chroma] [--golden golden.json]
# Ingest data
gweta ingest <source> <target> [--collection default]
# Start MCP server
gweta serve [--transport stdio|http] [--port 8080]
Configuration¶
Environment Variables¶
| Variable | Default | Description |
|---|---|---|
GWETA_MIN_QUALITY_SCORE |
0.6 |
Minimum quality threshold |
GWETA_MIN_DENSITY_SCORE |
0.3 |
Minimum density threshold |
GWETA_LOG_LEVEL |
INFO |
Logging level |