import { fetchKeepaDataBatch } from "./integrations/keepa.ts"; import { fetchSellabilityBatch, fetchSpApiPricingAndFees } from "./integrations/sp-api.ts"; import { getCache, setCache } from "./integrations/cache.ts"; import { analyzeProducts } from "./integrations/llm.ts"; import type { AnalysisResult, EnrichedProduct, KeepaData, ProductRecord, SellabilityInfo, SpApiData, } from "./types.ts"; export const DEFAULT_LLM_BATCH_SIZE = 5; export const DEFAULT_PRICING_CONCURRENCY = 5; export type SellabilityFilter = "available" | "all"; export type AnalysisPipelineOptions = { llmBatchSize?: number; pricingConcurrency?: number; llmBatchDelayMs?: number; llmRetryDelayMs?: number; sellability?: SellabilityFilter; useClaude?: boolean; }; export function chunkArray(items: T[], chunkSize: number): T[][] { const chunks: T[][] = []; for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)); } return chunks; } function wait(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } function unknownSpApiData(reason: string): SpApiData { return { fbaFee: 5.0, fbmFee: 1.5, referralFeePercent: 15, estimatedSalePrice: 0, canSell: null, sellabilityStatus: "unknown", sellabilityReason: reason, }; } export async function processProductChunk( products: ProductRecord[], options: AnalysisPipelineOptions = {}, ): Promise { const llmBatchSize = options.llmBatchSize ?? DEFAULT_LLM_BATCH_SIZE; const pricingConcurrency = Math.max( 1, options.pricingConcurrency ?? DEFAULT_PRICING_CONCURRENCY, ); const llmBatchDelayMs = Math.max(0, options.llmBatchDelayMs ?? 5_000); const llmRetryDelayMs = Math.max(0, options.llmRetryDelayMs ?? 10_000); const sellabilityFilter = options.sellability ?? "available"; const useClaude = options.useClaude === true; console.log(`\nChecking cache for ${products.length} products...`); const cached = new Map(); const excludedCachedAsins = new Set(); const uncachedProducts: ProductRecord[] = []; for (const p of products) { const hit = await getCache(p.asin); if (hit) { if ( sellabilityFilter === "all" || hit.spApi.sellabilityStatus === "available" ) { console.log(` [cache hit] ${p.asin}`); cached.set(p.asin, hit); } else { excludedCachedAsins.add(p.asin); console.log( ` [exclude cached] ${p.asin} - status=${hit.spApi.sellabilityStatus}`, ); } } else { uncachedProducts.push(p); } } console.log( `${cached.size} cached available, ${excludedCachedAsins.size} cached excluded, ${uncachedProducts.length} to fetch`, ); const sellabilityMap = new Map(); const availableProducts: ProductRecord[] = []; const unavailableProducts: ProductRecord[] = []; if (uncachedProducts.length > 0) { console.log( `\nChecking sellability for ${uncachedProducts.length} ASINs...`, ); const sellResults = await fetchSellabilityBatch( uncachedProducts.map((p) => p.asin), ); for (const p of uncachedProducts) { const info = sellResults.get(p.asin) ?? { canSell: null, sellabilityStatus: "unknown" as const, sellabilityReason: "Sellability check returned no result", }; sellabilityMap.set(p.asin, info); if ( sellabilityFilter === "all" || info.sellabilityStatus === "available" ) { availableProducts.push(p); console.log( ` [available] ${p.asin} - status=${info.sellabilityStatus}`, ); } else { unavailableProducts.push(p); console.log( ` [exclude] ${p.asin} - status=${info.sellabilityStatus}, reason=${info.sellabilityReason ?? "n/a"}`, ); } } if (sellabilityFilter === "all") { console.log( `\nSellability gate disabled: including all ${availableProducts.length} products`, ); } else { console.log( `\nSellability gate: ${availableProducts.length} available, ${unavailableProducts.length} excluded`, ); } } let keepaResults = new Map(); if (availableProducts.length > 0) { console.log(`\nFetching ${availableProducts.length} ASINs from Keepa...`); try { keepaResults = await fetchKeepaDataBatch( availableProducts.map((p) => p.asin), ); } catch (err) { console.warn(`Keepa batch fetch failed: ${err}`); } } console.log( `\nFetching pricing & fees for ${availableProducts.length} ASINs...`, ); const spApiResults = new Map(); const pricingQueue = [...availableProducts]; let pricingDone = 0; async function fetchNextPricing(): Promise { while (pricingQueue.length > 0) { const p = pricingQueue.shift(); if (!p) return; const sellability = sellabilityMap.get(p.asin) ?? { canSell: null, sellabilityStatus: "unknown" as const, sellabilityReason: "Sellability check returned no result", }; const spApi = await fetchSpApiPricingAndFees(p.asin, sellability); const keepa = keepaResults.get(p.asin); if (keepa?.currentPrice && spApi.estimatedSalePrice === 0) { spApi.estimatedSalePrice = keepa.currentPrice; } spApiResults.set(p.asin, spApi); pricingDone++; if (pricingDone % 10 === 0 || pricingDone === availableProducts.length) { console.log( ` [pricing] ${pricingDone}/${availableProducts.length} fetched`, ); } } } const pricingWorkers = Array.from( { length: Math.min(pricingConcurrency, availableProducts.length || 1) }, () => fetchNextPricing(), ); await Promise.all(pricingWorkers); console.log(`\nEnriching products...`); const enriched: EnrichedProduct[] = []; const availableAsins = new Set(availableProducts.map((ap) => ap.asin)); for (const p of products) { if (excludedCachedAsins.has(p.asin)) { continue; } const cachedProduct = cached.get(p.asin); if (cachedProduct) { enriched.push(cachedProduct); continue; } if (!availableAsins.has(p.asin)) { continue; } const keepa = keepaResults.get(p.asin) ?? null; const spApi = spApiResults.get(p.asin) ?? unknownSpApiData("SP-API data missing"); const product: EnrichedProduct = { record: p, keepa, spApi, fetchedAt: new Date().toISOString(), }; await setCache(p.asin, product); enriched.push(product); } console.log( `\nAnalyzing ${enriched.length} products via LLM (batch size: ${llmBatchSize})...\n`, ); const results: AnalysisResult[] = []; for (let i = 0; i < enriched.length; i += llmBatchSize) { const batch = enriched.slice(i, i + llmBatchSize); const batchNum = Math.floor(i / llmBatchSize) + 1; const totalBatches = Math.ceil(enriched.length / llmBatchSize); console.log(` LLM batch ${batchNum}/${totalBatches}...`); if (i > 0 && llmBatchDelayMs > 0) { await wait(llmBatchDelayMs); } let verdicts; try { verdicts = await analyzeProducts(batch, { ignoreSellability: sellabilityFilter === "all", useClaude, }); } catch { if (llmRetryDelayMs > 0) { await wait(llmRetryDelayMs); } try { verdicts = await analyzeProducts(batch, { ignoreSellability: sellabilityFilter === "all", useClaude, }); } catch { verdicts = null; } } for (let j = 0; j < batch.length; j++) { const enrichedProduct = batch[j]; if (!enrichedProduct) continue; results.push({ product: enrichedProduct, verdict: verdicts?.[j] ?? { asin: enrichedProduct.record.asin, verdict: "SKIP", confidence: 0, reasoning: "LLM analysis failed", }, }); } } return results; }