diff --git a/misc/gliner2_modal_demo.py b/misc/gliner2_modal_demo.py new file mode 100644 index 000000000..60b88e43b --- /dev/null +++ b/misc/gliner2_modal_demo.py @@ -0,0 +1,69 @@ +# # GLiNER2 on Modal +# +# [GLiNER2](https://github.com/fastino-ai/GLiNER2) structured extraction (`extract_json`) and +# classification (`classify_text`) on one string. From the repo root: +# `modal run misc/gliner2_modal_demo.py` + +import json + +import modal + +APP_NAME = "example-gliner2-modal" +MODEL_ID = "fastino/gliner2-base-v1" +DEFAULT_TEXT = ( + "Tim Cook unveiled iPhone 15 Pro for $999 in Cupertino; " + "reviewers praised the titanium design but criticized battery life." +) +SCHEMA = { + "announcement": [ + "company::str", + "person::str", + "product::str", + "price::str", + "location::str", + ] +} +CLS = {"sentiment": ["positive", "negative", "neutral", "mixed"]} + +app = modal.App(APP_NAME) +image = modal.Image.debian_slim(python_version="3.11").pip_install( + "gliner2", + "torch>=2.0.0", + "transformers>=4.51.3,<5.2.0", + "huggingface_hub>=0.21.4", + "tqdm", + "sentencepiece", + "onnxruntime", + "requests", + "urllib3", + "certifi", + "charset-normalizer", + "idna", + "safetensors", + "tokenizers", + "filelock", + "packaging", +) + + +@app.cls(image=image, cpu=2.0, memory=2048) +class GLiNERService: + @modal.enter() + def load(self): + from gliner2 import GLiNER2 + + self.model = GLiNER2.from_pretrained(MODEL_ID) + + @modal.method() + def analyze(self, text: str) -> dict: + m = self.model + return { + "structured": m.extract_json(text, SCHEMA), + "classification": m.classify_text(text, CLS, include_confidence=True), + } + + +@app.local_entrypoint() +def main(text: str = DEFAULT_TEXT): + print("Input:", text) + print(json.dumps(GLiNERService().analyze.remote(text), indent=2)) diff --git a/misc/kafka_microbatch_etl.py b/misc/kafka_microbatch_etl.py new file mode 100644 index 000000000..f597c811d --- /dev/null +++ b/misc/kafka_microbatch_etl.py @@ -0,0 +1,72 @@ +# --- +# cmd: ["python", "misc/kafka_microbatch_etl.py", "--batch=25", "--timeout-s=5", "--local=true"] +# runtimes: ["runc", "gvisor"] +# --- +# +# # Kafka micro-batch ETL (bounded) +# +# Polls up to N Kafka messages (or time limit), applies a tiny transform, +# POSTs the batch to a REST sink, then exits. +# +# Intended for ETL/backfills/periodic jobs — NOT continuous stream processing +# (e.g. Flink / Kafka Streams). This example is intentionally single-worker. + +import json +import os +import time + +import modal +import requests +from confluent_kafka import Consumer + +image = modal.Image.debian_slim().pip_install("confluent-kafka", "requests") +app = modal.App( + "kafka-microbatch-etl", + image=image, + secrets=[modal.Secret.from_name("kafka-etl-remote-v2")], +) + + +@app.function() +def etl(batch: int = 100, timeout_s: int = 5): + c = Consumer( + { + "bootstrap.servers": os.environ["KAFKA_BOOTSTRAP"], + "group.id": os.getenv("KAFKA_GROUP", "modal-microbatch"), + "auto.offset.reset": "earliest", + "enable.auto.commit": False, # keep example safe/simple + # Confluent Cloud auth: + "security.protocol": "SASL_SSL", + "sasl.mechanism": "PLAIN", + "sasl.username": os.environ["KAFKA_API_KEY"], + "sasl.password": os.environ["KAFKA_API_SECRET"], + } + ) + c.subscribe([os.environ["KAFKA_TOPIC"]]) + + rows, end = [], time.time() + timeout_s + while len(rows) < batch and time.time() < end: + m = c.poll(0.5) + if not m or m.error(): + continue + rows.append( + { + "ts": m.timestamp()[1], + "payload": json.loads(m.value().decode("utf-8")), + } + ) + c.close() + + if rows: + url = os.getenv("SINK_URL", "https://httpbin.org/post") + requests.post( + url, json={"count": len(rows), "rows": rows}, timeout=10 + ).raise_for_status() + return {"sent": len(rows)} + + +@app.local_entrypoint() +def main(batch: int = 100, timeout_s: int = 5, local: bool = False): + fn = etl.local if local else etl.remote + print(fn(batch=batch, timeout_s=timeout_s)) +