Refactor supplier analysis and product handling
- Updated `SupplierAnalysisResult` to include a `product` field and modified related tests. - Refactored `addRowsSheet` to accommodate changes in the product structure. - Enhanced UPC file analysis to utilize a new `toSupplierInputRecord` function for cleaner record creation. - Introduced new types for supplier input records and product observations. - Updated frontend components to handle new product details and analysis history. - Improved database writing functions to streamline run completion and error handling. - Added new API endpoints for product details and adjusted routing in the frontend.
This commit is contained in:
541
src/db/persistence.ts
Normal file
541
src/db/persistence.ts
Normal file
@@ -0,0 +1,541 @@
|
||||
import { sql } from "drizzle-orm";
|
||||
import { requireAsin, normalizeAsin } from "../asin.ts";
|
||||
import type {
|
||||
AnalysisResult,
|
||||
ProductRecord,
|
||||
SupplierAnalysisResult,
|
||||
} from "../types.ts";
|
||||
import { db } from "./index.ts";
|
||||
import {
|
||||
analysisRevisions,
|
||||
analysisRunStats,
|
||||
categoryRunDetails,
|
||||
productIdentifiers,
|
||||
productObservations,
|
||||
products,
|
||||
runItems,
|
||||
runs,
|
||||
sourcingInputs,
|
||||
supplierScores,
|
||||
upcResolutionCandidates,
|
||||
upcResolutions,
|
||||
} from "./schema.ts";
|
||||
|
||||
type Executor = any;
|
||||
type MetadataSource = "input" | "catalog";
|
||||
|
||||
type ProductSeed = {
|
||||
asin: string;
|
||||
name?: string | null;
|
||||
brand?: string | null;
|
||||
category?: string | null;
|
||||
metadataSource?: MetadataSource;
|
||||
fetchedAt?: Date;
|
||||
};
|
||||
|
||||
export type CategoryRunSummaryInput = {
|
||||
categoryId: number;
|
||||
categoryLabel: string;
|
||||
topAsinsChecked: number;
|
||||
availableAsins: number;
|
||||
fba: number;
|
||||
fbm: number;
|
||||
skip: number;
|
||||
status: "running" | "ok" | "empty" | "failed";
|
||||
error: string;
|
||||
};
|
||||
|
||||
export type RunCounts = {
|
||||
totalProducts: number;
|
||||
fbaCount: number;
|
||||
fbmCount: number;
|
||||
skipCount: number;
|
||||
};
|
||||
|
||||
function emptyToNull(value: string | undefined | null): string | null {
|
||||
const trimmed = value?.trim();
|
||||
return trimmed ? trimmed : null;
|
||||
}
|
||||
|
||||
function productCategory(record: ProductRecord, result: AnalysisResult): string | null {
|
||||
return emptyToNull(
|
||||
record.category ?? result.product.keepa?.categoryTree?.join(" > "),
|
||||
);
|
||||
}
|
||||
|
||||
export async function upsertProduct(
|
||||
seed: ProductSeed,
|
||||
executor: Executor = db,
|
||||
): Promise<string> {
|
||||
const asin = requireAsin(seed.asin);
|
||||
const now = seed.fetchedAt ?? new Date();
|
||||
const isCatalog = seed.metadataSource === "catalog";
|
||||
|
||||
await executor
|
||||
.insert(products)
|
||||
.values({
|
||||
asin,
|
||||
name: emptyToNull(seed.name),
|
||||
brand: emptyToNull(seed.brand),
|
||||
category: emptyToNull(seed.category),
|
||||
metadataFetchedAt: isCatalog ? now : null,
|
||||
firstSeenAt: now,
|
||||
lastSeenAt: now,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: products.asin,
|
||||
set: {
|
||||
lastSeenAt: sql`GREATEST(${products.lastSeenAt}, EXCLUDED.last_seen_at)`,
|
||||
name: isCatalog
|
||||
? sql`CASE WHEN EXCLUDED.metadata_fetched_at >= COALESCE(${products.metadataFetchedAt}, '-infinity'::timestamptz) AND NULLIF(EXCLUDED.name, '') IS NOT NULL THEN EXCLUDED.name ELSE ${products.name} END`
|
||||
: sql`COALESCE(${products.name}, NULLIF(EXCLUDED.name, ''))`,
|
||||
brand: isCatalog
|
||||
? sql`CASE WHEN EXCLUDED.metadata_fetched_at >= COALESCE(${products.metadataFetchedAt}, '-infinity'::timestamptz) AND NULLIF(EXCLUDED.brand, '') IS NOT NULL THEN EXCLUDED.brand ELSE ${products.brand} END`
|
||||
: sql`COALESCE(${products.brand}, NULLIF(EXCLUDED.brand, ''))`,
|
||||
category: isCatalog
|
||||
? sql`CASE WHEN EXCLUDED.metadata_fetched_at >= COALESCE(${products.metadataFetchedAt}, '-infinity'::timestamptz) AND NULLIF(EXCLUDED.category, '') IS NOT NULL THEN EXCLUDED.category ELSE ${products.category} END`
|
||||
: sql`COALESCE(${products.category}, NULLIF(EXCLUDED.category, ''))`,
|
||||
metadataFetchedAt: isCatalog
|
||||
? sql`GREATEST(COALESCE(${products.metadataFetchedAt}, '-infinity'::timestamptz), EXCLUDED.metadata_fetched_at)`
|
||||
: products.metadataFetchedAt,
|
||||
},
|
||||
});
|
||||
|
||||
return asin;
|
||||
}
|
||||
|
||||
export async function insertObservation(
|
||||
runId: number,
|
||||
result: AnalysisResult,
|
||||
source: string,
|
||||
executor: Executor = db,
|
||||
): Promise<number> {
|
||||
const fetchedAt = new Date(result.product.fetchedAt);
|
||||
const record = result.product.record;
|
||||
const keepa = result.product.keepa;
|
||||
const spApi = result.product.spApi;
|
||||
const asin = requireAsin(record.asin);
|
||||
const [observation] = await executor
|
||||
.insert(productObservations)
|
||||
.values({
|
||||
productAsin: asin,
|
||||
runId,
|
||||
source,
|
||||
currentPrice:
|
||||
keepa?.currentPrice ??
|
||||
record.sellingPriceFromSheet ??
|
||||
spApi.estimatedSalePrice ??
|
||||
null,
|
||||
avgPrice90d: keepa?.avgPrice90 ?? null,
|
||||
salesRank: keepa?.salesRank ?? record.amazonRank ?? null,
|
||||
salesRankAvg90d: keepa?.salesRankAvg90 ?? null,
|
||||
monthlySold: keepa?.monthlySold ?? null,
|
||||
rankDrops30d: keepa?.salesRankDrops30 ?? null,
|
||||
rankDrops90d: keepa?.salesRankDrops90 ?? null,
|
||||
sellerCount: keepa?.sellerCount ?? null,
|
||||
amazonIsSeller: keepa?.amazonIsSeller ?? null,
|
||||
amazonBuyboxSharePct90d: keepa?.amazonBuyboxSharePct90d ?? null,
|
||||
fbaFee: spApi.fbaFee ?? null,
|
||||
fbmFee: spApi.fbmFee ?? null,
|
||||
referralPercent: spApi.referralFeePercent ?? null,
|
||||
canSell: spApi.canSell,
|
||||
sellabilityStatus: spApi.sellabilityStatus,
|
||||
sellabilityReason: spApi.sellabilityReason ?? null,
|
||||
fetchedAt,
|
||||
})
|
||||
.returning({ id: productObservations.id });
|
||||
|
||||
if (!observation) throw new Error(`Failed to insert observation for ${asin}`);
|
||||
return observation.id;
|
||||
}
|
||||
|
||||
function sourcingInputValues(runItemId: number, record: ProductRecord) {
|
||||
return {
|
||||
runItemId,
|
||||
suppliedName: emptyToNull(record.name),
|
||||
suppliedBrand: emptyToNull(record.brand),
|
||||
suppliedCategory: emptyToNull(record.category),
|
||||
unitCost: record.unitCost ?? null,
|
||||
avgPrice90dSheet: record.avgPrice90FromSheet ?? null,
|
||||
sellingPriceSheet: record.sellingPriceFromSheet ?? null,
|
||||
fbaNetSheet: record.fbaNet ?? null,
|
||||
grossProfitDollar: record.grossProfit ?? null,
|
||||
grossProfitPct: record.grossProfitPct ?? null,
|
||||
netProfitSheet: record.netProfitFromSheet ?? null,
|
||||
roiSheet: record.roiFromSheet ?? null,
|
||||
moq: record.moq ?? null,
|
||||
moqCost: record.moqCost ?? null,
|
||||
qtyAvailable: record.totalQtyAvail ?? null,
|
||||
supplier: emptyToNull(record.supplier),
|
||||
sourceUrl: emptyToNull(record.sourceUrl),
|
||||
asinLink: emptyToNull(record.asinLink),
|
||||
promoCouponCode: emptyToNull(record.promoCouponCode),
|
||||
notes: emptyToNull(record.notes),
|
||||
leadDate: emptyToNull(record.leadDate),
|
||||
};
|
||||
}
|
||||
|
||||
export async function persistLlmResults(
|
||||
runId: number,
|
||||
results: AnalysisResult[],
|
||||
options: {
|
||||
source: string;
|
||||
metadataSource?: MetadataSource;
|
||||
preserveSourcingInput?: boolean;
|
||||
sourceInventoryIds?: Map<string, number>;
|
||||
},
|
||||
): Promise<void> {
|
||||
for (const result of results) {
|
||||
const record = result.product.record;
|
||||
const fetchedAt = new Date(result.product.fetchedAt);
|
||||
const asin = await upsertProduct({
|
||||
asin: record.asin,
|
||||
name: record.name,
|
||||
brand: record.brand,
|
||||
category: productCategory(record, result),
|
||||
metadataSource: options.metadataSource ?? "input",
|
||||
fetchedAt,
|
||||
});
|
||||
const [item] = await db
|
||||
.insert(runItems)
|
||||
.values({
|
||||
runId,
|
||||
productAsin: asin,
|
||||
sourceInventoryItemId: options.sourceInventoryIds?.get(asin) ?? null,
|
||||
})
|
||||
.returning({ id: runItems.id });
|
||||
if (!item) throw new Error(`Failed to insert run item for ${asin}`);
|
||||
|
||||
if (options.preserveSourcingInput) {
|
||||
await db.insert(sourcingInputs).values(sourcingInputValues(item.id, record));
|
||||
}
|
||||
|
||||
const observationId = await insertObservation(runId, result, options.source);
|
||||
await db.insert(analysisRevisions).values({
|
||||
runItemId: item.id,
|
||||
observationId,
|
||||
method: "llm",
|
||||
decision: result.verdict.verdict,
|
||||
confidence: result.verdict.confidence,
|
||||
reasoning: result.verdict.reasoning ?? null,
|
||||
analyzedAt: fetchedAt,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function supplierSourcingValues(runItemId: number, result: SupplierAnalysisResult) {
|
||||
return {
|
||||
runItemId,
|
||||
suppliedName: emptyToNull(result.record.name),
|
||||
suppliedBrand: emptyToNull(result.record.brand),
|
||||
suppliedCategory: emptyToNull(result.record.category),
|
||||
unitCost: result.record.unitCost ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
async function insertSupplierObservation(
|
||||
runId: number,
|
||||
productAsin: string,
|
||||
result: SupplierAnalysisResult,
|
||||
): Promise<number | null> {
|
||||
const keepa = result.keepa;
|
||||
const spApi = result.spApi;
|
||||
if (!spApi && !keepa) return null;
|
||||
|
||||
const [row] = await db
|
||||
.insert(productObservations)
|
||||
.values({
|
||||
productAsin,
|
||||
runId,
|
||||
source: "supplier_upc",
|
||||
currentPrice: result.score.salePrice,
|
||||
avgPrice90d: keepa?.avgPrice90 ?? null,
|
||||
salesRank: keepa?.salesRank ?? null,
|
||||
salesRankAvg90d: keepa?.salesRankAvg90 ?? null,
|
||||
monthlySold: keepa?.monthlySold ?? null,
|
||||
rankDrops30d: keepa?.salesRankDrops30 ?? null,
|
||||
rankDrops90d: keepa?.salesRankDrops90 ?? null,
|
||||
sellerCount: keepa?.sellerCount ?? null,
|
||||
amazonIsSeller: keepa?.amazonIsSeller ?? null,
|
||||
amazonBuyboxSharePct90d: keepa?.amazonBuyboxSharePct90d ?? null,
|
||||
fbaFee: spApi?.fbaFee ?? null,
|
||||
fbmFee: spApi?.fbmFee ?? null,
|
||||
referralPercent: spApi?.referralFeePercent ?? null,
|
||||
canSell: spApi?.canSell ?? null,
|
||||
sellabilityStatus: spApi?.sellabilityStatus ?? null,
|
||||
sellabilityReason: spApi?.sellabilityReason ?? null,
|
||||
fetchedAt: new Date(result.fetchedAt),
|
||||
})
|
||||
.returning({ id: productObservations.id });
|
||||
return row?.id ?? null;
|
||||
}
|
||||
|
||||
export async function persistSupplierResults(
|
||||
runId: number,
|
||||
results: SupplierAnalysisResult[],
|
||||
): Promise<void> {
|
||||
for (const result of results) {
|
||||
const resolvedAsin = normalizeAsin(result.lookup.asin);
|
||||
if (resolvedAsin) {
|
||||
await upsertProduct({
|
||||
asin: resolvedAsin,
|
||||
name: result.record.name,
|
||||
brand: result.record.brand,
|
||||
category: result.record.category,
|
||||
metadataSource: "input",
|
||||
fetchedAt: new Date(result.fetchedAt),
|
||||
});
|
||||
if (result.keepa?.categoryTree?.length) {
|
||||
await upsertProduct({
|
||||
asin: resolvedAsin,
|
||||
category: result.keepa.categoryTree.join(" > "),
|
||||
metadataSource: "catalog",
|
||||
fetchedAt: new Date(result.fetchedAt),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const [item] = await db
|
||||
.insert(runItems)
|
||||
.values({
|
||||
runId,
|
||||
productAsin: resolvedAsin,
|
||||
sourceRow: result.rowNumber ?? null,
|
||||
})
|
||||
.returning({ id: runItems.id });
|
||||
if (!item) throw new Error("Failed to insert supplier run item");
|
||||
|
||||
await db.insert(sourcingInputs).values(supplierSourcingValues(item.id, result));
|
||||
await db.insert(upcResolutions).values({
|
||||
runItemId: item.id,
|
||||
requestedUpc: result.upc,
|
||||
normalizedUpc: result.lookup.normalizedUpc,
|
||||
provider: result.lookup.provider ?? "unknown",
|
||||
status: result.lookup.status,
|
||||
reason: result.lookup.reason ?? null,
|
||||
resolvedProductAsin: resolvedAsin,
|
||||
resolvedAt: new Date(result.fetchedAt),
|
||||
});
|
||||
|
||||
for (const candidate of result.lookup.candidateAsins) {
|
||||
const candidateAsin = normalizeAsin(candidate);
|
||||
if (!candidateAsin) continue;
|
||||
await upsertProduct({ asin: candidateAsin, fetchedAt: new Date(result.fetchedAt) });
|
||||
await db
|
||||
.insert(upcResolutionCandidates)
|
||||
.values({ runItemId: item.id, productAsin: candidateAsin })
|
||||
.onConflictDoUpdate({
|
||||
target: [
|
||||
upcResolutionCandidates.runItemId,
|
||||
upcResolutionCandidates.productAsin,
|
||||
],
|
||||
set: { productAsin: sql`EXCLUDED.product_asin` },
|
||||
});
|
||||
}
|
||||
|
||||
if (resolvedAsin) {
|
||||
await db
|
||||
.insert(productIdentifiers)
|
||||
.values({
|
||||
productAsin: resolvedAsin,
|
||||
identifierType:
|
||||
result.lookup.normalizedUpc.length === 12
|
||||
? "upc"
|
||||
: result.lookup.normalizedUpc.length === 13
|
||||
? "ean"
|
||||
: "gtin",
|
||||
identifierValue: result.lookup.normalizedUpc,
|
||||
source: "supplier_upc",
|
||||
confirmedAt: new Date(result.fetchedAt),
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: [
|
||||
productIdentifiers.identifierType,
|
||||
productIdentifiers.identifierValue,
|
||||
],
|
||||
set: {
|
||||
productAsin: resolvedAsin,
|
||||
source: "supplier_upc",
|
||||
confirmedAt: new Date(result.fetchedAt),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const observationId = resolvedAsin
|
||||
? await insertSupplierObservation(runId, resolvedAsin, result)
|
||||
: null;
|
||||
const [revision] = await db
|
||||
.insert(analysisRevisions)
|
||||
.values({
|
||||
runItemId: item.id,
|
||||
observationId,
|
||||
method: "supplier_scoring",
|
||||
decision: result.score.verdict,
|
||||
confidence: result.score.score,
|
||||
reasoning: result.score.reason,
|
||||
analyzedAt: new Date(result.fetchedAt),
|
||||
})
|
||||
.returning({ id: analysisRevisions.id });
|
||||
if (!revision) throw new Error("Failed to insert supplier analysis revision");
|
||||
|
||||
await db.insert(supplierScores).values({
|
||||
revisionId: revision.id,
|
||||
score: result.score.score,
|
||||
salePrice: result.score.salePrice,
|
||||
fbaFee: result.score.fbaFee,
|
||||
profit: result.score.profit,
|
||||
margin: result.score.margin,
|
||||
roi: result.score.roi,
|
||||
reason: result.score.reason,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function createCategoryRun(
|
||||
summary: CategoryRunSummaryInput,
|
||||
runTimestamp: string,
|
||||
): Promise<number> {
|
||||
const [row] = await db
|
||||
.insert(runs)
|
||||
.values({
|
||||
type: "category_analysis",
|
||||
status: summary.status,
|
||||
errorMessage: summary.error || null,
|
||||
startedAt: new Date(runTimestamp),
|
||||
})
|
||||
.returning({ id: runs.id });
|
||||
if (!row) throw new Error("Failed to insert category run.");
|
||||
|
||||
await db.insert(categoryRunDetails).values({
|
||||
runId: row.id,
|
||||
categoryId: summary.categoryId,
|
||||
categoryLabel: summary.categoryLabel,
|
||||
checkedAsinCount: summary.topAsinsChecked,
|
||||
});
|
||||
await db.insert(analysisRunStats).values({
|
||||
runId: row.id,
|
||||
processedCount: summary.topAsinsChecked,
|
||||
availableCount: summary.availableAsins,
|
||||
analyzedCount: summary.fba + summary.fbm + summary.skip,
|
||||
fbaCount: summary.fba,
|
||||
fbmCount: summary.fbm,
|
||||
skipCount: summary.skip,
|
||||
});
|
||||
return row.id;
|
||||
}
|
||||
|
||||
export async function updateCategoryRun(
|
||||
runId: number,
|
||||
summary: Pick<
|
||||
CategoryRunSummaryInput,
|
||||
| "topAsinsChecked"
|
||||
| "availableAsins"
|
||||
| "fba"
|
||||
| "fbm"
|
||||
| "skip"
|
||||
| "status"
|
||||
| "error"
|
||||
>,
|
||||
): Promise<void> {
|
||||
await db
|
||||
.update(runs)
|
||||
.set({
|
||||
status: summary.status,
|
||||
errorMessage: summary.error || null,
|
||||
...(summary.status !== "running" ? { completedAt: new Date() } : {}),
|
||||
})
|
||||
.where(sql`${runs.id} = ${runId}`);
|
||||
await db
|
||||
.insert(categoryRunDetails)
|
||||
.values({
|
||||
runId,
|
||||
categoryId: 0,
|
||||
categoryLabel: "",
|
||||
checkedAsinCount: summary.topAsinsChecked,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: categoryRunDetails.runId,
|
||||
set: { checkedAsinCount: summary.topAsinsChecked },
|
||||
});
|
||||
await db
|
||||
.insert(analysisRunStats)
|
||||
.values({
|
||||
runId,
|
||||
processedCount: summary.topAsinsChecked,
|
||||
availableCount: summary.availableAsins,
|
||||
analyzedCount: summary.fba + summary.fbm + summary.skip,
|
||||
fbaCount: summary.fba,
|
||||
fbmCount: summary.fbm,
|
||||
skipCount: summary.skip,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: analysisRunStats.runId,
|
||||
set: {
|
||||
processedCount: summary.topAsinsChecked,
|
||||
availableCount: summary.availableAsins,
|
||||
analyzedCount: summary.fba + summary.fbm + summary.skip,
|
||||
fbaCount: summary.fba,
|
||||
fbmCount: summary.fbm,
|
||||
skipCount: summary.skip,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function refreshRunStats(runId: number): Promise<RunCounts> {
|
||||
const [stats] = await db.execute(
|
||||
sql<{
|
||||
total: string;
|
||||
fba: string | null;
|
||||
fbm: string | null;
|
||||
buy: string | null;
|
||||
watch: string | null;
|
||||
skip: string | null;
|
||||
}>`WITH latest AS (
|
||||
SELECT DISTINCT ON (ri.id) ar.decision
|
||||
FROM run_items ri
|
||||
JOIN analysis_revisions ar ON ar.run_item_id = ri.id
|
||||
WHERE ri.run_id = ${runId}
|
||||
ORDER BY ri.id, ar.analyzed_at DESC, ar.id DESC
|
||||
)
|
||||
SELECT
|
||||
COUNT(*) AS total,
|
||||
SUM(CASE WHEN decision = 'FBA' THEN 1 ELSE 0 END) AS fba,
|
||||
SUM(CASE WHEN decision = 'FBM' THEN 1 ELSE 0 END) AS fbm,
|
||||
SUM(CASE WHEN decision = 'BUY' THEN 1 ELSE 0 END) AS buy,
|
||||
SUM(CASE WHEN decision = 'WATCH' THEN 1 ELSE 0 END) AS watch,
|
||||
SUM(CASE WHEN decision = 'SKIP' THEN 1 ELSE 0 END) AS skip
|
||||
FROM latest`,
|
||||
);
|
||||
|
||||
const counts = {
|
||||
totalProducts: Number(stats?.total ?? 0),
|
||||
fbaCount: Number(stats?.fba ?? 0),
|
||||
fbmCount: Number(stats?.fbm ?? 0),
|
||||
skipCount: Number(stats?.skip ?? 0),
|
||||
};
|
||||
await db
|
||||
.insert(analysisRunStats)
|
||||
.values({
|
||||
runId,
|
||||
processedCount: counts.totalProducts,
|
||||
analyzedCount: counts.totalProducts,
|
||||
fbaCount: counts.fbaCount,
|
||||
fbmCount: counts.fbmCount,
|
||||
buyCount: Number(stats?.buy ?? 0),
|
||||
watchCount: Number(stats?.watch ?? 0),
|
||||
skipCount: counts.skipCount,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: analysisRunStats.runId,
|
||||
set: {
|
||||
processedCount: counts.totalProducts,
|
||||
analyzedCount: counts.totalProducts,
|
||||
fbaCount: counts.fbaCount,
|
||||
fbmCount: counts.fbmCount,
|
||||
buyCount: Number(stats?.buy ?? 0),
|
||||
watchCount: Number(stats?.watch ?? 0),
|
||||
skipCount: counts.skipCount,
|
||||
},
|
||||
});
|
||||
return counts;
|
||||
}
|
||||
Reference in New Issue
Block a user