diff --git a/.abacusai/config.json b/.abacusai/config.json index 95fcb87..e784e4a 100644 --- a/.abacusai/config.json +++ b/.abacusai/config.json @@ -16,6 +16,15 @@ "Bash(bun run build:web 2>&1 || true)", "Bash(bun run build:web 2>&1 || true)", "Bash(bun run build:web 2>&1 || true)", + "Bash(bun run build:web 2>&1 || true)", + "Bash(bun run build:web 2>&1 || true)", + "Bash(bun run build:web 2>&1 || true)", + "Bash(bun run build:web 2>&1 || true)", + "Bash(bun run build:web 2>&1 || true)", + "Bash(bun run db:migrate 2>&1 || true)", + "Bash(git --no-pager diff -- src/web/frontend.tsx src/web/styles.css 2>&1 || true)", + "Bash(bun run build:web 2>&1 || true)", + "Bash(bun run build:web 2>&1 || true)", "Bash(bun run build:web 2>&1 || true)" ] }, diff --git a/drizzle/0001_product_distributor_research.sql b/drizzle/0001_product_distributor_research.sql new file mode 100644 index 0000000..326d852 --- /dev/null +++ b/drizzle/0001_product_distributor_research.sql @@ -0,0 +1,23 @@ +CREATE TABLE "product_distributor_research" ( + "id" serial PRIMARY KEY NOT NULL, + "product_asin" text NOT NULL, + "run_item_id" integer, + "inventory_item_id" integer, + "provider" text DEFAULT 'claude' NOT NULL, + "model" text NOT NULL, + "status" text DEFAULT 'completed' NOT NULL, + "query_context_json" text, + "distributors_json" text, + "raw_response" text, + "created_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +ALTER TABLE "product_distributor_research" ADD CONSTRAINT "product_distributor_research_product_asin_products_asin_fk" FOREIGN KEY ("product_asin") REFERENCES "public"."products"("asin") ON DELETE cascade ON UPDATE no action; +--> statement-breakpoint +ALTER TABLE "product_distributor_research" ADD CONSTRAINT "product_distributor_research_run_item_id_run_items_id_fk" FOREIGN KEY ("run_item_id") REFERENCES "public"."run_items"("id") ON DELETE set null ON UPDATE no action; +--> statement-breakpoint +ALTER TABLE "product_distributor_research" ADD CONSTRAINT "product_distributor_research_inventory_item_id_stalker_inventory_items_id_fk" FOREIGN KEY ("inventory_item_id") REFERENCES "public"."stalker_inventory_items"("id") ON DELETE set null ON UPDATE no action; +--> statement-breakpoint +CREATE INDEX "idx_distributor_research_asin_time" ON "product_distributor_research" USING btree ("product_asin","created_at"); +--> statement-breakpoint +CREATE INDEX "idx_distributor_research_run_item" ON "product_distributor_research" USING btree ("run_item_id"); diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json index 96ebeac..fe43425 100644 --- a/drizzle/meta/_journal.json +++ b/drizzle/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1779726518779, "tag": "0000_adorable_shiver_man", "breakpoints": true + }, + { + "idx": 1, + "version": "7", + "when": 1780000000000, + "tag": "0001_product_distributor_research", + "breakpoints": true } ] } \ No newline at end of file diff --git a/src/db/schema.ts b/src/db/schema.ts index 6b67dfd..beef3a2 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -439,3 +439,34 @@ export const stalkerInventoryItems = pgTable( index("idx_stalker_inventory_product_asin").on(t.productAsin), ], ); + +export const productDistributorResearch = pgTable( + "product_distributor_research", + { + id: serial("id").primaryKey(), + productAsin: text("product_asin") + .notNull() + .references(() => products.asin, { onDelete: "cascade" }), + runItemId: integer("run_item_id").references( + (): AnyPgColumn => runItems.id, + { onDelete: "set null" }, + ), + inventoryItemId: integer("inventory_item_id").references( + (): AnyPgColumn => stalkerInventoryItems.id, + { onDelete: "set null" }, + ), + provider: text("provider").notNull().default("claude"), + model: text("model").notNull(), + status: text("status").notNull().default("completed"), + queryContextJson: text("query_context_json"), + distributorsJson: text("distributors_json"), + rawResponse: text("raw_response"), + createdAt: timestamp("created_at", { withTimezone: true }) + .notNull() + .defaultNow(), + }, + (t) => [ + index("idx_distributor_research_asin_time").on(t.productAsin, t.createdAt), + index("idx_distributor_research_run_item").on(t.runItemId), + ], +); diff --git a/src/server.ts b/src/server.ts index 201a566..8326b25 100644 --- a/src/server.ts +++ b/src/server.ts @@ -2,14 +2,22 @@ import index from "./web/index.html"; import * as XLSX from "xlsx"; import { normalizeAsin } from "./asin.ts"; import { db, client } from "./db/index.ts"; -import { analysisRevisions } from "./db/schema.ts"; +import { + analysisRevisions, + productDistributorResearch, +} from "./db/schema.ts"; import { insertObservation, refreshRunStats } from "./db/persistence.ts"; +import { config } from "./config.ts"; import { fetchKeepaDataBatch, lookupKeepaUpcs, mapUpcsToAsins, } from "./integrations/keepa.ts"; import { analyzeProducts } from "./integrations/llm.ts"; +import { + searchAsinOffers, + type SearxngOfferSearchResult, +} from "./integrations/searxng.ts"; import { fetchSellabilityBatch, fetchSpApiPricingAndFees, @@ -492,10 +500,37 @@ async function getProduct(asin: string) { ORDER BY revision.analyzed_at DESC`, [asin], ); - return { product, observations, analyses }; + const distributorResearchRows = await pgAll>( + `SELECT id, run_item_id, inventory_item_id, provider, model, status, distributors_json, raw_response, created_at + FROM product_distributor_research + WHERE product_asin = ? + ORDER BY created_at DESC, id DESC`, + [asin], + ); + const distributorResearch = distributorResearchRows.map((row) => { + const distributors = (() => { + try { + return normalizeDistributorCandidates(JSON.parse(String(row.distributors_json ?? "[]"))); + } catch { + return []; + } + })(); + return { + id: Number(row.id), + run_item_id: row.run_item_id == null ? null : Number(row.run_item_id), + inventory_item_id: row.inventory_item_id == null ? null : Number(row.inventory_item_id), + provider: String(row.provider ?? ""), + model: String(row.model ?? ""), + status: String(row.status ?? ""), + created_at: String(row.created_at ?? ""), + distributors, + raw_response: row.raw_response == null ? null : String(row.raw_response), + }; + }); + return { product, observations, analyses, distributorResearch }; } -async function reanalyzeRunItem(itemId: number) { +async function reanalyzeRunItem(itemId: number, useClaude = USE_CLAUDE) { const row = await pgGet>( `SELECT ri.id, ri.run_id, ri.product_asin AS asin, r.type, COALESCE(p.name, si.supplied_name, ri.product_asin) AS product_name, @@ -505,7 +540,8 @@ async function reanalyzeRunItem(itemId: number) { si.fba_net_sheet, si.gross_profit_dollar, si.gross_profit_pct, si.net_profit_sheet, si.roi_sheet, si.moq, si.moq_cost, si.qty_available, si.supplier, si.source_url, si.asin_link, - si.promo_coupon_code, si.notes, si.lead_date + si.promo_coupon_code, si.notes, si.lead_date, + ri.source_inventory_item_id FROM run_items ri JOIN runs r ON r.id = ri.run_id LEFT JOIN products p ON p.asin = ri.product_asin LEFT JOIN sourcing_inputs si ON si.run_item_id = ri.id @@ -556,7 +592,7 @@ async function reanalyzeRunItem(itemId: number) { fetchedAt: new Date().toISOString(), }; const verdict = - (await analyzeProducts([enriched], { useClaude: USE_CLAUDE }))[0] ?? { + (await analyzeProducts([enriched], { useClaude }))[0] ?? { asin: row.asin, verdict: "SKIP" as const, confidence: 0, @@ -577,6 +613,192 @@ async function reanalyzeRunItem(itemId: number) { return { itemId, runId: row.run_id, asin: row.asin, fetchedAt: enriched.fetchedAt }; } +type DistributorCandidate = { + name: string; + website: string; + rationale: string; + confidence: number; +}; + +function clampDistributorConfidence(value: unknown): number { + const parsed = Number(value); + if (!Number.isFinite(parsed)) return 0; + return Math.max(0, Math.min(100, Math.round(parsed))); +} + +function normalizeDistributorCandidates(payload: unknown): DistributorCandidate[] { + if (!Array.isArray(payload)) return []; + return payload + .filter((item): item is Record => item != null && typeof item === "object") + .map((item) => ({ + name: String(item.name ?? "").trim(), + website: String(item.website ?? "").trim(), + rationale: String(item.rationale ?? "").trim(), + confidence: clampDistributorConfidence(item.confidence), + })) + .filter((item) => item.name.length > 0 && item.website.length > 0) + .slice(0, 10); +} + +function extractJsonArrayFromText(text: string): string { + const trimmed = text.trim(); + const fence = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i); + const candidate = fence ? fence[1]?.trim() ?? "" : trimmed; + const start = candidate.indexOf("["); + const end = candidate.lastIndexOf("]"); + if (start >= 0 && end > start) { + return candidate.slice(start, end + 1); + } + return candidate; +} + +async function requestClaudeDistributorCandidates(context: Record) { + if (!config.anthropicApiKey) { + throw new Error("Missing required env var: ANTHROPIC_API_KEY"); + } + const model = (config.anthropicModel ?? "claude-sonnet-4-6").trim() || "claude-sonnet-4-6"; + const system = "You identify authorized wholesale distributors for products. Return only JSON."; + const prompt = [ + "Given this Amazon product context, identify up to 5 likely authorized U.S. wholesale distributors.", + "Prioritize official brand channels and reputable distributors.", + "Return only a raw JSON array with objects:", + '[{"name":"...","website":"https://...","rationale":"...","confidence":0-100}]', + JSON.stringify(context, null, 2), + ].join("\n\n"); + const response = await fetch("https://api.anthropic.com/v1/messages", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-api-key": config.anthropicApiKey, + "anthropic-version": "2023-06-01", + }, + body: JSON.stringify({ + model, + system, + messages: [{ role: "user", content: prompt }], + temperature: 0.2, + max_tokens: 1800, + }), + }); + const raw = await response.text(); + if (!response.ok) { + throw new Error(`Claude API error ${response.status}: ${raw.slice(0, 300)}`); + } + let contentText = ""; + try { + const parsed = JSON.parse(raw) as { content?: Array<{ type?: string; text?: string }> }; + contentText = (parsed.content ?? []) + .filter((block) => block?.type === "text" && typeof block.text === "string") + .map((block) => block.text ?? "") + .join("\n"); + } catch { + contentText = raw; + } + const arrayText = extractJsonArrayFromText(contentText); + let candidates: DistributorCandidate[] = []; + try { + candidates = normalizeDistributorCandidates(JSON.parse(arrayText)); + } catch { + candidates = []; + } + return { model, rawResponse: contentText, candidates }; +} + +async function findDistributorsForStalkerProduct(runItemId: number) { + const row = await pgGet>( + `SELECT ri.id AS run_item_id, ri.product_asin AS asin, ri.source_inventory_item_id, + p.name AS product_title, p.brand, p.category, + observation.current_price, observation.avg_price_90d, observation.sales_rank, + observation.monthly_sold, observation.seller_count, observation.amazon_is_seller, + observation.can_sell, observation.sellability_status, observation.sellability_reason, + latest_analysis.decision AS verdict, latest_analysis.confidence, latest_analysis.reasoning, + seller.seller_id, seller.seller_name, seller.rating, seller.rating_count + FROM run_items ri + JOIN products p ON p.asin = ri.product_asin + LEFT JOIN product_observations observation ON observation.id = ( + SELECT obs.id + FROM product_observations obs + WHERE obs.product_asin = ri.product_asin + ORDER BY obs.fetched_at DESC, obs.id DESC + LIMIT 1 + ) + LEFT JOIN LATERAL ( + SELECT revision.decision, revision.confidence, revision.reasoning + FROM analysis_revisions revision + WHERE revision.run_item_id = ri.id + ORDER BY revision.analyzed_at DESC, revision.id DESC + LIMIT 1 + ) latest_analysis ON TRUE + LEFT JOIN stalker_inventory_items si ON si.id = ri.source_inventory_item_id + LEFT JOIN sellers seller ON seller.seller_id = si.seller_id + WHERE ri.id = ?`, + [runItemId], + ); + if (!row?.asin) { + throw new Error("Stalker product item not found"); + } + const offerResults = await searchAsinOffers(row.asin, { + maxResults: 12, + includeUnmatchedAsinResults: true, + }).catch(() => [] as SearxngOfferSearchResult[]); + const promptContext = { + asin: row.asin, + productTitle: row.product_title ?? null, + brand: row.brand ?? null, + category: row.category ?? null, + metrics: { + currentPrice: row.current_price ?? null, + avgPrice90d: row.avg_price_90d ?? null, + salesRank: row.sales_rank ?? null, + monthlySold: row.monthly_sold ?? null, + sellerCount: row.seller_count ?? null, + amazonIsSeller: row.amazon_is_seller ?? null, + canSell: row.can_sell ?? null, + sellabilityStatus: row.sellability_status ?? null, + sellabilityReason: row.sellability_reason ?? null, + verdict: row.verdict ?? null, + confidence: row.confidence ?? null, + reasoning: row.reasoning ?? null, + }, + seller: { + sellerId: row.seller_id ?? null, + sellerName: row.seller_name ?? null, + rating: row.rating ?? null, + ratingCount: row.rating_count ?? null, + }, + offerResearch: offerResults.map((result) => ({ + title: result.title, + url: result.url, + domain: result.domain, + snippet: result.snippet, + score: result.score, + rank: result.rank, + })), + }; + const claude = await requestClaudeDistributorCandidates(promptContext); + const [saved] = await db + .insert(productDistributorResearch) + .values({ + productAsin: row.asin, + runItemId: runItemId, + inventoryItemId: row.source_inventory_item_id ?? null, + provider: "claude", + model: claude.model, + status: claude.candidates.length ? "completed" : "empty", + queryContextJson: JSON.stringify(promptContext), + distributorsJson: JSON.stringify(claude.candidates), + rawResponse: claude.rawResponse, + }) + .returning({ id: productDistributorResearch.id, createdAt: productDistributorResearch.createdAt }); + return { + asin: row.asin, + runItemId: runItemId, + researchId: saved?.id ?? null, + createdAt: saved?.createdAt ?? null, + distributors: claude.candidates, + }; +} + function stalkerBaseWhere(filters: URLSearchParams, product = false) { const conditions = ["r.type = 'stalker'"]; const params: unknown[] = []; @@ -710,8 +932,9 @@ async function getStalkerResults(filters: URLSearchParams) { function stalkerProductSql(where: string) { return `SELECT r.id AS "runId", r.started_at, seller.seller_id, seller.seller_name, - seller.rating, seller.rating_count, inventory.product_asin AS asin, + seller.rating, seller.rating_count, inventory.id AS inventory_item_id, inventory.product_asin AS asin, observation.can_sell, observation.sellability_status, observation.sellability_reason, + analysis.run_item_id, product.name AS product_title, product.brand, CASE WHEN product.category IS NULL THEN NULL ELSE json_build_array(product.category)::text END AS category_tree, observation.current_price, observation.avg_price_90d, observation.sales_rank, @@ -724,7 +947,7 @@ function stalkerProductSql(where: string) { JOIN products product ON product.asin = inventory.product_asin JOIN product_observations observation ON observation.id = inventory.observation_id LEFT JOIN LATERAL ( - SELECT revision.decision, revision.confidence, revision.reasoning + SELECT item.id AS run_item_id, revision.decision, revision.confidence, revision.reasoning FROM run_items item JOIN analysis_revisions revision ON revision.run_item_id = item.id WHERE item.source_inventory_item_id = inventory.id @@ -884,6 +1107,30 @@ const server = Bun.serve({ "/api/stalker/results": async (req) => json(await getStalkerResults(new URL(req.url).searchParams)), "/api/stalker/products": async (req) => json(await stalkerProducts(new URL(req.url).searchParams)), "/api/stalker/products/export.xlsx": async (req) => exportStalkerProducts(new URL(req.url).searchParams), + "/api/stalker/products/:runItemId/reanalyze": async (req) => { + if (req.method !== "POST") return json({ error: "Method not allowed" }, 405); + const runItemId = Number(req.params.runItemId); + if (!Number.isInteger(runItemId)) return json({ error: "Invalid run item identifier" }, 400); + const provider = new URL(req.url).searchParams.get("provider")?.trim().toLowerCase(); + const useClaude = provider === "claude"; + try { + return json(await reanalyzeRunItem(runItemId, useClaude || USE_CLAUDE)); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + return json({ error: message }, message === "Run item not found" ? 404 : 500); + } + }, + "/api/stalker/products/:runItemId/distributors": async (req) => { + if (req.method !== "POST") return json({ error: "Method not allowed" }, 405); + const runItemId = Number(req.params.runItemId); + if (!Number.isInteger(runItemId)) return json({ error: "Invalid run item identifier" }, 400); + try { + return json(await findDistributorsForStalkerProduct(runItemId)); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + return json({ error: message }, message === "Stalker product item not found" ? 404 : 500); + } + }, "/api/stalker/purge": async (req) => req.method === "DELETE" || req.method === "POST" ? json(await purgeStalkerData()) diff --git a/src/stalker/stalker-analyze.ts b/src/stalker/stalker-analyze.ts index 6d55e71..df2e3e1 100644 --- a/src/stalker/stalker-analyze.ts +++ b/src/stalker/stalker-analyze.ts @@ -1,4 +1,4 @@ -import { db } from "../db/index.ts"; +import { client, db } from "../db/index.ts"; import { persistLlmResults, refreshRunStats } from "../db/persistence.ts"; import { sql } from "drizzle-orm"; import { normalizeAsin } from "../asin.ts"; @@ -261,8 +261,15 @@ async function main(): Promise { } if (import.meta.main) { - main().catch((error) => { - console.error(error instanceof Error ? error.message : String(error)); - process.exit(1); - }); + main() + .catch((error) => { + console.error(error instanceof Error ? error.message : String(error)); + process.exitCode = 1; + }) + .finally(async () => { + try { + await client.end({ timeout: 5 }); + } catch { + } + }); } diff --git a/src/web/frontend.tsx b/src/web/frontend.tsx index 46debfd..15fbbf3 100644 --- a/src/web/frontend.tsx +++ b/src/web/frontend.tsx @@ -157,6 +157,8 @@ type StalkerProductItem = { seller_name: string | null; rating: number | null; rating_count: number | null; + inventory_item_id: number; + run_item_id: number | null; asin: string; can_sell: number; sellability_status: string; @@ -216,6 +218,22 @@ type ProductHistoryResponse = { reasoning: string | null; analyzed_at: string; }>; + distributorResearch: Array<{ + id: number; + run_item_id: number | null; + inventory_item_id: number | null; + provider: string; + model: string; + status: string; + created_at: string; + distributors: Array<{ + name: string; + website: string; + rationale: string; + confidence: number; + }>; + raw_response: string | null; + }>; }; type SortState = { @@ -1204,6 +1222,11 @@ function StalkerProductsExplorer({ const [page, setPage] = useState(1); const [pageSize, setPageSize] = useState(50); const [sort, setSort] = useState({ field: "monthly_sold", direction: "DESC" }); + const [showSellerIdColumn, setShowSellerIdColumn] = useState(false); + const [showSellerColumn, setShowSellerColumn] = useState(false); + const [showCategoryColumn, setShowCategoryColumn] = useState(false); + const [reanalyzing, setReanalyzing] = useState>({}); + const [findingDistributors, setFindingDistributors] = useState>({}); function buildStalkerProductParams(includePaging: boolean): URLSearchParams { const params = new URLSearchParams({ @@ -1295,6 +1318,64 @@ function StalkerProductsExplorer({ setPage(1); } + async function reanalyzeStalkerItem(item: StalkerProductItem) { + if (item.run_item_id == null) { + window.alert("No analysis item available for this row yet."); + return; + } + const key = String(item.run_item_id); + if (reanalyzing[key]) return; + setReanalyzing((prev) => ({ ...prev, [key]: true })); + try { + const response = await fetch(`/api/stalker/products/${item.run_item_id}/reanalyze?provider=claude`, { + method: "POST", + }); + if (!response.ok) { + const payload = (await response.json().catch(() => null)) as { error?: string } | null; + window.alert(payload?.error ?? "Failed to re-run analysis"); + return; + } + const params = buildStalkerProductParams(true); + const res = await fetch(`/api/stalker/products?${params.toString()}`); + const payload = (await res.json()) as StalkerProductsResponse; + setResults(payload); + } finally { + setReanalyzing((prev) => { + const next = { ...prev }; + delete next[key]; + return next; + }); + } + } + + async function discoverDistributors(item: StalkerProductItem) { + if (item.run_item_id == null) { + window.alert("No analysis item available for this row yet."); + return; + } + const key = String(item.run_item_id); + if (findingDistributors[key]) return; + setFindingDistributors((prev) => ({ ...prev, [key]: true })); + try { + const response = await fetch(`/api/stalker/products/${item.run_item_id}/distributors`, { + method: "POST", + }); + const payload = (await response.json().catch(() => null)) as { + error?: string; + } | null; + if (!response.ok) { + window.alert(payload?.error ?? "Failed to find distributors"); + return; + } + } finally { + setFindingDistributors((prev) => { + const next = { ...prev }; + delete next[key]; + return next; + }); + } + } + const exportHref = `/api/stalker/products/export.xlsx?${buildStalkerProductParams(false).toString()}`; return ( @@ -1361,6 +1442,11 @@ function StalkerProductsExplorer({ Export XLSX +
+ + + +
@@ -1371,7 +1457,6 @@ function StalkerProductsExplorer({ - Category @@ -1380,17 +1465,19 @@ function StalkerProductsExplorer({ - - + {showSellerIdColumn ? : null} + {showSellerColumn ? : null} + {showCategoryColumn ? Category : null} Status + Actions {loading ? ( - Loading... + Loading... ) : results?.items.length ? ( results.items.map((item) => { const categories = parseStringArrayJson(item.category_tree); @@ -1399,7 +1486,6 @@ function StalkerProductsExplorer({ {item.asin} {item.product_title || "-"} {item.brand || "-"} - {categories.at(-1) || "-"} {formatNumber(item.monthly_sold)} {formatNumber(item.seller_count)} {formatAmazonSeller(item.amazon_is_seller)} @@ -1408,17 +1494,34 @@ function StalkerProductsExplorer({ {formatCurrency(item.avg_price_90d)} {item.verdict ? {item.verdict} : "-"} {formatNumber(item.confidence)} - {item.seller_id} - {item.seller_name || "-"} + {showSellerIdColumn ? {item.seller_id} : null} + {showSellerColumn ? {item.seller_name || "-"} : null} + {showCategoryColumn ? {categories.at(-1) || "-"} : null} {formatNumber(item.rating_count)} {item.sellability_status} {item.runId} {formatDate(item.last_seen_at)} + +
+ + +
+ ); }) ) : ( - No sellable Stalker products found + No sellable Stalker products found )} @@ -1488,6 +1591,34 @@ function ProductDetails({
+
+

Distributor Research

+
+ + + + {data?.distributorResearch.length ? data.distributorResearch.map((entry) => ( + + + + + + + + + )) : } + +
CreatedProviderModelStatusDistributorsRun Item
{formatDate(entry.created_at)}{entry.provider}{entry.model}{entry.status} + {entry.distributors.length ? entry.distributors.map((distributor, idx) => ( +
+
{distributor.name} ({formatNumber(distributor.confidence)}%)
+ +
{distributor.rationale || "-"}
+
+ )) : "-"} +
{entry.run_item_id ?? "-"}
No distributor research yet
+
+

Observations

diff --git a/src/web/styles.css b/src/web/styles.css index fc36c0b..38d36a5 100644 --- a/src/web/styles.css +++ b/src/web/styles.css @@ -142,6 +142,12 @@ td { min-width: 1320px; } +.stalker-actions { + display: flex; + gap: 6px; + align-items: center; +} + th { background: #fafafb; font-weight: 600;