01_host.ts 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. require("dotenv").config();
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import knex from "knex";
  4. import PQueue from "p-queue";
  5. const queue = new PQueue({ concurrency: 1 });
  6. // 1. Connect to Neo4j database
  7. const neo4j = NEO4J.driver(
  8. process.env.NEO4J_DB_URI,
  9. NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD)
  10. );
  11. // 2. Connect to Postgres database
  12. const postgres = knex({
  13. client: "postgres",
  14. connection: {
  15. host: process.env.DB_HOST,
  16. database: process.env.DB_NAME,
  17. user: process.env.DB_USER,
  18. password: process.env.DB_PASSWORD
  19. }
  20. });
  21. (async function() {
  22. const startTime = Date.now();
  23. // 3. [NEO4J] Get all hosts
  24. const session = neo4j.session();
  25. session.run("MATCH (h:HOST) RETURN h").subscribe({
  26. onNext(record) {
  27. queue.add(async () => {
  28. // 4. [Postgres] Upsert Hosts
  29. const host = record.get("h").properties;
  30. const address = host.name;
  31. const banned = !!host.banned;
  32. const exists = await postgres<Host>("hosts")
  33. .where({
  34. address
  35. })
  36. .first();
  37. if (exists) {
  38. await postgres<Host>("hosts")
  39. .where("id", exists.id)
  40. .update({ banned });
  41. } else {
  42. await postgres<Host>("hosts").insert({
  43. address,
  44. banned
  45. });
  46. }
  47. });
  48. },
  49. onCompleted() {
  50. session.close();
  51. queue.add(() => {
  52. const endTime = Date.now();
  53. console.log(
  54. `✅ Done! It took ${(endTime - startTime) / 1000} seconds.`
  55. );
  56. });
  57. },
  58. onError(error) {
  59. session.close();
  60. throw error;
  61. }
  62. });
  63. })();