04_links.ts 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. require("dotenv").config();
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import knex from "knex";
  4. import PQueue from "p-queue";
  5. import { startOfHour } from "date-fns";
  6. const queue = new PQueue({ concurrency: 1 });
  7. // 1. Connect to Neo4j database
  8. const neo4j = NEO4J.driver(
  9. process.env.NEO4J_DB_URI,
  10. NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD)
  11. );
  12. // 2. Connect to Postgres database
  13. const postgres = knex({
  14. client: "postgres",
  15. connection: {
  16. host: process.env.DB_HOST,
  17. database: process.env.DB_NAME,
  18. user: process.env.DB_USER,
  19. password: process.env.DB_PASSWORD
  20. }
  21. });
  22. (async function() {
  23. const startTime = Date.now();
  24. // 3. [NEO4J] Get all links
  25. const session = neo4j.session();
  26. session
  27. .run(
  28. "MATCH (l:URL) " +
  29. "OPTIONAL MATCH (l)-[:USES]->(d) " +
  30. "OPTIONAL MATCH (l)<-[:CREATED]-(u) " +
  31. "OPTIONAL MATCH (v)-[:VISITED]->(l) " +
  32. "OPTIONAL MATCH (v)-[:BROWSED_BY]->(b) " +
  33. "OPTIONAL MATCH (v)-[:OS]->(o) " +
  34. "OPTIONAL MATCH (v)-[:LOCATED_IN]->(c) " +
  35. "OPTIONAL MATCH (v)-[:REFERRED_BY]->(r) " +
  36. "OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " +
  37. "WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " +
  38. "LIMIT 100 RETURN l, u.email as email, d.name as domain, stats"
  39. )
  40. .subscribe({
  41. onNext(record) {
  42. queue.add(async () => {
  43. const link = record.get("l").properties;
  44. const email = record.get("email");
  45. const address = record.get("domain");
  46. const stats = record.get("stats");
  47. // 4. Merge and normalize stats based on hour
  48. const visits: Record<
  49. string,
  50. Record<string, number | Record<string, number>>
  51. > = {} as any;
  52. stats.forEach(([b, o, country, referrer, date]) => {
  53. const dateHour = startOfHour(date).toISOString();
  54. const browser = b.toLowerCase();
  55. const os = o === "Mac Os X" ? "macos" : o.toLowerCase();
  56. visits[dateHour] = {
  57. ...visits[dateHour],
  58. total:
  59. (((visits[dateHour] && visits[dateHour].total) as number) ||
  60. 0) + 1,
  61. [`br_${browser}`]:
  62. (((visits[dateHour] &&
  63. visits[dateHour][`br_${browser}`]) as number) || 0) + 1,
  64. [`os_${os}`]:
  65. (((visits[dateHour] &&
  66. visits[dateHour][`os_${os}`]) as number) || 0) + 1,
  67. countries: {
  68. ...((visits[dateHour] || {}).countries as {}),
  69. [country.toLowerCase()]:
  70. ((visits[dateHour] &&
  71. visits[dateHour].countries[country.toLowerCase()]) ||
  72. 0) + 1
  73. },
  74. referrers: {
  75. ...((visits[dateHour] || {}).referrers as {}),
  76. [referrer.toLowerCase()]:
  77. ((visits[dateHour] &&
  78. visits[dateHour].countries[referrer.toLowerCase()]) ||
  79. 0) + 1
  80. }
  81. };
  82. });
  83. // 5. [Postgres] Find matching user and or domain
  84. const [user, domain] = await Promise.all([
  85. email &&
  86. postgres<User>("users")
  87. .where({ email })
  88. .first(),
  89. address &&
  90. postgres<Domain>("domains")
  91. .where({ address })
  92. .first()
  93. ]);
  94. // 6. [Postgres] Create link
  95. const data = {
  96. address: link.id,
  97. banned: !!link.banned,
  98. domain_id: domain ? domain.id : null,
  99. password: link.password,
  100. target: link.target,
  101. user_id: user ? user.id : null,
  102. ...(link.count && { visit_count: link.count.toNumber() }),
  103. ...(link.createdAt && { created_at: link.createdAt })
  104. };
  105. const exists = await postgres<Link>("links")
  106. .where({ address: link.id })
  107. .first();
  108. let link_id: number;
  109. if (exists) {
  110. const res = await postgres<Link>("links")
  111. .where("id", exists.id)
  112. .update(data, "id");
  113. link_id = res[0];
  114. } else {
  115. const res = await postgres<Link>("links").insert(data, "id");
  116. link_id = res[0];
  117. }
  118. // 7. [Postgres] Create visits
  119. for await (const [date, details] of Object.entries(visits)) {
  120. const data = {
  121. link_id,
  122. created_at: date,
  123. countries: details.countries as Record<string, number>,
  124. referrers: details.referrers as Record<string, number>,
  125. total: details.total as number,
  126. br_chrome: details.br_chrome as number,
  127. br_edge: details.br_edge as number,
  128. br_firefox: details.br_firefox as number,
  129. br_ie: details.br_ie as number,
  130. br_opera: details.br_opera as number,
  131. br_other: details.br_other as number,
  132. br_safari: details.br_safari as number,
  133. os_android: details.os_android as number,
  134. os_ios: details.os_ios as number,
  135. os_linux: details.os_linux as number,
  136. os_macos: details.os_macos as number,
  137. os_other: details.os_other as number,
  138. os_windows: details.os_windows as number
  139. };
  140. const visitExists = await postgres<Visit>("visits")
  141. .where({ link_id, created_at: data.created_at })
  142. .first();
  143. if (visitExists) {
  144. await postgres<Visit>("visits")
  145. .where("id", visitExists.id)
  146. .update(data);
  147. } else {
  148. await postgres<Visit>("visits").insert(data);
  149. }
  150. }
  151. });
  152. },
  153. onCompleted() {
  154. session.close();
  155. queue.add(() => {
  156. const endTime = Date.now();
  157. console.log(
  158. `✅ Done! It took ${(endTime - startTime) / 1000} seconds.`
  159. );
  160. });
  161. },
  162. onError(error) {
  163. console.log(error);
  164. session.close();
  165. throw error;
  166. }
  167. });
  168. })();