feat: add product distributor research table and integrate distributor analysis in Stalker product workflow

This commit is contained in:
Victor Noguera
2026-05-25 14:51:57 -04:00
parent 5dbff33032
commit 35087a5b2f
8 changed files with 481 additions and 20 deletions

View File

@@ -2,14 +2,22 @@ import index from "./web/index.html";
import * as XLSX from "xlsx";
import { normalizeAsin } from "./asin.ts";
import { db, client } from "./db/index.ts";
import { analysisRevisions } from "./db/schema.ts";
import {
analysisRevisions,
productDistributorResearch,
} from "./db/schema.ts";
import { insertObservation, refreshRunStats } from "./db/persistence.ts";
import { config } from "./config.ts";
import {
fetchKeepaDataBatch,
lookupKeepaUpcs,
mapUpcsToAsins,
} from "./integrations/keepa.ts";
import { analyzeProducts } from "./integrations/llm.ts";
import {
searchAsinOffers,
type SearxngOfferSearchResult,
} from "./integrations/searxng.ts";
import {
fetchSellabilityBatch,
fetchSpApiPricingAndFees,
@@ -492,10 +500,37 @@ async function getProduct(asin: string) {
ORDER BY revision.analyzed_at DESC`,
[asin],
);
return { product, observations, analyses };
const distributorResearchRows = await pgAll<Record<string, unknown>>(
`SELECT id, run_item_id, inventory_item_id, provider, model, status, distributors_json, raw_response, created_at
FROM product_distributor_research
WHERE product_asin = ?
ORDER BY created_at DESC, id DESC`,
[asin],
);
const distributorResearch = distributorResearchRows.map((row) => {
const distributors = (() => {
try {
return normalizeDistributorCandidates(JSON.parse(String(row.distributors_json ?? "[]")));
} catch {
return [];
}
})();
return {
id: Number(row.id),
run_item_id: row.run_item_id == null ? null : Number(row.run_item_id),
inventory_item_id: row.inventory_item_id == null ? null : Number(row.inventory_item_id),
provider: String(row.provider ?? ""),
model: String(row.model ?? ""),
status: String(row.status ?? ""),
created_at: String(row.created_at ?? ""),
distributors,
raw_response: row.raw_response == null ? null : String(row.raw_response),
};
});
return { product, observations, analyses, distributorResearch };
}
async function reanalyzeRunItem(itemId: number) {
async function reanalyzeRunItem(itemId: number, useClaude = USE_CLAUDE) {
const row = await pgGet<Record<string, any>>(
`SELECT ri.id, ri.run_id, ri.product_asin AS asin, r.type,
COALESCE(p.name, si.supplied_name, ri.product_asin) AS product_name,
@@ -505,7 +540,8 @@ async function reanalyzeRunItem(itemId: number) {
si.fba_net_sheet, si.gross_profit_dollar, si.gross_profit_pct,
si.net_profit_sheet, si.roi_sheet, si.moq, si.moq_cost,
si.qty_available, si.supplier, si.source_url, si.asin_link,
si.promo_coupon_code, si.notes, si.lead_date
si.promo_coupon_code, si.notes, si.lead_date,
ri.source_inventory_item_id
FROM run_items ri JOIN runs r ON r.id = ri.run_id
LEFT JOIN products p ON p.asin = ri.product_asin
LEFT JOIN sourcing_inputs si ON si.run_item_id = ri.id
@@ -556,7 +592,7 @@ async function reanalyzeRunItem(itemId: number) {
fetchedAt: new Date().toISOString(),
};
const verdict =
(await analyzeProducts([enriched], { useClaude: USE_CLAUDE }))[0] ?? {
(await analyzeProducts([enriched], { useClaude }))[0] ?? {
asin: row.asin,
verdict: "SKIP" as const,
confidence: 0,
@@ -577,6 +613,192 @@ async function reanalyzeRunItem(itemId: number) {
return { itemId, runId: row.run_id, asin: row.asin, fetchedAt: enriched.fetchedAt };
}
type DistributorCandidate = {
name: string;
website: string;
rationale: string;
confidence: number;
};
function clampDistributorConfidence(value: unknown): number {
const parsed = Number(value);
if (!Number.isFinite(parsed)) return 0;
return Math.max(0, Math.min(100, Math.round(parsed)));
}
function normalizeDistributorCandidates(payload: unknown): DistributorCandidate[] {
if (!Array.isArray(payload)) return [];
return payload
.filter((item): item is Record<string, unknown> => item != null && typeof item === "object")
.map((item) => ({
name: String(item.name ?? "").trim(),
website: String(item.website ?? "").trim(),
rationale: String(item.rationale ?? "").trim(),
confidence: clampDistributorConfidence(item.confidence),
}))
.filter((item) => item.name.length > 0 && item.website.length > 0)
.slice(0, 10);
}
function extractJsonArrayFromText(text: string): string {
const trimmed = text.trim();
const fence = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i);
const candidate = fence ? fence[1]?.trim() ?? "" : trimmed;
const start = candidate.indexOf("[");
const end = candidate.lastIndexOf("]");
if (start >= 0 && end > start) {
return candidate.slice(start, end + 1);
}
return candidate;
}
async function requestClaudeDistributorCandidates(context: Record<string, unknown>) {
if (!config.anthropicApiKey) {
throw new Error("Missing required env var: ANTHROPIC_API_KEY");
}
const model = (config.anthropicModel ?? "claude-sonnet-4-6").trim() || "claude-sonnet-4-6";
const system = "You identify authorized wholesale distributors for products. Return only JSON.";
const prompt = [
"Given this Amazon product context, identify up to 5 likely authorized U.S. wholesale distributors.",
"Prioritize official brand channels and reputable distributors.",
"Return only a raw JSON array with objects:",
'[{"name":"...","website":"https://...","rationale":"...","confidence":0-100}]',
JSON.stringify(context, null, 2),
].join("\n\n");
const response = await fetch("https://api.anthropic.com/v1/messages", {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": config.anthropicApiKey,
"anthropic-version": "2023-06-01",
},
body: JSON.stringify({
model,
system,
messages: [{ role: "user", content: prompt }],
temperature: 0.2,
max_tokens: 1800,
}),
});
const raw = await response.text();
if (!response.ok) {
throw new Error(`Claude API error ${response.status}: ${raw.slice(0, 300)}`);
}
let contentText = "";
try {
const parsed = JSON.parse(raw) as { content?: Array<{ type?: string; text?: string }> };
contentText = (parsed.content ?? [])
.filter((block) => block?.type === "text" && typeof block.text === "string")
.map((block) => block.text ?? "")
.join("\n");
} catch {
contentText = raw;
}
const arrayText = extractJsonArrayFromText(contentText);
let candidates: DistributorCandidate[] = [];
try {
candidates = normalizeDistributorCandidates(JSON.parse(arrayText));
} catch {
candidates = [];
}
return { model, rawResponse: contentText, candidates };
}
async function findDistributorsForStalkerProduct(runItemId: number) {
const row = await pgGet<Record<string, any>>(
`SELECT ri.id AS run_item_id, ri.product_asin AS asin, ri.source_inventory_item_id,
p.name AS product_title, p.brand, p.category,
observation.current_price, observation.avg_price_90d, observation.sales_rank,
observation.monthly_sold, observation.seller_count, observation.amazon_is_seller,
observation.can_sell, observation.sellability_status, observation.sellability_reason,
latest_analysis.decision AS verdict, latest_analysis.confidence, latest_analysis.reasoning,
seller.seller_id, seller.seller_name, seller.rating, seller.rating_count
FROM run_items ri
JOIN products p ON p.asin = ri.product_asin
LEFT JOIN product_observations observation ON observation.id = (
SELECT obs.id
FROM product_observations obs
WHERE obs.product_asin = ri.product_asin
ORDER BY obs.fetched_at DESC, obs.id DESC
LIMIT 1
)
LEFT JOIN LATERAL (
SELECT revision.decision, revision.confidence, revision.reasoning
FROM analysis_revisions revision
WHERE revision.run_item_id = ri.id
ORDER BY revision.analyzed_at DESC, revision.id DESC
LIMIT 1
) latest_analysis ON TRUE
LEFT JOIN stalker_inventory_items si ON si.id = ri.source_inventory_item_id
LEFT JOIN sellers seller ON seller.seller_id = si.seller_id
WHERE ri.id = ?`,
[runItemId],
);
if (!row?.asin) {
throw new Error("Stalker product item not found");
}
const offerResults = await searchAsinOffers(row.asin, {
maxResults: 12,
includeUnmatchedAsinResults: true,
}).catch(() => [] as SearxngOfferSearchResult[]);
const promptContext = {
asin: row.asin,
productTitle: row.product_title ?? null,
brand: row.brand ?? null,
category: row.category ?? null,
metrics: {
currentPrice: row.current_price ?? null,
avgPrice90d: row.avg_price_90d ?? null,
salesRank: row.sales_rank ?? null,
monthlySold: row.monthly_sold ?? null,
sellerCount: row.seller_count ?? null,
amazonIsSeller: row.amazon_is_seller ?? null,
canSell: row.can_sell ?? null,
sellabilityStatus: row.sellability_status ?? null,
sellabilityReason: row.sellability_reason ?? null,
verdict: row.verdict ?? null,
confidence: row.confidence ?? null,
reasoning: row.reasoning ?? null,
},
seller: {
sellerId: row.seller_id ?? null,
sellerName: row.seller_name ?? null,
rating: row.rating ?? null,
ratingCount: row.rating_count ?? null,
},
offerResearch: offerResults.map((result) => ({
title: result.title,
url: result.url,
domain: result.domain,
snippet: result.snippet,
score: result.score,
rank: result.rank,
})),
};
const claude = await requestClaudeDistributorCandidates(promptContext);
const [saved] = await db
.insert(productDistributorResearch)
.values({
productAsin: row.asin,
runItemId: runItemId,
inventoryItemId: row.source_inventory_item_id ?? null,
provider: "claude",
model: claude.model,
status: claude.candidates.length ? "completed" : "empty",
queryContextJson: JSON.stringify(promptContext),
distributorsJson: JSON.stringify(claude.candidates),
rawResponse: claude.rawResponse,
})
.returning({ id: productDistributorResearch.id, createdAt: productDistributorResearch.createdAt });
return {
asin: row.asin,
runItemId: runItemId,
researchId: saved?.id ?? null,
createdAt: saved?.createdAt ?? null,
distributors: claude.candidates,
};
}
function stalkerBaseWhere(filters: URLSearchParams, product = false) {
const conditions = ["r.type = 'stalker'"];
const params: unknown[] = [];
@@ -710,8 +932,9 @@ async function getStalkerResults(filters: URLSearchParams) {
function stalkerProductSql(where: string) {
return `SELECT r.id AS "runId", r.started_at, seller.seller_id, seller.seller_name,
seller.rating, seller.rating_count, inventory.product_asin AS asin,
seller.rating, seller.rating_count, inventory.id AS inventory_item_id, inventory.product_asin AS asin,
observation.can_sell, observation.sellability_status, observation.sellability_reason,
analysis.run_item_id,
product.name AS product_title, product.brand,
CASE WHEN product.category IS NULL THEN NULL ELSE json_build_array(product.category)::text END AS category_tree,
observation.current_price, observation.avg_price_90d, observation.sales_rank,
@@ -724,7 +947,7 @@ function stalkerProductSql(where: string) {
JOIN products product ON product.asin = inventory.product_asin
JOIN product_observations observation ON observation.id = inventory.observation_id
LEFT JOIN LATERAL (
SELECT revision.decision, revision.confidence, revision.reasoning
SELECT item.id AS run_item_id, revision.decision, revision.confidence, revision.reasoning
FROM run_items item
JOIN analysis_revisions revision ON revision.run_item_id = item.id
WHERE item.source_inventory_item_id = inventory.id
@@ -884,6 +1107,30 @@ const server = Bun.serve({
"/api/stalker/results": async (req) => json(await getStalkerResults(new URL(req.url).searchParams)),
"/api/stalker/products": async (req) => json(await stalkerProducts(new URL(req.url).searchParams)),
"/api/stalker/products/export.xlsx": async (req) => exportStalkerProducts(new URL(req.url).searchParams),
"/api/stalker/products/:runItemId/reanalyze": async (req) => {
if (req.method !== "POST") return json({ error: "Method not allowed" }, 405);
const runItemId = Number(req.params.runItemId);
if (!Number.isInteger(runItemId)) return json({ error: "Invalid run item identifier" }, 400);
const provider = new URL(req.url).searchParams.get("provider")?.trim().toLowerCase();
const useClaude = provider === "claude";
try {
return json(await reanalyzeRunItem(runItemId, useClaude || USE_CLAUDE));
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
return json({ error: message }, message === "Run item not found" ? 404 : 500);
}
},
"/api/stalker/products/:runItemId/distributors": async (req) => {
if (req.method !== "POST") return json({ error: "Method not allowed" }, 405);
const runItemId = Number(req.params.runItemId);
if (!Number.isInteger(runItemId)) return json({ error: "Invalid run item identifier" }, 400);
try {
return json(await findDistributorsForStalkerProduct(runItemId));
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
return json({ error: message }, message === "Stalker product item not found" ? 404 : 500);
}
},
"/api/stalker/purge": async (req) =>
req.method === "DELETE" || req.method === "POST"
? json(await purgeStalkerData())