1255 lines
50 KiB
TypeScript
1255 lines
50 KiB
TypeScript
import index from "./web/index.html";
|
||
import * as XLSX from "xlsx";
|
||
import { normalizeAsin } from "./asin.ts";
|
||
import { db, client } from "./db/index.ts";
|
||
import { eq } from "drizzle-orm";
|
||
import {
|
||
analysisRevisions,
|
||
productDistributorResearch,
|
||
} from "./db/schema.ts";
|
||
import { insertObservation, refreshRunStats } from "./db/persistence.ts";
|
||
import { config } from "./config.ts";
|
||
import {
|
||
fetchKeepaDataBatch,
|
||
lookupKeepaUpcs,
|
||
mapUpcsToAsins,
|
||
} from "./integrations/keepa.ts";
|
||
import { analyzeProducts } from "./integrations/llm.ts";
|
||
import {
|
||
searchAsinOffers,
|
||
type SearxngOfferSearchResult,
|
||
} from "./integrations/searxng.ts";
|
||
import {
|
||
fetchSellabilityBatch,
|
||
fetchSpApiPricingAndFees,
|
||
} from "./integrations/sp-api.ts";
|
||
import { runUpcFileAnalysis } from "./supplier/upc-file-analysis.ts";
|
||
import type {
|
||
AnalysisResult,
|
||
EnrichedProduct,
|
||
KeepaUpcLookupDetail,
|
||
ProductRecord,
|
||
SpApiData,
|
||
} from "./types.ts";
|
||
|
||
const DEFAULT_PAGE_SIZE = 25;
|
||
const MAX_PAGE_SIZE = 200;
|
||
const MAX_UPCS_PER_REQUEST = 1000;
|
||
const USE_CLAUDE = process.argv.includes("--claude");
|
||
|
||
function toPostgresSql(query: string): string {
|
||
let n = 0;
|
||
return query.replace(/\?/g, () => `$${++n}`);
|
||
}
|
||
|
||
async function pgGet<T extends Record<string, unknown>>(
|
||
query: string,
|
||
params: unknown[] = [],
|
||
): Promise<T | null> {
|
||
const rows = await client.unsafe<T[]>(toPostgresSql(query), params as never[]);
|
||
return (rows[0] as T) ?? null;
|
||
}
|
||
|
||
async function pgAll<T extends Record<string, unknown>>(
|
||
query: string,
|
||
params: unknown[] = [],
|
||
): Promise<T[]> {
|
||
return client.unsafe<T[]>(toPostgresSql(query), params as never[]) as unknown as T[];
|
||
}
|
||
|
||
async function pgRun(query: string, params: unknown[] = []): Promise<number> {
|
||
const result = await client.unsafe(toPostgresSql(query), params as never[]);
|
||
return result.count;
|
||
}
|
||
|
||
function json(data: unknown, status = 200): Response {
|
||
return new Response(JSON.stringify(data), {
|
||
status,
|
||
headers: { "content-type": "application/json; charset=utf-8" },
|
||
});
|
||
}
|
||
|
||
function csv(text: string, filename: string): Response {
|
||
return new Response(text, {
|
||
headers: {
|
||
"content-type": "text/csv; charset=utf-8",
|
||
"content-disposition": `attachment; filename="${filename}"`,
|
||
},
|
||
});
|
||
}
|
||
|
||
function xlsx(buffer: ArrayBuffer, filename: string): Response {
|
||
return new Response(buffer, {
|
||
headers: {
|
||
"content-type":
|
||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
||
"content-disposition": `attachment; filename="${filename}"`,
|
||
},
|
||
});
|
||
}
|
||
|
||
function parseIntParam(value: string | null, fallback: number): number {
|
||
const parsed = Number.parseInt(String(value), 10);
|
||
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
|
||
}
|
||
|
||
function pageInput(filters: URLSearchParams) {
|
||
const page = parseIntParam(filters.get("page"), 1);
|
||
const pageSize = Math.min(
|
||
parseIntParam(filters.get("pageSize"), DEFAULT_PAGE_SIZE),
|
||
MAX_PAGE_SIZE,
|
||
);
|
||
return { page, pageSize, offset: (page - 1) * pageSize };
|
||
}
|
||
|
||
function escapeCsvValue(value: unknown): string {
|
||
if (value == null) return "";
|
||
const escaped = String(value).replaceAll('"', '""');
|
||
return /[",\n]/.test(escaped) ? `"${escaped}"` : escaped;
|
||
}
|
||
|
||
function safeSort(
|
||
input: string | null,
|
||
columns: Record<string, string>,
|
||
fallback: string,
|
||
): string {
|
||
if (!input) return fallback;
|
||
const parts = input
|
||
.split(",")
|
||
.map((part) => {
|
||
const [key, direction] = part.trim().split(":");
|
||
const column = key ? columns[key] : undefined;
|
||
if (!column) return null;
|
||
return `${column} ${direction?.toUpperCase() === "DESC" ? "DESC" : "ASC"}`;
|
||
})
|
||
.filter((value): value is string => value != null);
|
||
return parts.join(", ") || fallback;
|
||
}
|
||
|
||
function splitRawUpcValues(input: string): string[] {
|
||
return input.split(/[\s,;|]+/).map((v) => v.trim()).filter(Boolean);
|
||
}
|
||
|
||
function collectUpcs(value: unknown, target: string[]): void {
|
||
if (typeof value === "string") {
|
||
target.push(...splitRawUpcValues(value));
|
||
} else if (typeof value === "number" && Number.isFinite(value)) {
|
||
target.push(String(Math.trunc(value)));
|
||
} else if (Array.isArray(value)) {
|
||
value.forEach((entry) => collectUpcs(entry, target));
|
||
}
|
||
}
|
||
|
||
async function parseUpcsFromRequest(req: Request): Promise<string[]> {
|
||
const parsed: string[] = [];
|
||
if (req.method === "GET") {
|
||
const params = new URL(req.url).searchParams;
|
||
params.getAll("upc").forEach((value) => collectUpcs(value, parsed));
|
||
collectUpcs(params.get("upcs"), parsed);
|
||
} else if (req.method === "POST") {
|
||
let body: unknown;
|
||
try {
|
||
body = await req.json();
|
||
} catch {
|
||
throw new Error("Invalid JSON body");
|
||
}
|
||
if (body && typeof body === "object" && "upcs" in body) {
|
||
collectUpcs((body as { upcs?: unknown }).upcs, parsed);
|
||
} else {
|
||
collectUpcs(body, parsed);
|
||
}
|
||
} else {
|
||
throw new Error("Method not allowed");
|
||
}
|
||
return [...new Set(parsed)];
|
||
}
|
||
|
||
function validateUpcs(upcs: string[]): string | null {
|
||
if (upcs.length === 0) return "Provide at least one UPC.";
|
||
if (upcs.length > MAX_UPCS_PER_REQUEST) {
|
||
return `Too many UPCs. Maximum allowed per request is ${MAX_UPCS_PER_REQUEST}.`;
|
||
}
|
||
return null;
|
||
}
|
||
|
||
function summarizeLookupStatuses(items: KeepaUpcLookupDetail[]) {
|
||
return items.reduce<Record<string, number>>((counts, item) => {
|
||
counts[item.status] = (counts[item.status] ?? 0) + 1;
|
||
return counts;
|
||
}, {});
|
||
}
|
||
|
||
type UpcFileRequest = {
|
||
inputFile: string;
|
||
outputFile?: string;
|
||
inputBatchSize?: number;
|
||
upcLookupBatchSize?: number;
|
||
maxRows?: number;
|
||
};
|
||
|
||
function positiveField(value: unknown, name: string): number | undefined {
|
||
if (value == null) return undefined;
|
||
const parsed = Number(value);
|
||
if (!Number.isInteger(parsed) || parsed < 1) {
|
||
throw new Error(`${name} must be a positive integer`);
|
||
}
|
||
return parsed;
|
||
}
|
||
|
||
async function parseUpcFileRequest(req: Request): Promise<UpcFileRequest> {
|
||
if (req.method !== "POST") throw new Error("Method not allowed");
|
||
let body: Record<string, unknown>;
|
||
try {
|
||
body = (await req.json()) as Record<string, unknown>;
|
||
} catch {
|
||
throw new Error("Invalid JSON body");
|
||
}
|
||
if (typeof body.inputFile !== "string" || !body.inputFile.trim()) {
|
||
throw new Error("inputFile is required");
|
||
}
|
||
return {
|
||
inputFile: body.inputFile,
|
||
outputFile:
|
||
typeof body.outputFile === "string" ? body.outputFile : undefined,
|
||
inputBatchSize: positiveField(body.inputBatchSize, "inputBatchSize"),
|
||
upcLookupBatchSize: positiveField(
|
||
body.upcLookupBatchSize,
|
||
"upcLookupBatchSize",
|
||
),
|
||
maxRows: positiveField(body.maxRows, "maxRows"),
|
||
};
|
||
}
|
||
|
||
async function getRuns(filters: URLSearchParams) {
|
||
const { page, pageSize, offset } = pageInput(filters);
|
||
const conditions: string[] = ["TRUE"];
|
||
const params: unknown[] = [];
|
||
const type = filters.get("processType")?.trim();
|
||
const status = filters.get("status")?.trim();
|
||
const q = filters.get("q")?.trim();
|
||
if (type) {
|
||
conditions.push("r.type::text = ?");
|
||
params.push(type);
|
||
}
|
||
if (status) {
|
||
conditions.push("r.status::text = ?");
|
||
params.push(status);
|
||
}
|
||
if (filters.get("startDate")) {
|
||
conditions.push("r.started_at >= ?");
|
||
params.push(filters.get("startDate"));
|
||
}
|
||
if (filters.get("endDate")) {
|
||
conditions.push("r.started_at <= ?");
|
||
params.push(filters.get("endDate"));
|
||
}
|
||
if (q) {
|
||
conditions.push(
|
||
"(COALESCE(r.input_file, '') ILIKE ? OR COALESCE(cd.category_label, '') ILIKE ? OR CAST(r.id AS text) ILIKE ?)",
|
||
);
|
||
params.push(`%${q}%`, `%${q}%`, `%${q}%`);
|
||
}
|
||
const where = `WHERE ${conditions.join(" AND ")}`;
|
||
const sort = safeSort(
|
||
filters.get("sort"),
|
||
{
|
||
runId: "r.id",
|
||
processType: "r.type",
|
||
timestamp: "r.started_at",
|
||
status: "r.status",
|
||
jobType: "COALESCE(cd.category_label, r.input_file, r.type::text)",
|
||
totalProducts: "COALESCE(stats.processed_count, 0)",
|
||
fbaCount: "COALESCE(stats.fba_count, 0)",
|
||
fbmCount: "COALESCE(stats.fbm_count, 0)",
|
||
skipCount: "COALESCE(stats.skip_count, 0)",
|
||
},
|
||
"r.started_at DESC, r.id DESC",
|
||
);
|
||
const join = `
|
||
FROM runs r
|
||
LEFT JOIN analysis_run_stats stats ON stats.run_id = r.id
|
||
LEFT JOIN category_run_details cd ON cd.run_id = r.id`;
|
||
const totalRow = await pgGet<{ total: string }>(
|
||
`SELECT COUNT(*) AS total ${join} ${where}`,
|
||
params,
|
||
);
|
||
const items = await pgAll(
|
||
`SELECT r.type AS "processType", r.id AS "runId",
|
||
r.started_at AS timestamp, r.status, r.input_file AS source,
|
||
r.output_file AS output,
|
||
COALESCE(cd.category_label, r.input_file, r.type::text) AS "jobType",
|
||
COALESCE(stats.processed_count, 0) AS "totalProducts",
|
||
COALESCE(stats.fba_count, 0) AS "fbaCount",
|
||
COALESCE(stats.fbm_count, 0) AS "fbmCount",
|
||
COALESCE(stats.buy_count, 0) AS "buyCount",
|
||
COALESCE(stats.watch_count, 0) AS "watchCount",
|
||
COALESCE(stats.skip_count, 0) AS "skipCount"
|
||
${join} ${where} ORDER BY ${sort} LIMIT ? OFFSET ?`,
|
||
[...params, pageSize, offset],
|
||
);
|
||
const total = Number(totalRow?.total ?? 0);
|
||
return { items, page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)) };
|
||
}
|
||
|
||
async function getRun(runId: number) {
|
||
return pgGet(
|
||
`SELECT r.type AS "processType", r.id AS "runId", r.parent_run_id AS "parentRunId",
|
||
r.started_at AS timestamp, r.completed_at AS "completedAt", r.status,
|
||
r.input_file AS source, r.output_file AS output, r.error_message AS "errorMessage",
|
||
COALESCE(cd.category_label, r.input_file, r.type::text) AS "jobType",
|
||
cd.category_id AS "categoryId", cd.checked_asin_count AS "checkedAsins",
|
||
COALESCE(stats.processed_count, 0) AS "totalProducts",
|
||
COALESCE(stats.available_count, 0) AS "availableAsins",
|
||
COALESCE(stats.fba_count, 0) AS "fbaCount",
|
||
COALESCE(stats.fbm_count, 0) AS "fbmCount",
|
||
COALESCE(stats.buy_count, 0) AS "buyCount",
|
||
COALESCE(stats.watch_count, 0) AS "watchCount",
|
||
COALESCE(stats.skip_count, 0) AS "skipCount"
|
||
FROM runs r
|
||
LEFT JOIN analysis_run_stats stats ON stats.run_id = r.id
|
||
LEFT JOIN category_run_details cd ON cd.run_id = r.id
|
||
WHERE r.id = ?`,
|
||
[runId],
|
||
);
|
||
}
|
||
|
||
const ITEM_ROWS = `
|
||
WITH latest_revision AS (
|
||
SELECT DISTINCT ON (ar.run_item_id)
|
||
ar.run_item_id, ar.id AS revision_id, ar.decision, ar.confidence,
|
||
ar.reasoning, ar.analyzed_at, ar.observation_id
|
||
FROM analysis_revisions ar
|
||
ORDER BY ar.run_item_id, ar.analyzed_at DESC, ar.id DESC
|
||
)
|
||
SELECT ri.id AS item_id, ri.run_id, r.type AS process_type,
|
||
ri.product_asin, COALESCE(ri.product_asin, ur.normalized_upc) AS asin,
|
||
COALESCE(p.name, si.supplied_name, ur.normalized_upc) AS product_name,
|
||
COALESCE(p.brand, si.supplied_brand) AS brand,
|
||
COALESCE(p.category, si.supplied_category) AS category,
|
||
si.unit_cost, si.avg_price_90d_sheet, si.selling_price_sheet,
|
||
observation.current_price, observation.avg_price_90d,
|
||
observation.sales_rank, observation.sales_rank_avg_90d,
|
||
observation.seller_count, observation.amazon_is_seller,
|
||
observation.amazon_buybox_share_pct_90d, observation.monthly_sold,
|
||
observation.rank_drops_30d, observation.rank_drops_90d,
|
||
observation.fba_fee, observation.fbm_fee, observation.referral_percent,
|
||
observation.can_sell, observation.sellability_status,
|
||
observation.sellability_reason, revision.decision AS verdict,
|
||
revision.confidence, revision.reasoning,
|
||
revision.analyzed_at AS fetched_at, ur.normalized_upc AS upc,
|
||
ur.status AS upc_lookup_status
|
||
FROM run_items ri
|
||
JOIN runs r ON r.id = ri.run_id
|
||
LEFT JOIN products p ON p.asin = ri.product_asin
|
||
LEFT JOIN sourcing_inputs si ON si.run_item_id = ri.id
|
||
LEFT JOIN upc_resolutions ur ON ur.run_item_id = ri.id
|
||
LEFT JOIN latest_revision revision ON revision.run_item_id = ri.id
|
||
LEFT JOIN product_observations observation ON observation.id = revision.observation_id
|
||
`;
|
||
|
||
function itemFilters(filters: URLSearchParams, runId?: number) {
|
||
const conditions: string[] = [];
|
||
const params: unknown[] = [];
|
||
if (runId != null) {
|
||
conditions.push("run_id = ?");
|
||
params.push(runId);
|
||
}
|
||
const verdict = filters.get("verdict")?.trim();
|
||
if (verdict) {
|
||
conditions.push("verdict::text = ?");
|
||
params.push(verdict);
|
||
}
|
||
if (filters.get("sellabilityStatus")) {
|
||
conditions.push("sellability_status = ?");
|
||
params.push(filters.get("sellabilityStatus"));
|
||
}
|
||
const minConfidence = Number(filters.get("minConfidence"));
|
||
const maxConfidence = Number(filters.get("maxConfidence"));
|
||
if (filters.get("minConfidence") && Number.isFinite(minConfidence)) {
|
||
conditions.push("confidence >= ?");
|
||
params.push(minConfidence);
|
||
}
|
||
if (filters.get("maxConfidence") && Number.isFinite(maxConfidence)) {
|
||
conditions.push("confidence <= ?");
|
||
params.push(maxConfidence);
|
||
}
|
||
if (filters.get("amazonIsSeller") === "yes") {
|
||
conditions.push("amazon_is_seller = true");
|
||
} else if (filters.get("amazonIsSeller") === "no") {
|
||
conditions.push("amazon_is_seller = false");
|
||
}
|
||
const q = filters.get("q")?.trim();
|
||
if (q) {
|
||
const wildcard = `%${q}%`;
|
||
conditions.push(
|
||
"(COALESCE(asin, '') ILIKE ? OR COALESCE(product_name, '') ILIKE ? OR COALESCE(brand, '') ILIKE ? OR COALESCE(category, '') ILIKE ? OR COALESCE(reasoning, '') ILIKE ?)",
|
||
);
|
||
params.push(wildcard, wildcard, wildcard, wildcard, wildcard);
|
||
}
|
||
return {
|
||
where: conditions.length ? `WHERE ${conditions.join(" AND ")}` : "",
|
||
params,
|
||
};
|
||
}
|
||
|
||
const ITEM_SORTS: Record<string, string> = {
|
||
asin: "asin",
|
||
product_name: "product_name",
|
||
brand: "brand",
|
||
category: "category",
|
||
current_price: "current_price",
|
||
avg_price_90d: "avg_price_90d",
|
||
sales_rank: "sales_rank",
|
||
seller_count: "seller_count",
|
||
amazon_is_seller: "amazon_is_seller",
|
||
amazon_buybox_share_pct_90d: "amazon_buybox_share_pct_90d",
|
||
monthly_sold: "monthly_sold",
|
||
verdict: "verdict",
|
||
confidence: "confidence",
|
||
fetched_at: "fetched_at",
|
||
};
|
||
|
||
async function getRunItems(runId: number, filters: URLSearchParams) {
|
||
const { page, pageSize, offset } = pageInput(filters);
|
||
const { where, params } = itemFilters(filters, runId);
|
||
const orderBy = safeSort(filters.get("sort"), ITEM_SORTS, "monthly_sold DESC NULLS LAST, asin ASC");
|
||
const totalRow = await pgGet<{ total: string }>(
|
||
`SELECT COUNT(*) AS total FROM (${ITEM_ROWS}) item_rows ${where}`,
|
||
params,
|
||
);
|
||
const items = await pgAll(
|
||
`SELECT * FROM (${ITEM_ROWS}) item_rows ${where} ORDER BY ${orderBy} LIMIT ? OFFSET ?`,
|
||
[...params, pageSize, offset],
|
||
);
|
||
const total = Number(totalRow?.total ?? 0);
|
||
return { items, page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)) };
|
||
}
|
||
|
||
async function exportRunItems(runId: number, filters: URLSearchParams) {
|
||
const { where, params } = itemFilters(filters, runId);
|
||
const orderBy = safeSort(filters.get("sort"), ITEM_SORTS, "monthly_sold DESC NULLS LAST, asin ASC");
|
||
const rows = await pgAll<Record<string, unknown>>(
|
||
`SELECT * FROM (${ITEM_ROWS}) item_rows ${where} ORDER BY ${orderBy}`,
|
||
params,
|
||
);
|
||
const headers = [
|
||
"run_id", "asin", "product_name", "brand", "category", "unit_cost",
|
||
"current_price", "avg_price_90d", "sales_rank_avg_90d", "seller_count",
|
||
"amazon_is_seller", "amazon_buybox_share_pct_90d", "monthly_sold",
|
||
"sellability_status", "verdict", "confidence", "reasoning", "fetched_at",
|
||
];
|
||
return [headers.join(","), ...rows.map((row) => headers.map((h) => escapeCsvValue(row[h])).join(","))].join("\n");
|
||
}
|
||
|
||
async function getProducts(filters: URLSearchParams) {
|
||
const { page, pageSize, offset } = pageInput(filters);
|
||
const { where, params } = itemFilters(filters);
|
||
const orderBy = safeSort(filters.get("sort"), ITEM_SORTS, "fetched_at DESC NULLS LAST, asin ASC");
|
||
const base = `
|
||
SELECT product.asin, product.asin AS product_asin,
|
||
latest.item_id, latest.run_id AS "runId", latest.process_type AS "processType",
|
||
COALESCE(product.name, latest.product_name) AS product_name,
|
||
COALESCE(product.brand, latest.brand) AS brand,
|
||
COALESCE(product.category, latest.category) AS category,
|
||
latest.unit_cost, latest.current_price, latest.avg_price_90d,
|
||
latest.sales_rank, latest.sales_rank_avg_90d, latest.seller_count,
|
||
latest.amazon_is_seller, latest.amazon_buybox_share_pct_90d,
|
||
latest.monthly_sold, latest.rank_drops_30d, latest.rank_drops_90d,
|
||
latest.sellability_status, latest.verdict, latest.confidence,
|
||
latest.reasoning, latest.fetched_at
|
||
FROM products product
|
||
LEFT JOIN LATERAL (
|
||
SELECT *
|
||
FROM (${ITEM_ROWS}) item_history
|
||
WHERE item_history.product_asin = product.asin
|
||
ORDER BY item_history.fetched_at DESC NULLS LAST, item_history.item_id DESC
|
||
LIMIT 1
|
||
) latest ON TRUE`;
|
||
const total = Number(
|
||
(await pgGet<{ total: string }>(
|
||
`SELECT COUNT(*) AS total FROM (${base}) products ${where}`,
|
||
params,
|
||
))?.total ?? 0,
|
||
);
|
||
const items = await pgAll(
|
||
`SELECT * FROM (${base}) products ${where} ORDER BY ${orderBy} LIMIT ? OFFSET ?`,
|
||
[...params, pageSize, offset],
|
||
);
|
||
return { items, page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)) };
|
||
}
|
||
|
||
async function getProduct(asin: string) {
|
||
const product = await pgGet(
|
||
`SELECT * FROM products WHERE asin = ?`,
|
||
[asin],
|
||
);
|
||
if (!product) return null;
|
||
const observations = await pgAll(
|
||
`SELECT observation.*, run.type AS run_type
|
||
FROM product_observations observation
|
||
JOIN runs run ON run.id = observation.run_id
|
||
WHERE observation.product_asin = ?
|
||
ORDER BY observation.fetched_at DESC`,
|
||
[asin],
|
||
);
|
||
const analyses = await pgAll(
|
||
`SELECT revision.*, item.run_id, run.type AS run_type
|
||
FROM analysis_revisions revision
|
||
JOIN run_items item ON item.id = revision.run_item_id
|
||
JOIN runs run ON run.id = item.run_id
|
||
WHERE item.product_asin = ?
|
||
ORDER BY revision.analyzed_at DESC`,
|
||
[asin],
|
||
);
|
||
const distributorResearchRows = await pgAll<Record<string, unknown>>(
|
||
`SELECT id, run_item_id, inventory_item_id, provider, model, status, distributors_json, raw_response, created_at
|
||
FROM product_distributor_research
|
||
WHERE product_asin = ?
|
||
ORDER BY created_at DESC, id DESC`,
|
||
[asin],
|
||
);
|
||
const distributorResearch = distributorResearchRows.map((row) => {
|
||
const distributors = (() => {
|
||
try {
|
||
return normalizeDistributorCandidates(JSON.parse(String(row.distributors_json ?? "[]")));
|
||
} catch {
|
||
return [];
|
||
}
|
||
})();
|
||
return {
|
||
id: Number(row.id),
|
||
run_item_id: row.run_item_id == null ? null : Number(row.run_item_id),
|
||
inventory_item_id: row.inventory_item_id == null ? null : Number(row.inventory_item_id),
|
||
provider: String(row.provider ?? ""),
|
||
model: String(row.model ?? ""),
|
||
status: String(row.status ?? ""),
|
||
created_at: String(row.created_at ?? ""),
|
||
distributors,
|
||
raw_response: row.raw_response == null ? null : String(row.raw_response),
|
||
};
|
||
});
|
||
return { product, observations, analyses, distributorResearch };
|
||
}
|
||
|
||
async function findLatestRunItemIdByAsin(asin: string): Promise<number | null> {
|
||
const row = await pgGet<{ id: number }>(
|
||
`SELECT ri.id
|
||
FROM run_items ri
|
||
WHERE ri.product_asin = ?
|
||
ORDER BY ri.id DESC
|
||
LIMIT 1`,
|
||
[asin],
|
||
);
|
||
return row?.id == null ? null : Number(row.id);
|
||
}
|
||
|
||
async function reanalyzeStalkerProductByAsin(asin: string, useClaude = USE_CLAUDE) {
|
||
const runItemId = await findLatestRunItemIdByAsin(asin);
|
||
if (runItemId == null) {
|
||
throw new Error("Stalker product item not found");
|
||
}
|
||
return reanalyzeRunItem(runItemId, useClaude);
|
||
}
|
||
|
||
async function findDistributorsForStalkerProductByAsin(asin: string) {
|
||
const runItemId = await findLatestRunItemIdByAsin(asin);
|
||
if (runItemId == null) {
|
||
throw new Error("Stalker product item not found");
|
||
}
|
||
return findDistributorsForStalkerProduct(runItemId);
|
||
}
|
||
|
||
async function reanalyzeRunItem(itemId: number, useClaude = USE_CLAUDE) {
|
||
const row = await pgGet<Record<string, any>>(
|
||
`SELECT ri.id, ri.run_id, ri.product_asin AS asin, r.type,
|
||
COALESCE(p.name, si.supplied_name, ri.product_asin) AS product_name,
|
||
COALESCE(p.brand, si.supplied_brand) AS brand,
|
||
COALESCE(p.category, si.supplied_category) AS category,
|
||
si.unit_cost, si.avg_price_90d_sheet, si.selling_price_sheet,
|
||
si.fba_net_sheet, si.gross_profit_dollar, si.gross_profit_pct,
|
||
si.net_profit_sheet, si.roi_sheet, si.moq, si.moq_cost,
|
||
si.qty_available, si.supplier, si.source_url, si.asin_link,
|
||
si.promo_coupon_code, si.notes, si.lead_date,
|
||
ri.source_inventory_item_id
|
||
FROM run_items ri JOIN runs r ON r.id = ri.run_id
|
||
LEFT JOIN products p ON p.asin = ri.product_asin
|
||
LEFT JOIN sourcing_inputs si ON si.run_item_id = ri.id
|
||
WHERE ri.id = ? AND ri.product_asin IS NOT NULL`,
|
||
[itemId],
|
||
);
|
||
if (!row) throw new Error("Run item not found");
|
||
if (row.type === "supplier_upc") {
|
||
throw new Error("Supplier scoring revisions are produced by the supplier pipeline");
|
||
}
|
||
const record: ProductRecord = {
|
||
asin: row.asin,
|
||
name: row.product_name ?? row.asin,
|
||
unitCost: row.unit_cost ?? 0,
|
||
brand: row.brand ?? undefined,
|
||
category: row.category ?? undefined,
|
||
amazonRank: undefined,
|
||
avgPrice90FromSheet: row.avg_price_90d_sheet ?? undefined,
|
||
sellingPriceFromSheet: row.selling_price_sheet ?? undefined,
|
||
fbaNet: row.fba_net_sheet ?? undefined,
|
||
grossProfit: row.gross_profit_dollar ?? undefined,
|
||
grossProfitPct: row.gross_profit_pct ?? undefined,
|
||
netProfitFromSheet: row.net_profit_sheet ?? undefined,
|
||
roiFromSheet: row.roi_sheet ?? undefined,
|
||
moq: row.moq ?? undefined,
|
||
moqCost: row.moq_cost ?? undefined,
|
||
totalQtyAvail: row.qty_available ?? undefined,
|
||
supplier: row.supplier ?? undefined,
|
||
sourceUrl: row.source_url ?? undefined,
|
||
asinLink: row.asin_link ?? undefined,
|
||
promoCouponCode: row.promo_coupon_code ?? undefined,
|
||
notes: row.notes ?? undefined,
|
||
leadDate: row.lead_date ?? undefined,
|
||
};
|
||
const keepaMap = await fetchKeepaDataBatch([row.asin]).catch(() => new Map());
|
||
const keepa = keepaMap.get(row.asin) ?? null;
|
||
const sellabilityMap = await fetchSellabilityBatch([row.asin]);
|
||
const sellability = sellabilityMap.get(row.asin) ?? {
|
||
canSell: null,
|
||
sellabilityStatus: "unknown" as const,
|
||
sellabilityReason: "Sellability check returned no result",
|
||
};
|
||
const spApi = await fetchSpApiPricingAndFees(row.asin, sellability);
|
||
const enriched: EnrichedProduct = {
|
||
record,
|
||
keepa,
|
||
spApi: spApi as SpApiData,
|
||
fetchedAt: new Date().toISOString(),
|
||
};
|
||
const verdict =
|
||
(await analyzeProducts([enriched], { useClaude }))[0] ?? {
|
||
asin: row.asin,
|
||
verdict: "SKIP" as const,
|
||
confidence: 0,
|
||
reasoning: "LLM analysis returned no verdict",
|
||
};
|
||
const result: AnalysisResult = { product: enriched, verdict };
|
||
const observationId = await insertObservation(row.run_id, result, "reanalysis");
|
||
await db.insert(analysisRevisions).values({
|
||
runItemId: itemId,
|
||
observationId,
|
||
method: "llm",
|
||
decision: verdict.verdict,
|
||
confidence: verdict.confidence,
|
||
reasoning: verdict.reasoning,
|
||
analyzedAt: new Date(enriched.fetchedAt),
|
||
});
|
||
await refreshRunStats(row.run_id);
|
||
return { itemId, runId: row.run_id, asin: row.asin, fetchedAt: enriched.fetchedAt };
|
||
}
|
||
|
||
type DistributorCandidate = {
|
||
name: string;
|
||
website: string;
|
||
rationale: string;
|
||
confidence: number;
|
||
reputation: string;
|
||
contactInfo: string;
|
||
outreachDraft: string;
|
||
};
|
||
|
||
function clampDistributorConfidence(value: unknown): number {
|
||
const parsed = Number(value);
|
||
if (!Number.isFinite(parsed)) return 0;
|
||
return Math.max(0, Math.min(100, Math.round(parsed)));
|
||
}
|
||
|
||
function normalizeDistributorCandidates(payload: unknown): DistributorCandidate[] {
|
||
if (!Array.isArray(payload)) return [];
|
||
return payload
|
||
.filter((item): item is Record<string, unknown> => item != null && typeof item === "object")
|
||
.map((item) => ({
|
||
name: String(item.name ?? "").trim(),
|
||
website: String(item.website ?? "").trim(),
|
||
rationale: String(item.rationale ?? "").trim(),
|
||
confidence: clampDistributorConfidence(item.confidence),
|
||
reputation: String(item.reputation ?? "").trim(),
|
||
contactInfo: String(item.contact_info ?? item.contactInfo ?? "").trim(),
|
||
outreachDraft: String(item.outreach_draft ?? item.outreachDraft ?? "").trim(),
|
||
}))
|
||
.filter((item) => item.name.length > 0 && item.website.length > 0)
|
||
.slice(0, 10);
|
||
}
|
||
|
||
function extractJsonArrayFromText(text: string): string {
|
||
const trimmed = text.trim();
|
||
const fence = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i);
|
||
const candidate = fence ? fence[1]?.trim() ?? "" : trimmed;
|
||
const start = candidate.indexOf("[");
|
||
const end = candidate.lastIndexOf("]");
|
||
if (start >= 0 && end > start) {
|
||
return candidate.slice(start, end + 1);
|
||
}
|
||
return candidate;
|
||
}
|
||
|
||
async function requestClaudeDistributorCandidates(context: Record<string, unknown>) {
|
||
if (!config.anthropicApiKey) {
|
||
throw new Error("Missing required env var: ANTHROPIC_API_KEY");
|
||
}
|
||
const model = (config.anthropicModel ?? "claude-sonnet-4-6").trim() || "claude-sonnet-4-6";
|
||
const system = [
|
||
"You are a wholesale sourcing researcher who identifies authorized U.S. distributors for Amazon products.",
|
||
"For each candidate you research their reputation, locate real point-of-contact details, and draft a concise cold-outreach message.",
|
||
"Return only raw JSON — no prose, no markdown fences.",
|
||
].join(" ");
|
||
const prompt = [
|
||
"Analyze the Amazon product context below and identify up to 5 likely authorized U.S. wholesale distributors.",
|
||
"",
|
||
"For each distributor:",
|
||
"1. Identify whether they are an official brand distributor, authorized reseller, or national wholesaler.",
|
||
"2. Investigate their reputation: check for BBB accreditation, industry tenure, any known complaints or red flags, and whether they appear on the brand's own authorized-distributor list.",
|
||
"3. Find the most direct point-of-contact for opening a new wholesale account. Search the distributor's website for a dedicated wholesale, reseller, or new-account page. Return AS MANY of these as you can find: full name and title of the wholesale/vendor relations contact, direct email address (e.g. wholesale@..., newaccounts@..., sales@...), direct phone number, and the URL of the wholesale application or inquiry page. If a named contact is not publicly listed, return the best department email and phone. Do NOT return a generic contact form URL as the only answer.",
|
||
"4. Draft a short, professional cold-outreach message (3–5 sentences) I can copy-paste and send. Tone: warm, genuine, and business-oriented — the goal is to start a relationship, not close a deal. Rules: (a) Praise the brand's reputation, quality, or market position sincerely — make it specific to what this brand is known for. (b) Frame the inquiry as a mutual growth opportunity; express eagerness to carry their line and help it reach more customers. (c) Do NOT mention Amazon, FBA, or online marketplaces anywhere in the message — present yourself simply as a retailer / reseller interested in carrying their products. (d) Ask about wholesale account requirements and invite them to share terms or an application. (e) Keep it concise and human — avoid corporate filler phrases.",
|
||
"",
|
||
"Return a raw JSON array. Each object must have exactly these keys:",
|
||
' "name" — distributor company name',
|
||
' "website" — full URL (https://...)',
|
||
' "rationale" — why this distributor is a strong candidate (1–2 sentences)',
|
||
' "confidence" — integer 0–100 reflecting how confident you are this is a real authorized source',
|
||
' "reputation" — summary of reputation findings (BBB status, years in business, any red flags)',
|
||
' "contact_info" — structured string with all contact details found: "Name: ..., Title: ..., Email: ..., Phone: ..., Wholesale page: ..."',
|
||
' "outreach_draft"— complete ready-to-send message addressed to the specific contact',
|
||
"",
|
||
"Product context:",
|
||
JSON.stringify(context, null, 2),
|
||
].join("\n");
|
||
const response = await fetch("https://api.anthropic.com/v1/messages", {
|
||
method: "POST",
|
||
headers: {
|
||
"Content-Type": "application/json",
|
||
"x-api-key": config.anthropicApiKey,
|
||
"anthropic-version": "2023-06-01",
|
||
},
|
||
body: JSON.stringify({
|
||
model,
|
||
system,
|
||
messages: [{ role: "user", content: prompt }],
|
||
temperature: 0.2,
|
||
max_tokens: 4096,
|
||
}),
|
||
});
|
||
const raw = await response.text();
|
||
if (!response.ok) {
|
||
throw new Error(`Claude API error ${response.status}: ${raw.slice(0, 300)}`);
|
||
}
|
||
let contentText = "";
|
||
try {
|
||
const parsed = JSON.parse(raw) as { content?: Array<{ type?: string; text?: string }> };
|
||
contentText = (parsed.content ?? [])
|
||
.filter((block) => block?.type === "text" && typeof block.text === "string")
|
||
.map((block) => block.text ?? "")
|
||
.join("\n");
|
||
} catch {
|
||
contentText = raw;
|
||
}
|
||
const arrayText = extractJsonArrayFromText(contentText);
|
||
let candidates: DistributorCandidate[] = [];
|
||
try {
|
||
candidates = normalizeDistributorCandidates(JSON.parse(arrayText));
|
||
} catch {
|
||
candidates = [];
|
||
}
|
||
return { model, rawResponse: contentText, candidates };
|
||
}
|
||
|
||
async function findDistributorsForStalkerProduct(runItemId: number) {
|
||
const row = await pgGet<Record<string, any>>(
|
||
`SELECT ri.id AS run_item_id, ri.product_asin AS asin, ri.source_inventory_item_id,
|
||
p.name AS product_title, p.brand, p.category,
|
||
observation.current_price, observation.avg_price_90d, observation.sales_rank,
|
||
observation.monthly_sold, observation.seller_count, observation.amazon_is_seller,
|
||
observation.can_sell, observation.sellability_status, observation.sellability_reason,
|
||
latest_analysis.decision AS verdict, latest_analysis.confidence, latest_analysis.reasoning,
|
||
seller.seller_id, seller.seller_name, seller.rating, seller.rating_count
|
||
FROM run_items ri
|
||
JOIN products p ON p.asin = ri.product_asin
|
||
LEFT JOIN product_observations observation ON observation.id = (
|
||
SELECT obs.id
|
||
FROM product_observations obs
|
||
WHERE obs.product_asin = ri.product_asin
|
||
ORDER BY obs.fetched_at DESC, obs.id DESC
|
||
LIMIT 1
|
||
)
|
||
LEFT JOIN LATERAL (
|
||
SELECT revision.decision, revision.confidence, revision.reasoning
|
||
FROM analysis_revisions revision
|
||
WHERE revision.run_item_id = ri.id
|
||
ORDER BY revision.analyzed_at DESC, revision.id DESC
|
||
LIMIT 1
|
||
) latest_analysis ON TRUE
|
||
LEFT JOIN stalker_inventory_items si ON si.id = ri.source_inventory_item_id
|
||
LEFT JOIN sellers seller ON seller.seller_id = si.seller_id
|
||
WHERE ri.id = ?`,
|
||
[runItemId],
|
||
);
|
||
if (!row?.asin) {
|
||
throw new Error("Stalker product item not found");
|
||
}
|
||
const offerResults = await searchAsinOffers(row.asin, {
|
||
maxResults: 12,
|
||
includeUnmatchedAsinResults: true,
|
||
}).catch(() => [] as SearxngOfferSearchResult[]);
|
||
const promptContext = {
|
||
asin: row.asin,
|
||
productTitle: row.product_title ?? null,
|
||
brand: row.brand ?? null,
|
||
category: row.category ?? null,
|
||
metrics: {
|
||
currentPrice: row.current_price ?? null,
|
||
avgPrice90d: row.avg_price_90d ?? null,
|
||
salesRank: row.sales_rank ?? null,
|
||
monthlySold: row.monthly_sold ?? null,
|
||
sellerCount: row.seller_count ?? null,
|
||
amazonIsSeller: row.amazon_is_seller ?? null,
|
||
canSell: row.can_sell ?? null,
|
||
sellabilityStatus: row.sellability_status ?? null,
|
||
sellabilityReason: row.sellability_reason ?? null,
|
||
verdict: row.verdict ?? null,
|
||
confidence: row.confidence ?? null,
|
||
reasoning: row.reasoning ?? null,
|
||
},
|
||
seller: {
|
||
sellerId: row.seller_id ?? null,
|
||
sellerName: row.seller_name ?? null,
|
||
rating: row.rating ?? null,
|
||
ratingCount: row.rating_count ?? null,
|
||
},
|
||
offerResearch: offerResults.map((result) => ({
|
||
title: result.title,
|
||
url: result.url,
|
||
domain: result.domain,
|
||
snippet: result.snippet,
|
||
score: result.score,
|
||
rank: result.rank,
|
||
})),
|
||
};
|
||
const claude = await requestClaudeDistributorCandidates(promptContext);
|
||
await db
|
||
.delete(productDistributorResearch)
|
||
.where(eq(productDistributorResearch.runItemId, runItemId));
|
||
const [saved] = await db
|
||
.insert(productDistributorResearch)
|
||
.values({
|
||
productAsin: row.asin,
|
||
runItemId: runItemId,
|
||
inventoryItemId: row.source_inventory_item_id ?? null,
|
||
provider: "claude",
|
||
model: claude.model,
|
||
status: claude.candidates.length ? "completed" : "empty",
|
||
queryContextJson: JSON.stringify(promptContext),
|
||
distributorsJson: JSON.stringify(claude.candidates),
|
||
rawResponse: claude.rawResponse,
|
||
})
|
||
.returning({ id: productDistributorResearch.id, createdAt: productDistributorResearch.createdAt });
|
||
return {
|
||
asin: row.asin,
|
||
runItemId: runItemId,
|
||
researchId: saved?.id ?? null,
|
||
createdAt: saved?.createdAt ?? null,
|
||
distributors: claude.candidates,
|
||
};
|
||
}
|
||
|
||
function stalkerBaseWhere(filters: URLSearchParams, product = false) {
|
||
const conditions = ["r.type = 'stalker'"];
|
||
const params: unknown[] = [];
|
||
for (const [key, expression] of [
|
||
["runId", "r.id = ?"],
|
||
["sellerId", "seller.seller_id = ?"],
|
||
] as const) {
|
||
const value = filters.get(key)?.trim();
|
||
if (value) {
|
||
conditions.push(expression);
|
||
params.push(key === "runId" ? Number(value) : value.toUpperCase());
|
||
}
|
||
}
|
||
const q = filters.get("q")?.trim();
|
||
if (q) {
|
||
const wildcard = `%${q}%`;
|
||
if (product) {
|
||
conditions.push(
|
||
"(inventory.product_asin ILIKE ? OR COALESCE(product.name, '') ILIKE ? OR COALESCE(product.brand, '') ILIKE ? OR COALESCE(product.category, '') ILIKE ? OR seller.seller_id ILIKE ? OR COALESCE(seller.seller_name, '') ILIKE ?)",
|
||
);
|
||
params.push(wildcard, wildcard, wildcard, wildcard, wildcard, wildcard);
|
||
} else {
|
||
conditions.push(
|
||
"(seller.seller_id ILIKE ? OR COALESCE(seller.seller_name, '') ILIKE ? OR EXISTS (SELECT 1 FROM stalker_inventory_items query_inventory WHERE query_inventory.run_id = r.id AND query_inventory.seller_id = seller.seller_id AND query_inventory.product_asin ILIKE ?))",
|
||
);
|
||
params.push(wildcard, wildcard, wildcard);
|
||
}
|
||
}
|
||
for (const [parameter, expression] of [
|
||
["minRatingCount", "seller.rating_count >= ?"],
|
||
["maxRatingCount", "seller.rating_count <= ?"],
|
||
] as const) {
|
||
const value = Number(filters.get(parameter));
|
||
if (filters.get(parameter) && Number.isFinite(value)) {
|
||
conditions.push(expression);
|
||
params.push(value);
|
||
}
|
||
}
|
||
if (product) {
|
||
conditions.push("observation.can_sell = true", "observation.sellability_status = 'available'");
|
||
const verdict = filters.get("verdict")?.toUpperCase();
|
||
if (verdict === "FBA" || verdict === "FBM" || verdict === "SKIP") {
|
||
conditions.push("analysis.decision::text = ?");
|
||
params.push(verdict);
|
||
} else if (verdict === "UNANALYZED") {
|
||
conditions.push("analysis.decision IS NULL");
|
||
}
|
||
if (filters.get("amazonIsSeller") === "yes") {
|
||
conditions.push("observation.amazon_is_seller = true");
|
||
} else if (filters.get("amazonIsSeller") === "no") {
|
||
conditions.push("observation.amazon_is_seller = false");
|
||
} else if (filters.get("amazonIsSeller") === "unknown") {
|
||
conditions.push("observation.amazon_is_seller IS NULL");
|
||
}
|
||
for (const [parameter, expression] of [
|
||
["minPrice", "observation.current_price >= ?"],
|
||
["maxPrice", "observation.current_price <= ?"],
|
||
["minMonthlySold", "observation.monthly_sold >= ?"],
|
||
["maxMonthlySold", "observation.monthly_sold <= ?"],
|
||
["minSalesRank", "observation.sales_rank >= ?"],
|
||
["maxSalesRank", "observation.sales_rank <= ?"],
|
||
["minSellerCount", "observation.seller_count >= ?"],
|
||
["maxSellerCount", "observation.seller_count <= ?"],
|
||
["minConfidence", "analysis.confidence >= ?"],
|
||
["maxConfidence", "analysis.confidence <= ?"],
|
||
] as const) {
|
||
const value = Number(filters.get(parameter));
|
||
if (filters.get(parameter) && Number.isFinite(value)) {
|
||
conditions.push(expression);
|
||
params.push(value);
|
||
}
|
||
}
|
||
}
|
||
return { where: `WHERE ${conditions.join(" AND ")}`, params };
|
||
}
|
||
|
||
async function getStalkerResults(filters: URLSearchParams) {
|
||
const { page, pageSize, offset } = pageInput(filters);
|
||
const { where, params } = stalkerBaseWhere(filters);
|
||
const base = `SELECT r.id AS "runId", r.started_at, r.status, r.input_file,
|
||
seller.seller_id, seller.seller_name, seller.rating, seller.rating_count,
|
||
seller.storefront_asin_total, seller.persisted_inventory_sample_count,
|
||
COUNT(DISTINCT scan.source_product_asin) AS discovered_from_count,
|
||
MIN(scan.fetched_at) AS first_seen_at, MAX(scan.fetched_at) AS last_seen_at,
|
||
COUNT(DISTINCT inventory.product_asin) AS persisted_inventory_asin_count,
|
||
STRING_AGG(DISTINCT inventory.product_asin, ',') AS inventory_sample_asins
|
||
FROM stalker_scan_sellers scan_seller
|
||
JOIN stalker_scans scan ON scan.id = scan_seller.scan_id
|
||
JOIN runs r ON r.id = scan.run_id
|
||
JOIN sellers seller ON seller.seller_id = scan_seller.seller_id
|
||
LEFT JOIN stalker_inventory_items inventory
|
||
ON inventory.run_id = r.id AND inventory.seller_id = seller.seller_id
|
||
${where} GROUP BY r.id, seller.seller_id`;
|
||
const order = safeSort(
|
||
filters.get("sort"),
|
||
{
|
||
runId: '"runId"',
|
||
started_at: "started_at",
|
||
seller_id: "seller_id",
|
||
seller_name: "seller_name",
|
||
rating: "rating",
|
||
rating_count: "rating_count",
|
||
discovered_from_count: "discovered_from_count",
|
||
persisted_inventory_asin_count: "persisted_inventory_asin_count",
|
||
storefront_asin_total: "storefront_asin_total",
|
||
last_seen_at: "last_seen_at",
|
||
},
|
||
"persisted_inventory_asin_count DESC, last_seen_at DESC, seller_id ASC",
|
||
);
|
||
const total = Number((await pgGet<{ total: string }>(`SELECT COUNT(*) AS total FROM (${base}) rows`, params))?.total ?? 0);
|
||
const items = await pgAll(
|
||
`SELECT * FROM (${base}) rows ORDER BY ${order} LIMIT ? OFFSET ?`,
|
||
[...params, pageSize, offset],
|
||
);
|
||
const summary = await pgGet<Record<string, string>>(
|
||
`SELECT COUNT(DISTINCT "runId") AS runs, COUNT(DISTINCT seller_id) AS sellers,
|
||
COALESCE(SUM(persisted_inventory_asin_count), 0) AS "persistedInventoryAsins"
|
||
FROM (${base}) rows`,
|
||
params,
|
||
);
|
||
return {
|
||
items,
|
||
summary: {
|
||
runs: Number(summary?.runs ?? 0),
|
||
sellers: Number(summary?.sellers ?? 0),
|
||
persistedInventoryAsins: Number(summary?.persistedInventoryAsins ?? 0),
|
||
},
|
||
page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)),
|
||
};
|
||
}
|
||
|
||
function stalkerProductSql(where: string) {
|
||
return `SELECT r.id AS "runId", r.started_at, seller.seller_id, seller.seller_name,
|
||
seller.rating, seller.rating_count, inventory.id AS inventory_item_id, inventory.product_asin AS asin,
|
||
observation.can_sell, observation.sellability_status, observation.sellability_reason,
|
||
analysis.run_item_id,
|
||
product.name AS product_title, product.brand,
|
||
CASE WHEN product.category IS NULL THEN NULL ELSE json_build_array(product.category)::text END AS category_tree,
|
||
observation.current_price, observation.avg_price_90d, observation.sales_rank,
|
||
observation.monthly_sold, observation.seller_count, observation.amazon_is_seller,
|
||
analysis.decision AS verdict, analysis.confidence, analysis.reasoning,
|
||
inventory.last_seen_at
|
||
FROM stalker_inventory_items inventory
|
||
JOIN runs r ON r.id = inventory.run_id
|
||
JOIN sellers seller ON seller.seller_id = inventory.seller_id
|
||
JOIN products product ON product.asin = inventory.product_asin
|
||
JOIN product_observations observation ON observation.id = inventory.observation_id
|
||
LEFT JOIN LATERAL (
|
||
SELECT item.id AS run_item_id, revision.decision, revision.confidence, revision.reasoning
|
||
FROM run_items item
|
||
JOIN analysis_revisions revision ON revision.run_item_id = item.id
|
||
WHERE item.source_inventory_item_id = inventory.id
|
||
ORDER BY revision.analyzed_at DESC, revision.id DESC LIMIT 1
|
||
) analysis ON true
|
||
${where}`;
|
||
}
|
||
|
||
async function stalkerProducts(filters: URLSearchParams, exportOnly = false) {
|
||
const { page, pageSize, offset } = pageInput(filters);
|
||
const { where, params } = stalkerBaseWhere(filters, true);
|
||
const base = stalkerProductSql(where);
|
||
const order = safeSort(
|
||
filters.get("sort"),
|
||
{
|
||
runId: '"runId"',
|
||
started_at: "started_at",
|
||
seller_id: "seller_id",
|
||
seller_name: "seller_name",
|
||
rating: "rating",
|
||
rating_count: "rating_count",
|
||
asin: "asin",
|
||
product_title: "product_title",
|
||
brand: "brand",
|
||
current_price: "current_price",
|
||
avg_price_90d: "avg_price_90d",
|
||
sales_rank: "sales_rank",
|
||
monthly_sold: "monthly_sold",
|
||
seller_count: "seller_count",
|
||
amazon_is_seller: "amazon_is_seller",
|
||
verdict: "verdict",
|
||
confidence: "confidence",
|
||
last_seen_at: "last_seen_at",
|
||
},
|
||
"monthly_sold DESC NULLS LAST, last_seen_at DESC, asin ASC",
|
||
);
|
||
if (exportOnly) return pgAll(`SELECT * FROM (${base}) products ORDER BY ${order}`, params);
|
||
const total = Number((await pgGet<{ total: string }>(`SELECT COUNT(*) AS total FROM (${base}) products`, params))?.total ?? 0);
|
||
const items = await pgAll(
|
||
`SELECT * FROM (${base}) products ORDER BY ${order} LIMIT ? OFFSET ?`,
|
||
[...params, pageSize, offset],
|
||
);
|
||
const summary = await pgGet<Record<string, string>>(
|
||
`SELECT COUNT(DISTINCT "runId") AS runs, COUNT(DISTINCT seller_id) AS sellers,
|
||
COUNT(DISTINCT asin) AS products FROM (${base}) products`,
|
||
params,
|
||
);
|
||
return {
|
||
items,
|
||
summary: {
|
||
runs: Number(summary?.runs ?? 0),
|
||
sellers: Number(summary?.sellers ?? 0),
|
||
products: Number(summary?.products ?? 0),
|
||
},
|
||
page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)),
|
||
};
|
||
}
|
||
|
||
async function exportStalkerProducts(filters: URLSearchParams): Promise<Response> {
|
||
const rows = (await stalkerProducts(filters, true)) as Array<Record<string, any>>;
|
||
const data = rows.map((row) => ({
|
||
ASIN: row.asin,
|
||
"Amazon URL": `https://amazon.com/dp/${row.asin}`,
|
||
Product: row.product_title ?? "",
|
||
Brand: row.brand ?? "",
|
||
Category: row.category_tree ?? "",
|
||
"Monthly Sold": row.monthly_sold ?? null,
|
||
Sellers: row.seller_count ?? null,
|
||
"Sales Rank": row.sales_rank ?? null,
|
||
"Current Price": row.current_price ?? null,
|
||
Verdict: row.verdict ?? "",
|
||
Confidence: row.confidence ?? null,
|
||
"Seller ID": row.seller_id,
|
||
Seller: row.seller_name ?? "",
|
||
"Run ID": row.runId,
|
||
}));
|
||
const workbook = XLSX.utils.book_new();
|
||
XLSX.utils.book_append_sheet(workbook, XLSX.utils.json_to_sheet(data), "Sellable Products");
|
||
return xlsx(
|
||
XLSX.write(workbook, { type: "array", bookType: "xlsx" }) as ArrayBuffer,
|
||
"stalker-sellable-products.xlsx",
|
||
);
|
||
}
|
||
|
||
async function purgeStalkerData() {
|
||
const count = await pgGet<{ count: string }>(
|
||
"SELECT COUNT(*) AS count FROM runs WHERE type = 'stalker'",
|
||
);
|
||
await client.begin(async (sql) => {
|
||
await sql`DELETE FROM runs WHERE type = 'stalker'`;
|
||
await sql`DELETE FROM sellers WHERE NOT EXISTS (
|
||
SELECT 1 FROM stalker_scan_sellers x WHERE x.seller_id = sellers.seller_id
|
||
)`;
|
||
});
|
||
return { ok: true, deleted: { runs: Number(count?.count ?? 0) } };
|
||
}
|
||
|
||
const server = Bun.serve({
|
||
port: Number(process.env.PORT || "3000"),
|
||
routes: {
|
||
"/": index,
|
||
"/products": index,
|
||
"/products/:asin": index,
|
||
"/stalker": index,
|
||
"/stalker/products": index,
|
||
"/runs/:runId": index,
|
||
"/api/runs": async (req) => json(await getRuns(new URL(req.url).searchParams)),
|
||
"/api/runs/:runId": async (req) => {
|
||
const runId = Number(req.params.runId);
|
||
if (!Number.isInteger(runId)) return json({ error: "Invalid run identifier" }, 400);
|
||
if (req.method === "DELETE") {
|
||
const deleted = await pgRun("DELETE FROM runs WHERE id = ?", [runId]);
|
||
return deleted ? json({ deletedRun: true }) : json({ error: "Run not found" }, 404);
|
||
}
|
||
const run = await getRun(runId);
|
||
if (!run) return json({ error: "Run not found" }, 404);
|
||
return json({
|
||
...run,
|
||
summary: {
|
||
totalProducts: run.totalProducts,
|
||
fbaCount: run.fbaCount,
|
||
fbmCount: run.fbmCount,
|
||
buyCount: run.buyCount,
|
||
watchCount: run.watchCount,
|
||
skipCount: run.skipCount,
|
||
},
|
||
});
|
||
},
|
||
"/api/runs/:runId/items": async (req) => {
|
||
const runId = Number(req.params.runId);
|
||
if (!Number.isInteger(runId)) return json({ error: "Invalid run identifier" }, 400);
|
||
return json(await getRunItems(runId, new URL(req.url).searchParams));
|
||
},
|
||
"/api/runs/:runId/export.csv": async (req) => {
|
||
const runId = Number(req.params.runId);
|
||
if (!Number.isInteger(runId)) return json({ error: "Invalid run identifier" }, 400);
|
||
return csv(await exportRunItems(runId, new URL(req.url).searchParams), `run-${runId}.csv`);
|
||
},
|
||
"/api/run-items/:itemId/reanalyze": async (req) => {
|
||
if (req.method !== "POST") return json({ error: "Method not allowed" }, 405);
|
||
const itemId = Number(req.params.itemId);
|
||
if (!Number.isInteger(itemId)) return json({ error: "Invalid run item identifier" }, 400);
|
||
try {
|
||
return json(await reanalyzeRunItem(itemId));
|
||
} catch (error) {
|
||
const message = error instanceof Error ? error.message : String(error);
|
||
return json({ error: message }, message === "Run item not found" ? 404 : 500);
|
||
}
|
||
},
|
||
"/api/products": async (req) => json(await getProducts(new URL(req.url).searchParams)),
|
||
"/api/products/:asin": async (req) => {
|
||
const asin = normalizeAsin(req.params.asin);
|
||
if (!asin) return json({ error: "Invalid ASIN" }, 400);
|
||
const result = await getProduct(asin);
|
||
return result ? json(result) : json({ error: "Product not found" }, 404);
|
||
},
|
||
"/api/stalker/results": async (req) => json(await getStalkerResults(new URL(req.url).searchParams)),
|
||
"/api/stalker/products": async (req) => json(await stalkerProducts(new URL(req.url).searchParams)),
|
||
"/api/stalker/products/export.xlsx": async (req) => exportStalkerProducts(new URL(req.url).searchParams),
|
||
"/api/stalker/products/:runItemId/reanalyze": async (req) => {
|
||
if (req.method !== "POST") return json({ error: "Method not allowed" }, 405);
|
||
const runItemId = Number(req.params.runItemId);
|
||
if (!Number.isInteger(runItemId)) return json({ error: "Invalid run item identifier" }, 400);
|
||
const provider = new URL(req.url).searchParams.get("provider")?.trim().toLowerCase();
|
||
const useClaude = provider === "claude";
|
||
try {
|
||
return json(await reanalyzeRunItem(runItemId, useClaude || USE_CLAUDE));
|
||
} catch (error) {
|
||
const message = error instanceof Error ? error.message : String(error);
|
||
return json({ error: message }, message === "Run item not found" ? 404 : 500);
|
||
}
|
||
},
|
||
"/api/stalker/products/:runItemId/distributors": async (req) => {
|
||
if (req.method !== "POST") return json({ error: "Method not allowed" }, 405);
|
||
const runItemId = Number(req.params.runItemId);
|
||
if (!Number.isInteger(runItemId)) return json({ error: "Invalid run item identifier" }, 400);
|
||
try {
|
||
return json(await findDistributorsForStalkerProduct(runItemId));
|
||
} catch (error) {
|
||
const message = error instanceof Error ? error.message : String(error);
|
||
return json({ error: message }, message === "Stalker product item not found" ? 404 : 500);
|
||
}
|
||
},
|
||
"/api/stalker/products/by-asin/:asin/reanalyze": async (req) => {
|
||
if (req.method !== "POST") return json({ error: "Method not allowed" }, 405);
|
||
const asin = normalizeAsin(req.params.asin);
|
||
if (!asin) return json({ error: "Invalid ASIN" }, 400);
|
||
const provider = new URL(req.url).searchParams.get("provider")?.trim().toLowerCase();
|
||
const useClaude = provider === "claude";
|
||
try {
|
||
return json(await reanalyzeStalkerProductByAsin(asin, useClaude || USE_CLAUDE));
|
||
} catch (error) {
|
||
const message = error instanceof Error ? error.message : String(error);
|
||
return json({ error: message }, message === "Stalker product item not found" ? 404 : 500);
|
||
}
|
||
},
|
||
"/api/stalker/products/by-asin/:asin/distributors": async (req) => {
|
||
if (req.method !== "POST") return json({ error: "Method not allowed" }, 405);
|
||
const asin = normalizeAsin(req.params.asin);
|
||
if (!asin) return json({ error: "Invalid ASIN" }, 400);
|
||
try {
|
||
return json(await findDistributorsForStalkerProductByAsin(asin));
|
||
} catch (error) {
|
||
const message = error instanceof Error ? error.message : String(error);
|
||
return json({ error: message }, message === "Stalker product item not found" ? 404 : 500);
|
||
}
|
||
},
|
||
"/api/stalker/purge": async (req) =>
|
||
req.method === "DELETE" || req.method === "POST"
|
||
? json(await purgeStalkerData())
|
||
: json({ error: "Method not allowed" }, 405),
|
||
"/api/upc/map": async (req) => {
|
||
try {
|
||
const upcs = await parseUpcsFromRequest(req);
|
||
const error = validateUpcs(upcs);
|
||
if (error) return json({ error }, 400);
|
||
const items = [...(await mapUpcsToAsins(upcs)).entries()].map(([upc, asin]) => ({ upc, asin }));
|
||
return json({ requested: upcs.length, matched: items.length, items });
|
||
} catch (error) {
|
||
return json({ error: error instanceof Error ? error.message : String(error) }, 400);
|
||
}
|
||
},
|
||
"/api/upc/lookup": async (req) => {
|
||
try {
|
||
const upcs = await parseUpcsFromRequest(req);
|
||
const error = validateUpcs(upcs);
|
||
if (error) return json({ error }, 400);
|
||
const items = [...(await lookupKeepaUpcs(upcs)).values()];
|
||
return json({ requested: upcs.length, statusCounts: summarizeLookupStatuses(items), items });
|
||
} catch (error) {
|
||
return json({ error: error instanceof Error ? error.message : String(error) }, 400);
|
||
}
|
||
},
|
||
"/api/process/upc-file": async (req) => {
|
||
try {
|
||
return json(await runUpcFileAnalysis({ ...(await parseUpcFileRequest(req)), manageResources: false }));
|
||
} catch (error) {
|
||
return json({ error: error instanceof Error ? error.message : String(error) }, 400);
|
||
}
|
||
},
|
||
},
|
||
fetch() {
|
||
return json({ error: "Not found" }, 404);
|
||
},
|
||
development: { hmr: true, console: true },
|
||
});
|
||
|
||
console.log(`Results viewer running on http://localhost:${server.port}`);
|