Help me run donto embedding worker(s) on this machine. I may run this exact same prompt on SEVERAL machines — that is fine and expected: the coordinator hands every worker a disjoint slice of work (Postgres FOR UPDATE SKIP LOCKED), machines never overlap, and if a machine stops mid-batch its work auto-returns to the queue in ~15 minutes. Nothing is ever lost or embedded twice. WHAT THIS IS: donto (donto.org) needs millions of short texts turned into 384-dim vectors with the small model BAAI/bge-small-en-v1.5 — ~1.2M conversation chunks/claims + ~2.8M entities are queued behind one 4-core server, and a live benchmark run (BEAM-10M) is literally waiting on these vectors. A worker just loops: lease texts over HTTPS -> embed locally -> submit vectors back. It never touches a database or my files, needs no inbound ports and no account; the only secret is one bearer token; I can stop it at any time. COORDINATOR: https://donto.org/embed TOKEN: 80f8ffdbee931ae49b10b500263044f4674e94310ae4fefe STEP 1 — reachability (no token needed): curl -A curl/8 https://donto.org/embed/health Expect: {"ok": true, "targets": ["memory_chunk", "predicate", "entity"]} NOTE: Cloudflare blocks python-urllib's default User-Agent — everything that talks to the coordinator must send a curl-style UA. The worker below already does. STEP 2 — detect hardware, choose ONE path: Run nvidia-smi. If it lists a GPU -> GPU PATH (one GPU ~ 50-100 CPU cores here). Otherwise -> CPU PATH. STEP 3 — save this as worker.py (same file works for both paths): import json, os, socket, time, urllib.request URL = os.environ.get("DONTO_EMBED_URL", "https://donto.org/embed").rstrip("/") TOKEN = os.environ["DONTO_EMBED_TOKEN"] WID = os.environ.get("EMBED_WORKER_ID") or f"worker-{socket.gethostname()}-{os.getpid()}" N = int(os.environ.get("EMBED_N", "256")) # items per lease (server caps at 1024) BS = int(os.environ.get("EMBED_BATCH", "256")) # model batch size TARGET = os.environ.get("EMBED_TARGET") or None # memory_chunk|predicate|entity|unset=any CUDA = os.environ.get("EMBED_CUDA") == "1" IDLE = float(os.environ.get("EMBED_IDLE_SLEEP", "5")) from fastembed import TextEmbedding if CUDA: import onnxruntime as ort assert "CUDAExecutionProvider" in ort.get_available_providers(), ( "CUDA provider missing - install fastembed-gpu (+ nvidia-cudnn-cu12), or unset EMBED_CUDA") def post(path, body, timeout=300): req = urllib.request.Request(URL + path, data=json.dumps(body).encode(), headers={"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json", "User-Agent": "curl/8.5.0"}, method="POST") # Cloudflare blocks python-urllib with urllib.request.urlopen(req, timeout=timeout) as r: return json.loads(r.read()) _m = {} def model_for(name): if name not in _m: kw = {"model_name": name} if CUDA: kw["providers"] = ["CUDAExecutionProvider", "CPUExecutionProvider"] _m[name] = TextEmbedding(**kw) return _m[name] _lr = 0.0 def report_err(kind, msg): # best-effort error telemetry, max 1/min global _lr if time.time() - _lr < 60: return _lr = time.time() try: post("/report", {"worker_id": WID, "kind": kind, "message": str(msg)[:1500]}, timeout=20) except Exception: pass total, win, errs = 0, [], 0 print(f"[{WID}] -> {URL} target={TARGET or 'any'} n={N} batch={BS} cuda={CUDA}", flush=True) while True: try: batch = post("/lease", {"worker_id": WID, "n": N, "target": TARGET}).get("batch", []) errs = 0 except Exception as e: errs += 1; print(f"[{WID}] lease error: {e}", flush=True) report_err("lease", e); time.sleep(min(IDLE * errs, 60)); continue if not batch: time.sleep(IDLE); continue by = {} for b in batch: by.setdefault(b["model"], []).append(b) items = [] for mn, g in by.items(): for x, v in zip(g, model_for(mn).embed([t["text"] for t in g], batch_size=BS)): items.append({"target": x["target"], "item_id": x["item_id"], "vector": [float(z) for z in v]}) try: r = post("/submit", {"worker_id": WID, "items": items}) n = int(r.get("upserted", 0)); total += n now = time.time(); win.append((now, n)); win = [(t, c) for t, c in win if now - t < 60] print(f"[{WID}] +{n} (total {total}, ~{sum(c for _, c in win)}/min)", flush=True) errs = 0 except Exception as e: errs += 1; print(f"[{WID}] submit error: {e}", flush=True) report_err("submit", e); time.sleep(min(IDLE * errs, 60)) STEP 4 — GPU PATH (skip if no GPU): python3 -m venv ~/.donto-embed && source ~/.donto-embed/bin/activate pip uninstall -y fastembed 2>/dev/null; pip install -U fastembed-gpu python -c "import onnxruntime as o; print(o.get_available_providers())" -> must include CUDAExecutionProvider. If it does NOT: pip install nvidia-cudnn-cu12 nvidia-cublas-cu12 and re-check; if still missing, tell me what you found and use the CPU PATH meanwhile. Run TWO processes (they share the GPU and overlap network I/O): export DONTO_EMBED_TOKEN=80f8ffdbee931ae49b10b500263044f4674e94310ae4fefe EMBED_CUDA=1 EMBED_N=1024 EMBED_BATCH=512 EMBED_WORKER_ID=gpu1-$(hostname) \ nohup python worker.py > ~/donto-w1.log 2>&1 & EMBED_CUDA=1 EMBED_N=1024 EMBED_BATCH=512 EMBED_WORKER_ID=gpu2-$(hostname) \ nohup python worker.py > ~/donto-w2.log 2>&1 & If the GPU runs out of memory, halve EMBED_BATCH (the model itself is ~130 MB). STEP 5 — CPU PATH (no GPU): pip install fastembed export DONTO_EMBED_TOKEN=80f8ffdbee931ae49b10b500263044f4674e94310ae4fefe for i in $(seq $(nproc)); do EMBED_WORKER_ID=w$i-$(hostname) nohup python worker.py > ~/donto-w$i.log 2>&1 & done (~0.3-0.5 GB RAM per process — if the machine swaps, run fewer.) Zero-setup alternative (CPU only, one worker/core): docker run --rm -e DONTO_EMBED_TOKEN=80f8ffdbee931ae49b10b500263044f4674e94310ae4fefe \ ghcr.io/thomasdavis/donto-embed-worker STEP 6 — VERIFY it is really working (do not skip — "started" is not "working"): 1. Within ~2 min the logs print lines like "+1024 (total 4096, ~3100/min)". Tell me the steady ~N/min figure. (Repeated lease/submit errors are auto-reported to the coordinator — throttled to 1/min — so the operator can see them; the worker also backs off exponentially, but still tell me the error line if it persists.) 2. GPU path only: nvidia-smi must show the python processes holding GPU memory. If it doesn't, the model silently fell back to CPU — re-check providers (step 4). 3. curl -A curl/8 -H "Authorization: Bearer 80f8ffdbee931ae49b10b500263044f4674e94310ae4fefe" https://donto.org/embed/stats -> queue.done climbing / queue.pending falling. The machine also appears within seconds as a contributor at https://admin.donto.org/embed (vectors-done rising). Ballparks: ~60-150 vectors/min per CPU core; ~2,000-6,000/min per GPU. STEP 7 — keep it running: Leave the workers running in the background and make them survive this terminal closing (nohup is already used above; tmux or a systemd --user unit is even better — your choice). If a lease returns an empty batch the worker naps 5 s and retries: the server refills its queue every 2 minutes from a multi-million backlog, so empty moments are normal, NOT an error. Stop anytime with: pkill -f worker.py MORE MACHINES: run this same prompt on every machine, same token everywhere. Worker IDs are unique automatically (hostname + pid); the coordinator splits work safely across any number of machines.