04_links.ts 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. require("dotenv").config();
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import knex from "knex";
  4. import PQueue from "p-queue";
  5. import { startOfHour } from "date-fns";
  6. let count = 0;
  7. const queue = new PQueue({ concurrency: 5 });
  8. queue.on("active", () =>
  9. Math.random() > 0.7 ? console.log(count++) : count++
  10. );
  11. // 1. Connect to Neo4j database
  12. const neo4j = NEO4J.driver(
  13. process.env.NEO4J_DB_URI,
  14. NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD)
  15. );
  16. // 2. Connect to Postgres database
  17. const postgres = knex({
  18. client: "postgres",
  19. connection: {
  20. host: process.env.DB_HOST,
  21. database: process.env.DB_NAME,
  22. user: process.env.DB_USER,
  23. password: process.env.DB_PASSWORD
  24. }
  25. });
  26. (async function() {
  27. const startTime = Date.now();
  28. // 3. [NEO4J] Get all links
  29. const session = neo4j.session();
  30. const { records } = await session.run(
  31. "MATCH (l:URL) WITH COUNT(l) as count RETURN count"
  32. );
  33. const total = records[0].get("count").toNumber();
  34. const limit = 20000;
  35. function main(index = 0) {
  36. queue.add(
  37. () =>
  38. new Promise((resolve, reject) => {
  39. session
  40. .run(
  41. "MATCH (l:URL) WITH l SKIP $skip LIMIT $limit " +
  42. "OPTIONAL MATCH (l)-[:USES]->(d) " +
  43. "OPTIONAL MATCH (l)<-[:CREATED]-(u) " +
  44. "OPTIONAL MATCH (v)-[:VISITED]->(l) " +
  45. "OPTIONAL MATCH (v)-[:BROWSED_BY]->(b) " +
  46. "OPTIONAL MATCH (v)-[:OS]->(o) " +
  47. "OPTIONAL MATCH (v)-[:LOCATED_IN]->(c) " +
  48. "OPTIONAL MATCH (v)-[:REFERRED_BY]->(r) " +
  49. "OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " +
  50. "WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " +
  51. "RETURN l, u.email as email, d.name as domain, stats",
  52. { limit: (index + 1) * limit, skip: index * limit }
  53. )
  54. .subscribe({
  55. onNext(record) {
  56. queue.add(async () => {
  57. const link = record.get("l").properties;
  58. const email = record.get("email");
  59. const address = record.get("domain");
  60. const stats = record.get("stats");
  61. // 4. Merge and normalize stats based on hour
  62. const visits: Record<
  63. string,
  64. Record<string, number | Record<string, number>>
  65. > = {} as any;
  66. stats.forEach(([b, o, country, referrer, date]) => {
  67. if (b && o && country && referrer && date) {
  68. const dateHour = startOfHour(date).toISOString();
  69. const browser = b.toLowerCase();
  70. const os = o === "Mac Os X" ? "macos" : o.toLowerCase();
  71. visits[dateHour] = {
  72. ...visits[dateHour],
  73. total:
  74. (((visits[dateHour] &&
  75. visits[dateHour].total) as number) || 0) + 1,
  76. [`br_${browser}`]:
  77. (((visits[dateHour] &&
  78. visits[dateHour][`br_${browser}`]) as number) ||
  79. 0) + 1,
  80. [`os_${os}`]:
  81. (((visits[dateHour] &&
  82. visits[dateHour][`os_${os}`]) as number) || 0) + 1,
  83. countries: {
  84. ...((visits[dateHour] || {}).countries as {}),
  85. [country.toLowerCase()]:
  86. ((visits[dateHour] &&
  87. visits[dateHour].countries[
  88. country.toLowerCase()
  89. ]) ||
  90. 0) + 1
  91. },
  92. referrers: {
  93. ...((visits[dateHour] || {}).referrers as {}),
  94. [referrer.toLowerCase()]:
  95. ((visits[dateHour] &&
  96. visits[dateHour].referrers[
  97. referrer.toLowerCase()
  98. ]) ||
  99. 0) + 1
  100. }
  101. };
  102. }
  103. });
  104. // 5. [Postgres] Find matching user and or domain
  105. const [user, domain] = await Promise.all([
  106. email &&
  107. postgres<User>("users")
  108. .where({ email })
  109. .first(),
  110. address &&
  111. postgres<Domain>("domains")
  112. .where({ address })
  113. .first()
  114. ]);
  115. // 6. [Postgres] Create link
  116. const data = {
  117. address: link.id,
  118. banned: !!link.banned,
  119. domain_id: domain ? domain.id : null,
  120. password: link.password,
  121. target: link.target,
  122. user_id: user ? user.id : null,
  123. ...(link.count && { visit_count: link.count.toNumber() }),
  124. ...(link.createdAt && { created_at: link.createdAt })
  125. };
  126. const exists = await postgres<Link>("links")
  127. .where({ address: link.id })
  128. .first();
  129. let link_id: number;
  130. if (exists) {
  131. const res = await postgres<Link>("links")
  132. .where("id", exists.id)
  133. .update(data, "id");
  134. link_id = res[0];
  135. } else {
  136. const res = await postgres<Link>("links").insert(
  137. data,
  138. "id"
  139. );
  140. link_id = res[0];
  141. }
  142. // 7. [Postgres] Create visits
  143. for await (const [date, details] of Object.entries(visits)) {
  144. const data = {
  145. link_id,
  146. created_at: date,
  147. countries: details.countries as Record<string, number>,
  148. referrers: details.referrers as Record<string, number>,
  149. total: details.total as number,
  150. br_chrome: details.br_chrome as number,
  151. br_edge: details.br_edge as number,
  152. br_firefox: details.br_firefox as number,
  153. br_ie: details.br_ie as number,
  154. br_opera: details.br_opera as number,
  155. br_other: details.br_other as number,
  156. br_safari: details.br_safari as number,
  157. os_android: details.os_android as number,
  158. os_ios: details.os_ios as number,
  159. os_linux: details.os_linux as number,
  160. os_macos: details.os_macos as number,
  161. os_other: details.os_other as number,
  162. os_windows: details.os_windows as number
  163. };
  164. const visitExists = await postgres<Visit>("visits")
  165. .where({ link_id, created_at: data.created_at })
  166. .first();
  167. if (visitExists) {
  168. await postgres<Visit>("visits")
  169. .where("id", visitExists.id)
  170. .update(data);
  171. } else {
  172. await postgres<Visit>("visits").insert(data);
  173. }
  174. }
  175. });
  176. },
  177. onCompleted() {
  178. session.close();
  179. if ((index + 1) * limit < total) {
  180. queue.add(() => main(index + 1));
  181. } else {
  182. queue.add(() => {
  183. const endTime = Date.now();
  184. console.log(
  185. `✅ Done! It took ${(endTime - startTime) /
  186. 1000} seconds.`
  187. );
  188. });
  189. }
  190. resolve();
  191. },
  192. onError(error) {
  193. console.log(error);
  194. session.close();
  195. if ((index + 1) * limit < total) {
  196. queue.add(() => main(index + 1));
  197. }
  198. reject(error);
  199. }
  200. });
  201. })
  202. );
  203. }
  204. main();
  205. })();