import { sql } from "drizzle-orm"; import { requireAsin, normalizeAsin } from "../asin.ts"; import type { AnalysisResult, ProductRecord, SupplierAnalysisResult, } from "../types.ts"; import { db } from "./index.ts"; import { analysisRevisions, analysisRunStats, categoryRunDetails, productIdentifiers, productObservations, products, runItems, runs, sourcingInputs, supplierScores, upcResolutionCandidates, upcResolutions, } from "./schema.ts"; type Executor = any; type MetadataSource = "input" | "catalog"; type ProductSeed = { asin: string; name?: string | null; brand?: string | null; category?: string | null; metadataSource?: MetadataSource; fetchedAt?: Date; }; export type CategoryRunSummaryInput = { categoryId: number; categoryLabel: string; topAsinsChecked: number; availableAsins: number; fba: number; fbm: number; skip: number; status: "running" | "ok" | "empty" | "failed"; error: string; }; export type RunCounts = { totalProducts: number; fbaCount: number; fbmCount: number; skipCount: number; }; function emptyToNull(value: string | undefined | null): string | null { const trimmed = value?.trim(); return trimmed ? trimmed : null; } function productCategory(record: ProductRecord, result: AnalysisResult): string | null { return emptyToNull( record.category ?? result.product.keepa?.categoryTree?.join(" > "), ); } export async function upsertProduct( seed: ProductSeed, executor: Executor = db, ): Promise { const asin = requireAsin(seed.asin); const now = seed.fetchedAt ?? new Date(); const isCatalog = seed.metadataSource === "catalog"; await executor .insert(products) .values({ asin, name: emptyToNull(seed.name), brand: emptyToNull(seed.brand), category: emptyToNull(seed.category), metadataFetchedAt: isCatalog ? now : null, firstSeenAt: now, lastSeenAt: now, }) .onConflictDoUpdate({ target: products.asin, set: { lastSeenAt: sql`GREATEST(${products.lastSeenAt}, EXCLUDED.last_seen_at)`, name: isCatalog ? sql`CASE WHEN EXCLUDED.metadata_fetched_at >= COALESCE(${products.metadataFetchedAt}, '-infinity'::timestamptz) AND NULLIF(EXCLUDED.name, '') IS NOT NULL THEN EXCLUDED.name ELSE ${products.name} END` : sql`COALESCE(${products.name}, NULLIF(EXCLUDED.name, ''))`, brand: isCatalog ? sql`CASE WHEN EXCLUDED.metadata_fetched_at >= COALESCE(${products.metadataFetchedAt}, '-infinity'::timestamptz) AND NULLIF(EXCLUDED.brand, '') IS NOT NULL THEN EXCLUDED.brand ELSE ${products.brand} END` : sql`COALESCE(${products.brand}, NULLIF(EXCLUDED.brand, ''))`, category: isCatalog ? sql`CASE WHEN EXCLUDED.metadata_fetched_at >= COALESCE(${products.metadataFetchedAt}, '-infinity'::timestamptz) AND NULLIF(EXCLUDED.category, '') IS NOT NULL THEN EXCLUDED.category ELSE ${products.category} END` : sql`COALESCE(${products.category}, NULLIF(EXCLUDED.category, ''))`, metadataFetchedAt: isCatalog ? sql`GREATEST(COALESCE(${products.metadataFetchedAt}, '-infinity'::timestamptz), EXCLUDED.metadata_fetched_at)` : products.metadataFetchedAt, }, }); return asin; } export async function insertObservation( runId: number, result: AnalysisResult, source: string, executor: Executor = db, ): Promise { const fetchedAt = new Date(result.product.fetchedAt); const record = result.product.record; const keepa = result.product.keepa; const spApi = result.product.spApi; const asin = requireAsin(record.asin); const [observation] = await executor .insert(productObservations) .values({ productAsin: asin, runId, source, currentPrice: keepa?.currentPrice ?? record.sellingPriceFromSheet ?? spApi.estimatedSalePrice ?? null, avgPrice90d: keepa?.avgPrice90 ?? null, salesRank: keepa?.salesRank ?? record.amazonRank ?? null, salesRankAvg90d: keepa?.salesRankAvg90 ?? null, monthlySold: keepa?.monthlySold ?? null, rankDrops30d: keepa?.salesRankDrops30 ?? null, rankDrops90d: keepa?.salesRankDrops90 ?? null, sellerCount: keepa?.sellerCount ?? null, amazonIsSeller: keepa?.amazonIsSeller ?? null, amazonBuyboxSharePct90d: keepa?.amazonBuyboxSharePct90d ?? null, fbaFee: spApi.fbaFee ?? null, fbmFee: spApi.fbmFee ?? null, referralPercent: spApi.referralFeePercent ?? null, canSell: spApi.canSell, sellabilityStatus: spApi.sellabilityStatus, sellabilityReason: spApi.sellabilityReason ?? null, fetchedAt, }) .returning({ id: productObservations.id }); if (!observation) throw new Error(`Failed to insert observation for ${asin}`); return observation.id; } function sourcingInputValues(runItemId: number, record: ProductRecord) { return { runItemId, suppliedName: emptyToNull(record.name), suppliedBrand: emptyToNull(record.brand), suppliedCategory: emptyToNull(record.category), unitCost: record.unitCost ?? null, avgPrice90dSheet: record.avgPrice90FromSheet ?? null, sellingPriceSheet: record.sellingPriceFromSheet ?? null, fbaNetSheet: record.fbaNet ?? null, grossProfitDollar: record.grossProfit ?? null, grossProfitPct: record.grossProfitPct ?? null, netProfitSheet: record.netProfitFromSheet ?? null, roiSheet: record.roiFromSheet ?? null, moq: record.moq ?? null, moqCost: record.moqCost ?? null, qtyAvailable: record.totalQtyAvail ?? null, supplier: emptyToNull(record.supplier), sourceUrl: emptyToNull(record.sourceUrl), asinLink: emptyToNull(record.asinLink), promoCouponCode: emptyToNull(record.promoCouponCode), notes: emptyToNull(record.notes), leadDate: emptyToNull(record.leadDate), }; } export async function persistLlmResults( runId: number, results: AnalysisResult[], options: { source: string; metadataSource?: MetadataSource; preserveSourcingInput?: boolean; sourceInventoryIds?: Map; }, ): Promise { for (const result of results) { const record = result.product.record; const fetchedAt = new Date(result.product.fetchedAt); const asin = await upsertProduct({ asin: record.asin, name: record.name, brand: record.brand, category: productCategory(record, result), metadataSource: options.metadataSource ?? "input", fetchedAt, }); const [item] = await db .insert(runItems) .values({ runId, productAsin: asin, sourceInventoryItemId: options.sourceInventoryIds?.get(asin) ?? null, }) .returning({ id: runItems.id }); if (!item) throw new Error(`Failed to insert run item for ${asin}`); if (options.preserveSourcingInput) { await db.insert(sourcingInputs).values(sourcingInputValues(item.id, record)); } const observationId = await insertObservation(runId, result, options.source); await db.insert(analysisRevisions).values({ runItemId: item.id, observationId, method: "llm", decision: result.verdict.verdict, confidence: result.verdict.confidence, reasoning: result.verdict.reasoning ?? null, analyzedAt: fetchedAt, }); } } function supplierSourcingValues(runItemId: number, result: SupplierAnalysisResult) { return { runItemId, suppliedName: emptyToNull(result.record.name), suppliedBrand: emptyToNull(result.record.brand), suppliedCategory: emptyToNull(result.record.category), unitCost: result.record.unitCost ?? null, }; } async function insertSupplierObservation( runId: number, productAsin: string, result: SupplierAnalysisResult, ): Promise { const keepa = result.keepa; const spApi = result.spApi; if (!spApi && !keepa) return null; const [row] = await db .insert(productObservations) .values({ productAsin, runId, source: "supplier_upc", currentPrice: result.score.salePrice, avgPrice90d: keepa?.avgPrice90 ?? null, salesRank: keepa?.salesRank ?? null, salesRankAvg90d: keepa?.salesRankAvg90 ?? null, monthlySold: keepa?.monthlySold ?? null, rankDrops30d: keepa?.salesRankDrops30 ?? null, rankDrops90d: keepa?.salesRankDrops90 ?? null, sellerCount: keepa?.sellerCount ?? null, amazonIsSeller: keepa?.amazonIsSeller ?? null, amazonBuyboxSharePct90d: keepa?.amazonBuyboxSharePct90d ?? null, fbaFee: spApi?.fbaFee ?? null, fbmFee: spApi?.fbmFee ?? null, referralPercent: spApi?.referralFeePercent ?? null, canSell: spApi?.canSell ?? null, sellabilityStatus: spApi?.sellabilityStatus ?? null, sellabilityReason: spApi?.sellabilityReason ?? null, fetchedAt: new Date(result.fetchedAt), }) .returning({ id: productObservations.id }); return row?.id ?? null; } export async function persistSupplierResults( runId: number, results: SupplierAnalysisResult[], ): Promise { for (const result of results) { const resolvedAsin = normalizeAsin(result.lookup.asin); if (resolvedAsin) { await upsertProduct({ asin: resolvedAsin, name: result.record.name, brand: result.record.brand, category: result.record.category, metadataSource: "input", fetchedAt: new Date(result.fetchedAt), }); if (result.keepa?.categoryTree?.length) { await upsertProduct({ asin: resolvedAsin, category: result.keepa.categoryTree.join(" > "), metadataSource: "catalog", fetchedAt: new Date(result.fetchedAt), }); } } const [item] = await db .insert(runItems) .values({ runId, productAsin: resolvedAsin, sourceRow: result.rowNumber ?? null, }) .returning({ id: runItems.id }); if (!item) throw new Error("Failed to insert supplier run item"); await db.insert(sourcingInputs).values(supplierSourcingValues(item.id, result)); await db.insert(upcResolutions).values({ runItemId: item.id, requestedUpc: result.upc, normalizedUpc: result.lookup.normalizedUpc, provider: result.lookup.provider ?? "unknown", status: result.lookup.status, reason: result.lookup.reason ?? null, resolvedProductAsin: resolvedAsin, resolvedAt: new Date(result.fetchedAt), }); for (const candidate of result.lookup.candidateAsins) { const candidateAsin = normalizeAsin(candidate); if (!candidateAsin) continue; await upsertProduct({ asin: candidateAsin, fetchedAt: new Date(result.fetchedAt) }); await db .insert(upcResolutionCandidates) .values({ runItemId: item.id, productAsin: candidateAsin }) .onConflictDoUpdate({ target: [ upcResolutionCandidates.runItemId, upcResolutionCandidates.productAsin, ], set: { productAsin: sql`EXCLUDED.product_asin` }, }); } if (resolvedAsin) { await db .insert(productIdentifiers) .values({ productAsin: resolvedAsin, identifierType: result.lookup.normalizedUpc.length === 12 ? "upc" : result.lookup.normalizedUpc.length === 13 ? "ean" : "gtin", identifierValue: result.lookup.normalizedUpc, source: "supplier_upc", confirmedAt: new Date(result.fetchedAt), }) .onConflictDoUpdate({ target: [ productIdentifiers.identifierType, productIdentifiers.identifierValue, ], set: { productAsin: resolvedAsin, source: "supplier_upc", confirmedAt: new Date(result.fetchedAt), }, }); } const observationId = resolvedAsin ? await insertSupplierObservation(runId, resolvedAsin, result) : null; const [revision] = await db .insert(analysisRevisions) .values({ runItemId: item.id, observationId, method: "supplier_scoring", decision: result.score.verdict, confidence: result.score.score, reasoning: result.score.reason, analyzedAt: new Date(result.fetchedAt), }) .returning({ id: analysisRevisions.id }); if (!revision) throw new Error("Failed to insert supplier analysis revision"); await db.insert(supplierScores).values({ revisionId: revision.id, score: result.score.score, salePrice: result.score.salePrice, fbaFee: result.score.fbaFee, profit: result.score.profit, margin: result.score.margin, roi: result.score.roi, reason: result.score.reason, }); } } export async function createCategoryRun( summary: CategoryRunSummaryInput, runTimestamp: string, ): Promise { const [row] = await db .insert(runs) .values({ type: "category_analysis", status: summary.status, errorMessage: summary.error || null, startedAt: new Date(runTimestamp), }) .returning({ id: runs.id }); if (!row) throw new Error("Failed to insert category run."); await db.insert(categoryRunDetails).values({ runId: row.id, categoryId: summary.categoryId, categoryLabel: summary.categoryLabel, checkedAsinCount: summary.topAsinsChecked, }); await db.insert(analysisRunStats).values({ runId: row.id, processedCount: summary.topAsinsChecked, availableCount: summary.availableAsins, analyzedCount: summary.fba + summary.fbm + summary.skip, fbaCount: summary.fba, fbmCount: summary.fbm, skipCount: summary.skip, }); return row.id; } export async function updateCategoryRun( runId: number, summary: Pick< CategoryRunSummaryInput, | "topAsinsChecked" | "availableAsins" | "fba" | "fbm" | "skip" | "status" | "error" >, ): Promise { await db .update(runs) .set({ status: summary.status, errorMessage: summary.error || null, ...(summary.status !== "running" ? { completedAt: new Date() } : {}), }) .where(sql`${runs.id} = ${runId}`); await db .insert(categoryRunDetails) .values({ runId, categoryId: 0, categoryLabel: "", checkedAsinCount: summary.topAsinsChecked, }) .onConflictDoUpdate({ target: categoryRunDetails.runId, set: { checkedAsinCount: summary.topAsinsChecked }, }); await db .insert(analysisRunStats) .values({ runId, processedCount: summary.topAsinsChecked, availableCount: summary.availableAsins, analyzedCount: summary.fba + summary.fbm + summary.skip, fbaCount: summary.fba, fbmCount: summary.fbm, skipCount: summary.skip, }) .onConflictDoUpdate({ target: analysisRunStats.runId, set: { processedCount: summary.topAsinsChecked, availableCount: summary.availableAsins, analyzedCount: summary.fba + summary.fbm + summary.skip, fbaCount: summary.fba, fbmCount: summary.fbm, skipCount: summary.skip, }, }); } export async function refreshRunStats(runId: number): Promise { const [stats] = await db.execute( sql<{ total: string; fba: string | null; fbm: string | null; buy: string | null; watch: string | null; skip: string | null; }>`WITH latest AS ( SELECT DISTINCT ON (ri.id) ar.decision FROM run_items ri JOIN analysis_revisions ar ON ar.run_item_id = ri.id WHERE ri.run_id = ${runId} ORDER BY ri.id, ar.analyzed_at DESC, ar.id DESC ) SELECT COUNT(*) AS total, SUM(CASE WHEN decision = 'FBA' THEN 1 ELSE 0 END) AS fba, SUM(CASE WHEN decision = 'FBM' THEN 1 ELSE 0 END) AS fbm, SUM(CASE WHEN decision = 'BUY' THEN 1 ELSE 0 END) AS buy, SUM(CASE WHEN decision = 'WATCH' THEN 1 ELSE 0 END) AS watch, SUM(CASE WHEN decision = 'SKIP' THEN 1 ELSE 0 END) AS skip FROM latest`, ); const counts = { totalProducts: Number(stats?.total ?? 0), fbaCount: Number(stats?.fba ?? 0), fbmCount: Number(stats?.fbm ?? 0), skipCount: Number(stats?.skip ?? 0), }; await db .insert(analysisRunStats) .values({ runId, processedCount: counts.totalProducts, analyzedCount: counts.totalProducts, fbaCount: counts.fbaCount, fbmCount: counts.fbmCount, buyCount: Number(stats?.buy ?? 0), watchCount: Number(stats?.watch ?? 0), skipCount: counts.skipCount, }) .onConflictDoUpdate({ target: analysisRunStats.runId, set: { processedCount: counts.totalProducts, analyzedCount: counts.totalProducts, fbaCount: counts.fbaCount, fbmCount: counts.fbmCount, buyCount: Number(stats?.buy ?? 0), watchCount: Number(stats?.watch ?? 0), skipCount: counts.skipCount, }, }); return counts; }