Files
asin-check/src/server.ts
Victor Noguera 923ebbaec5 Refactor supplier analysis and product handling
- Updated `SupplierAnalysisResult` to include a `product` field and modified related tests.
- Refactored `addRowsSheet` to accommodate changes in the product structure.
- Enhanced UPC file analysis to utilize a new `toSupplierInputRecord` function for cleaner record creation.
- Introduced new types for supplier input records and product observations.
- Updated frontend components to handle new product details and analysis history.
- Improved database writing functions to streamline run completion and error handling.
- Added new API endpoints for product details and adjusted routing in the frontend.
2026-05-25 12:27:41 -04:00

928 lines
36 KiB
TypeScript

import index from "./web/index.html";
import * as XLSX from "xlsx";
import { normalizeAsin } from "./asin.ts";
import { db, client } from "./db/index.ts";
import { analysisRevisions } from "./db/schema.ts";
import { insertObservation, refreshRunStats } from "./db/persistence.ts";
import {
fetchKeepaDataBatch,
lookupKeepaUpcs,
mapUpcsToAsins,
} from "./integrations/keepa.ts";
import { analyzeProducts } from "./integrations/llm.ts";
import {
fetchSellabilityBatch,
fetchSpApiPricingAndFees,
} from "./integrations/sp-api.ts";
import { runUpcFileAnalysis } from "./supplier/upc-file-analysis.ts";
import type {
AnalysisResult,
EnrichedProduct,
KeepaUpcLookupDetail,
ProductRecord,
SpApiData,
} from "./types.ts";
const DEFAULT_PAGE_SIZE = 25;
const MAX_PAGE_SIZE = 200;
const MAX_UPCS_PER_REQUEST = 1000;
const USE_CLAUDE = process.argv.includes("--claude");
function toPostgresSql(query: string): string {
let n = 0;
return query.replace(/\?/g, () => `$${++n}`);
}
async function pgGet<T extends Record<string, unknown>>(
query: string,
params: unknown[] = [],
): Promise<T | null> {
const rows = await client.unsafe<T[]>(toPostgresSql(query), params as never[]);
return (rows[0] as T) ?? null;
}
async function pgAll<T extends Record<string, unknown>>(
query: string,
params: unknown[] = [],
): Promise<T[]> {
return client.unsafe<T[]>(toPostgresSql(query), params as never[]) as unknown as T[];
}
async function pgRun(query: string, params: unknown[] = []): Promise<number> {
const result = await client.unsafe(toPostgresSql(query), params as never[]);
return result.count;
}
function json(data: unknown, status = 200): Response {
return new Response(JSON.stringify(data), {
status,
headers: { "content-type": "application/json; charset=utf-8" },
});
}
function csv(text: string, filename: string): Response {
return new Response(text, {
headers: {
"content-type": "text/csv; charset=utf-8",
"content-disposition": `attachment; filename="${filename}"`,
},
});
}
function xlsx(buffer: ArrayBuffer, filename: string): Response {
return new Response(buffer, {
headers: {
"content-type":
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"content-disposition": `attachment; filename="${filename}"`,
},
});
}
function parseIntParam(value: string | null, fallback: number): number {
const parsed = Number.parseInt(String(value), 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}
function pageInput(filters: URLSearchParams) {
const page = parseIntParam(filters.get("page"), 1);
const pageSize = Math.min(
parseIntParam(filters.get("pageSize"), DEFAULT_PAGE_SIZE),
MAX_PAGE_SIZE,
);
return { page, pageSize, offset: (page - 1) * pageSize };
}
function escapeCsvValue(value: unknown): string {
if (value == null) return "";
const escaped = String(value).replaceAll('"', '""');
return /[",\n]/.test(escaped) ? `"${escaped}"` : escaped;
}
function safeSort(
input: string | null,
columns: Record<string, string>,
fallback: string,
): string {
if (!input) return fallback;
const parts = input
.split(",")
.map((part) => {
const [key, direction] = part.trim().split(":");
const column = key ? columns[key] : undefined;
if (!column) return null;
return `${column} ${direction?.toUpperCase() === "DESC" ? "DESC" : "ASC"}`;
})
.filter((value): value is string => value != null);
return parts.join(", ") || fallback;
}
function splitRawUpcValues(input: string): string[] {
return input.split(/[\s,;|]+/).map((v) => v.trim()).filter(Boolean);
}
function collectUpcs(value: unknown, target: string[]): void {
if (typeof value === "string") {
target.push(...splitRawUpcValues(value));
} else if (typeof value === "number" && Number.isFinite(value)) {
target.push(String(Math.trunc(value)));
} else if (Array.isArray(value)) {
value.forEach((entry) => collectUpcs(entry, target));
}
}
async function parseUpcsFromRequest(req: Request): Promise<string[]> {
const parsed: string[] = [];
if (req.method === "GET") {
const params = new URL(req.url).searchParams;
params.getAll("upc").forEach((value) => collectUpcs(value, parsed));
collectUpcs(params.get("upcs"), parsed);
} else if (req.method === "POST") {
let body: unknown;
try {
body = await req.json();
} catch {
throw new Error("Invalid JSON body");
}
if (body && typeof body === "object" && "upcs" in body) {
collectUpcs((body as { upcs?: unknown }).upcs, parsed);
} else {
collectUpcs(body, parsed);
}
} else {
throw new Error("Method not allowed");
}
return [...new Set(parsed)];
}
function validateUpcs(upcs: string[]): string | null {
if (upcs.length === 0) return "Provide at least one UPC.";
if (upcs.length > MAX_UPCS_PER_REQUEST) {
return `Too many UPCs. Maximum allowed per request is ${MAX_UPCS_PER_REQUEST}.`;
}
return null;
}
function summarizeLookupStatuses(items: KeepaUpcLookupDetail[]) {
return items.reduce<Record<string, number>>((counts, item) => {
counts[item.status] = (counts[item.status] ?? 0) + 1;
return counts;
}, {});
}
type UpcFileRequest = {
inputFile: string;
outputFile?: string;
inputBatchSize?: number;
upcLookupBatchSize?: number;
maxRows?: number;
};
function positiveField(value: unknown, name: string): number | undefined {
if (value == null) return undefined;
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1) {
throw new Error(`${name} must be a positive integer`);
}
return parsed;
}
async function parseUpcFileRequest(req: Request): Promise<UpcFileRequest> {
if (req.method !== "POST") throw new Error("Method not allowed");
let body: Record<string, unknown>;
try {
body = (await req.json()) as Record<string, unknown>;
} catch {
throw new Error("Invalid JSON body");
}
if (typeof body.inputFile !== "string" || !body.inputFile.trim()) {
throw new Error("inputFile is required");
}
return {
inputFile: body.inputFile,
outputFile:
typeof body.outputFile === "string" ? body.outputFile : undefined,
inputBatchSize: positiveField(body.inputBatchSize, "inputBatchSize"),
upcLookupBatchSize: positiveField(
body.upcLookupBatchSize,
"upcLookupBatchSize",
),
maxRows: positiveField(body.maxRows, "maxRows"),
};
}
async function getRuns(filters: URLSearchParams) {
const { page, pageSize, offset } = pageInput(filters);
const conditions: string[] = ["TRUE"];
const params: unknown[] = [];
const type = filters.get("processType")?.trim();
const status = filters.get("status")?.trim();
const q = filters.get("q")?.trim();
if (type) {
conditions.push("r.type::text = ?");
params.push(type);
}
if (status) {
conditions.push("r.status::text = ?");
params.push(status);
}
if (filters.get("startDate")) {
conditions.push("r.started_at >= ?");
params.push(filters.get("startDate"));
}
if (filters.get("endDate")) {
conditions.push("r.started_at <= ?");
params.push(filters.get("endDate"));
}
if (q) {
conditions.push(
"(COALESCE(r.input_file, '') ILIKE ? OR COALESCE(cd.category_label, '') ILIKE ? OR CAST(r.id AS text) ILIKE ?)",
);
params.push(`%${q}%`, `%${q}%`, `%${q}%`);
}
const where = `WHERE ${conditions.join(" AND ")}`;
const sort = safeSort(
filters.get("sort"),
{
runId: "r.id",
processType: "r.type",
timestamp: "r.started_at",
status: "r.status",
jobType: "COALESCE(cd.category_label, r.input_file, r.type::text)",
totalProducts: "COALESCE(stats.processed_count, 0)",
fbaCount: "COALESCE(stats.fba_count, 0)",
fbmCount: "COALESCE(stats.fbm_count, 0)",
skipCount: "COALESCE(stats.skip_count, 0)",
},
"r.started_at DESC, r.id DESC",
);
const join = `
FROM runs r
LEFT JOIN analysis_run_stats stats ON stats.run_id = r.id
LEFT JOIN category_run_details cd ON cd.run_id = r.id`;
const totalRow = await pgGet<{ total: string }>(
`SELECT COUNT(*) AS total ${join} ${where}`,
params,
);
const items = await pgAll(
`SELECT r.type AS "processType", r.id AS "runId",
r.started_at AS timestamp, r.status, r.input_file AS source,
r.output_file AS output,
COALESCE(cd.category_label, r.input_file, r.type::text) AS "jobType",
COALESCE(stats.processed_count, 0) AS "totalProducts",
COALESCE(stats.fba_count, 0) AS "fbaCount",
COALESCE(stats.fbm_count, 0) AS "fbmCount",
COALESCE(stats.buy_count, 0) AS "buyCount",
COALESCE(stats.watch_count, 0) AS "watchCount",
COALESCE(stats.skip_count, 0) AS "skipCount"
${join} ${where} ORDER BY ${sort} LIMIT ? OFFSET ?`,
[...params, pageSize, offset],
);
const total = Number(totalRow?.total ?? 0);
return { items, page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)) };
}
async function getRun(runId: number) {
return pgGet(
`SELECT r.type AS "processType", r.id AS "runId", r.parent_run_id AS "parentRunId",
r.started_at AS timestamp, r.completed_at AS "completedAt", r.status,
r.input_file AS source, r.output_file AS output, r.error_message AS "errorMessage",
COALESCE(cd.category_label, r.input_file, r.type::text) AS "jobType",
cd.category_id AS "categoryId", cd.checked_asin_count AS "checkedAsins",
COALESCE(stats.processed_count, 0) AS "totalProducts",
COALESCE(stats.available_count, 0) AS "availableAsins",
COALESCE(stats.fba_count, 0) AS "fbaCount",
COALESCE(stats.fbm_count, 0) AS "fbmCount",
COALESCE(stats.buy_count, 0) AS "buyCount",
COALESCE(stats.watch_count, 0) AS "watchCount",
COALESCE(stats.skip_count, 0) AS "skipCount"
FROM runs r
LEFT JOIN analysis_run_stats stats ON stats.run_id = r.id
LEFT JOIN category_run_details cd ON cd.run_id = r.id
WHERE r.id = ?`,
[runId],
);
}
const ITEM_ROWS = `
WITH latest_revision AS (
SELECT DISTINCT ON (ar.run_item_id)
ar.run_item_id, ar.id AS revision_id, ar.decision, ar.confidence,
ar.reasoning, ar.analyzed_at, ar.observation_id
FROM analysis_revisions ar
ORDER BY ar.run_item_id, ar.analyzed_at DESC, ar.id DESC
)
SELECT ri.id AS item_id, ri.run_id, r.type AS process_type,
ri.product_asin, COALESCE(ri.product_asin, ur.normalized_upc) AS asin,
COALESCE(p.name, si.supplied_name, ur.normalized_upc) AS product_name,
COALESCE(p.brand, si.supplied_brand) AS brand,
COALESCE(p.category, si.supplied_category) AS category,
si.unit_cost, si.avg_price_90d_sheet, si.selling_price_sheet,
observation.current_price, observation.avg_price_90d,
observation.sales_rank, observation.sales_rank_avg_90d,
observation.seller_count, observation.amazon_is_seller,
observation.amazon_buybox_share_pct_90d, observation.monthly_sold,
observation.rank_drops_30d, observation.rank_drops_90d,
observation.fba_fee, observation.fbm_fee, observation.referral_percent,
observation.can_sell, observation.sellability_status,
observation.sellability_reason, revision.decision AS verdict,
revision.confidence, revision.reasoning,
revision.analyzed_at AS fetched_at, ur.normalized_upc AS upc,
ur.status AS upc_lookup_status
FROM run_items ri
JOIN runs r ON r.id = ri.run_id
LEFT JOIN products p ON p.asin = ri.product_asin
LEFT JOIN sourcing_inputs si ON si.run_item_id = ri.id
LEFT JOIN upc_resolutions ur ON ur.run_item_id = ri.id
LEFT JOIN latest_revision revision ON revision.run_item_id = ri.id
LEFT JOIN product_observations observation ON observation.id = revision.observation_id
`;
function itemFilters(filters: URLSearchParams, runId?: number) {
const conditions: string[] = [];
const params: unknown[] = [];
if (runId != null) {
conditions.push("run_id = ?");
params.push(runId);
}
const verdict = filters.get("verdict")?.trim();
if (verdict) {
conditions.push("verdict::text = ?");
params.push(verdict);
}
if (filters.get("sellabilityStatus")) {
conditions.push("sellability_status = ?");
params.push(filters.get("sellabilityStatus"));
}
const minConfidence = Number(filters.get("minConfidence"));
const maxConfidence = Number(filters.get("maxConfidence"));
if (filters.get("minConfidence") && Number.isFinite(minConfidence)) {
conditions.push("confidence >= ?");
params.push(minConfidence);
}
if (filters.get("maxConfidence") && Number.isFinite(maxConfidence)) {
conditions.push("confidence <= ?");
params.push(maxConfidence);
}
if (filters.get("amazonIsSeller") === "yes") {
conditions.push("amazon_is_seller = true");
} else if (filters.get("amazonIsSeller") === "no") {
conditions.push("amazon_is_seller = false");
}
const q = filters.get("q")?.trim();
if (q) {
const wildcard = `%${q}%`;
conditions.push(
"(COALESCE(asin, '') ILIKE ? OR COALESCE(product_name, '') ILIKE ? OR COALESCE(brand, '') ILIKE ? OR COALESCE(category, '') ILIKE ? OR COALESCE(reasoning, '') ILIKE ?)",
);
params.push(wildcard, wildcard, wildcard, wildcard, wildcard);
}
return {
where: conditions.length ? `WHERE ${conditions.join(" AND ")}` : "",
params,
};
}
const ITEM_SORTS: Record<string, string> = {
asin: "asin",
product_name: "product_name",
brand: "brand",
category: "category",
current_price: "current_price",
avg_price_90d: "avg_price_90d",
sales_rank: "sales_rank",
seller_count: "seller_count",
amazon_is_seller: "amazon_is_seller",
amazon_buybox_share_pct_90d: "amazon_buybox_share_pct_90d",
monthly_sold: "monthly_sold",
verdict: "verdict",
confidence: "confidence",
fetched_at: "fetched_at",
};
async function getRunItems(runId: number, filters: URLSearchParams) {
const { page, pageSize, offset } = pageInput(filters);
const { where, params } = itemFilters(filters, runId);
const orderBy = safeSort(filters.get("sort"), ITEM_SORTS, "monthly_sold DESC NULLS LAST, asin ASC");
const totalRow = await pgGet<{ total: string }>(
`SELECT COUNT(*) AS total FROM (${ITEM_ROWS}) item_rows ${where}`,
params,
);
const items = await pgAll(
`SELECT * FROM (${ITEM_ROWS}) item_rows ${where} ORDER BY ${orderBy} LIMIT ? OFFSET ?`,
[...params, pageSize, offset],
);
const total = Number(totalRow?.total ?? 0);
return { items, page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)) };
}
async function exportRunItems(runId: number, filters: URLSearchParams) {
const { where, params } = itemFilters(filters, runId);
const orderBy = safeSort(filters.get("sort"), ITEM_SORTS, "monthly_sold DESC NULLS LAST, asin ASC");
const rows = await pgAll<Record<string, unknown>>(
`SELECT * FROM (${ITEM_ROWS}) item_rows ${where} ORDER BY ${orderBy}`,
params,
);
const headers = [
"run_id", "asin", "product_name", "brand", "category", "unit_cost",
"current_price", "avg_price_90d", "sales_rank_avg_90d", "seller_count",
"amazon_is_seller", "amazon_buybox_share_pct_90d", "monthly_sold",
"sellability_status", "verdict", "confidence", "reasoning", "fetched_at",
];
return [headers.join(","), ...rows.map((row) => headers.map((h) => escapeCsvValue(row[h])).join(","))].join("\n");
}
async function getProducts(filters: URLSearchParams) {
const { page, pageSize, offset } = pageInput(filters);
const { where, params } = itemFilters(filters);
const orderBy = safeSort(filters.get("sort"), ITEM_SORTS, "fetched_at DESC NULLS LAST, asin ASC");
const base = `
SELECT product.asin, product.asin AS product_asin,
latest.item_id, latest.run_id AS "runId", latest.process_type AS "processType",
COALESCE(product.name, latest.product_name) AS product_name,
COALESCE(product.brand, latest.brand) AS brand,
COALESCE(product.category, latest.category) AS category,
latest.unit_cost, latest.current_price, latest.avg_price_90d,
latest.sales_rank, latest.sales_rank_avg_90d, latest.seller_count,
latest.amazon_is_seller, latest.amazon_buybox_share_pct_90d,
latest.monthly_sold, latest.rank_drops_30d, latest.rank_drops_90d,
latest.sellability_status, latest.verdict, latest.confidence,
latest.reasoning, latest.fetched_at
FROM products product
LEFT JOIN LATERAL (
SELECT *
FROM (${ITEM_ROWS}) item_history
WHERE item_history.product_asin = product.asin
ORDER BY item_history.fetched_at DESC NULLS LAST, item_history.item_id DESC
LIMIT 1
) latest ON TRUE`;
const total = Number(
(await pgGet<{ total: string }>(
`SELECT COUNT(*) AS total FROM (${base}) products ${where}`,
params,
))?.total ?? 0,
);
const items = await pgAll(
`SELECT * FROM (${base}) products ${where} ORDER BY ${orderBy} LIMIT ? OFFSET ?`,
[...params, pageSize, offset],
);
return { items, page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)) };
}
async function getProduct(asin: string) {
const product = await pgGet(
`SELECT * FROM products WHERE asin = ?`,
[asin],
);
if (!product) return null;
const observations = await pgAll(
`SELECT observation.*, run.type AS run_type
FROM product_observations observation
JOIN runs run ON run.id = observation.run_id
WHERE observation.product_asin = ?
ORDER BY observation.fetched_at DESC`,
[asin],
);
const analyses = await pgAll(
`SELECT revision.*, item.run_id, run.type AS run_type
FROM analysis_revisions revision
JOIN run_items item ON item.id = revision.run_item_id
JOIN runs run ON run.id = item.run_id
WHERE item.product_asin = ?
ORDER BY revision.analyzed_at DESC`,
[asin],
);
return { product, observations, analyses };
}
async function reanalyzeRunItem(itemId: number) {
const row = await pgGet<Record<string, any>>(
`SELECT ri.id, ri.run_id, ri.product_asin AS asin, r.type,
COALESCE(p.name, si.supplied_name, ri.product_asin) AS product_name,
COALESCE(p.brand, si.supplied_brand) AS brand,
COALESCE(p.category, si.supplied_category) AS category,
si.unit_cost, si.avg_price_90d_sheet, si.selling_price_sheet,
si.fba_net_sheet, si.gross_profit_dollar, si.gross_profit_pct,
si.net_profit_sheet, si.roi_sheet, si.moq, si.moq_cost,
si.qty_available, si.supplier, si.source_url, si.asin_link,
si.promo_coupon_code, si.notes, si.lead_date
FROM run_items ri JOIN runs r ON r.id = ri.run_id
LEFT JOIN products p ON p.asin = ri.product_asin
LEFT JOIN sourcing_inputs si ON si.run_item_id = ri.id
WHERE ri.id = ? AND ri.product_asin IS NOT NULL`,
[itemId],
);
if (!row) throw new Error("Run item not found");
if (row.type === "supplier_upc") {
throw new Error("Supplier scoring revisions are produced by the supplier pipeline");
}
const record: ProductRecord = {
asin: row.asin,
name: row.product_name ?? row.asin,
unitCost: row.unit_cost ?? 0,
brand: row.brand ?? undefined,
category: row.category ?? undefined,
amazonRank: undefined,
avgPrice90FromSheet: row.avg_price_90d_sheet ?? undefined,
sellingPriceFromSheet: row.selling_price_sheet ?? undefined,
fbaNet: row.fba_net_sheet ?? undefined,
grossProfit: row.gross_profit_dollar ?? undefined,
grossProfitPct: row.gross_profit_pct ?? undefined,
netProfitFromSheet: row.net_profit_sheet ?? undefined,
roiFromSheet: row.roi_sheet ?? undefined,
moq: row.moq ?? undefined,
moqCost: row.moq_cost ?? undefined,
totalQtyAvail: row.qty_available ?? undefined,
supplier: row.supplier ?? undefined,
sourceUrl: row.source_url ?? undefined,
asinLink: row.asin_link ?? undefined,
promoCouponCode: row.promo_coupon_code ?? undefined,
notes: row.notes ?? undefined,
leadDate: row.lead_date ?? undefined,
};
const keepaMap = await fetchKeepaDataBatch([row.asin]).catch(() => new Map());
const keepa = keepaMap.get(row.asin) ?? null;
const sellabilityMap = await fetchSellabilityBatch([row.asin]);
const sellability = sellabilityMap.get(row.asin) ?? {
canSell: null,
sellabilityStatus: "unknown" as const,
sellabilityReason: "Sellability check returned no result",
};
const spApi = await fetchSpApiPricingAndFees(row.asin, sellability);
const enriched: EnrichedProduct = {
record,
keepa,
spApi: spApi as SpApiData,
fetchedAt: new Date().toISOString(),
};
const verdict =
(await analyzeProducts([enriched], { useClaude: USE_CLAUDE }))[0] ?? {
asin: row.asin,
verdict: "SKIP" as const,
confidence: 0,
reasoning: "LLM analysis returned no verdict",
};
const result: AnalysisResult = { product: enriched, verdict };
const observationId = await insertObservation(row.run_id, result, "reanalysis");
await db.insert(analysisRevisions).values({
runItemId: itemId,
observationId,
method: "llm",
decision: verdict.verdict,
confidence: verdict.confidence,
reasoning: verdict.reasoning,
analyzedAt: new Date(enriched.fetchedAt),
});
await refreshRunStats(row.run_id);
return { itemId, runId: row.run_id, asin: row.asin, fetchedAt: enriched.fetchedAt };
}
function stalkerBaseWhere(filters: URLSearchParams, product = false) {
const conditions = ["r.type = 'stalker'"];
const params: unknown[] = [];
for (const [key, expression] of [
["runId", "r.id = ?"],
["sellerId", "seller.seller_id = ?"],
] as const) {
const value = filters.get(key)?.trim();
if (value) {
conditions.push(expression);
params.push(key === "runId" ? Number(value) : value.toUpperCase());
}
}
const q = filters.get("q")?.trim();
if (q) {
const wildcard = `%${q}%`;
if (product) {
conditions.push(
"(inventory.product_asin ILIKE ? OR COALESCE(product.name, '') ILIKE ? OR COALESCE(product.brand, '') ILIKE ? OR COALESCE(product.category, '') ILIKE ? OR seller.seller_id ILIKE ? OR COALESCE(seller.seller_name, '') ILIKE ?)",
);
params.push(wildcard, wildcard, wildcard, wildcard, wildcard, wildcard);
} else {
conditions.push(
"(seller.seller_id ILIKE ? OR COALESCE(seller.seller_name, '') ILIKE ? OR EXISTS (SELECT 1 FROM stalker_inventory_items query_inventory WHERE query_inventory.run_id = r.id AND query_inventory.seller_id = seller.seller_id AND query_inventory.product_asin ILIKE ?))",
);
params.push(wildcard, wildcard, wildcard);
}
}
for (const [parameter, expression] of [
["minRatingCount", "seller.rating_count >= ?"],
["maxRatingCount", "seller.rating_count <= ?"],
] as const) {
const value = Number(filters.get(parameter));
if (filters.get(parameter) && Number.isFinite(value)) {
conditions.push(expression);
params.push(value);
}
}
if (product) {
conditions.push("observation.can_sell = true", "observation.sellability_status = 'available'");
const verdict = filters.get("verdict")?.toUpperCase();
if (verdict === "FBA" || verdict === "FBM" || verdict === "SKIP") {
conditions.push("analysis.decision::text = ?");
params.push(verdict);
} else if (verdict === "UNANALYZED") {
conditions.push("analysis.decision IS NULL");
}
if (filters.get("amazonIsSeller") === "yes") {
conditions.push("observation.amazon_is_seller = true");
} else if (filters.get("amazonIsSeller") === "no") {
conditions.push("observation.amazon_is_seller = false");
} else if (filters.get("amazonIsSeller") === "unknown") {
conditions.push("observation.amazon_is_seller IS NULL");
}
for (const [parameter, expression] of [
["minPrice", "observation.current_price >= ?"],
["maxPrice", "observation.current_price <= ?"],
["minMonthlySold", "observation.monthly_sold >= ?"],
["maxMonthlySold", "observation.monthly_sold <= ?"],
["minSalesRank", "observation.sales_rank >= ?"],
["maxSalesRank", "observation.sales_rank <= ?"],
["minSellerCount", "observation.seller_count >= ?"],
["maxSellerCount", "observation.seller_count <= ?"],
["minConfidence", "analysis.confidence >= ?"],
["maxConfidence", "analysis.confidence <= ?"],
] as const) {
const value = Number(filters.get(parameter));
if (filters.get(parameter) && Number.isFinite(value)) {
conditions.push(expression);
params.push(value);
}
}
}
return { where: `WHERE ${conditions.join(" AND ")}`, params };
}
async function getStalkerResults(filters: URLSearchParams) {
const { page, pageSize, offset } = pageInput(filters);
const { where, params } = stalkerBaseWhere(filters);
const base = `SELECT r.id AS "runId", r.started_at, r.status, r.input_file,
seller.seller_id, seller.seller_name, seller.rating, seller.rating_count,
seller.storefront_asin_total, seller.persisted_inventory_sample_count,
COUNT(DISTINCT scan.source_product_asin) AS discovered_from_count,
MIN(scan.fetched_at) AS first_seen_at, MAX(scan.fetched_at) AS last_seen_at,
COUNT(DISTINCT inventory.product_asin) AS persisted_inventory_asin_count,
STRING_AGG(DISTINCT inventory.product_asin, ',') AS inventory_sample_asins
FROM stalker_scan_sellers scan_seller
JOIN stalker_scans scan ON scan.id = scan_seller.scan_id
JOIN runs r ON r.id = scan.run_id
JOIN sellers seller ON seller.seller_id = scan_seller.seller_id
LEFT JOIN stalker_inventory_items inventory
ON inventory.run_id = r.id AND inventory.seller_id = seller.seller_id
${where} GROUP BY r.id, seller.seller_id`;
const order = safeSort(
filters.get("sort"),
{
runId: '"runId"',
started_at: "started_at",
seller_id: "seller_id",
seller_name: "seller_name",
rating: "rating",
rating_count: "rating_count",
discovered_from_count: "discovered_from_count",
persisted_inventory_asin_count: "persisted_inventory_asin_count",
storefront_asin_total: "storefront_asin_total",
last_seen_at: "last_seen_at",
},
"persisted_inventory_asin_count DESC, last_seen_at DESC, seller_id ASC",
);
const total = Number((await pgGet<{ total: string }>(`SELECT COUNT(*) AS total FROM (${base}) rows`, params))?.total ?? 0);
const items = await pgAll(
`SELECT * FROM (${base}) rows ORDER BY ${order} LIMIT ? OFFSET ?`,
[...params, pageSize, offset],
);
const summary = await pgGet<Record<string, string>>(
`SELECT COUNT(DISTINCT "runId") AS runs, COUNT(DISTINCT seller_id) AS sellers,
COALESCE(SUM(persisted_inventory_asin_count), 0) AS "persistedInventoryAsins"
FROM (${base}) rows`,
params,
);
return {
items,
summary: {
runs: Number(summary?.runs ?? 0),
sellers: Number(summary?.sellers ?? 0),
persistedInventoryAsins: Number(summary?.persistedInventoryAsins ?? 0),
},
page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)),
};
}
function stalkerProductSql(where: string) {
return `SELECT r.id AS "runId", r.started_at, seller.seller_id, seller.seller_name,
seller.rating, seller.rating_count, inventory.product_asin AS asin,
observation.can_sell, observation.sellability_status, observation.sellability_reason,
product.name AS product_title, product.brand,
CASE WHEN product.category IS NULL THEN NULL ELSE json_build_array(product.category)::text END AS category_tree,
observation.current_price, observation.avg_price_90d, observation.sales_rank,
observation.monthly_sold, observation.seller_count, observation.amazon_is_seller,
analysis.decision AS verdict, analysis.confidence, analysis.reasoning,
inventory.last_seen_at
FROM stalker_inventory_items inventory
JOIN runs r ON r.id = inventory.run_id
JOIN sellers seller ON seller.seller_id = inventory.seller_id
JOIN products product ON product.asin = inventory.product_asin
JOIN product_observations observation ON observation.id = inventory.observation_id
LEFT JOIN LATERAL (
SELECT revision.decision, revision.confidence, revision.reasoning
FROM run_items item
JOIN analysis_revisions revision ON revision.run_item_id = item.id
WHERE item.source_inventory_item_id = inventory.id
ORDER BY revision.analyzed_at DESC, revision.id DESC LIMIT 1
) analysis ON true
${where}`;
}
async function stalkerProducts(filters: URLSearchParams, exportOnly = false) {
const { page, pageSize, offset } = pageInput(filters);
const { where, params } = stalkerBaseWhere(filters, true);
const base = stalkerProductSql(where);
const order = safeSort(
filters.get("sort"),
{
runId: '"runId"',
started_at: "started_at",
seller_id: "seller_id",
seller_name: "seller_name",
rating: "rating",
rating_count: "rating_count",
asin: "asin",
product_title: "product_title",
brand: "brand",
current_price: "current_price",
avg_price_90d: "avg_price_90d",
sales_rank: "sales_rank",
monthly_sold: "monthly_sold",
seller_count: "seller_count",
amazon_is_seller: "amazon_is_seller",
verdict: "verdict",
confidence: "confidence",
last_seen_at: "last_seen_at",
},
"monthly_sold DESC NULLS LAST, last_seen_at DESC, asin ASC",
);
if (exportOnly) return pgAll(`SELECT * FROM (${base}) products ORDER BY ${order}`, params);
const total = Number((await pgGet<{ total: string }>(`SELECT COUNT(*) AS total FROM (${base}) products`, params))?.total ?? 0);
const items = await pgAll(
`SELECT * FROM (${base}) products ORDER BY ${order} LIMIT ? OFFSET ?`,
[...params, pageSize, offset],
);
const summary = await pgGet<Record<string, string>>(
`SELECT COUNT(DISTINCT "runId") AS runs, COUNT(DISTINCT seller_id) AS sellers,
COUNT(DISTINCT asin) AS products FROM (${base}) products`,
params,
);
return {
items,
summary: {
runs: Number(summary?.runs ?? 0),
sellers: Number(summary?.sellers ?? 0),
products: Number(summary?.products ?? 0),
},
page, pageSize, total, totalPages: Math.max(1, Math.ceil(total / pageSize)),
};
}
async function exportStalkerProducts(filters: URLSearchParams): Promise<Response> {
const rows = (await stalkerProducts(filters, true)) as Array<Record<string, any>>;
const data = rows.map((row) => ({
ASIN: row.asin,
"Amazon URL": `https://amazon.com/dp/${row.asin}`,
Product: row.product_title ?? "",
Brand: row.brand ?? "",
Category: row.category_tree ?? "",
"Monthly Sold": row.monthly_sold ?? null,
Sellers: row.seller_count ?? null,
"Sales Rank": row.sales_rank ?? null,
"Current Price": row.current_price ?? null,
Verdict: row.verdict ?? "",
Confidence: row.confidence ?? null,
"Seller ID": row.seller_id,
Seller: row.seller_name ?? "",
"Run ID": row.runId,
}));
const workbook = XLSX.utils.book_new();
XLSX.utils.book_append_sheet(workbook, XLSX.utils.json_to_sheet(data), "Sellable Products");
return xlsx(
XLSX.write(workbook, { type: "array", bookType: "xlsx" }) as ArrayBuffer,
"stalker-sellable-products.xlsx",
);
}
async function purgeStalkerData() {
const count = await pgGet<{ count: string }>(
"SELECT COUNT(*) AS count FROM runs WHERE type = 'stalker'",
);
await client.begin(async (sql) => {
await sql`DELETE FROM runs WHERE type = 'stalker'`;
await sql`DELETE FROM sellers WHERE NOT EXISTS (
SELECT 1 FROM stalker_scan_sellers x WHERE x.seller_id = sellers.seller_id
)`;
});
return { ok: true, deleted: { runs: Number(count?.count ?? 0) } };
}
const server = Bun.serve({
port: Number(process.env.PORT || "3000"),
routes: {
"/": index,
"/products": index,
"/products/:asin": index,
"/stalker": index,
"/stalker/products": index,
"/runs/:runId": index,
"/api/runs": async (req) => json(await getRuns(new URL(req.url).searchParams)),
"/api/runs/:runId": async (req) => {
const runId = Number(req.params.runId);
if (!Number.isInteger(runId)) return json({ error: "Invalid run identifier" }, 400);
if (req.method === "DELETE") {
const deleted = await pgRun("DELETE FROM runs WHERE id = ?", [runId]);
return deleted ? json({ deletedRun: true }) : json({ error: "Run not found" }, 404);
}
const run = await getRun(runId);
if (!run) return json({ error: "Run not found" }, 404);
return json({
...run,
summary: {
totalProducts: run.totalProducts,
fbaCount: run.fbaCount,
fbmCount: run.fbmCount,
buyCount: run.buyCount,
watchCount: run.watchCount,
skipCount: run.skipCount,
},
});
},
"/api/runs/:runId/items": async (req) => {
const runId = Number(req.params.runId);
if (!Number.isInteger(runId)) return json({ error: "Invalid run identifier" }, 400);
return json(await getRunItems(runId, new URL(req.url).searchParams));
},
"/api/runs/:runId/export.csv": async (req) => {
const runId = Number(req.params.runId);
if (!Number.isInteger(runId)) return json({ error: "Invalid run identifier" }, 400);
return csv(await exportRunItems(runId, new URL(req.url).searchParams), `run-${runId}.csv`);
},
"/api/run-items/:itemId/reanalyze": async (req) => {
if (req.method !== "POST") return json({ error: "Method not allowed" }, 405);
const itemId = Number(req.params.itemId);
if (!Number.isInteger(itemId)) return json({ error: "Invalid run item identifier" }, 400);
try {
return json(await reanalyzeRunItem(itemId));
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
return json({ error: message }, message === "Run item not found" ? 404 : 500);
}
},
"/api/products": async (req) => json(await getProducts(new URL(req.url).searchParams)),
"/api/products/:asin": async (req) => {
const asin = normalizeAsin(req.params.asin);
if (!asin) return json({ error: "Invalid ASIN" }, 400);
const result = await getProduct(asin);
return result ? json(result) : json({ error: "Product not found" }, 404);
},
"/api/stalker/results": async (req) => json(await getStalkerResults(new URL(req.url).searchParams)),
"/api/stalker/products": async (req) => json(await stalkerProducts(new URL(req.url).searchParams)),
"/api/stalker/products/export.xlsx": async (req) => exportStalkerProducts(new URL(req.url).searchParams),
"/api/stalker/purge": async (req) =>
req.method === "DELETE" || req.method === "POST"
? json(await purgeStalkerData())
: json({ error: "Method not allowed" }, 405),
"/api/upc/map": async (req) => {
try {
const upcs = await parseUpcsFromRequest(req);
const error = validateUpcs(upcs);
if (error) return json({ error }, 400);
const items = [...(await mapUpcsToAsins(upcs)).entries()].map(([upc, asin]) => ({ upc, asin }));
return json({ requested: upcs.length, matched: items.length, items });
} catch (error) {
return json({ error: error instanceof Error ? error.message : String(error) }, 400);
}
},
"/api/upc/lookup": async (req) => {
try {
const upcs = await parseUpcsFromRequest(req);
const error = validateUpcs(upcs);
if (error) return json({ error }, 400);
const items = [...(await lookupKeepaUpcs(upcs)).values()];
return json({ requested: upcs.length, statusCounts: summarizeLookupStatuses(items), items });
} catch (error) {
return json({ error: error instanceof Error ? error.message : String(error) }, 400);
}
},
"/api/process/upc-file": async (req) => {
try {
return json(await runUpcFileAnalysis({ ...(await parseUpcFileRequest(req)), manageResources: false }));
} catch (error) {
return json({ error: error instanceof Error ? error.message : String(error) }, 400);
}
},
},
fetch() {
return json({ error: "Not found" }, 404);
},
development: { hmr: true, console: true },
});
console.log(`Results viewer running on http://localhost:${server.port}`);