feat: add UPC to ASIN mapping and large file UPC analysis

Introduces the capability to resolve UPCs to ASINs using the Keepa API. This includes a new `upc-file` command for processing large Excel files of UPCs, a `upc` CLI tool for quick lookups, and API endpoints for web-based integration. The analysis pipeline was refactored into a reusable module to support both standard ASIN leads and new UPC-driven workflows.
This commit is contained in:
Victor Noguera
2026-04-16 23:06:55 -04:00
parent d25cf5d5ec
commit 32e7b0c485
14 changed files with 2278 additions and 250 deletions

256
src/analysis-pipeline.ts Normal file
View File

@@ -0,0 +1,256 @@
import { fetchKeepaDataBatch } from "./keepa.ts";
import { fetchSellabilityBatch, fetchSpApiPricingAndFees } from "./sp-api.ts";
import { getCache, setCache } from "./cache.ts";
import { analyzeProducts } from "./llm.ts";
import type {
AnalysisResult,
EnrichedProduct,
KeepaData,
ProductRecord,
SellabilityInfo,
SpApiData,
} from "./types.ts";
export const DEFAULT_LLM_BATCH_SIZE = 5;
export const DEFAULT_PRICING_CONCURRENCY = 5;
export type AnalysisPipelineOptions = {
llmBatchSize?: number;
pricingConcurrency?: number;
llmBatchDelayMs?: number;
llmRetryDelayMs?: number;
};
export function chunkArray<T>(items: T[], chunkSize: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize));
}
return chunks;
}
function wait(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function unknownSpApiData(reason: string): SpApiData {
return {
fbaFee: 5.0,
fbmFee: 1.5,
referralFeePercent: 15,
estimatedSalePrice: 0,
canSell: null,
sellabilityStatus: "unknown",
sellabilityReason: reason,
};
}
export async function processProductChunk(
products: ProductRecord[],
options: AnalysisPipelineOptions = {},
): Promise<AnalysisResult[]> {
const llmBatchSize = options.llmBatchSize ?? DEFAULT_LLM_BATCH_SIZE;
const pricingConcurrency = Math.max(
1,
options.pricingConcurrency ?? DEFAULT_PRICING_CONCURRENCY,
);
const llmBatchDelayMs = Math.max(0, options.llmBatchDelayMs ?? 5_000);
const llmRetryDelayMs = Math.max(0, options.llmRetryDelayMs ?? 10_000);
console.log(`\nChecking cache for ${products.length} products...`);
const cached = new Map<string, EnrichedProduct>();
const excludedCachedAsins = new Set<string>();
const uncachedProducts: ProductRecord[] = [];
for (const p of products) {
const hit = await getCache(p.asin);
if (hit) {
if (hit.spApi.sellabilityStatus === "available") {
console.log(` [cache hit] ${p.asin}`);
cached.set(p.asin, hit);
} else {
excludedCachedAsins.add(p.asin);
console.log(
` [exclude cached] ${p.asin} - status=${hit.spApi.sellabilityStatus}`,
);
}
} else {
uncachedProducts.push(p);
}
}
console.log(
`${cached.size} cached available, ${excludedCachedAsins.size} cached excluded, ${uncachedProducts.length} to fetch`,
);
const sellabilityMap = new Map<string, SellabilityInfo>();
const availableProducts: ProductRecord[] = [];
const unavailableProducts: ProductRecord[] = [];
if (uncachedProducts.length > 0) {
console.log(
`\nChecking sellability for ${uncachedProducts.length} ASINs...`,
);
const sellResults = await fetchSellabilityBatch(
uncachedProducts.map((p) => p.asin),
);
for (const p of uncachedProducts) {
const info = sellResults.get(p.asin) ?? {
canSell: null,
sellabilityStatus: "unknown" as const,
sellabilityReason: "Sellability check returned no result",
};
sellabilityMap.set(p.asin, info);
if (info.sellabilityStatus === "available") {
availableProducts.push(p);
console.log(
` [available] ${p.asin} - status=${info.sellabilityStatus}`,
);
} else {
unavailableProducts.push(p);
console.log(
` [exclude] ${p.asin} - status=${info.sellabilityStatus}, reason=${info.sellabilityReason ?? "n/a"}`,
);
}
}
console.log(
`\nSellability gate: ${availableProducts.length} available, ${unavailableProducts.length} excluded`,
);
}
let keepaResults = new Map<string, KeepaData>();
if (availableProducts.length > 0) {
console.log(`\nFetching ${availableProducts.length} ASINs from Keepa...`);
try {
keepaResults = await fetchKeepaDataBatch(
availableProducts.map((p) => p.asin),
);
} catch (err) {
console.warn(`Keepa batch fetch failed: ${err}`);
}
}
console.log(
`\nFetching pricing & fees for ${availableProducts.length} ASINs...`,
);
const spApiResults = new Map<string, SpApiData>();
const pricingQueue = [...availableProducts];
let pricingDone = 0;
async function fetchNextPricing(): Promise<void> {
while (pricingQueue.length > 0) {
const p = pricingQueue.shift();
if (!p) return;
const sellability = sellabilityMap.get(p.asin) ?? {
canSell: null,
sellabilityStatus: "unknown" as const,
sellabilityReason: "Sellability check returned no result",
};
const spApi = await fetchSpApiPricingAndFees(p.asin, sellability);
const keepa = keepaResults.get(p.asin);
if (keepa?.currentPrice && spApi.estimatedSalePrice === 0) {
spApi.estimatedSalePrice = keepa.currentPrice;
}
spApiResults.set(p.asin, spApi);
pricingDone++;
if (pricingDone % 10 === 0 || pricingDone === availableProducts.length) {
console.log(
` [pricing] ${pricingDone}/${availableProducts.length} fetched`,
);
}
}
}
const pricingWorkers = Array.from(
{ length: Math.min(pricingConcurrency, availableProducts.length || 1) },
() => fetchNextPricing(),
);
await Promise.all(pricingWorkers);
console.log(`\nEnriching products...`);
const enriched: EnrichedProduct[] = [];
const availableAsins = new Set(availableProducts.map((ap) => ap.asin));
for (const p of products) {
if (excludedCachedAsins.has(p.asin)) {
continue;
}
const cachedProduct = cached.get(p.asin);
if (cachedProduct) {
enriched.push(cachedProduct);
continue;
}
if (!availableAsins.has(p.asin)) {
continue;
}
const keepa = keepaResults.get(p.asin) ?? null;
const spApi =
spApiResults.get(p.asin) ?? unknownSpApiData("SP-API data missing");
const product: EnrichedProduct = {
record: p,
keepa,
spApi,
fetchedAt: new Date().toISOString(),
};
await setCache(p.asin, product);
enriched.push(product);
}
console.log(
`\nAnalyzing ${enriched.length} products via LLM (batch size: ${llmBatchSize})...\n`,
);
const results: AnalysisResult[] = [];
for (let i = 0; i < enriched.length; i += llmBatchSize) {
const batch = enriched.slice(i, i + llmBatchSize);
const batchNum = Math.floor(i / llmBatchSize) + 1;
const totalBatches = Math.ceil(enriched.length / llmBatchSize);
console.log(` LLM batch ${batchNum}/${totalBatches}...`);
if (i > 0 && llmBatchDelayMs > 0) {
await wait(llmBatchDelayMs);
}
let verdicts;
try {
verdicts = await analyzeProducts(batch);
} catch {
if (llmRetryDelayMs > 0) {
await wait(llmRetryDelayMs);
}
try {
verdicts = await analyzeProducts(batch);
} catch {
verdicts = null;
}
}
for (let j = 0; j < batch.length; j++) {
const enrichedProduct = batch[j];
if (!enrichedProduct) continue;
results.push({
product: enrichedProduct,
verdict: verdicts?.[j] ?? {
asin: enrichedProduct.record.asin,
verdict: "SKIP",
confidence: 0,
reasoning: "LLM analysis failed",
},
});
}
}
return results;
}

View File

@@ -1,22 +1,12 @@
import { readProducts } from "./reader.ts";
import { fetchKeepaDataBatch } from "./keepa.ts";
import { fetchSellabilityBatch, fetchSpApiPricingAndFees } from "./sp-api.ts";
import { connectCache, getCache, setCache, disconnectCache } from "./cache.ts";
import { analyzeProducts } from "./llm.ts";
import { connectCache, disconnectCache } from "./cache.ts";
import { printResults, writeResultsToDb } from "./writer.ts";
import { initDb, closeDb } from "./database.ts";
import { chunkArray, processProductChunk } from "./analysis-pipeline.ts";
import path from "node:path";
import type {
EnrichedProduct,
AnalysisResult,
KeepaData,
ProductRecord,
SellabilityInfo,
SpApiData,
} from "./types.ts";
import type { AnalysisResult } from "./types.ts";
const DB_PATH = "./results.db";
const LLM_BATCH_SIZE = 5;
const INPUT_BATCH_SIZE = 50;
function parseArgs(): { inputFile: string; outputFile?: string } {
@@ -35,14 +25,6 @@ function parseArgs(): { inputFile: string; outputFile?: string } {
return { inputFile, outputFile };
}
function chunkArray<T>(items: T[], chunkSize: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize));
}
return chunks;
}
function resolveBaseOutputPath(inputFile: string, outputFile?: string): string {
if (outputFile) return outputFile;
@@ -50,201 +32,6 @@ function resolveBaseOutputPath(inputFile: string, outputFile?: string): string {
return path.join(parsedInput.dir, `${parsedInput.name}_results.xlsx`);
}
async function processProductChunk(
products: ProductRecord[],
): Promise<AnalysisResult[]> {
console.log(`\nChecking cache for ${products.length} products...`);
const cached = new Map<string, EnrichedProduct>();
const excludedCachedAsins = new Set<string>();
const uncachedProducts: ProductRecord[] = [];
for (const p of products) {
const hit = await getCache(p.asin);
if (hit) {
if (hit.spApi.sellabilityStatus === "available") {
console.log(` [cache hit] ${p.asin}`);
cached.set(p.asin, hit);
} else {
excludedCachedAsins.add(p.asin);
console.log(
` [exclude cached] ${p.asin} — status=${hit.spApi.sellabilityStatus}`,
);
}
} else {
uncachedProducts.push(p);
}
}
console.log(
`${cached.size} cached available, ${excludedCachedAsins.size} cached excluded, ${uncachedProducts.length} to fetch`,
);
const sellabilityMap = new Map<string, SellabilityInfo>();
const availableProducts: ProductRecord[] = [];
const unavailableProducts: ProductRecord[] = [];
if (uncachedProducts.length > 0) {
console.log(
`\nChecking sellability for ${uncachedProducts.length} ASINs...`,
);
const sellResults = await fetchSellabilityBatch(
uncachedProducts.map((p) => p.asin),
);
for (const p of uncachedProducts) {
const info = sellResults.get(p.asin) ?? {
canSell: null,
sellabilityStatus: "unknown" as const,
sellabilityReason: "Sellability check returned no result",
};
sellabilityMap.set(p.asin, info);
if (info.sellabilityStatus === "available") {
availableProducts.push(p);
console.log(` [available] ${p.asin} — status=${info.sellabilityStatus}`);
} else {
unavailableProducts.push(p);
console.log(
` [exclude] ${p.asin} — status=${info.sellabilityStatus}, reason=${info.sellabilityReason ?? "n/a"}`,
);
}
}
console.log(
`\nSellability gate: ${availableProducts.length} available, ${unavailableProducts.length} excluded`,
);
}
let keepaResults = new Map<string, KeepaData>();
if (availableProducts.length > 0) {
console.log(`\nFetching ${availableProducts.length} ASINs from Keepa...`);
try {
keepaResults = await fetchKeepaDataBatch(
availableProducts.map((p) => p.asin),
);
} catch (err) {
console.warn(`Keepa batch fetch failed: ${err}`);
}
}
console.log(
`\nFetching pricing & fees for ${availableProducts.length} ASINs...`,
);
const spApiResults = new Map<string, SpApiData>();
const pricingQueue = [...availableProducts];
let pricingDone = 0;
async function fetchNextPricing(): Promise<void> {
while (pricingQueue.length > 0) {
const p = pricingQueue.shift()!;
const sellability = sellabilityMap.get(p.asin)!;
const spApi = await fetchSpApiPricingAndFees(p.asin, sellability);
const keepa = keepaResults.get(p.asin);
if (keepa?.currentPrice && spApi.estimatedSalePrice === 0) {
spApi.estimatedSalePrice = keepa.currentPrice;
}
spApiResults.set(p.asin, spApi);
pricingDone++;
if (pricingDone % 10 === 0 || pricingDone === availableProducts.length) {
console.log(
` [pricing] ${pricingDone}/${availableProducts.length} fetched`,
);
}
}
}
const pricingWorkers = Array.from(
{ length: Math.min(5, availableProducts.length || 1) },
() => fetchNextPricing(),
);
await Promise.all(pricingWorkers);
console.log(`\nEnriching products...`);
const enriched: EnrichedProduct[] = [];
const availableAsins = new Set(availableProducts.map((ap) => ap.asin));
for (const p of products) {
if (excludedCachedAsins.has(p.asin)) {
continue;
}
const cachedProduct = cached.get(p.asin);
if (cachedProduct) {
enriched.push(cachedProduct);
continue;
}
if (!availableAsins.has(p.asin)) {
continue;
}
const keepa = keepaResults.get(p.asin) ?? null;
const spApi = spApiResults.get(p.asin) ?? {
fbaFee: 5.0,
fbmFee: 1.5,
referralFeePercent: 15,
estimatedSalePrice: 0,
canSell: null,
sellabilityStatus: "unknown" as const,
sellabilityReason: "SP-API data missing",
};
const product: EnrichedProduct = {
record: p,
keepa,
spApi,
fetchedAt: new Date().toISOString(),
};
await setCache(p.asin, product);
enriched.push(product);
}
console.log(
`\nAnalyzing ${enriched.length} products via LLM (batch size: ${LLM_BATCH_SIZE})...\n`,
);
const results: AnalysisResult[] = [];
for (let i = 0; i < enriched.length; i += LLM_BATCH_SIZE) {
const batch = enriched.slice(i, i + LLM_BATCH_SIZE);
const batchNum = Math.floor(i / LLM_BATCH_SIZE) + 1;
const totalBatches = Math.ceil(enriched.length / LLM_BATCH_SIZE);
console.log(` LLM batch ${batchNum}/${totalBatches}...`);
if (i > 0) {
await new Promise((r) => setTimeout(r, 5000));
}
let verdicts;
try {
verdicts = await analyzeProducts(batch);
} catch {
await new Promise((r) => setTimeout(r, 10_000));
try {
verdicts = await analyzeProducts(batch);
} catch {
verdicts = null;
}
}
for (let j = 0; j < batch.length; j++) {
results.push({
product: batch[j]!,
verdict: verdicts?.[j] ?? {
asin: batch[j]!.record.asin,
verdict: "SKIP",
confidence: 0,
reasoning: "LLM analysis failed",
},
});
}
}
return results;
}
async function main() {
const { inputFile, outputFile } = parseArgs();

200
src/keepa.test.ts Normal file
View File

@@ -0,0 +1,200 @@
import { afterAll, beforeEach, expect, mock, test } from "bun:test";
import { lookupKeepaUpcs, mapUpcsToAsins } from "./keepa.ts";
const originalFetch = globalThis.fetch;
function makeUpc(index: number): string {
return String(index).padStart(12, "0");
}
beforeEach(() => {
globalThis.fetch = originalFetch;
});
afterAll(() => {
globalThis.fetch = originalFetch;
});
test("lookupKeepaUpcs marks invalid UPCs and skips API calls", async () => {
const fetchMock = mock(async () => {
return new Response("should not be called", { status: 500 });
});
globalThis.fetch = fetchMock as unknown as typeof globalThis.fetch;
const details = await lookupKeepaUpcs([
"",
"abc",
"12345678901",
"123456789012345",
]);
expect(fetchMock.mock.calls.length).toBe(0);
expect(details.size).toBe(4);
expect(details.get("")?.status).toBe("invalid_upc");
expect(details.get("abc")?.status).toBe("invalid_upc");
expect(details.get("12345678901")?.status).toBe("invalid_upc");
expect(details.get("123456789012345")?.status).toBe("invalid_upc");
});
test("lookupKeepaUpcs returns found, not_found, and multiple_asins outcomes", async () => {
globalThis.fetch = mock(async () => {
return new Response(
JSON.stringify({
products: [
{
asin: "B000FOUND01",
upcList: ["012345678901"],
stats: {
current: [null, null, null, 1234],
avg: [2500, null, null, 1400],
},
csv: [[1, 2999]],
},
{
asin: "B000MULTI01",
upcList: ["098765432109"],
stats: {
current: [null, null, null, 2000],
avg: [1800, null, null, 2200],
},
csv: [[1, 1999]],
},
{
asin: "B000MULTI02",
upcList: ["098765432109"],
stats: {
current: [null, null, null, 2100],
avg: [1850, null, null, 2250],
},
csv: [[1, 2099]],
},
],
tokensLeft: 10,
refillRate: 1,
}),
{ status: 200 },
);
}) as unknown as typeof globalThis.fetch;
const details = await lookupKeepaUpcs([
"012345678901",
"098765432109",
"111111111111",
]);
expect(details.get("012345678901")?.status).toBe("found");
expect(details.get("012345678901")?.asin).toBe("B000FOUND01");
expect(details.get("012345678901")?.keepaData?.currentPrice).toBe(29.99);
expect(details.get("098765432109")?.status).toBe("multiple_asins");
expect(details.get("098765432109")?.candidateAsins).toEqual([
"B000MULTI01",
"B000MULTI02",
]);
expect(details.get("111111111111")?.status).toBe("not_found");
const simpleMap = await mapUpcsToAsins([
"012345678901",
"098765432109",
"111111111111",
]);
expect(simpleMap.get("012345678901")).toBe("B000FOUND01");
expect(simpleMap.has("098765432109")).toBe(false);
expect(simpleMap.has("111111111111")).toBe(false);
});
test("lookupKeepaUpcs keeps partial success when one chunk fails", async () => {
const upcs = Array.from({ length: 101 }, (_, i) => makeUpc(700000000000 + i));
const firstChunkFirstUpc = upcs[0]!;
const secondChunkUpc = upcs[100]!;
globalThis.fetch = mock(async (input: string | URL | Request) => {
const rawUrl =
typeof input === "string"
? input
: input instanceof URL
? input.toString()
: input.url;
const url = new URL(rawUrl);
const codes = (url.searchParams.get("code") ?? "").split(",");
if (codes.includes(firstChunkFirstUpc)) {
return new Response("first chunk failed", { status: 500 });
}
return new Response(
JSON.stringify({
products: [
{
asin: "B000LAST001",
upcList: [secondChunkUpc],
stats: {
current: [null, null, null, 1000],
avg: [1500, null, null, 1200],
},
csv: [[1, 1599]],
},
],
tokensLeft: 10,
refillRate: 1,
}),
{ status: 200 },
);
}) as unknown as typeof globalThis.fetch;
const details = await lookupKeepaUpcs(upcs);
expect(details.get(firstChunkFirstUpc)?.status).toBe("request_failed");
expect(details.get(secondChunkUpc)?.status).toBe("found");
expect(details.get(secondChunkUpc)?.asin).toBe("B000LAST001");
const simpleMap = await mapUpcsToAsins(upcs);
expect(simpleMap.has(firstChunkFirstUpc)).toBe(false);
expect(simpleMap.get(secondChunkUpc)).toBe("B000LAST001");
});
test("lookupKeepaUpcs retries on 429 and succeeds after refill wait", async () => {
const targetUpc = "123456789012";
const fetchMock = mock(async () => {
const callNumber = fetchMock.mock.calls.length;
if (callNumber === 1) {
return new Response(
JSON.stringify({
refillIn: 0,
refillRate: 21,
tokensLeft: -1,
}),
{ status: 429 },
);
}
return new Response(
JSON.stringify({
products: [
{
asin: "B000RETRY01",
upcList: [targetUpc],
stats: {
current: [null, null, null, 1111],
avg: [1299, null, null, 1234],
},
csv: [[1, 1399]],
},
],
tokensLeft: 10,
refillRate: 21,
}),
{ status: 200 },
);
});
globalThis.fetch = fetchMock as unknown as typeof globalThis.fetch;
const details = await lookupKeepaUpcs([targetUpc]);
expect(fetchMock.mock.calls.length).toBe(2);
expect(details.get(targetUpc)?.status).toBe("found");
expect(details.get(targetUpc)?.asin).toBe("B000RETRY01");
});

View File

@@ -1,10 +1,21 @@
import { config } from "./config.ts";
import type { KeepaData } from "./types.ts";
import type { KeepaData, KeepaUpcLookupDetail } from "./types.ts";
const KEEPA_BASE = "https://api.keepa.com";
const MAX_ASINS_PER_REQUEST = 100;
const MAX_CODES_PER_REQUEST = MAX_ASINS_PER_REQUEST;
const MAX_KEEPA_RETRIES = 4;
const KEEP_RETRY_BUFFER_MS = 250;
const AMAZON_US_SELLER_ID = "ATVPDKIKX0DER";
const KEEPA_MINUTES_OFFSET = 21_564_000;
const UPC_PATTERN = /^\d{12,14}$/;
type KeepaApiResponse = {
products?: Record<string, any>[];
tokensLeft?: number;
refillRate?: number;
refillIn?: number;
};
// Token-based rate limiting: Keepa Pro = 1 token/min regeneration.
// Each product request costs 1 token regardless of ASIN count (up to 100).
@@ -35,6 +46,168 @@ async function waitForToken(): Promise<void> {
tokensLeft = 1;
}
function wait(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function buildProductUrl(
queryParam: "asin" | "code",
values: string[],
): string {
const params = new URLSearchParams({
key: config.keepaApiKey,
domain: "1",
stats: "90",
buybox: "1",
days: "90",
});
params.set(queryParam, values.join(","));
return `${KEEPA_BASE}/product?${params.toString()}`;
}
function updateTokenState(data: KeepaApiResponse): void {
if (data.tokensLeft != null) tokensLeft = data.tokensLeft;
if (data.refillRate != null) refillRate = data.refillRate;
}
function computeWaitMsFromRefill(refillIn?: number): number {
if (
typeof refillIn === "number" &&
Number.isFinite(refillIn) &&
refillIn >= 0
) {
return Math.max(
Math.ceil(refillIn) + KEEP_RETRY_BUFFER_MS,
KEEP_RETRY_BUFFER_MS,
);
}
const safeRefillRate = Math.max(1, refillRate);
return Math.ceil((1 / safeRefillRate) * 60_000) + KEEP_RETRY_BUFFER_MS;
}
function parseErrorPayload(text: string): KeepaApiResponse | null {
try {
const parsed = JSON.parse(text) as KeepaApiResponse;
return parsed && typeof parsed === "object" ? parsed : null;
} catch {
return null;
}
}
async function fetchKeepaWithRetries(
url: string,
operationLabel: string,
): Promise<KeepaApiResponse> {
let lastErrorMessage = "Unknown Keepa error";
for (let attempt = 1; attempt <= MAX_KEEPA_RETRIES; attempt++) {
await waitForToken();
const res = await fetch(url);
lastRequestTime = Date.now();
if (res.ok) {
const data = (await res.json()) as KeepaApiResponse;
updateTokenState(data);
return data;
}
const text = await res.text();
const payload = parseErrorPayload(text);
if (payload) {
updateTokenState(payload);
}
lastErrorMessage = `Keepa API error ${res.status}: ${text}`;
if (res.status !== 429 || attempt === MAX_KEEPA_RETRIES) {
break;
}
const waitMs = computeWaitMsFromRefill(payload?.refillIn);
tokensLeft = Math.min(tokensLeft, 0);
console.warn(
`Keepa throttled during ${operationLabel} (attempt ${attempt}/${MAX_KEEPA_RETRIES}). Waiting ${Math.ceil(waitMs / 1000)}s before retry...`,
);
await wait(waitMs);
}
throw new Error(lastErrorMessage);
}
function normalizeUpc(input: string): string {
return input.trim();
}
function isValidUpc(value: string): boolean {
return UPC_PATTERN.test(value);
}
function normalizeCodeFromKeepa(value: string): string {
return value.replace(/\D/g, "");
}
function collectCodes(value: unknown, target: Set<string>): void {
if (Array.isArray(value)) {
for (const item of value) {
collectCodes(item, target);
}
return;
}
if (typeof value === "number" && Number.isFinite(value)) {
const normalized = normalizeCodeFromKeepa(String(Math.trunc(value)));
if (isValidUpc(normalized)) target.add(normalized);
return;
}
if (typeof value !== "string") {
return;
}
for (const rawPart of value.split(/[\s,;|]+/)) {
if (!rawPart) continue;
const normalized = normalizeCodeFromKeepa(rawPart);
if (isValidUpc(normalized)) target.add(normalized);
}
}
function extractUpcsFromProduct(product: Record<string, any>): string[] {
const codes = new Set<string>();
const candidates: unknown[] = [
product.upcList,
product.upc,
product.eanList,
product.ean,
product.gtinList,
product.gtin,
];
for (const candidate of candidates) {
collectCodes(candidate, codes);
}
return Array.from(codes);
}
function buildFailureDetail(
upc: string,
status: "invalid_upc" | "not_found" | "multiple_asins" | "request_failed",
reason: string,
candidateAsins: string[] = [],
): KeepaUpcLookupDetail {
return {
requestedUpc: upc,
normalizedUpc: upc,
status,
asin: null,
candidateAsins,
keepaData: null,
reason,
};
}
export async function fetchKeepaDataBatch(
asins: string[],
): Promise<Map<string, KeepaData>> {
@@ -43,32 +216,13 @@ export async function fetchKeepaDataBatch(
// Split into chunks of MAX_ASINS_PER_REQUEST
for (let i = 0; i < asins.length; i += MAX_ASINS_PER_REQUEST) {
const chunk = asins.slice(i, i + MAX_ASINS_PER_REQUEST);
await waitForToken();
const asinParam = chunk.join(",");
const url = `${KEEPA_BASE}/product?key=${config.keepaApiKey}&domain=1&asin=${asinParam}&stats=90&buybox=1&days=90`;
const url = buildProductUrl("asin", chunk);
console.log(
`Keepa: fetching ${chunk.length} ASINs (tokens left: ${tokensLeft})...`,
);
const res = await fetch(url);
lastRequestTime = Date.now();
if (!res.ok) {
const text = await res.text();
throw new Error(`Keepa API error ${res.status}: ${text}`);
}
const data = (await res.json()) as {
products?: Record<string, any>[];
tokensLeft?: number;
refillRate?: number;
};
// Update token state from API response
if (data.tokensLeft != null) tokensLeft = data.tokensLeft;
if (data.refillRate != null) refillRate = data.refillRate;
const data = await fetchKeepaWithRetries(url, "ASIN batch fetch");
console.log(
`Keepa: ${data.products?.length ?? 0} products returned, ${tokensLeft} tokens remaining (refill: ${refillRate}/min)`,
@@ -86,6 +240,133 @@ export async function fetchKeepaDataBatch(
return results;
}
export async function lookupKeepaUpcs(
upcs: string[],
): Promise<Map<string, KeepaUpcLookupDetail>> {
const details = new Map<string, KeepaUpcLookupDetail>();
const validUpcs: string[] = [];
const seenValid = new Set<string>();
for (const rawUpc of upcs) {
const normalized = normalizeUpc(rawUpc);
if (!isValidUpc(normalized)) {
if (!details.has(normalized)) {
details.set(
normalized,
buildFailureDetail(
normalized,
"invalid_upc",
"UPC must be 12, 13, or 14 digits",
),
);
}
continue;
}
if (seenValid.has(normalized)) continue;
seenValid.add(normalized);
validUpcs.push(normalized);
}
for (let i = 0; i < validUpcs.length; i += MAX_CODES_PER_REQUEST) {
const chunk = validUpcs.slice(i, i + MAX_CODES_PER_REQUEST);
const chunkSet = new Set(chunk);
const url = buildProductUrl("code", chunk);
console.log(
`Keepa: mapping ${chunk.length} UPCs to ASINs (tokens left: ${tokensLeft})...`,
);
try {
const data = await fetchKeepaWithRetries(url, "UPC code lookup");
console.log(
`Keepa: ${data.products?.length ?? 0} products returned for UPC query, ${tokensLeft} tokens remaining (refill: ${refillRate}/min)`,
);
const byUpc = new Map<string, Map<string, KeepaData>>();
for (const product of data.products ?? []) {
const asin = String(product.asin ?? "").trim();
if (!asin) continue;
const keepaData = parseKeepaProduct(product);
const productUpcs = extractUpcsFromProduct(product);
for (const upc of productUpcs) {
if (!chunkSet.has(upc)) continue;
if (!byUpc.has(upc)) byUpc.set(upc, new Map());
byUpc.get(upc)!.set(asin, keepaData);
}
}
for (const upc of chunk) {
const asinMap = byUpc.get(upc);
if (!asinMap || asinMap.size === 0) {
details.set(
upc,
buildFailureDetail(
upc,
"not_found",
"No Keepa product matched this UPC",
),
);
continue;
}
const candidateAsins = Array.from(asinMap.keys());
if (candidateAsins.length > 1) {
details.set(
upc,
buildFailureDetail(
upc,
"multiple_asins",
`UPC matched multiple ASINs (${candidateAsins.length})`,
candidateAsins,
),
);
continue;
}
const asin = candidateAsins[0]!;
details.set(upc, {
requestedUpc: upc,
normalizedUpc: upc,
status: "found",
asin,
candidateAsins: [asin],
keepaData: asinMap.get(asin) ?? null,
});
}
} catch (error) {
const reason = error instanceof Error ? error.message : String(error);
console.warn(
`Keepa UPC chunk failed (offset ${i}, size ${chunk.length}): ${reason}`,
);
for (const upc of chunk) {
details.set(upc, buildFailureDetail(upc, "request_failed", reason));
}
}
}
return details;
}
export async function mapUpcsToAsins(
upcs: string[],
): Promise<Map<string, string>> {
const details = await lookupKeepaUpcs(upcs);
const mapping = new Map<string, string>();
for (const [upc, detail] of details.entries()) {
if (detail.status === "found" && detail.asin) {
mapping.set(upc, detail.asin);
}
}
return mapping;
}
function parseKeepaProduct(product: Record<string, any>): KeepaData {
const stats = product.stats;
const csv = product.csv;

View File

@@ -1,9 +1,19 @@
import index from "./web/index.html";
import { getDb, initDb } from "./database.ts";
import { fetchKeepaDataBatch } from "./keepa.ts";
import {
fetchKeepaDataBatch,
lookupKeepaUpcs,
mapUpcsToAsins,
} from "./keepa.ts";
import { runUpcFileAnalysis } from "./upc-file-analysis.ts";
import { fetchSellabilityBatch, fetchSpApiPricingAndFees } from "./sp-api.ts";
import { analyzeProducts } from "./llm.ts";
import type { EnrichedProduct, ProductRecord, SpApiData } from "./types.ts";
import type {
EnrichedProduct,
KeepaUpcLookupDetail,
ProductRecord,
SpApiData,
} from "./types.ts";
type ProcessType = "lead_analysis" | "category_analysis";
@@ -46,6 +56,7 @@ const DB_PATH = process.env.RESULTS_DB_PATH || "./results.db";
const DEFAULT_PAGE_SIZE = 25;
const MAX_PAGE_SIZE = 200;
const ASIN_PATTERN = /^[A-Z0-9]{10}$/;
const MAX_UPCS_PER_REQUEST = 1000;
initDb(DB_PATH);
const db = getDb(DB_PATH);
@@ -82,6 +93,188 @@ function isValidAsin(value: string): boolean {
return ASIN_PATTERN.test(value);
}
function splitRawUpcValues(input: string): string[] {
return input
.split(/[\s,;|]+/)
.map((chunk) => chunk.trim())
.filter(Boolean);
}
function collectUpcsFromUnknown(value: unknown, target: string[]): void {
if (typeof value === "string") {
target.push(...splitRawUpcValues(value));
return;
}
if (typeof value === "number" && Number.isFinite(value)) {
target.push(String(Math.trunc(value)));
return;
}
if (Array.isArray(value)) {
for (const item of value) {
collectUpcsFromUnknown(item, target);
}
}
}
function normalizeAndDedupeUpcs(values: string[]): string[] {
const seen = new Set<string>();
const normalized: string[] = [];
for (const value of values) {
const upc = value.trim();
if (!upc || seen.has(upc)) continue;
seen.add(upc);
normalized.push(upc);
}
return normalized;
}
function parseUpcsFromSearchParams(params: URLSearchParams): string[] {
const parsed: string[] = [];
for (const value of params.getAll("upc")) {
collectUpcsFromUnknown(value, parsed);
}
const upcsValue = params.get("upcs");
if (upcsValue) {
collectUpcsFromUnknown(upcsValue, parsed);
}
return normalizeAndDedupeUpcs(parsed);
}
async function parseUpcsFromRequest(req: Request): Promise<string[]> {
if (req.method === "GET") {
const url = new URL(req.url);
return parseUpcsFromSearchParams(url.searchParams);
}
if (req.method !== "POST") {
throw new Error("Method not allowed");
}
let body: unknown;
try {
body = await req.json();
} catch {
throw new Error("Invalid JSON body");
}
const parsed: string[] = [];
if (body && typeof body === "object" && "upcs" in body) {
collectUpcsFromUnknown((body as { upcs?: unknown }).upcs, parsed);
} else {
collectUpcsFromUnknown(body, parsed);
}
return normalizeAndDedupeUpcs(parsed);
}
function validateUpcRequest(upcs: string[]): string | null {
if (upcs.length === 0) {
return "Provide at least one UPC via query (?upc=...) or JSON body { upcs: [...] }";
}
if (upcs.length > MAX_UPCS_PER_REQUEST) {
return `Too many UPCs. Maximum allowed per request is ${MAX_UPCS_PER_REQUEST}.`;
}
return null;
}
function summarizeLookupStatuses(
details: KeepaUpcLookupDetail[],
): Record<string, number> {
const counts: Record<string, number> = {};
for (const detail of details) {
counts[detail.status] = (counts[detail.status] ?? 0) + 1;
}
return counts;
}
function parsePositiveIntField(
value: unknown,
fieldName: string,
): number | undefined {
if (value == null) return undefined;
if (typeof value === "number") {
if (!Number.isInteger(value) || value < 1) {
throw new Error(`${fieldName} must be a positive integer`);
}
return value;
}
if (typeof value === "string" && value.trim().length > 0) {
const parsed = Number.parseInt(value, 10);
if (!Number.isFinite(parsed) || parsed < 1) {
throw new Error(`${fieldName} must be a positive integer`);
}
return parsed;
}
throw new Error(`${fieldName} must be a positive integer`);
}
type UpcFileProcessRequest = {
inputFile: string;
outputFile?: string;
inputBatchSize?: number;
upcLookupBatchSize?: number;
maxRows?: number;
};
async function parseUpcFileProcessRequest(
req: Request,
): Promise<UpcFileProcessRequest> {
if (req.method !== "POST") {
throw new Error("Method not allowed");
}
let body: unknown;
try {
body = await req.json();
} catch {
throw new Error("Invalid JSON body");
}
if (!body || typeof body !== "object") {
throw new Error("Request body must be an object");
}
const parsedBody = body as Record<string, unknown>;
const inputFileValue = parsedBody.inputFile;
if (
typeof inputFileValue !== "string" ||
inputFileValue.trim().length === 0
) {
throw new Error("inputFile is required and must be a non-empty string");
}
const outputFileValue = parsedBody.outputFile;
if (
outputFileValue != null &&
(typeof outputFileValue !== "string" || outputFileValue.trim().length === 0)
) {
throw new Error("outputFile must be a non-empty string when provided");
}
return {
inputFile: inputFileValue.trim(),
outputFile:
typeof outputFileValue === "string" ? outputFileValue.trim() : undefined,
inputBatchSize: parsePositiveIntField(
parsedBody.inputBatchSize,
"inputBatchSize",
),
upcLookupBatchSize: parsePositiveIntField(
parsedBody.upcLookupBatchSize,
"upcLookupBatchSize",
),
maxRows: parsePositiveIntField(parsedBody.maxRows, "maxRows"),
};
}
function parseSort(
sortParam: string | null,
allowed: Set<string>,
@@ -1074,6 +1267,97 @@ const server = Bun.serve({
const url = new URL(req.url);
return json(getProductList(url.searchParams));
},
"/api/upc/map": async (req) => {
let upcs: string[];
try {
upcs = await parseUpcsFromRequest(req);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const status = message === "Method not allowed" ? 405 : 400;
return json({ error: message }, status);
}
const validationError = validateUpcRequest(upcs);
if (validationError) {
return json({ error: validationError }, 400);
}
try {
const mapping = await mapUpcsToAsins(upcs);
const items = Array.from(mapping.entries()).map(([upc, asin]) => ({
upc,
asin,
}));
return json({
requested: upcs.length,
matched: items.length,
items,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return json({ error: message }, 500);
}
},
"/api/upc/lookup": async (req) => {
let upcs: string[];
try {
upcs = await parseUpcsFromRequest(req);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const status = message === "Method not allowed" ? 405 : 400;
return json({ error: message }, status);
}
const validationError = validateUpcRequest(upcs);
if (validationError) {
return json({ error: validationError }, 400);
}
try {
const detailMap = await lookupKeepaUpcs(upcs);
const items = Array.from(detailMap.values());
return json({
requested: upcs.length,
statusCounts: summarizeLookupStatuses(items),
items,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return json({ error: message }, 500);
}
},
"/api/process/upc-file": async (req) => {
let parsed: UpcFileProcessRequest;
try {
parsed = await parseUpcFileProcessRequest(req);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const status =
message === "Method not allowed"
? 405
: message === "Invalid JSON body"
? 400
: 400;
return json({ error: message }, status);
}
try {
const summary = await runUpcFileAnalysis({
inputFile: parsed.inputFile,
outputFile: parsed.outputFile,
inputBatchSize: parsed.inputBatchSize,
upcLookupBatchSize: parsed.upcLookupBatchSize,
maxRows: parsed.maxRows,
dbPath: DB_PATH,
manageResources: false,
});
return json(summary);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return json({ error: message }, 500);
}
},
"/api/runs/:processType/:runId": (req) => {
const processType = req.params.processType as ProcessType;
const runId = Number(req.params.runId);

View File

@@ -44,6 +44,23 @@ export interface KeepaData {
categoryTree: string[];
}
export type KeepaUpcLookupStatus =
| "found"
| "invalid_upc"
| "not_found"
| "multiple_asins"
| "request_failed";
export interface KeepaUpcLookupDetail {
requestedUpc: string;
normalizedUpc: string;
status: KeepaUpcLookupStatus;
asin: string | null;
candidateAsins: string[];
keepaData: KeepaData | null;
reason?: string;
}
export type SellabilityInfo = {
canSell: boolean | null;
sellabilityStatus: "available" | "restricted" | "not_available" | "unknown";

331
src/upc-file-analysis.ts Normal file
View File

@@ -0,0 +1,331 @@
import path from "node:path";
import { lookupKeepaUpcs } from "./keepa.ts";
import { processProductChunk, chunkArray } from "./analysis-pipeline.ts";
import {
processUpcFileInBatches,
type UpcInputRow,
} from "./upc-file-reader.ts";
import {
appendResultsToRun,
printResults,
refreshRunCountsInDb,
startRunInDb,
type RunCounts,
} from "./writer.ts";
import { initDb, closeDb } from "./database.ts";
import { connectCache, disconnectCache } from "./cache.ts";
import type {
KeepaUpcLookupDetail,
KeepaUpcLookupStatus,
ProductRecord,
} from "./types.ts";
const DB_PATH = process.env.RESULTS_DB_PATH || "./results.db";
const DEFAULT_INPUT_BATCH_SIZE = 200;
const DEFAULT_UPC_LOOKUP_BATCH_SIZE = 100;
export type UpcFileAnalysisOptions = {
inputFile: string;
outputFile?: string;
inputBatchSize?: number;
upcLookupBatchSize?: number;
maxRows?: number;
manageResources?: boolean;
dbPath?: string;
};
export type UpcFileAnalysisSummary = {
runId: number;
dbPath: string;
inputFile: string;
outputFile?: string;
processedRows: number;
matchedRows: number;
unresolvedByStatus: Record<KeepaUpcLookupStatus, number>;
runCounts: RunCounts;
reader: {
mode: "xlsx_stream" | "xlsx_fallback" | "xls_fallback";
totalRowsSeen: number;
emittedRows: number;
skippedMissingUpc: number;
skippedInvalidUpc: number;
};
};
function printUsage(): void {
console.log("Usage:");
console.log(
" bun run src/upc-file-analysis.ts --input <file.xls|file.xlsx> [--out output.xlsx] [--input-batch-size 200] [--upc-lookup-batch-size 100] [--max-rows 1000]",
);
}
function parsePositiveInt(value: string | undefined, flagName: string): number {
const parsed = Number.parseInt(String(value), 10);
if (!Number.isFinite(parsed) || parsed < 1) {
throw new Error(`Invalid value for ${flagName}: ${value}`);
}
return parsed;
}
function parseArgs(argv: string[]): UpcFileAnalysisOptions {
let inputFile: string | undefined;
let outputFile: string | undefined;
let inputBatchSize: number | undefined;
let upcLookupBatchSize: number | undefined;
let maxRows: number | undefined;
for (let i = 0; i < argv.length; i++) {
const arg = argv[i]!;
if (arg === "--help" || arg === "-h") {
printUsage();
process.exit(0);
}
if (arg === "--input") {
const next = argv[i + 1];
if (!next) throw new Error("Missing value after --input");
inputFile = next;
i++;
continue;
}
if (arg === "--out") {
const next = argv[i + 1];
if (!next) throw new Error("Missing value after --out");
outputFile = next;
i++;
continue;
}
if (arg === "--input-batch-size") {
inputBatchSize = parsePositiveInt(argv[i + 1], "--input-batch-size");
i++;
continue;
}
if (arg === "--upc-lookup-batch-size") {
upcLookupBatchSize = parsePositiveInt(
argv[i + 1],
"--upc-lookup-batch-size",
);
i++;
continue;
}
if (arg === "--max-rows") {
maxRows = parsePositiveInt(argv[i + 1], "--max-rows");
i++;
continue;
}
if (arg.startsWith("--")) {
throw new Error(`Unknown flag: ${arg}`);
}
if (!inputFile) {
inputFile = arg;
continue;
}
throw new Error(`Unexpected positional argument: ${arg}`);
}
if (!inputFile) {
throw new Error("Missing --input <file.xls|file.xlsx>");
}
return {
inputFile,
outputFile,
inputBatchSize,
upcLookupBatchSize,
maxRows,
};
}
function resolveDefaultOutputPath(inputFile: string): string {
const parsedInput = path.parse(inputFile);
return path.join(parsedInput.dir, `${parsedInput.name}_upc_results.xlsx`);
}
function createStatusCounter(): Record<KeepaUpcLookupStatus, number> {
return {
found: 0,
invalid_upc: 0,
not_found: 0,
multiple_asins: 0,
request_failed: 0,
};
}
async function lookupUpcsWithChunking(
rows: UpcInputRow[],
lookupBatchSize: number,
): Promise<Map<string, KeepaUpcLookupDetail>> {
const uniqueUpcs = Array.from(new Set(rows.map((row) => row.upc)));
const chunks = chunkArray(uniqueUpcs, lookupBatchSize);
const details = new Map<string, KeepaUpcLookupDetail>();
for (let i = 0; i < chunks.length; i++) {
const chunk = chunks[i]!;
console.log(
` Keepa UPC lookup chunk ${i + 1}/${chunks.length} (${chunk.length} UPCs)...`,
);
const chunkDetails = await lookupKeepaUpcs(chunk);
for (const [upc, detail] of chunkDetails.entries()) {
details.set(upc, detail);
}
}
return details;
}
function toProductRecord(
row: UpcInputRow,
detail: KeepaUpcLookupDetail,
): ProductRecord {
const keepaCategory = detail.keepaData?.categoryTree?.[0];
return {
asin: detail.asin ?? row.upc,
name: row.name ?? detail.asin ?? row.upc,
unitCost: row.unitCost ?? 0,
brand: row.brand,
category: row.category ?? keepaCategory,
};
}
export async function runUpcFileAnalysis(
options: UpcFileAnalysisOptions,
): Promise<UpcFileAnalysisSummary> {
const dbPath = options.dbPath ?? DB_PATH;
const inputBatchSize = Math.max(
1,
options.inputBatchSize ?? DEFAULT_INPUT_BATCH_SIZE,
);
const lookupBatchSize = Math.max(
1,
options.upcLookupBatchSize ?? DEFAULT_UPC_LOOKUP_BATCH_SIZE,
);
const outputFile =
options.outputFile ?? resolveDefaultOutputPath(options.inputFile);
const manageResources = options.manageResources ?? true;
if (manageResources) {
console.log("Connecting to Redis...");
await connectCache();
console.log("Initializing SQLite database...");
initDb(dbPath);
}
const unresolvedByStatus = createStatusCounter();
const printableSample = [];
let processedRows = 0;
let matchedRows = 0;
const runId = startRunInDb(dbPath, options.inputFile, outputFile);
try {
const readerSummary = await processUpcFileInBatches(
options.inputFile,
async ({ batchNumber, rows }) => {
console.log(
`\n=== UPC input batch ${batchNumber} (${rows.length} rows) ===`,
);
processedRows += rows.length;
const detailMap = await lookupUpcsWithChunking(rows, lookupBatchSize);
const matchedProducts: ProductRecord[] = [];
for (const row of rows) {
const detail = detailMap.get(row.upc);
if (!detail) {
unresolvedByStatus.request_failed += 1;
continue;
}
unresolvedByStatus[detail.status] += 1;
if (detail.status === "found" && detail.asin) {
matchedRows += 1;
matchedProducts.push(toProductRecord(row, detail));
}
}
console.log(
`Batch ${batchNumber}: ${matchedProducts.length}/${rows.length} rows resolved to single ASINs`,
);
if (matchedProducts.length === 0) {
return;
}
const analyzed = await processProductChunk(matchedProducts);
appendResultsToRun(dbPath, runId, analyzed);
if (printableSample.length < 200) {
const remaining = 200 - printableSample.length;
printableSample.push(...analyzed.slice(0, remaining));
}
},
{
batchSize: inputBatchSize,
maxRows: options.maxRows,
},
);
const runCounts = refreshRunCountsInDb(dbPath, runId);
if (printableSample.length > 0) {
printResults(printableSample);
if (runCounts.totalProducts > printableSample.length) {
console.log(
`Printed ${printableSample.length} sampled results out of ${runCounts.totalProducts} analyzed products.`,
);
}
} else {
console.log("No products were eligible for analysis after UPC mapping.");
}
return {
runId,
dbPath,
inputFile: options.inputFile,
outputFile,
processedRows,
matchedRows,
unresolvedByStatus,
runCounts,
reader: {
mode: readerSummary.mode,
totalRowsSeen: readerSummary.totalRowsSeen,
emittedRows: readerSummary.emittedRows,
skippedMissingUpc: readerSummary.skippedMissingUpc,
skippedInvalidUpc: readerSummary.skippedInvalidUpc,
},
};
} finally {
if (manageResources) {
await disconnectCache();
closeDb();
}
}
}
async function main(): Promise<void> {
const parsed = parseArgs(process.argv.slice(2));
const summary = await runUpcFileAnalysis(parsed);
console.log("\n=== UPC file analysis summary ===");
console.log(JSON.stringify(summary, null, 2));
}
if (import.meta.main) {
main().catch((err) => {
const message = err instanceof Error ? err.message : String(err);
console.error(`UPC file analysis failed: ${message}`);
process.exit(1);
});
}

360
src/upc-file-reader.ts Normal file
View File

@@ -0,0 +1,360 @@
import ExcelJS from "exceljs";
import * as XLSX from "xlsx";
import path from "node:path";
const UPC_PATTERN = /^\d{12,14}$/;
const COLUMN_CANDIDATES = {
upc: ["upc", "upc code", "upc/ean", "ean", "gtin", "barcode", "product code"],
name: ["name", "product name", "title", "product title"],
unitCost: ["unit cost", "cost", "price", "buy cost", "unit_cost", "unitcost"],
brand: ["brand"],
category: ["category"],
} as const;
type ColumnKey = keyof typeof COLUMN_CANDIDATES;
type ColumnMap = Record<ColumnKey, number | undefined>;
export type UpcInputRow = {
rowNumber: number;
upc: string;
name?: string;
unitCost?: number;
brand?: string;
category?: string;
};
export type UpcInputBatch = {
batchNumber: number;
rows: UpcInputRow[];
};
export type UpcReaderSummary = {
filePath: string;
mode: "xlsx_stream" | "xlsx_fallback" | "xls_fallback";
totalRowsSeen: number;
emittedRows: number;
skippedMissingUpc: number;
skippedInvalidUpc: number;
};
export type UpcReaderOptions = {
batchSize?: number;
maxRows?: number;
};
export async function processUpcFileInBatches(
filePath: string,
onBatch: (batch: UpcInputBatch) => Promise<void>,
options: UpcReaderOptions = {},
): Promise<UpcReaderSummary> {
const ext = path.extname(filePath).toLowerCase();
if (ext === ".xlsx") {
try {
return await processXlsxStreaming(filePath, onBatch, options);
} catch (err) {
console.warn(
`XLSX streaming reader failed, falling back to in-memory parser: ${err}`,
);
return processXlsLikeFallback(
filePath,
onBatch,
options,
"xlsx_fallback",
);
}
}
if (ext === ".xls") {
return processXlsLikeFallback(filePath, onBatch, options, "xls_fallback");
}
throw new Error(`Unsupported file extension: ${ext}. Expected .xls or .xlsx`);
}
async function processXlsxStreaming(
filePath: string,
onBatch: (batch: UpcInputBatch) => Promise<void>,
options: UpcReaderOptions,
): Promise<UpcReaderSummary> {
const batchSize = Math.max(1, options.batchSize ?? 200);
const maxRows =
options.maxRows && options.maxRows > 0 ? options.maxRows : null;
let headerDetected = false;
let columns: ColumnMap | null = null;
let seenRows = 0;
let emittedRows = 0;
let skippedMissingUpc = 0;
let skippedInvalidUpc = 0;
let batchNumber = 1;
let currentBatch: UpcInputRow[] = [];
let stop = false;
const flush = async () => {
if (currentBatch.length === 0) return;
await onBatch({ batchNumber, rows: currentBatch });
batchNumber += 1;
currentBatch = [];
};
const workbookReader = new ExcelJS.stream.xlsx.WorkbookReader(filePath, {
worksheets: "emit",
sharedStrings: "cache",
hyperlinks: "ignore",
styles: "ignore",
});
for await (const worksheet of workbookReader) {
if (stop) break;
for await (const row of worksheet) {
const values = normalizeExcelJsRow(row.values as unknown[]);
if (!headerDetected) {
columns = detectColumns(values);
if (columns.upc == null) {
throw new Error(
`No UPC column found in header row. Header row values: ${values.join(", ")}`,
);
}
headerDetected = true;
continue;
}
seenRows += 1;
const parsed = parseUpcInputRow(values, columns, row.number);
if (!parsed) {
skippedMissingUpc += 1;
continue;
}
if (!isValidUpc(parsed.upc)) {
skippedInvalidUpc += 1;
continue;
}
currentBatch.push(parsed);
emittedRows += 1;
if (currentBatch.length >= batchSize) {
await flush();
}
if (maxRows != null && emittedRows >= maxRows) {
stop = true;
break;
}
}
// Process only the first worksheet.
break;
}
await flush();
if (!headerDetected) {
throw new Error("No rows found in the first worksheet.");
}
return {
filePath,
mode: "xlsx_stream",
totalRowsSeen: seenRows,
emittedRows,
skippedMissingUpc,
skippedInvalidUpc,
};
}
function processXlsLikeFallback(
filePath: string,
onBatch: (batch: UpcInputBatch) => Promise<void>,
options: UpcReaderOptions,
mode: "xlsx_fallback" | "xls_fallback",
): Promise<UpcReaderSummary> {
return new Promise<UpcReaderSummary>(async (resolve, reject) => {
try {
const batchSize = Math.max(1, options.batchSize ?? 200);
const maxRows =
options.maxRows && options.maxRows > 0 ? options.maxRows : null;
const workbook = XLSX.readFile(filePath, { raw: true });
const sheetName = workbook.SheetNames[0];
if (!sheetName) throw new Error("No sheets found in file");
const sheet = workbook.Sheets[sheetName];
if (!sheet || !sheet["!ref"]) throw new Error("Sheet has no data");
const range = XLSX.utils.decode_range(sheet["!ref"]);
const headerValues: string[] = [];
for (let c = range.s.c; c <= range.e.c; c++) {
const cellAddress = XLSX.utils.encode_cell({ r: range.s.r, c });
const value = sheet[cellAddress]?.v;
headerValues.push(normalizeOptionalString(value) ?? "");
}
const columns = detectColumns(headerValues);
if (columns.upc == null) {
throw new Error(
`No UPC column found in header row. Header row values: ${headerValues.join(", ")}`,
);
}
let seenRows = 0;
let emittedRows = 0;
let skippedMissingUpc = 0;
let skippedInvalidUpc = 0;
let batchNumber = 1;
let currentBatch: UpcInputRow[] = [];
const flush = async () => {
if (currentBatch.length === 0) return;
await onBatch({ batchNumber, rows: currentBatch });
batchNumber += 1;
currentBatch = [];
};
for (let r = range.s.r + 1; r <= range.e.r; r++) {
seenRows += 1;
const rowValues: string[] = [];
for (let c = range.s.c; c <= range.e.c; c++) {
const cellAddress = XLSX.utils.encode_cell({ r, c });
rowValues.push(normalizeOptionalString(sheet[cellAddress]?.v) ?? "");
}
const parsed = parseUpcInputRow(rowValues, columns, r + 1);
if (!parsed) {
skippedMissingUpc += 1;
continue;
}
if (!isValidUpc(parsed.upc)) {
skippedInvalidUpc += 1;
continue;
}
currentBatch.push(parsed);
emittedRows += 1;
if (currentBatch.length >= batchSize) {
await flush();
}
if (maxRows != null && emittedRows >= maxRows) {
break;
}
}
await flush();
resolve({
filePath,
mode,
totalRowsSeen: seenRows,
emittedRows,
skippedMissingUpc,
skippedInvalidUpc,
});
} catch (err) {
reject(err);
}
});
}
function detectColumns(headers: string[]): ColumnMap {
const columns = {} as ColumnMap;
for (const key of Object.keys(COLUMN_CANDIDATES) as ColumnKey[]) {
columns[key] = findColumnIndex(headers, [...COLUMN_CANDIDATES[key]]);
}
return columns;
}
function findColumnIndex(
headers: string[],
candidates: string[],
): number | undefined {
const normalizedCandidates = new Set(candidates.map(normalizeHeader));
for (let i = 0; i < headers.length; i++) {
if (normalizedCandidates.has(normalizeHeader(headers[i] ?? ""))) {
return i;
}
}
return undefined;
}
function parseUpcInputRow(
rowValues: string[],
columns: ColumnMap,
rowNumber: number,
): UpcInputRow | null {
if (columns.upc == null) return null;
const rawUpc = rowValues[columns.upc] ?? "";
const upc = rawUpc.replace(/\D/g, "").trim();
if (!upc) {
return null;
}
return {
rowNumber,
upc,
name: getRowString(rowValues, columns.name),
unitCost: parseOptionalNumber(rowValues[columns.unitCost ?? -1]),
brand: getRowString(rowValues, columns.brand),
category: getRowString(rowValues, columns.category),
};
}
function normalizeExcelJsRow(values: unknown[]): string[] {
// ExcelJS row.values is 1-indexed with values[0] intentionally empty.
const normalized: string[] = [];
for (let i = 1; i < values.length; i++) {
normalized.push(normalizeOptionalString(values[i]) ?? "");
}
return normalized;
}
function getRowString(
values: string[],
index: number | undefined,
): string | undefined {
if (index == null || index < 0) return undefined;
const value = values[index];
return value?.trim() ? value.trim() : undefined;
}
function normalizeHeader(value: string): string {
return value
.toLowerCase()
.trim()
.replace(/%/g, " pct ")
.replace(/\$/g, " usd ")
.replace(/[^a-z0-9]/g, "");
}
function normalizeOptionalString(value: unknown): string | undefined {
if (value == null) return undefined;
if (typeof value === "object") {
if ("text" in (value as Record<string, unknown>)) {
return normalizeOptionalString((value as { text?: unknown }).text);
}
if ("result" in (value as Record<string, unknown>)) {
return normalizeOptionalString((value as { result?: unknown }).result);
}
}
const text = String(value).trim();
return text.length > 0 ? text : undefined;
}
function parseOptionalNumber(value: unknown): number | undefined {
if (value == null || value === "") return undefined;
const cleaned = String(value).trim().replace(/[$,%]/g, "").replace(/,/g, "");
const parsed = Number(cleaned);
return Number.isFinite(parsed) ? parsed : undefined;
}
function isValidUpc(value: string): boolean {
return UPC_PATTERN.test(value);
}

147
src/upc-lookup.ts Normal file
View File

@@ -0,0 +1,147 @@
import { lookupKeepaUpcs, mapUpcsToAsins } from "./keepa.ts";
function printUsage(): void {
console.log("Usage:");
console.log(
" bun run src/upc-lookup.ts <upc...> [--detailed] [--json] [--file path]",
);
console.log("");
console.log("Examples:");
console.log(" bun run src/upc-lookup.ts 012345678901 098765432109");
console.log(
" bun run src/upc-lookup.ts 012345678901,098765432109 --detailed",
);
console.log(" bun run src/upc-lookup.ts --file upcs.txt --detailed --json");
}
function splitRawUpcValues(input: string): string[] {
return input
.split(/[\s,;|]+/)
.map((chunk) => chunk.trim())
.filter(Boolean);
}
async function readUpcsFromFile(path: string): Promise<string[]> {
const file = Bun.file(path);
if (!(await file.exists())) {
throw new Error(`UPC file not found: ${path}`);
}
return splitRawUpcValues(await file.text());
}
function parseArgs(args: string[]): {
upcs: string[];
filePaths: string[];
detailed: boolean;
asJson: boolean;
} {
let detailed = false;
let asJson = false;
const collected: string[] = [];
const filePaths: string[] = [];
for (let i = 0; i < args.length; i++) {
const arg = args[i]!;
if (arg === "--help" || arg === "-h") {
printUsage();
process.exit(0);
}
if (arg === "--detailed") {
detailed = true;
continue;
}
if (arg === "--json") {
asJson = true;
continue;
}
if (arg === "--file") {
const next = args[i + 1];
if (!next) {
throw new Error("Missing file path after --file");
}
filePaths.push(next);
i++;
continue;
}
if (arg.startsWith("--")) {
throw new Error(`Unknown flag: ${arg}`);
}
collected.push(...splitRawUpcValues(arg));
}
return {
upcs: collected,
filePaths,
detailed,
asJson,
};
}
function dedupeUpcs(upcs: string[]): string[] {
return Array.from(new Set(upcs.map((upc) => upc.trim()).filter(Boolean)));
}
async function main(): Promise<void> {
const args = process.argv.slice(2);
const parsed = parseArgs(args);
const fileUpcs: string[] = [];
for (const path of parsed.filePaths) {
fileUpcs.push(...(await readUpcsFromFile(path)));
}
const upcs = dedupeUpcs([...parsed.upcs, ...fileUpcs]);
if (upcs.length === 0) {
printUsage();
process.exit(1);
}
if (parsed.detailed) {
const details = await lookupKeepaUpcs(upcs);
const items = Array.from(details.values());
if (parsed.asJson) {
console.log(JSON.stringify(items, null, 2));
return;
}
console.table(
items.map((item) => ({
upc: item.normalizedUpc,
status: item.status,
asin: item.asin ?? "",
candidates: item.candidateAsins.join("|"),
reason: item.reason ?? "",
})),
);
return;
}
const mapping = await mapUpcsToAsins(upcs);
const items = Array.from(mapping.entries()).map(([upc, asin]) => ({
upc,
asin,
}));
if (parsed.asJson) {
console.log(JSON.stringify(items, null, 2));
return;
}
if (items.length === 0) {
console.log(
"No one-to-one UPC to ASIN matches found. Run with --detailed for per-UPC status.",
);
return;
}
console.table(items);
}
main().catch((err) => {
const message = err instanceof Error ? err.message : String(err);
console.error(`UPC lookup failed: ${message}`);
process.exit(1);
});

View File

@@ -1,6 +1,25 @@
import { getDb } from "./database.ts";
import type { AnalysisResult } from "./types.ts";
export type RunCounts = {
totalProducts: number;
fbaCount: number;
fbmCount: number;
skipCount: number;
};
function computeRunCountsFromResults(results: AnalysisResult[]): RunCounts {
const fbaCount = results.filter((r) => r.verdict.verdict === "FBA").length;
const fbmCount = results.filter((r) => r.verdict.verdict === "FBM").length;
const skipCount = results.filter((r) => r.verdict.verdict === "SKIP").length;
return {
totalProducts: results.length,
fbaCount,
fbmCount,
skipCount,
};
}
function buildRow(r: AnalysisResult) {
const price =
r.product.keepa?.currentPrice ??
@@ -68,12 +87,25 @@ export function writeResultsToDb(
inputFile: string,
outputFile: string | undefined,
): void {
const database = getDb(dbPath);
const runCounts = computeRunCountsFromResults(results);
const runId = startRunInDb(dbPath, inputFile, outputFile, runCounts);
appendResultsToRun(dbPath, runId, results);
console.log(`Results written to SQLite database for run_id: ${runId}`);
}
export function startRunInDb(
dbPath: string,
inputFile: string,
outputFile: string | undefined,
counts: RunCounts = {
totalProducts: 0,
fbaCount: 0,
fbmCount: 0,
skipCount: 0,
},
): number {
const database = getDb(dbPath);
const timestamp = new Date().toISOString();
const fbaCount = results.filter((r) => r.verdict.verdict === "FBA").length;
const fbmCount = results.filter((r) => r.verdict.verdict === "FBM").length;
const skipCount = results.filter((r) => r.verdict.verdict === "SKIP").length;
const insertRun = database.prepare(
`INSERT INTO runs (
@@ -86,25 +118,39 @@ export function writeResultsToDb(
skip_count
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
);
const runInfo = insertRun.run(
timestamp,
inputFile,
outputFile ?? null,
results.length,
fbaCount,
fbmCount,
skipCount,
counts.totalProducts,
counts.fbaCount,
counts.fbmCount,
counts.skipCount,
);
const runId =
(runInfo.changes as number) > 0
? (runInfo.lastInsertRowid as number)
: null;
if (runId === null) {
console.error("Failed to insert run record into SQLite.");
throw new Error("Failed to insert run record into SQLite.");
}
return runId;
}
export function appendResultsToRun(
dbPath: string,
runId: number,
results: AnalysisResult[],
): void {
if (results.length === 0) {
return;
}
const database = getDb(dbPath);
const insertResult = database.prepare(
`INSERT INTO results (
run_id, asin, product_name, brand, category, unit_cost, current_price,
@@ -174,7 +220,49 @@ export function writeResultsToDb(
);
}
})();
console.log(`Results written to SQLite database for run_id: ${runId}`);
}
export function refreshRunCountsInDb(dbPath: string, runId: number): RunCounts {
const database = getDb(dbPath);
const stats = database
.query(
`SELECT
COUNT(*) AS total,
SUM(CASE WHEN verdict = 'FBA' THEN 1 ELSE 0 END) AS fba,
SUM(CASE WHEN verdict = 'FBM' THEN 1 ELSE 0 END) AS fbm,
SUM(CASE WHEN verdict = 'SKIP' THEN 1 ELSE 0 END) AS skip
FROM results
WHERE run_id = ?`,
)
.get(runId) as {
total: number;
fba: number | null;
fbm: number | null;
skip: number | null;
};
const counts: RunCounts = {
totalProducts: stats.total ?? 0,
fbaCount: stats.fba ?? 0,
fbmCount: stats.fbm ?? 0,
skipCount: stats.skip ?? 0,
};
database
.query(
`UPDATE runs
SET total_products = ?, fba_count = ?, fbm_count = ?, skip_count = ?
WHERE id = ?`,
)
.run(
counts.totalProducts,
counts.fbaCount,
counts.fbmCount,
counts.skipCount,
runId,
);
return counts;
}
export function printResults(results: AnalysisResult[]): void {
const rows = results