04_links.ts 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. require("dotenv").config();
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import knex from "knex";
  4. import PQueue from "p-queue";
  5. import { startOfHour } from "date-fns";
  6. let count = 0;
  7. const queue = new PQueue({ concurrency: 5 });
  8. queue.on("active", () => (count % 1000 === 0 ? console.log(count++) : count++));
  9. // 1. Connect to Neo4j database
  10. const neo4j = NEO4J.driver(
  11. process.env.NEO4J_DB_URI,
  12. NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD)
  13. );
  14. // 2. Connect to Postgres database
  15. const postgres = knex({
  16. client: "postgres",
  17. connection: {
  18. host: process.env.DB_HOST,
  19. database: process.env.DB_NAME,
  20. user: process.env.DB_USER,
  21. password: process.env.DB_PASSWORD
  22. }
  23. });
  24. (async function() {
  25. const startTime = Date.now();
  26. // 3. [NEO4J] Get all links
  27. const session = neo4j.session();
  28. const { records } = await session.run(
  29. "MATCH (l:URL) WITH COUNT(l) as count RETURN count"
  30. );
  31. const total = records[0].get("count").toNumber();
  32. const limit = 20000;
  33. function main(index = 0) {
  34. queue.add(
  35. () =>
  36. new Promise((resolve, reject) => {
  37. session
  38. .run(
  39. "MATCH (l:URL) WITH l SKIP $skip LIMIT $limit " +
  40. "OPTIONAL MATCH (l)-[:USES]->(d) " +
  41. "OPTIONAL MATCH (l)<-[:CREATED]-(u) " +
  42. "OPTIONAL MATCH (v)-[:VISITED]->(l) " +
  43. "OPTIONAL MATCH (v)-[:BROWSED_BY]->(b) " +
  44. "OPTIONAL MATCH (v)-[:OS]->(o) " +
  45. "OPTIONAL MATCH (v)-[:LOCATED_IN]->(c) " +
  46. "OPTIONAL MATCH (v)-[:REFERRED_BY]->(r) " +
  47. "OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " +
  48. "WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " +
  49. "RETURN l, u.email as email, d.name as domain, stats",
  50. { limit: limit, skip: index * limit }
  51. )
  52. .subscribe({
  53. onNext(record) {
  54. queue.add(async () => {
  55. const link = record.get("l").properties;
  56. const email = record.get("email");
  57. const address = record.get("domain");
  58. const stats = record.get("stats");
  59. // 4. Merge and normalize stats based on hour
  60. const visits: Record<
  61. string,
  62. Record<string, number | Record<string, number>>
  63. > = {} as any;
  64. stats.forEach(([b, o, country, referrer, date]) => {
  65. if (b && o && country && referrer && date) {
  66. const dateHour = startOfHour(date).toISOString();
  67. const browser = b.toLowerCase();
  68. const os = o === "Mac Os X" ? "macos" : o.toLowerCase();
  69. visits[dateHour] = {
  70. ...visits[dateHour],
  71. total:
  72. (((visits[dateHour] &&
  73. visits[dateHour].total) as number) || 0) + 1,
  74. [`br_${browser}`]:
  75. (((visits[dateHour] &&
  76. visits[dateHour][`br_${browser}`]) as number) ||
  77. 0) + 1,
  78. [`os_${os}`]:
  79. (((visits[dateHour] &&
  80. visits[dateHour][`os_${os}`]) as number) || 0) + 1,
  81. countries: {
  82. ...((visits[dateHour] || {}).countries as {}),
  83. [country.toLowerCase()]:
  84. ((visits[dateHour] &&
  85. visits[dateHour].countries[
  86. country.toLowerCase()
  87. ]) ||
  88. 0) + 1
  89. },
  90. referrers: {
  91. ...((visits[dateHour] || {}).referrers as {}),
  92. [referrer.toLowerCase()]:
  93. ((visits[dateHour] &&
  94. visits[dateHour].referrers[
  95. referrer.toLowerCase()
  96. ]) ||
  97. 0) + 1
  98. }
  99. };
  100. }
  101. });
  102. // 5. [Postgres] Find matching user and or domain
  103. const [user, domain] = await Promise.all([
  104. email &&
  105. postgres<User>("users")
  106. .where({ email })
  107. .first(),
  108. address &&
  109. postgres<Domain>("domains")
  110. .where({ address })
  111. .first()
  112. ]);
  113. // 6. [Postgres] Create link
  114. const data = {
  115. address: link.id,
  116. banned: !!link.banned,
  117. domain_id: domain ? domain.id : null,
  118. password: link.password,
  119. target: link.target,
  120. user_id: user ? user.id : null,
  121. ...(link.count && { visit_count: link.count.toNumber() }),
  122. ...(link.createdAt && { created_at: link.createdAt })
  123. };
  124. const res = await postgres<Link>("links").insert(data, "id");
  125. const link_id = res[0];
  126. // 7. [Postgres] Create visits
  127. const newVisits = Object.entries(visits).map(
  128. ([date, details]) => ({
  129. link_id,
  130. created_at: date,
  131. countries: details.countries as Record<string, number>,
  132. referrers: details.referrers as Record<string, number>,
  133. total: details.total as number,
  134. br_chrome: details.br_chrome as number,
  135. br_edge: details.br_edge as number,
  136. br_firefox: details.br_firefox as number,
  137. br_ie: details.br_ie as number,
  138. br_opera: details.br_opera as number,
  139. br_other: details.br_other as number,
  140. br_safari: details.br_safari as number,
  141. os_android: details.os_android as number,
  142. os_ios: details.os_ios as number,
  143. os_linux: details.os_linux as number,
  144. os_macos: details.os_macos as number,
  145. os_other: details.os_other as number,
  146. os_windows: details.os_windows as number
  147. })
  148. );
  149. await postgres<Visit>("visits").insert(newVisits);
  150. });
  151. },
  152. onCompleted() {
  153. session.close();
  154. if ((index + 1) * limit < total) {
  155. queue.add(() => main(index + 1));
  156. } else {
  157. queue.add(() => {
  158. const endTime = Date.now();
  159. console.log(
  160. `✅ Done! It took ${(endTime - startTime) /
  161. 1000} seconds.`
  162. );
  163. });
  164. }
  165. resolve();
  166. },
  167. onError(error) {
  168. session.close();
  169. if ((index + 1) * limit < total) {
  170. queue.add(() => main(index + 1));
  171. }
  172. reject(error);
  173. }
  174. });
  175. })
  176. );
  177. }
  178. main();
  179. })();