04_links.ts 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. require("dotenv").config();
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import knex from "knex";
  4. import PQueue from "p-queue";
  5. import { startOfHour } from "date-fns";
  6. let count = 0;
  7. const queue = new PQueue({ concurrency: 10 });
  8. queue.on("active", () =>
  9. Math.random() > 0.5 ? console.log(count++) : count++
  10. );
  11. // 1. Connect to Neo4j database
  12. const neo4j = NEO4J.driver(
  13. process.env.NEO4J_DB_URI,
  14. NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD)
  15. );
  16. // 2. Connect to Postgres database
  17. const postgres = knex({
  18. client: "postgres",
  19. connection: {
  20. host: process.env.DB_HOST,
  21. database: process.env.DB_NAME,
  22. user: process.env.DB_USER,
  23. password: process.env.DB_PASSWORD
  24. }
  25. });
  26. (async function() {
  27. const startTime = Date.now();
  28. // 3. [NEO4J] Get all links
  29. const session = neo4j.session();
  30. const { records } = await session.run(
  31. "MATCH (l:URL) WITH COUNT(l) as count RETURN count"
  32. );
  33. const total = records[0].get("count").toNumber();
  34. const limit = 20000;
  35. function main(index = 0) {
  36. queue.add(
  37. () =>
  38. new Promise((resolve, reject) => {
  39. session
  40. .run(
  41. "MATCH (l:URL) WITH l LIMIT $limit " +
  42. "OPTIONAL MATCH (l)-[:USES]->(d) " +
  43. "OPTIONAL MATCH (l)<-[:CREATED]-(u) " +
  44. "OPTIONAL MATCH (v)-[:VISITED]->(l) " +
  45. "OPTIONAL MATCH (v)-[:BROWSED_BY]->(b) " +
  46. "OPTIONAL MATCH (v)-[:OS]->(o) " +
  47. "OPTIONAL MATCH (v)-[:LOCATED_IN]->(c) " +
  48. "OPTIONAL MATCH (v)-[:REFERRED_BY]->(r) " +
  49. "OPTIONAL MATCH (v)-[:VISITED_IN]->(dd) " +
  50. "WITH l, u, d, COLLECT([b.browser, o.os, c.country, r.referrer, dd.date]) as stats " +
  51. "RETURN l, u.email as email, d.name as domain, stats",
  52. { limit: (index + 1) * limit }
  53. )
  54. .subscribe({
  55. onNext(record) {
  56. const endTime = Date.now();
  57. console.log(
  58. `✅ Done! It took ${(endTime - startTime) / 1000} seconds.`
  59. );
  60. queue.add(async () => {
  61. const link = record.get("l").properties;
  62. const email = record.get("email");
  63. const address = record.get("domain");
  64. const stats = record.get("stats");
  65. // 4. Merge and normalize stats based on hour
  66. const visits: Record<
  67. string,
  68. Record<string, number | Record<string, number>>
  69. > = {} as any;
  70. stats.forEach(([b, o, country, referrer, date]) => {
  71. if (b && o && country && referrer && date) {
  72. const dateHour = startOfHour(date).toISOString();
  73. const browser = b.toLowerCase();
  74. const os = o === "Mac Os X" ? "macos" : o.toLowerCase();
  75. visits[dateHour] = {
  76. ...visits[dateHour],
  77. total:
  78. (((visits[dateHour] &&
  79. visits[dateHour].total) as number) || 0) + 1,
  80. [`br_${browser}`]:
  81. (((visits[dateHour] &&
  82. visits[dateHour][`br_${browser}`]) as number) ||
  83. 0) + 1,
  84. [`os_${os}`]:
  85. (((visits[dateHour] &&
  86. visits[dateHour][`os_${os}`]) as number) || 0) + 1,
  87. countries: {
  88. ...((visits[dateHour] || {}).countries as {}),
  89. [country.toLowerCase()]:
  90. ((visits[dateHour] &&
  91. visits[dateHour].countries[
  92. country.toLowerCase()
  93. ]) ||
  94. 0) + 1
  95. },
  96. referrers: {
  97. ...((visits[dateHour] || {}).referrers as {}),
  98. [referrer.toLowerCase()]:
  99. ((visits[dateHour] &&
  100. visits[dateHour].countries[
  101. referrer.toLowerCase()
  102. ]) ||
  103. 0) + 1
  104. }
  105. };
  106. }
  107. });
  108. // 5. [Postgres] Find matching user and or domain
  109. const [user, domain] = await Promise.all([
  110. email &&
  111. postgres<User>("users")
  112. .where({ email })
  113. .first(),
  114. address &&
  115. postgres<Domain>("domains")
  116. .where({ address })
  117. .first()
  118. ]);
  119. // 6. [Postgres] Create link
  120. const data = {
  121. address: link.id,
  122. banned: !!link.banned,
  123. domain_id: domain ? domain.id : null,
  124. password: link.password,
  125. target: link.target,
  126. user_id: user ? user.id : null,
  127. ...(link.count && { visit_count: link.count.toNumber() }),
  128. ...(link.createdAt && { created_at: link.createdAt })
  129. };
  130. const exists = await postgres<Link>("links")
  131. .where({ address: link.id })
  132. .first();
  133. let link_id: number;
  134. if (exists) {
  135. const res = await postgres<Link>("links")
  136. .where("id", exists.id)
  137. .update(data, "id");
  138. link_id = res[0];
  139. } else {
  140. const res = await postgres<Link>("links").insert(
  141. data,
  142. "id"
  143. );
  144. link_id = res[0];
  145. }
  146. // 7. [Postgres] Create visits
  147. for await (const [date, details] of Object.entries(visits)) {
  148. const data = {
  149. link_id,
  150. created_at: date,
  151. countries: details.countries as Record<string, number>,
  152. referrers: details.referrers as Record<string, number>,
  153. total: details.total as number,
  154. br_chrome: details.br_chrome as number,
  155. br_edge: details.br_edge as number,
  156. br_firefox: details.br_firefox as number,
  157. br_ie: details.br_ie as number,
  158. br_opera: details.br_opera as number,
  159. br_other: details.br_other as number,
  160. br_safari: details.br_safari as number,
  161. os_android: details.os_android as number,
  162. os_ios: details.os_ios as number,
  163. os_linux: details.os_linux as number,
  164. os_macos: details.os_macos as number,
  165. os_other: details.os_other as number,
  166. os_windows: details.os_windows as number
  167. };
  168. const visitExists = await postgres<Visit>("visits")
  169. .where({ link_id, created_at: data.created_at })
  170. .first();
  171. if (visitExists) {
  172. await postgres<Visit>("visits")
  173. .where("id", visitExists.id)
  174. .update(data);
  175. } else {
  176. await postgres<Visit>("visits").insert(data);
  177. }
  178. }
  179. });
  180. },
  181. onCompleted() {
  182. session.close();
  183. if ((index + 1) * limit < total) {
  184. queue.add(() => main(index + 1));
  185. } else {
  186. queue.add(() => {
  187. const endTime = Date.now();
  188. console.log(
  189. `✅ Done! It took ${(endTime - startTime) /
  190. 1000} seconds.`
  191. );
  192. });
  193. }
  194. resolve();
  195. },
  196. onError(error) {
  197. console.log(error);
  198. session.close();
  199. if ((index + 1) * limit < total) {
  200. queue.add(() => main(index + 1));
  201. }
  202. reject(error);
  203. }
  204. });
  205. })
  206. );
  207. }
  208. main();
  209. })();