require("dotenv").config(); import { v1 as NEO4J } from "neo4j-driver"; import knex from "knex"; import PQueue from "p-queue"; import { startOfHour } from "date-fns"; let count = 0; const queue = new PQueue({ concurrency: 5 }); queue.on("active", () => (count % 1000 === 0 ? console.log(count++) : count++)); // 1. Connect to Neo4j database const neo4j = NEO4J.driver( process.env.NEO4J_DB_URI, NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD) ); // 2. Connect to Postgres database const postgres = knex({ client: "postgres", connection: { host: process.env.DB_HOST, database: process.env.DB_NAME, user: process.env.DB_USER, password: process.env.DB_PASSWORD } }); (async function() { const startTime = Date.now(); // 3. [NEO4J] Get all links const session = neo4j.session(); const { records } = await session.run( "MATCH (l:URL) WITH COUNT(l) as count RETURN count" ); const total = records[0].get("count").toNumber(); const limit = 20000; function main(index = 0) { queue.add( () => new Promise((resolve, reject) => { session .run( "MATCH (l:URL) WITH l SKIP $skip LIMIT $limit " + "OPTIONAL MATCH (l)-[:USES]->(d) " + "OPTIONAL MATCH (l)<-[:CREATED]-(u) " + "OPTIONAL MATCH (v)-[:VISITED]->(l) " + "OPTIONAL MATCH (v)-[:BROWSED_BY]->(b) " + "OPTIONAL MATCH (v)-[:OS]->(o) " + "OPTIONAL MATCH (v)-[:LOCATED_IN]->(c) " + "OPTIONAL MATCH (v)-[:REFERRED_BY]->(r) " + "OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " + "WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " + "RETURN l, u.email as email, d.name as domain, stats", { limit: limit, skip: index * limit } ) .subscribe({ onNext(record) { queue.add(async () => { const link = record.get("l").properties; const email = record.get("email"); const address = record.get("domain"); const stats = record.get("stats"); // 4. Merge and normalize stats based on hour const visits: Record< string, Record> > = {} as any; stats.forEach(([b, o, country, referrer, date]) => { if (b && o && country && referrer && date) { const dateHour = startOfHour( new Date(date) ).toISOString(); const browser = b.toLowerCase(); const os = o === "Mac Os X" ? "macos" : o.toLowerCase(); visits[dateHour] = { ...visits[dateHour], total: (((visits[dateHour] && visits[dateHour].total) as number) || 0) + 1, [`br_${browser}`]: (((visits[dateHour] && visits[dateHour][`br_${browser}`]) as number) || 0) + 1, [`os_${os}`]: (((visits[dateHour] && visits[dateHour][`os_${os}`]) as number) || 0) + 1, countries: { ...((visits[dateHour] || {}).countries as {}), [country.toLowerCase()]: ((visits[dateHour] && visits[dateHour].countries[ country.toLowerCase() ]) || 0) + 1 }, referrers: { ...((visits[dateHour] || {}).referrers as {}), [referrer.toLowerCase()]: ((visits[dateHour] && visits[dateHour].referrers[ referrer.toLowerCase() ]) || 0) + 1 } }; } }); // 5. [Postgres] Find matching user and or domain const [user, domain] = await Promise.all([ email && postgres("users") .where({ email }) .first(), address && postgres("domains") .where({ address }) .first() ]); // 6. [Postgres] Create link const data = { address: link.id, banned: !!link.banned, domain_id: domain ? domain.id : null, password: link.password, target: link.target, user_id: user ? user.id : null, ...(link.count && { visit_count: link.count.toNumber() }), ...(link.createdAt && { created_at: link.createdAt }) }; const res = await postgres("links").insert(data, "id"); const link_id = res[0]; // 7. [Postgres] Create visits const newVisits = Object.entries(visits).map( ([date, details]) => ({ link_id, created_at: date, countries: details.countries as Record, referrers: details.referrers as Record, total: details.total as number, br_chrome: details.br_chrome as number, br_edge: details.br_edge as number, br_firefox: details.br_firefox as number, br_ie: details.br_ie as number, br_opera: details.br_opera as number, br_other: details.br_other as number, br_safari: details.br_safari as number, os_android: details.os_android as number, os_ios: details.os_ios as number, os_linux: details.os_linux as number, os_macos: details.os_macos as number, os_other: details.os_other as number, os_windows: details.os_windows as number }) ); await postgres("visits").insert(newVisits); }); }, onCompleted() { session.close(); if ((index + 1) * limit < total) { queue.add(() => main(index + 1)); } else { queue.add(() => { const endTime = Date.now(); console.log( `✅ Done! It took ${(endTime - startTime) / 1000} seconds.` ); }); } resolve(); }, onError(error) { session.close(); if ((index + 1) * limit < total) { queue.add(() => main(index + 1)); } reject(error); } }); }) ); } main(); })();