01_host.ts 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import env from "../env";
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import knex from "knex";
  4. import PQueue from "p-queue";
  5. const queue = new PQueue({ concurrency: 10 });
  6. // 1. Connect to Neo4j database
  7. const neo4j = NEO4J.driver(
  8. env.NEO4J_DB_URI,
  9. NEO4J.auth.basic(env.NEO4J_DB_USERNAME, env.NEO4J_DB_PASSWORD)
  10. );
  11. // 2. Connect to Postgres database
  12. const postgres = knex({
  13. client: "postgres",
  14. connection: {
  15. host: env.DB_HOST,
  16. database: env.DB_NAME,
  17. user: env.DB_USER,
  18. password: env.DB_PASSWORD
  19. }
  20. });
  21. (async function() {
  22. const startTime = Date.now();
  23. // 3. [NEO4J] Get all hosts
  24. const session = neo4j.session();
  25. session.run("MATCH (h:HOST) RETURN h").subscribe({
  26. onNext(record) {
  27. queue.add(async () => {
  28. // 4. [Postgres] Upsert Hosts
  29. const host = record.get("h").properties;
  30. const address = host.name;
  31. const banned = !!host.banned;
  32. const exists = await postgres<Host>("hosts")
  33. .where({
  34. address
  35. })
  36. .first();
  37. if (exists) {
  38. await postgres<Host>("hosts")
  39. .where("id", exists.id)
  40. .update({ banned });
  41. } else {
  42. await postgres<Host>("hosts").insert({
  43. address,
  44. banned
  45. });
  46. }
  47. });
  48. },
  49. onCompleted() {
  50. session.close();
  51. queue.add(() => {
  52. const endTime = Date.now();
  53. console.log(
  54. `✅ Done! It took ${(endTime - startTime) / 1000} seconds.`
  55. );
  56. });
  57. },
  58. onError(error) {
  59. session.close();
  60. throw error;
  61. }
  62. });
  63. })();