Refactor database interactions to use Drizzle ORM

- Replaced direct SQLite database calls with Drizzle ORM methods in `top-monthly-sold-by-category.ts`, `writer.ts`, and `upc-file-analysis.ts`.
- Updated test cases in `top-monthly-sold-by-category.test.ts` to mock the new database interactions.
- Removed unnecessary database initialization and cleanup code.
- Improved code readability and maintainability by using ORM features for inserting and updating records.
This commit is contained in:
Victor Noguera
2026-05-25 00:08:30 -04:00
parent 70e0e8a535
commit b982edd160
22 changed files with 2456 additions and 2766 deletions

View File

@@ -1,6 +1,8 @@
import { existsSync, mkdirSync, readFileSync } from "node:fs";
import path from "node:path";
import { type Database, getDb, initDb } from "./database.ts";
import { db } from "./db/index.ts";
import { runs, categoryProductResults } from "./db/schema.ts";
import { eq, sql } from "drizzle-orm";
import { config } from "./config.ts";
import { analyzeProducts } from "./llm.ts";
import { fetchSellabilityBatch, fetchSpApiPricingAndFees } from "./sp-api.ts";
@@ -171,36 +173,32 @@ function printUsageAndExit(message: string): never {
}
export async function insertCategoryRunSummary(
db: Database,
summary: CategoryRunSummary,
runTimestamp: string,
): Promise<number> {
const query = `
INSERT INTO category_analysis_runs (
category_id, category_label, run_timestamp,
top_asins_checked, available_asins,
fba_count, fbm_count, skip_count,
status, error_message
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
`;
const result = db.run(query, [
summary.categoryId,
summary.categoryLabel,
runTimestamp,
summary.topAsinsChecked,
summary.availableAsins,
summary.fba,
summary.fbm,
summary.skip,
summary.status,
summary.error,
]);
// Bun's SQLite client returns { changes: number, lastInsertRowid: number | bigint }
return Number(result.lastInsertRowid);
const [row] = await db
.insert(runs)
.values({
type: "category_analysis",
status: (summary.status as typeof runs.$inferInsert.status) ?? "running",
categoryId: summary.categoryId,
categoryLabel: summary.categoryLabel,
topAsinsChecked: summary.topAsinsChecked,
availableAsins: summary.availableAsins,
totalProducts: summary.topAsinsChecked,
fbaCount: summary.fba,
fbmCount: summary.fbm,
skipCount: summary.skip,
errorMessage: summary.error || null,
startedAt: new Date(runTimestamp),
})
.returning({ id: runs.id });
if (!row) throw new Error("Failed to insert category run.");
return row.id;
}
export async function updateCategoryRunSummary(
db: Database,
runId: number,
summary: Pick<
CategoryRunSummary,
@@ -213,136 +211,110 @@ export async function updateCategoryRunSummary(
| "error"
>,
): Promise<void> {
db.run(
`
UPDATE category_analysis_runs
SET
top_asins_checked = ?,
available_asins = ?,
fba_count = ?,
fbm_count = ?,
skip_count = ?,
status = ?,
error_message = ?
WHERE id = ?
`,
[
summary.topAsinsChecked,
summary.availableAsins,
summary.fba,
summary.fbm,
summary.skip,
summary.status,
summary.error,
runId,
],
);
await db
.update(runs)
.set({
topAsinsChecked: summary.topAsinsChecked,
availableAsins: summary.availableAsins,
totalProducts: summary.topAsinsChecked,
fbaCount: summary.fba,
fbmCount: summary.fbm,
skipCount: summary.skip,
status: summary.status as typeof runs.$inferInsert.status,
errorMessage: summary.error || null,
...(summary.status !== "running" ? { completedAt: new Date() } : {}),
})
.where(eq(runs.id, runId));
}
export async function insertProductAnalysisResults(
db: Database,
runId: number,
results: AnalysisResult[],
): Promise<void> {
if (results.length === 0) {
return;
}
if (results.length === 0) return;
const insertStmt = db.prepare(`
INSERT INTO product_analysis_results (
asin, run_id, name, brand, category, unit_cost,
current_price, avg_price_90d, avg_price_90d_sheet,
selling_price_sheet, sales_rank, sales_rank_avg_90d,
seller_count, amazon_is_seller, amazon_buybox_share_pct_90d,
monthly_sold, rank_drops_30d, rank_drops_90d,
fba_fee, fbm_fee, referral_percent, can_sell,
sellability_status, sellability_reason,
verdict, confidence, reasoning, fetched_at
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?
)
ON CONFLICT(asin) DO UPDATE SET
run_id = excluded.run_id,
name = excluded.name,
brand = excluded.brand,
category = excluded.category,
unit_cost = excluded.unit_cost,
current_price = excluded.current_price,
avg_price_90d = excluded.avg_price_90d,
avg_price_90d_sheet = excluded.avg_price_90d_sheet,
selling_price_sheet = excluded.selling_price_sheet,
sales_rank = excluded.sales_rank,
sales_rank_avg_90d = excluded.sales_rank_avg_90d,
seller_count = excluded.seller_count,
amazon_is_seller = excluded.amazon_is_seller,
amazon_buybox_share_pct_90d = excluded.amazon_buybox_share_pct_90d,
monthly_sold = excluded.monthly_sold,
rank_drops_30d = excluded.rank_drops_30d,
rank_drops_90d = excluded.rank_drops_90d,
fba_fee = excluded.fba_fee,
fbm_fee = excluded.fbm_fee,
referral_percent = excluded.referral_percent,
can_sell = excluded.can_sell,
sellability_status = excluded.sellability_status,
sellability_reason = excluded.sellability_reason,
verdict = excluded.verdict,
confidence = excluded.confidence,
reasoning = excluded.reasoning,
fetched_at = excluded.fetched_at;
`);
const rows = results.map((r) => {
const price =
r.product.keepa?.currentPrice ??
r.product.record.sellingPriceFromSheet ??
r.product.spApi.estimatedSalePrice;
const rank = r.product.keepa?.salesRank ?? r.product.record.amazonRank;
db.transaction((resultsBatch: AnalysisResult[]) => {
for (const r of resultsBatch) {
const price =
r.product.keepa?.currentPrice ??
r.product.record.sellingPriceFromSheet ??
r.product.spApi.estimatedSalePrice;
const rank = r.product.keepa?.salesRank ?? r.product.record.amazonRank;
insertStmt.run(
r.product.record.asin,
runId,
r.product.record.name,
r.product.record.brand ?? null,
return {
asin: r.product.record.asin,
runId,
name: r.product.record.name,
brand: r.product.record.brand ?? null,
category:
r.product.record.category ??
r.product.keepa?.categoryTree?.join(" > ") ??
null,
r.product.record.unitCost ?? null,
price ?? null,
r.product.keepa?.avgPrice90 ?? null,
r.product.record.avgPrice90FromSheet ?? null,
r.product.record.sellingPriceFromSheet ?? null,
rank ?? null,
r.product.keepa?.salesRankAvg90 ?? null,
r.product.keepa?.sellerCount ?? null,
r.product.keepa?.amazonIsSeller == null
? null
: r.product.keepa.amazonIsSeller
? 1
: 0,
r.product.keepa?.amazonBuyboxSharePct90d ?? null,
r.product.keepa?.monthlySold ?? null,
r.product.keepa?.salesRankDrops30 ?? null,
r.product.keepa?.salesRankDrops90 ?? null,
r.product.spApi.fbaFee ?? null,
r.product.spApi.fbmFee ?? null,
r.product.spApi.referralFeePercent ?? null,
r.product.keepa?.categoryTree?.join(" > ") ??
null,
unitCost: r.product.record.unitCost ?? null,
currentPrice: price ?? null,
avgPrice90d: r.product.keepa?.avgPrice90 ?? null,
avgPrice90dSheet: r.product.record.avgPrice90FromSheet ?? null,
sellingPriceSheet: r.product.record.sellingPriceFromSheet ?? null,
salesRank: rank ?? null,
salesRankAvg90d: r.product.keepa?.salesRankAvg90 ?? null,
sellerCount: r.product.keepa?.sellerCount ?? null,
amazonIsSeller: r.product.keepa?.amazonIsSeller ?? null,
amazonBuyboxSharePct90d: r.product.keepa?.amazonBuyboxSharePct90d ?? null,
monthlySold: r.product.keepa?.monthlySold ?? null,
rankDrops30d: r.product.keepa?.salesRankDrops30 ?? null,
rankDrops90d: r.product.keepa?.salesRankDrops90 ?? null,
fbaFee: r.product.spApi.fbaFee ?? null,
fbmFee: r.product.spApi.fbmFee ?? null,
referralPercent: r.product.spApi.referralFeePercent ?? null,
canSell:
r.product.spApi.canSell == null
? "unknown"
: r.product.spApi.canSell
? "yes"
: "no",
r.product.spApi.sellabilityStatus ?? null,
r.product.spApi.sellabilityReason ?? null,
r.verdict.verdict,
r.verdict.confidence,
r.verdict.reasoning ?? null,
r.product.fetchedAt,
);
}
})(results); // Execute the transaction with the results batch
sellabilityStatus: r.product.spApi.sellabilityStatus ?? null,
sellabilityReason: r.product.spApi.sellabilityReason ?? null,
verdict: r.verdict.verdict,
confidence: r.verdict.confidence,
reasoning: r.verdict.reasoning ?? null,
fetchedAt: new Date(r.product.fetchedAt),
};
});
await db
.insert(categoryProductResults)
.values(rows)
.onConflictDoUpdate({
target: categoryProductResults.asin,
set: {
runId: sql`EXCLUDED.run_id`,
name: sql`EXCLUDED.name`,
brand: sql`EXCLUDED.brand`,
category: sql`EXCLUDED.category`,
unitCost: sql`EXCLUDED.unit_cost`,
currentPrice: sql`EXCLUDED.current_price`,
avgPrice90d: sql`EXCLUDED.avg_price_90d`,
avgPrice90dSheet: sql`EXCLUDED.avg_price_90d_sheet`,
sellingPriceSheet: sql`EXCLUDED.selling_price_sheet`,
salesRank: sql`EXCLUDED.sales_rank`,
salesRankAvg90d: sql`EXCLUDED.sales_rank_avg_90d`,
sellerCount: sql`EXCLUDED.seller_count`,
amazonIsSeller: sql`EXCLUDED.amazon_is_seller`,
amazonBuyboxSharePct90d: sql`EXCLUDED.amazon_buybox_share_pct_90d`,
monthlySold: sql`EXCLUDED.monthly_sold`,
rankDrops30d: sql`EXCLUDED.rank_drops_30d`,
rankDrops90d: sql`EXCLUDED.rank_drops_90d`,
fbaFee: sql`EXCLUDED.fba_fee`,
fbmFee: sql`EXCLUDED.fbm_fee`,
referralPercent: sql`EXCLUDED.referral_percent`,
canSell: sql`EXCLUDED.can_sell`,
sellabilityStatus: sql`EXCLUDED.sellability_status`,
sellabilityReason: sql`EXCLUDED.sellability_reason`,
verdict: sql`EXCLUDED.verdict`,
confidence: sql`EXCLUDED.confidence`,
reasoning: sql`EXCLUDED.reasoning`,
fetchedAt: sql`EXCLUDED.fetched_at`,
},
});
}
function loadCategoryBlacklist(filePath: string): Set<number> {
@@ -1067,7 +1039,6 @@ function buildEnrichedProducts(
}
export async function processCategory(
db: Database,
runId: number,
category: CategoryInfo,
perCategoryTop: number,
@@ -1083,7 +1054,7 @@ export async function processCategory(
);
if (topAsins.length === 0) {
log("info", " Keepa returned no ASINs for this category.");
await updateCategoryRunSummary(db, runId, {
await updateCategoryRunSummary(runId, {
topAsinsChecked: 0,
availableAsins: 0,
fba: 0,
@@ -1127,7 +1098,7 @@ export async function processCategory(
` Sellable ASINs: ${availableAsins.length}/${uniqueTopAsins.length}`,
);
if (availableAsins.length === 0) {
await updateCategoryRunSummary(db, runId, {
await updateCategoryRunSummary(runId, {
topAsinsChecked: uniqueTopAsins.length,
availableAsins: 0,
fba: 0,
@@ -1164,7 +1135,7 @@ export async function processCategory(
);
if (selectedAsins.length === 0) {
await updateCategoryRunSummary(db, runId, {
await updateCategoryRunSummary(runId, {
topAsinsChecked: uniqueTopAsins.length,
availableAsins: 0,
fba: 0,
@@ -1231,7 +1202,7 @@ export async function processCategory(
},
}));
await insertProductAnalysisResults(db, runId, batchResults);
await insertProductAnalysisResults(runId, batchResults);
for (const result of batchResults) {
results.push(result);
@@ -1244,7 +1215,7 @@ export async function processCategory(
}
}
await updateCategoryRunSummary(db, runId, {
await updateCategoryRunSummary(runId, {
topAsinsChecked: uniqueTopAsins.length,
availableAsins: selectedAsins.length,
fba,
@@ -1264,7 +1235,7 @@ export async function processCategory(
}
}
await updateCategoryRunSummary(db, runId, {
await updateCategoryRunSummary(runId, {
topAsinsChecked: uniqueTopAsins.length,
availableAsins: selectedAsins.length,
fba,
@@ -1293,10 +1264,6 @@ export async function main(): Promise<void> {
assertSpApiPrerequisites();
mkdirSync(args.outputDir, { recursive: true });
const DB_PATH =
process.env.RESULTS_DB_PATH || path.join(process.cwd(), "db", "results.db");
initDb(DB_PATH);
const db = getDb(DB_PATH);
log("info", "Starting per-category monthly-sold pipeline");
log("info", `Marketplace: ${config.spApiMarketplaceId}`);
@@ -1333,7 +1300,6 @@ export async function main(): Promise<void> {
let runId: number | undefined;
try {
runId = await insertCategoryRunSummary(
db,
{
categoryId: category.id,
categoryLabel: category.label,
@@ -1350,7 +1316,6 @@ export async function main(): Promise<void> {
);
categorySummary = await processCategory(
db,
runId,
category,
args.perCategoryTop,
@@ -1382,7 +1347,7 @@ export async function main(): Promise<void> {
results: [],
};
if (runId) {
await updateCategoryRunSummary(db, runId, {
await updateCategoryRunSummary(runId, {
topAsinsChecked: 0,
availableAsins: 0,
fba: 0,