|
@@ -4,7 +4,12 @@ import knex from "knex";
|
|
|
import PQueue from "p-queue";
|
|
import PQueue from "p-queue";
|
|
|
import { startOfHour } from "date-fns";
|
|
import { startOfHour } from "date-fns";
|
|
|
|
|
|
|
|
-const queue = new PQueue({ concurrency: 1 });
|
|
|
|
|
|
|
+let count = 0;
|
|
|
|
|
+const queue = new PQueue({ concurrency: 10 });
|
|
|
|
|
+
|
|
|
|
|
+queue.on("active", () =>
|
|
|
|
|
+ Math.random() > 0.5 ? console.log(count++) : count++
|
|
|
|
|
+);
|
|
|
|
|
|
|
|
// 1. Connect to Neo4j database
|
|
// 1. Connect to Neo4j database
|
|
|
const neo4j = NEO4J.driver(
|
|
const neo4j = NEO4J.driver(
|
|
@@ -27,155 +32,193 @@ const postgres = knex({
|
|
|
|
|
|
|
|
// 3. [NEO4J] Get all links
|
|
// 3. [NEO4J] Get all links
|
|
|
const session = neo4j.session();
|
|
const session = neo4j.session();
|
|
|
- session
|
|
|
|
|
- .run(
|
|
|
|
|
- "MATCH (l:URL) " +
|
|
|
|
|
- "OPTIONAL MATCH (l)-[:USES]->(d) " +
|
|
|
|
|
- "OPTIONAL MATCH (l)<-[:CREATED]-(u) " +
|
|
|
|
|
- "OPTIONAL MATCH (v)-[:VISITED]->(l) " +
|
|
|
|
|
- "OPTIONAL MATCH (v)-[:BROWSED_BY]->(b) " +
|
|
|
|
|
- "OPTIONAL MATCH (v)-[:OS]->(o) " +
|
|
|
|
|
- "OPTIONAL MATCH (v)-[:LOCATED_IN]->(c) " +
|
|
|
|
|
- "OPTIONAL MATCH (v)-[:REFERRED_BY]->(r) " +
|
|
|
|
|
- "OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " +
|
|
|
|
|
- "WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " +
|
|
|
|
|
- "LIMIT 100 RETURN l, u.email as email, d.name as domain, stats"
|
|
|
|
|
- )
|
|
|
|
|
- .subscribe({
|
|
|
|
|
- onNext(record) {
|
|
|
|
|
- queue.add(async () => {
|
|
|
|
|
- const link = record.get("l").properties;
|
|
|
|
|
- const email = record.get("email");
|
|
|
|
|
- const address = record.get("domain");
|
|
|
|
|
- const stats = record.get("stats");
|
|
|
|
|
|
|
+ const { records } = await session.run(
|
|
|
|
|
+ "MATCH (l:URL) WITH COUNT(l) as count RETURN count"
|
|
|
|
|
+ );
|
|
|
|
|
+ const total = records[0].get("count").toNumber();
|
|
|
|
|
+ const limit = 20000;
|
|
|
|
|
|
|
|
- // 4. Merge and normalize stats based on hour
|
|
|
|
|
- const visits: Record<
|
|
|
|
|
- string,
|
|
|
|
|
- Record<string, number | Record<string, number>>
|
|
|
|
|
- > = {} as any;
|
|
|
|
|
|
|
+ function main(index = 0) {
|
|
|
|
|
+ queue.add(
|
|
|
|
|
+ () =>
|
|
|
|
|
+ new Promise((resolve, reject) => {
|
|
|
|
|
+ session
|
|
|
|
|
+ .run(
|
|
|
|
|
+ "MATCH (l:URL) WITH l LIMIT $limit " +
|
|
|
|
|
+ "OPTIONAL MATCH (l)-[:USES]->(d) " +
|
|
|
|
|
+ "OPTIONAL MATCH (l)<-[:CREATED]-(u) " +
|
|
|
|
|
+ "OPTIONAL MATCH (v)-[:VISITED]->(l) " +
|
|
|
|
|
+ "OPTIONAL MATCH (v)-[:BROWSED_BY]->(b) " +
|
|
|
|
|
+ "OPTIONAL MATCH (v)-[:OS]->(o) " +
|
|
|
|
|
+ "OPTIONAL MATCH (v)-[:LOCATED_IN]->(c) " +
|
|
|
|
|
+ "OPTIONAL MATCH (v)-[:REFERRED_BY]->(r) " +
|
|
|
|
|
+ "OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " +
|
|
|
|
|
+ "WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " +
|
|
|
|
|
+ "RETURN l, u.email as email, d.name as domain, stats",
|
|
|
|
|
+ { limit: (index + 1) * limit }
|
|
|
|
|
+ )
|
|
|
|
|
+ .subscribe({
|
|
|
|
|
+ onNext(record) {
|
|
|
|
|
+ const endTime = Date.now();
|
|
|
|
|
+ console.log(
|
|
|
|
|
+ `✅ Done! It took ${(endTime - startTime) / 1000} seconds.`
|
|
|
|
|
+ );
|
|
|
|
|
+ queue.add(async () => {
|
|
|
|
|
+ const link = record.get("l").properties;
|
|
|
|
|
+ const email = record.get("email");
|
|
|
|
|
+ const address = record.get("domain");
|
|
|
|
|
+ const stats = record.get("stats");
|
|
|
|
|
|
|
|
- stats.forEach(([b, o, country, referrer, date]) => {
|
|
|
|
|
- const dateHour = startOfHour(date).toISOString();
|
|
|
|
|
- const browser = b.toLowerCase();
|
|
|
|
|
- const os = o === "Mac Os X" ? "macos" : o.toLowerCase();
|
|
|
|
|
- visits[dateHour] = {
|
|
|
|
|
- ...visits[dateHour],
|
|
|
|
|
- total:
|
|
|
|
|
- (((visits[dateHour] && visits[dateHour].total) as number) ||
|
|
|
|
|
- 0) + 1,
|
|
|
|
|
- [`br_${browser}`]:
|
|
|
|
|
- (((visits[dateHour] &&
|
|
|
|
|
- visits[dateHour][`br_${browser}`]) as number) || 0) + 1,
|
|
|
|
|
- [`os_${os}`]:
|
|
|
|
|
- (((visits[dateHour] &&
|
|
|
|
|
- visits[dateHour][`os_${os}`]) as number) || 0) + 1,
|
|
|
|
|
- countries: {
|
|
|
|
|
- ...((visits[dateHour] || {}).countries as {}),
|
|
|
|
|
- [country.toLowerCase()]:
|
|
|
|
|
- ((visits[dateHour] &&
|
|
|
|
|
- visits[dateHour].countries[country.toLowerCase()]) ||
|
|
|
|
|
- 0) + 1
|
|
|
|
|
- },
|
|
|
|
|
- referrers: {
|
|
|
|
|
- ...((visits[dateHour] || {}).referrers as {}),
|
|
|
|
|
- [referrer.toLowerCase()]:
|
|
|
|
|
- ((visits[dateHour] &&
|
|
|
|
|
- visits[dateHour].countries[referrer.toLowerCase()]) ||
|
|
|
|
|
- 0) + 1
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ // 4. Merge and normalize stats based on hour
|
|
|
|
|
+ const visits: Record<
|
|
|
|
|
+ string,
|
|
|
|
|
+ Record<string, number | Record<string, number>>
|
|
|
|
|
+ > = {} as any;
|
|
|
|
|
+
|
|
|
|
|
+ stats.forEach(([b, o, country, referrer, date]) => {
|
|
|
|
|
+ if (b && o && country && referrer && date) {
|
|
|
|
|
+ const dateHour = startOfHour(date).toISOString();
|
|
|
|
|
+ const browser = b.toLowerCase();
|
|
|
|
|
+ const os = o === "Mac Os X" ? "macos" : o.toLowerCase();
|
|
|
|
|
+ visits[dateHour] = {
|
|
|
|
|
+ ...visits[dateHour],
|
|
|
|
|
+ total:
|
|
|
|
|
+ (((visits[dateHour] &&
|
|
|
|
|
+ visits[dateHour].total) as number) || 0) + 1,
|
|
|
|
|
+ [`br_${browser}`]:
|
|
|
|
|
+ (((visits[dateHour] &&
|
|
|
|
|
+ visits[dateHour][`br_${browser}`]) as number) ||
|
|
|
|
|
+ 0) + 1,
|
|
|
|
|
+ [`os_${os}`]:
|
|
|
|
|
+ (((visits[dateHour] &&
|
|
|
|
|
+ visits[dateHour][`os_${os}`]) as number) || 0) + 1,
|
|
|
|
|
+ countries: {
|
|
|
|
|
+ ...((visits[dateHour] || {}).countries as {}),
|
|
|
|
|
+ [country.toLowerCase()]:
|
|
|
|
|
+ ((visits[dateHour] &&
|
|
|
|
|
+ visits[dateHour].countries[
|
|
|
|
|
+ country.toLowerCase()
|
|
|
|
|
+ ]) ||
|
|
|
|
|
+ 0) + 1
|
|
|
|
|
+ },
|
|
|
|
|
+ referrers: {
|
|
|
|
|
+ ...((visits[dateHour] || {}).referrers as {}),
|
|
|
|
|
+ [referrer.toLowerCase()]:
|
|
|
|
|
+ ((visits[dateHour] &&
|
|
|
|
|
+ visits[dateHour].countries[
|
|
|
|
|
+ referrer.toLowerCase()
|
|
|
|
|
+ ]) ||
|
|
|
|
|
+ 0) + 1
|
|
|
|
|
+ }
|
|
|
|
|
+ };
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
|
|
|
- // 5. [Postgres] Find matching user and or domain
|
|
|
|
|
- const [user, domain] = await Promise.all([
|
|
|
|
|
- email &&
|
|
|
|
|
- postgres<User>("users")
|
|
|
|
|
- .where({ email })
|
|
|
|
|
- .first(),
|
|
|
|
|
- address &&
|
|
|
|
|
- postgres<Domain>("domains")
|
|
|
|
|
- .where({ address })
|
|
|
|
|
- .first()
|
|
|
|
|
- ]);
|
|
|
|
|
|
|
+ // 5. [Postgres] Find matching user and or domain
|
|
|
|
|
+ const [user, domain] = await Promise.all([
|
|
|
|
|
+ email &&
|
|
|
|
|
+ postgres<User>("users")
|
|
|
|
|
+ .where({ email })
|
|
|
|
|
+ .first(),
|
|
|
|
|
+ address &&
|
|
|
|
|
+ postgres<Domain>("domains")
|
|
|
|
|
+ .where({ address })
|
|
|
|
|
+ .first()
|
|
|
|
|
+ ]);
|
|
|
|
|
|
|
|
- // 6. [Postgres] Create link
|
|
|
|
|
- const data = {
|
|
|
|
|
- address: link.id,
|
|
|
|
|
- banned: !!link.banned,
|
|
|
|
|
- domain_id: domain ? domain.id : null,
|
|
|
|
|
- password: link.password,
|
|
|
|
|
- target: link.target,
|
|
|
|
|
- user_id: user ? user.id : null,
|
|
|
|
|
- ...(link.count && { visit_count: link.count.toNumber() }),
|
|
|
|
|
- ...(link.createdAt && { created_at: link.createdAt })
|
|
|
|
|
- };
|
|
|
|
|
|
|
+ // 6. [Postgres] Create link
|
|
|
|
|
+ const data = {
|
|
|
|
|
+ address: link.id,
|
|
|
|
|
+ banned: !!link.banned,
|
|
|
|
|
+ domain_id: domain ? domain.id : null,
|
|
|
|
|
+ password: link.password,
|
|
|
|
|
+ target: link.target,
|
|
|
|
|
+ user_id: user ? user.id : null,
|
|
|
|
|
+ ...(link.count && { visit_count: link.count.toNumber() }),
|
|
|
|
|
+ ...(link.createdAt && { created_at: link.createdAt })
|
|
|
|
|
+ };
|
|
|
|
|
|
|
|
- const exists = await postgres<Link>("links")
|
|
|
|
|
- .where({ address: link.id })
|
|
|
|
|
- .first();
|
|
|
|
|
|
|
+ const exists = await postgres<Link>("links")
|
|
|
|
|
+ .where({ address: link.id })
|
|
|
|
|
+ .first();
|
|
|
|
|
|
|
|
- let link_id: number;
|
|
|
|
|
- if (exists) {
|
|
|
|
|
- const res = await postgres<Link>("links")
|
|
|
|
|
- .where("id", exists.id)
|
|
|
|
|
- .update(data, "id");
|
|
|
|
|
- link_id = res[0];
|
|
|
|
|
- } else {
|
|
|
|
|
- const res = await postgres<Link>("links").insert(data, "id");
|
|
|
|
|
- link_id = res[0];
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ let link_id: number;
|
|
|
|
|
+ if (exists) {
|
|
|
|
|
+ const res = await postgres<Link>("links")
|
|
|
|
|
+ .where("id", exists.id)
|
|
|
|
|
+ .update(data, "id");
|
|
|
|
|
+ link_id = res[0];
|
|
|
|
|
+ } else {
|
|
|
|
|
+ const res = await postgres<Link>("links").insert(
|
|
|
|
|
+ data,
|
|
|
|
|
+ "id"
|
|
|
|
|
+ );
|
|
|
|
|
+ link_id = res[0];
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // 7. [Postgres] Create visits
|
|
|
|
|
- for await (const [date, details] of Object.entries(visits)) {
|
|
|
|
|
- const data = {
|
|
|
|
|
- link_id,
|
|
|
|
|
- created_at: date,
|
|
|
|
|
- countries: details.countries as Record<string, number>,
|
|
|
|
|
- referrers: details.referrers as Record<string, number>,
|
|
|
|
|
- total: details.total as number,
|
|
|
|
|
- br_chrome: details.br_chrome as number,
|
|
|
|
|
- br_edge: details.br_edge as number,
|
|
|
|
|
- br_firefox: details.br_firefox as number,
|
|
|
|
|
- br_ie: details.br_ie as number,
|
|
|
|
|
- br_opera: details.br_opera as number,
|
|
|
|
|
- br_other: details.br_other as number,
|
|
|
|
|
- br_safari: details.br_safari as number,
|
|
|
|
|
- os_android: details.os_android as number,
|
|
|
|
|
- os_ios: details.os_ios as number,
|
|
|
|
|
- os_linux: details.os_linux as number,
|
|
|
|
|
- os_macos: details.os_macos as number,
|
|
|
|
|
- os_other: details.os_other as number,
|
|
|
|
|
- os_windows: details.os_windows as number
|
|
|
|
|
- };
|
|
|
|
|
|
|
+ // 7. [Postgres] Create visits
|
|
|
|
|
+ for await (const [date, details] of Object.entries(visits)) {
|
|
|
|
|
+ const data = {
|
|
|
|
|
+ link_id,
|
|
|
|
|
+ created_at: date,
|
|
|
|
|
+ countries: details.countries as Record<string, number>,
|
|
|
|
|
+ referrers: details.referrers as Record<string, number>,
|
|
|
|
|
+ total: details.total as number,
|
|
|
|
|
+ br_chrome: details.br_chrome as number,
|
|
|
|
|
+ br_edge: details.br_edge as number,
|
|
|
|
|
+ br_firefox: details.br_firefox as number,
|
|
|
|
|
+ br_ie: details.br_ie as number,
|
|
|
|
|
+ br_opera: details.br_opera as number,
|
|
|
|
|
+ br_other: details.br_other as number,
|
|
|
|
|
+ br_safari: details.br_safari as number,
|
|
|
|
|
+ os_android: details.os_android as number,
|
|
|
|
|
+ os_ios: details.os_ios as number,
|
|
|
|
|
+ os_linux: details.os_linux as number,
|
|
|
|
|
+ os_macos: details.os_macos as number,
|
|
|
|
|
+ os_other: details.os_other as number,
|
|
|
|
|
+ os_windows: details.os_windows as number
|
|
|
|
|
+ };
|
|
|
|
|
|
|
|
- const visitExists = await postgres<Visit>("visits")
|
|
|
|
|
- .where({ link_id, created_at: data.created_at })
|
|
|
|
|
- .first();
|
|
|
|
|
|
|
+ const visitExists = await postgres<Visit>("visits")
|
|
|
|
|
+ .where({ link_id, created_at: data.created_at })
|
|
|
|
|
+ .first();
|
|
|
|
|
|
|
|
- if (visitExists) {
|
|
|
|
|
- await postgres<Visit>("visits")
|
|
|
|
|
- .where("id", visitExists.id)
|
|
|
|
|
- .update(data);
|
|
|
|
|
- } else {
|
|
|
|
|
- await postgres<Visit>("visits").insert(data);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
- },
|
|
|
|
|
- onCompleted() {
|
|
|
|
|
- session.close();
|
|
|
|
|
- queue.add(() => {
|
|
|
|
|
- const endTime = Date.now();
|
|
|
|
|
- console.log(
|
|
|
|
|
- `✅ Done! It took ${(endTime - startTime) / 1000} seconds.`
|
|
|
|
|
- );
|
|
|
|
|
- });
|
|
|
|
|
- },
|
|
|
|
|
- onError(error) {
|
|
|
|
|
- console.log(error);
|
|
|
|
|
- session.close();
|
|
|
|
|
- throw error;
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ if (visitExists) {
|
|
|
|
|
+ await postgres<Visit>("visits")
|
|
|
|
|
+ .where("id", visitExists.id)
|
|
|
|
|
+ .update(data);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ await postgres<Visit>("visits").insert(data);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ },
|
|
|
|
|
+ onCompleted() {
|
|
|
|
|
+ session.close();
|
|
|
|
|
+ if ((index + 1) * limit < total) {
|
|
|
|
|
+ queue.add(() => main(index + 1));
|
|
|
|
|
+ } else {
|
|
|
|
|
+ queue.add(() => {
|
|
|
|
|
+ const endTime = Date.now();
|
|
|
|
|
+ console.log(
|
|
|
|
|
+ `✅ Done! It took ${(endTime - startTime) /
|
|
|
|
|
+ 1000} seconds.`
|
|
|
|
|
+ );
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ resolve();
|
|
|
|
|
+ },
|
|
|
|
|
+ onError(error) {
|
|
|
|
|
+ console.log(error);
|
|
|
|
|
+ session.close();
|
|
|
|
|
+ if ((index + 1) * limit < total) {
|
|
|
|
|
+ queue.add(() => main(index + 1));
|
|
|
|
|
+ }
|
|
|
|
|
+ reject(error);
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ })
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ main();
|
|
|
})();
|
|
})();
|