feat: transition bestseller analysis storage to SQLite and add category blacklist

- Replaces Excel output with structured database tables for tracking category analysis runs and product results.
- Implements a blacklist to exclude specific category IDs from the bestseller pipeline.
- Adds unit tests for category processing and enhances logging with levels and timestamps.
- Introduces foreign key enforcement and updated schema definitions in the database module.
This commit is contained in:
Victor Noguera
2026-04-13 00:28:23 -04:00
parent 7ba6397578
commit a906f5ede3
7 changed files with 434 additions and 242 deletions

View File

@@ -1,6 +1,6 @@
import { existsSync, mkdirSync, readFileSync } from "node:fs";
import path from "node:path";
import * as XLSX from "xlsx";
import { type Database, getDb, initDb } from "./database.ts";
import { config } from "./config.ts";
import { analyzeProducts } from "./llm.ts";
import { fetchSellabilityBatch, fetchSpApiPricingAndFees } from "./sp-api.ts";
@@ -36,9 +36,10 @@ type CategoryRunSummary = {
fba: number;
fbm: number;
skip: number;
outputFile: string;
status: "ok" | "empty" | "failed";
error: string;
runId?: number;
results?: AnalysisResult[];
};
const KEEPA_BASE = "https://api.keepa.com";
@@ -58,6 +59,15 @@ let keepaTokensLeft = 1;
let keepaRefillRate = 1;
let keepaLastRequestMs = 0;
function log(
level: "info" | "warn" | "error",
message: string,
...args: any[]
) {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] [${level.toUpperCase()}] ${message}`, ...args);
}
function parseArgs(): ParsedArgs {
const args = process.argv.slice(2);
const outputDir =
@@ -99,10 +109,11 @@ function readFlagValue(args: string[], flag: string): string | undefined {
function printUsageAndExit(message: string): never {
if (message) {
console.error(message);
log("error", message);
}
console.error(
log(
"error",
[
"Usage:",
" bun run src/bestsellers-by-category.ts [--category-limit 32] [--per-category-top 100] [--out-dir output] [--blacklist-file category-blacklist.csv]",
@@ -118,11 +129,112 @@ function printUsageAndExit(message: string): never {
process.exit(1);
}
export async function insertCategoryRunSummary(
db: Database,
summary: CategoryRunSummary,
runTimestamp: string,
): Promise<number> {
const query = `
INSERT INTO category_analysis_runs (
category_id, category_label, run_timestamp,
top_asins_checked, available_asins,
fba_count, fbm_count, skip_count,
status, error_message
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
`;
const result = db.run(query, [
summary.categoryId,
summary.categoryLabel,
runTimestamp,
summary.topAsinsChecked,
summary.availableAsins,
summary.fba,
summary.fbm,
summary.skip,
summary.status,
summary.error,
]);
// Bun's SQLite client returns { changes: number, lastInsertRowid: number | bigint }
return Number(result.lastInsertRowid);
}
export async function insertProductAnalysisResults(
db: Database,
runId: number,
results: AnalysisResult[],
): Promise<void> {
if (results.length === 0) {
return;
}
const insertStmt = db.prepare(`
INSERT INTO product_analysis_results (
asin, run_id, name, brand, category, unit_cost,
current_price, avg_price_90d, avg_price_90d_sheet,
selling_price_sheet, sales_rank, sales_rank_avg_90d,
seller_count, monthly_sold, rank_drops_30d, rank_drops_90d,
fba_fee, fbm_fee, referral_percent, can_sell,
sellability_status, sellability_reason,
verdict, confidence, reasoning, fetched_at
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?
);
`);
db.transaction((resultsBatch: AnalysisResult[]) => {
for (const r of resultsBatch) {
const price =
r.product.keepa?.currentPrice ??
r.product.record.sellingPriceFromSheet ??
r.product.spApi.estimatedSalePrice;
const rank = r.product.keepa?.salesRank ?? r.product.record.amazonRank;
insertStmt.run(
r.product.record.asin,
runId,
r.product.record.name,
r.product.record.brand ?? null,
r.product.record.category ??
r.product.keepa?.categoryTree?.join(" > ") ??
null,
r.product.record.unitCost ?? null,
price ?? null,
r.product.keepa?.avgPrice90 ?? null,
r.product.record.avgPrice90FromSheet ?? null,
r.product.record.sellingPriceFromSheet ?? null,
rank ?? null,
r.product.keepa?.salesRankAvg90 ?? null,
r.product.keepa?.sellerCount ?? null,
r.product.keepa?.monthlySold ?? null,
r.product.keepa?.salesRankDrops30 ?? null,
r.product.keepa?.salesRankDrops90 ?? null,
r.product.spApi.fbaFee ?? null,
r.product.spApi.fbmFee ?? null,
r.product.spApi.referralFeePercent ?? null,
r.product.spApi.canSell == null
? "unknown"
: r.product.spApi.canSell
? "yes"
: "no",
r.product.spApi.sellabilityStatus ?? null,
r.product.spApi.sellabilityReason ?? null,
r.verdict.verdict,
r.verdict.confidence,
r.verdict.reasoning ?? null,
r.product.fetchedAt,
);
}
})(results); // Execute the transaction with the results batch
}
function loadCategoryBlacklist(filePath: string): Set<number> {
const blacklist = new Set<number>();
if (!existsSync(filePath)) {
console.warn(
log(
"warn",
`Blacklist file not found at ${filePath}; continuing with no excluded categories.`,
);
return blacklist;
@@ -147,7 +259,8 @@ function loadCategoryBlacklist(filePath: string): Set<number> {
}
if (!idToken) {
console.warn(
log(
"warn",
`Blacklist CSV line ${lineNumber}: missing id, row ignored (${trimmed}).`,
);
continue;
@@ -155,20 +268,23 @@ function loadCategoryBlacklist(filePath: string): Set<number> {
const id = Number(idToken);
if (!Number.isInteger(id) || id <= 0) {
console.warn(
log(
"warn",
`Blacklist CSV line ${lineNumber}: invalid id '${idToken}', row ignored (${trimmed}).`,
);
continue;
}
if (!nameToken) {
console.warn(
log(
"warn",
`Blacklist CSV line ${lineNumber}: missing name for id ${id}; accepted but please add name.`,
);
}
if (blacklist.has(id)) {
console.warn(
log(
"warn",
`Blacklist CSV line ${lineNumber}: duplicate id ${id}, keeping first occurrence.`,
);
continue;
@@ -252,7 +368,8 @@ async function waitForKeepaToken(): Promise<void> {
(Date.now() - keepaLastRequestMs);
if (waitMs > 0) {
console.log(
log(
"info",
`Keepa tokens depleted; waiting ${Math.ceil(waitMs / 1000)}s...`,
);
await sleep(waitMs);
@@ -294,7 +411,8 @@ async function keepaGetJson(pathAndQuery: string): Promise<any> {
rateLimitHits++;
const waitMs = computeBackoffMs(rateLimitHits, rate.refillInMs);
console.warn(
log(
"warn",
`Keepa rate limited (429). Retry ${rateLimitHits} in ${Math.ceil(waitMs / 1000)}s...`,
);
await sleep(waitMs);
@@ -516,7 +634,8 @@ async function fetchSellabilityMap(
sellability.set(asin, info);
}
console.log(
log(
"info",
` Sellability progress: ${Math.min(i + chunk.length, asins.length)}/${asins.length}`,
);
}
@@ -548,7 +667,7 @@ async function fetchSpApiMap(
done++;
if (done % 10 === 0 || done === asins.length) {
console.log(` Pricing progress: ${done}/${asins.length}`);
log("info", ` Pricing progress: ${done}/${asins.length}`);
}
}
}
@@ -640,7 +759,8 @@ async function fetchKeepaEnrichmentMap(
});
}
console.log(
log(
"info",
` Keepa enrichment progress: ${Math.min(i + chunk.length, asins.length)}/${asins.length}`,
);
}
@@ -652,10 +772,9 @@ function buildEnrichedProducts(
asins: string[],
sellabilityMap: Map<string, SellabilityInfo>,
spApiMap: Map<string, SpApiData>,
titleByAsin: Map<string, string>,
keepaEnrichmentMap: Map<string, { keepa: KeepaData; title: string }>,
): EnrichedProduct[] {
return asins.map((asin) => {
const keepa = null;
const sellability = sellabilityMap.get(asin) ?? {
canSell: null,
sellabilityStatus: "unknown" as const,
@@ -672,15 +791,23 @@ function buildEnrichedProducts(
sellabilityReason: sellability.sellabilityReason,
};
const enrichedKeepa = keepaEnrichmentMap.get(asin);
const keepa = enrichedKeepa?.keepa ?? null;
const title = enrichedKeepa?.title ?? asin;
const record: ProductRecord = {
asin,
name: titleByAsin.get(asin) ?? asin,
name: title,
unitCost: 0,
category: undefined,
brand: undefined,
supplier: undefined,
};
if (keepa?.currentPrice && spApi.estimatedSalePrice === 0) {
spApi.estimatedSalePrice = keepa.currentPrice;
}
return {
record,
keepa,
@@ -700,14 +827,14 @@ async function runLlmInBatches(
const batchNum = Math.floor(i / LLM_BATCH_SIZE) + 1;
const totalBatches = Math.ceil(products.length / LLM_BATCH_SIZE);
console.log(` LLM batch ${batchNum}/${totalBatches}...`);
log("info", ` LLM batch ${batchNum}/${totalBatches}...`);
let batchVerdicts: LlmVerdict[];
try {
batchVerdicts = await analyzeProducts(batch);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
console.warn(` LLM batch failed: ${message}`);
log("warn", ` LLM batch failed: ${message}`);
batchVerdicts = batch.map((p) => ({
asin: p.record.asin,
verdict: "SKIP",
@@ -726,16 +853,16 @@ async function runLlmInBatches(
return verdicts;
}
async function processCategory(
export async function processCategory(
db: Database,
category: CategoryInfo,
perCategoryTop: number,
outputDir: string,
): Promise<CategoryRunSummary> {
console.log(`\nCategory ${category.label} (${category.id})`);
log("info", `\nCategory ${category.label} (${category.id})`);
const topAsins = await fetchCategoryBestSellerAsins(category, perCategoryTop);
if (topAsins.length === 0) {
console.log(" Keepa returned no ASINs for this category.");
log("info", " Keepa returned no ASINs for this category.");
return {
categoryId: category.id,
categoryLabel: category.label,
@@ -744,13 +871,13 @@ async function processCategory(
fba: 0,
fbm: 0,
skip: 0,
outputFile: "",
status: "empty",
error: "No ASINs returned by Keepa",
results: [],
};
}
console.log(` Top ASINs fetched: ${topAsins.length}`);
log("info", ` Top ASINs fetched: ${topAsins.length}`);
const sellabilityMap = await fetchSellabilityMap(topAsins);
const availableAsins = topAsins.filter((asin) => {
@@ -758,7 +885,7 @@ async function processCategory(
return info?.canSell === true && info.sellabilityStatus === "available";
});
console.log(` Sellable ASINs: ${availableAsins.length}/${topAsins.length}`);
log("info", ` Sellable ASINs: ${availableAsins.length}/${topAsins.length}`);
if (availableAsins.length === 0) {
return {
categoryId: category.id,
@@ -768,9 +895,9 @@ async function processCategory(
fba: 0,
fbm: 0,
skip: 0,
outputFile: "",
status: "empty",
error: "No sellable ASINs",
results: [],
};
}
@@ -793,21 +920,8 @@ async function processCategory(
availableAsins,
sellabilityMap,
spApiMap,
titleByAsin,
).map((product) => {
const keepa = keepaMap.get(product.record.asin) ?? null;
const spApi = product.spApi;
if (keepa?.currentPrice && spApi.estimatedSalePrice === 0) {
spApi.estimatedSalePrice = keepa.currentPrice;
}
return {
...product,
keepa,
spApi,
};
});
keepaEnrichment,
);
const verdicts = await runLlmInBatches(enrichedProducts);
const verdictByAsin = new Map(verdicts.map((v) => [v.asin, v]));
@@ -822,9 +936,14 @@ async function processCategory(
},
}));
const outputName = `${sanitizeFileSegment(category.label)}_${category.id}.xlsx`;
const outputPath = path.join(outputDir, outputName);
writeCategoryResultsWorkbook(results, outputPath);
// No longer writing to XLSX, directly insert into DB
// const outputName = `${sanitizeFileSegment(category.label)}_${category.id}.xlsx`;
// const outputPath = path.join(outputDir, outputName);
// writeCategoryResultsWorkbook(results, outputPath);
// The categoryRunId will be provided by the main function after inserting the summary
// We need to pass it here or get it after inserting the summary in main.
// For now, let's assume it's handled in main.
const fba = results.filter((r) => r.verdict.verdict === "FBA").length;
const fbm = results.filter((r) => r.verdict.verdict === "FBM").length;
@@ -838,204 +957,77 @@ async function processCategory(
fba,
fbm,
skip,
outputFile: path.basename(outputPath),
status: "ok",
error: "",
results,
};
}
function buildCategoryOutputRow(r: AnalysisResult) {
const price =
r.product.keepa?.currentPrice ??
r.product.record.sellingPriceFromSheet ??
r.product.spApi.estimatedSalePrice;
const rank = r.product.keepa?.salesRank ?? r.product.record.amazonRank;
return {
ASIN: r.product.record.asin,
Name: r.product.record.name,
Brand: r.product.record.brand ?? "",
Category:
r.product.record.category ??
r.product.keepa?.categoryTree?.join(" > ") ??
"",
"Unit Cost": r.product.record.unitCost,
"Current Price": price ?? "",
"Avg Price 90d": r.product.keepa?.avgPrice90 ?? "",
"Avg Price 90d (sheet)": r.product.record.avgPrice90FromSheet ?? "",
"Selling Price (sheet)": r.product.record.sellingPriceFromSheet ?? "",
"Sales Rank": rank ?? "",
"Rank Avg 90d": r.product.keepa?.salesRankAvg90 ?? "",
Sellers: r.product.keepa?.sellerCount ?? "",
"Monthly Sold": r.product.keepa?.monthlySold ?? "",
"Rank Drops 30d": r.product.keepa?.salesRankDrops30 ?? "",
"Rank Drops 90d": r.product.keepa?.salesRankDrops90 ?? "",
"FBA Fee": r.product.spApi.fbaFee,
"FBM Fee": r.product.spApi.fbmFee,
"Referral %": r.product.spApi.referralFeePercent,
"Can Sell":
r.product.spApi.canSell == null
? "unknown"
: r.product.spApi.canSell
? "yes"
: "no",
Sellability: r.product.spApi.sellabilityStatus,
"Sellability Reason": r.product.spApi.sellabilityReason ?? "",
Verdict: r.verdict.verdict,
Confidence: r.verdict.confidence,
Reasoning: r.verdict.reasoning,
};
}
function writeCategoryResultsWorkbook(
results: AnalysisResult[],
outputPath: string,
): void {
const rows = results.map(buildCategoryOutputRow);
const ws = XLSX.utils.json_to_sheet(rows);
const wb = XLSX.utils.book_new();
XLSX.utils.book_append_sheet(wb, ws, "Results");
XLSX.writeFile(wb, outputPath);
console.log(`Results written to ${outputPath}`);
}
function writeConsolidatedWorkbook(
summaries: CategoryRunSummary[],
outputDir: string,
): string {
const workbook = XLSX.utils.book_new();
const summaryRows = summaries.map((row) => ({
"Category ID": row.categoryId,
"Category Label": row.categoryLabel,
"Top ASINs Checked": row.topAsinsChecked,
"Sellable ASINs": row.availableAsins,
FBA: row.fba,
FBM: row.fbm,
SKIP: row.skip,
Status: row.status,
"Output File": row.outputFile,
Error: row.error,
}));
const totals = summaries.reduce(
(acc, row) => {
acc.topAsinsChecked += row.topAsinsChecked;
acc.availableAsins += row.availableAsins;
acc.fba += row.fba;
acc.fbm += row.fbm;
acc.skip += row.skip;
if (row.status === "ok") acc.ok += 1;
if (row.status === "empty") acc.empty += 1;
if (row.status === "failed") acc.failed += 1;
return acc;
},
{
topAsinsChecked: 0,
availableAsins: 0,
fba: 0,
fbm: 0,
skip: 0,
ok: 0,
empty: 0,
failed: 0,
},
);
const overviewRows = [
{ Metric: "Categories total", Value: summaries.length },
{ Metric: "Categories with output", Value: totals.ok },
{ Metric: "Categories empty", Value: totals.empty },
{ Metric: "Categories failed", Value: totals.failed },
{ Metric: "Top ASINs checked", Value: totals.topAsinsChecked },
{ Metric: "Sellable ASINs", Value: totals.availableAsins },
{ Metric: "Total FBA verdicts", Value: totals.fba },
{ Metric: "Total FBM verdicts", Value: totals.fbm },
{ Metric: "Total SKIP verdicts", Value: totals.skip },
];
const summarySheet = XLSX.utils.json_to_sheet(summaryRows);
const overviewSheet = XLSX.utils.json_to_sheet(overviewRows);
XLSX.utils.book_append_sheet(workbook, overviewSheet, "Overview");
XLSX.utils.book_append_sheet(workbook, summarySheet, "ByCategory");
const outputPath = path.join(
outputDir,
"consolidated_bestsellers_summary.xlsx",
);
XLSX.writeFile(workbook, outputPath);
return outputPath;
}
function printSummary(
categories: CategoryInfo[],
processed: number,
generatedFiles: number,
totalTopAsins: number,
totalAvailableAsins: number,
): void {
console.log("\nRun summary");
console.log(`Categories discovered/selected: ${categories.length}`);
console.log(`Categories processed: ${processed}`);
console.log(`Category files written: ${generatedFiles}`);
console.log(`Top ASINs checked: ${totalTopAsins}`);
console.log(`Sellable ASINs enriched: ${totalAvailableAsins}`);
}
async function main(): Promise<void> {
export async function main(): Promise<void> {
const args = parseArgs();
assertSpApiPrerequisites();
mkdirSync(args.outputDir, { recursive: true });
const DB_PATH = path.join(args.outputDir, "analysis.sqlite");
initDb(DB_PATH);
const db = getDb(DB_PATH);
console.log("Starting per-category bestseller pipeline");
console.log(`Marketplace: ${config.spApiMarketplaceId}`);
console.log(`SP-API region: ${config.spApiRegion}`);
console.log(`Category limit: ${args.categoryLimit}`);
console.log(`Top ASINs per category: ${args.perCategoryTop}`);
console.log(`Output directory: ${args.outputDir}`);
console.log(`Blacklist file: ${args.blacklistFile}`);
log("info", "Starting per-category bestseller pipeline");
log("info", `Marketplace: ${config.spApiMarketplaceId}`);
log("info", `SP-API region: ${config.spApiRegion}`);
log("info", `Category limit: ${args.categoryLimit}`);
log("info", `Top ASINs per category: ${args.perCategoryTop}`);
// Removed outputDir logging as it's not directly used for XLSX anymore
// console.log(`Output directory: ${args.outputDir}`);
log("info", `Blacklist file: ${args.blacklistFile}`);
const categoryBlacklist = loadCategoryBlacklist(args.blacklistFile);
console.log(`Loaded ${categoryBlacklist.size} blacklisted category IDs.`);
log("info", `Loaded ${categoryBlacklist.size} blacklisted category IDs.`);
const categories = await discoverCategories(args.categoryLimit);
const allowedCategories = categories.filter(
(c) => !categoryBlacklist.has(c.id),
);
const blacklistedCount = categories.length - allowedCategories.length;
console.log(
log(
"info",
`Discovered ${categories.length} categories (${blacklistedCount} blacklisted, ${allowedCategories.length} to process).`,
);
let processed = 0;
let generatedFiles = 0;
let totalTopAsins = 0;
let totalAvailableAsins = 0;
const categorySummaries: CategoryRunSummary[] = [];
const runTimestamp = new Date().toISOString();
let processedCategories = 0;
let totalInsertedAsins = 0;
const allCategorySummaries: CategoryRunSummary[] = [];
for (const category of allowedCategories) {
let categorySummary: CategoryRunSummary;
try {
const outcome = await processCategory(
categorySummary = await processCategory(
db,
category,
args.perCategoryTop,
args.outputDir,
);
processed++;
totalTopAsins += outcome.topAsinsChecked;
totalAvailableAsins += outcome.availableAsins;
if (outcome.status === "ok") {
generatedFiles++;
const runId = await insertCategoryRunSummary(
db,
categorySummary,
runTimestamp,
);
if (categorySummary.results) {
await insertProductAnalysisResults(db, runId, categorySummary.results);
totalInsertedAsins += categorySummary.results.length;
}
categorySummaries.push(outcome);
processedCategories++;
allCategorySummaries.push({ ...categorySummary, runId });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
console.warn(
log(
"warn",
`Skipping category ${category.label} (${category.id}) due to error: ${message}`,
);
processed++;
categorySummaries.push({
categorySummary = {
categoryId: category.id,
categoryLabel: category.label,
topAsinsChecked: 0,
@@ -1043,26 +1035,24 @@ async function main(): Promise<void> {
fba: 0,
fbm: 0,
skip: 0,
outputFile: "",
status: "failed",
error: message,
});
results: [],
};
processedCategories++;
allCategorySummaries.push(categorySummary);
}
}
const consolidatedPath = writeConsolidatedWorkbook(
categorySummaries,
args.outputDir,
);
console.log(`Consolidated workbook written: ${consolidatedPath}`);
printSummary(
allowedCategories,
processed,
generatedFiles,
totalTopAsins,
totalAvailableAsins,
);
log("info", "\nRun summary");
log("info", `Categories discovered/selected: ${categories.length}`);
log("info", `Categories processed: ${processedCategories}`);
log("info", `Total ASINs inserted into DB: ${totalInsertedAsins}`);
}
await main();
if (import.meta.main) {
main().catch((err) => {
log("error", `Bestsellers process crashed: ${String(err)}`);
process.exit(1);
});
}