04_links.ts 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. require("dotenv").config();
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import knex from "knex";
  4. import PQueue from "p-queue";
  5. import { startOfHour } from "date-fns";
  6. let count = 0;
  7. const queue = new PQueue({ concurrency: 5 });
  8. queue.on("active", () => (count % 1000 === 0 ? console.log(count++) : count++));
  9. // 1. Connect to Neo4j database
  10. const neo4j = NEO4J.driver(
  11. process.env.NEO4J_DB_URI,
  12. NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD)
  13. );
  14. // 2. Connect to Postgres database
  15. const postgres = knex({
  16. client: "postgres",
  17. connection: {
  18. host: process.env.DB_HOST,
  19. database: process.env.DB_NAME,
  20. user: process.env.DB_USER,
  21. password: process.env.DB_PASSWORD
  22. }
  23. });
  24. (async function() {
  25. const startTime = Date.now();
  26. // 3. [NEO4J] Get all links
  27. const session = neo4j.session();
  28. const { records } = await session.run(
  29. "MATCH (l:URL) WITH COUNT(l) as count RETURN count"
  30. );
  31. const total = records[0].get("count").toNumber();
  32. const limit = 20000;
  33. function main(index = 0) {
  34. queue.add(
  35. () =>
  36. new Promise((resolve, reject) => {
  37. session
  38. .run(
  39. "MATCH (l:URL) WITH l SKIP $skip LIMIT $limit " +
  40. "OPTIONAL MATCH (l)-[:USES]->(d) " +
  41. "OPTIONAL MATCH (l)<-[:CREATED]-(u) " +
  42. "OPTIONAL MATCH (v)-[:VISITED]->(l) " +
  43. "OPTIONAL MATCH (v)-[:BROWSED_BY]->(b) " +
  44. "OPTIONAL MATCH (v)-[:OS]->(o) " +
  45. "OPTIONAL MATCH (v)-[:LOCATED_IN]->(c) " +
  46. "OPTIONAL MATCH (v)-[:REFERRED_BY]->(r) " +
  47. "OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " +
  48. "WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " +
  49. "RETURN l, u.email as email, d.name as domain, stats",
  50. { limit: limit, skip: index * limit }
  51. )
  52. .subscribe({
  53. onNext(record) {
  54. queue.add(async () => {
  55. const link = record.get("l").properties;
  56. const email = record.get("email");
  57. const address = record.get("domain");
  58. const stats = record.get("stats");
  59. // 4. Merge and normalize stats based on hour
  60. const visits: Record<
  61. string,
  62. Record<string, number | Record<string, number>>
  63. > = {} as any;
  64. stats.forEach(([b, o, country, referrer, date]) => {
  65. if (b && o && country && referrer && date) {
  66. const dateHour = startOfHour(
  67. new Date(date)
  68. ).toISOString();
  69. const browser = b.toLowerCase();
  70. const os = o === "Mac Os X" ? "macos" : o.toLowerCase();
  71. visits[dateHour] = {
  72. ...visits[dateHour],
  73. total:
  74. (((visits[dateHour] &&
  75. visits[dateHour].total) as number) || 0) + 1,
  76. [`br_${browser}`]:
  77. (((visits[dateHour] &&
  78. visits[dateHour][`br_${browser}`]) as number) ||
  79. 0) + 1,
  80. [`os_${os}`]:
  81. (((visits[dateHour] &&
  82. visits[dateHour][`os_${os}`]) as number) || 0) + 1,
  83. countries: {
  84. ...((visits[dateHour] || {}).countries as {}),
  85. [country.toLowerCase()]:
  86. ((visits[dateHour] &&
  87. visits[dateHour].countries[
  88. country.toLowerCase()
  89. ]) ||
  90. 0) + 1
  91. },
  92. referrers: {
  93. ...((visits[dateHour] || {}).referrers as {}),
  94. [referrer.toLowerCase()]:
  95. ((visits[dateHour] &&
  96. visits[dateHour].referrers[
  97. referrer.toLowerCase()
  98. ]) ||
  99. 0) + 1
  100. }
  101. };
  102. }
  103. });
  104. // 5. [Postgres] Find matching user and or domain
  105. const [user, domain] = await Promise.all([
  106. email &&
  107. postgres<User>("users")
  108. .where({ email })
  109. .first(),
  110. address &&
  111. postgres<Domain>("domains")
  112. .where({ address })
  113. .first()
  114. ]);
  115. // 6. [Postgres] Create link
  116. const data = {
  117. address: link.id,
  118. banned: !!link.banned,
  119. domain_id: domain ? domain.id : null,
  120. password: link.password,
  121. target: link.target,
  122. user_id: user ? user.id : null,
  123. ...(link.count && { visit_count: link.count.toNumber() }),
  124. ...(link.createdAt && { created_at: link.createdAt })
  125. };
  126. const res = await postgres<Link>("links").insert(data, "id");
  127. const link_id = res[0];
  128. // 7. [Postgres] Create visits
  129. const newVisits = Object.entries(visits).map(
  130. ([date, details]) => ({
  131. link_id,
  132. created_at: date,
  133. countries: details.countries as Record<string, number>,
  134. referrers: details.referrers as Record<string, number>,
  135. total: details.total as number,
  136. br_chrome: details.br_chrome as number,
  137. br_edge: details.br_edge as number,
  138. br_firefox: details.br_firefox as number,
  139. br_ie: details.br_ie as number,
  140. br_opera: details.br_opera as number,
  141. br_other: details.br_other as number,
  142. br_safari: details.br_safari as number,
  143. os_android: details.os_android as number,
  144. os_ios: details.os_ios as number,
  145. os_linux: details.os_linux as number,
  146. os_macos: details.os_macos as number,
  147. os_other: details.os_other as number,
  148. os_windows: details.os_windows as number
  149. })
  150. );
  151. await postgres<Visit>("visits").insert(newVisits);
  152. });
  153. },
  154. onCompleted() {
  155. session.close();
  156. if ((index + 1) * limit < total) {
  157. queue.add(() => main(index + 1));
  158. } else {
  159. queue.add(() => {
  160. const endTime = Date.now();
  161. console.log(
  162. `✅ Done! It took ${(endTime - startTime) /
  163. 1000} seconds.`
  164. );
  165. });
  166. }
  167. resolve();
  168. },
  169. onError(error) {
  170. session.close();
  171. if ((index + 1) * limit < total) {
  172. queue.add(() => main(index + 1));
  173. }
  174. reject(error);
  175. }
  176. });
  177. })
  178. );
  179. }
  180. main();
  181. })();