02_users.ts 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. require("dotenv").config();
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import knex from "knex";
  4. import PQuque from "p-queue";
  5. const queue = new PQuque({ concurrency: 1 });
  6. // 1. Connect to Neo4j database
  7. const neo4j = NEO4J.driver(
  8. process.env.NEO4J_DB_URI,
  9. NEO4J.auth.basic(process.env.NEO4J_DB_USERNAME, process.env.NEO4J_DB_PASSWORD)
  10. );
  11. // 2. Connect to Postgres database
  12. const postgres = knex({
  13. client: "postgres",
  14. connection: {
  15. host: process.env.DB_HOST,
  16. database: process.env.DB_NAME,
  17. user: process.env.DB_USER,
  18. password: process.env.DB_PASSWORD
  19. }
  20. });
  21. (async function() {
  22. const startTime = Date.now();
  23. // 3. [NEO4J] Get all users
  24. const session = neo4j.session();
  25. session
  26. .run(
  27. "MATCH (u:USER) OPTIONAL MATCH (u)-[r:RECEIVED]->(c) WITH u, collect(c.date) as cooldowns RETURN u, cooldowns"
  28. )
  29. .subscribe({
  30. onNext(record) {
  31. queue.add(async () => {
  32. // 4. [Postgres] Upsert users
  33. const user = record.get("u").properties;
  34. const cooldowns = record.get("cooldowns");
  35. const email = user.email;
  36. const password = user.password;
  37. const verified = !!user.verified;
  38. const banned = !!user.banned;
  39. const apikey = user.apikey;
  40. const created_at = user.createdAt;
  41. const data = {
  42. email,
  43. password,
  44. verified,
  45. banned,
  46. ...(apikey && { apikey }),
  47. ...(created_at && { created_at }),
  48. ...(cooldowns && cooldowns.length && { cooldowns })
  49. };
  50. const exists = await postgres<User>("users")
  51. .where({
  52. email
  53. })
  54. .first();
  55. if (exists) {
  56. await postgres<User>("users")
  57. .where("id", exists.id)
  58. .update(data);
  59. } else {
  60. await postgres<User>("users").insert(data);
  61. }
  62. });
  63. },
  64. onCompleted() {
  65. session.close();
  66. queue.add(() => {
  67. const endTime = Date.now();
  68. console.log(
  69. `✅ Done! It took ${(endTime - startTime) / 1000} seconds.`
  70. );
  71. });
  72. },
  73. onError(error) {
  74. session.close();
  75. throw error;
  76. }
  77. });
  78. })();
  79. // 5. TODO: [Postgres] Update bannedBy