03_domains.ts 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import env from "../env";
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import PQueue from "p-queue";
  4. import knex from "knex";
  5. const queue = new PQueue({ concurrency: 1 });
  6. // 1. Connect to Neo4j database
  7. const neo4j = NEO4J.driver(
  8. env.NEO4J_DB_URI,
  9. NEO4J.auth.basic(env.NEO4J_DB_USERNAME, env.NEO4J_DB_PASSWORD)
  10. );
  11. // 2. Connect to Postgres database
  12. const postgres = knex({
  13. client: "postgres",
  14. connection: {
  15. host: env.DB_HOST,
  16. database: env.DB_NAME,
  17. user: env.DB_USER,
  18. password: env.DB_PASSWORD
  19. }
  20. });
  21. (async function() {
  22. const startTime = Date.now();
  23. // 3. [NEO4J] Get all domain
  24. const session = neo4j.session();
  25. session
  26. .run(
  27. "MATCH (d:DOMAIN) OPTIONAL MATCH (u)-[:OWNS]->(d) RETURN d as domain, u.email as email"
  28. )
  29. .subscribe({
  30. onNext(record) {
  31. queue.add(async () => {
  32. const domain = record.get("domain").properties;
  33. const email = record.get("email");
  34. // 4. [Postgres] Get user ID
  35. const user =
  36. email &&
  37. (await postgres<User>("users")
  38. .where({ email })
  39. .first());
  40. // 5. [Postgres] Upsert domains
  41. const banned = !!domain.banned;
  42. const address = domain.name;
  43. const homepage = domain.homepage;
  44. const user_id = user ? user.id : null;
  45. const data = {
  46. banned,
  47. address,
  48. ...(homepage && { homepage }),
  49. ...(user_id && { user_id })
  50. };
  51. const exists = await postgres<Domain>("domains")
  52. .where({
  53. address
  54. })
  55. .first();
  56. if (exists) {
  57. await postgres<Domain>("domains")
  58. .where("id", exists.id)
  59. .update(data);
  60. } else {
  61. await postgres<Domain>("domains").insert(data);
  62. }
  63. });
  64. },
  65. onCompleted() {
  66. session.close();
  67. queue.add(() => {
  68. const endTime = Date.now();
  69. console.log(
  70. `✅ Done! It took ${(endTime - startTime) / 1000} seconds.`
  71. );
  72. });
  73. },
  74. onError(error) {
  75. console.log(error);
  76. session.close();
  77. throw error;
  78. }
  79. });
  80. })();