neo4j_delete_duplicated.ts 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import env from "../env";
  2. import { v1 as NEO4J } from "neo4j-driver";
  3. import PQueue from "p-queue";
  4. let count = 0;
  5. const queue = new PQueue({ concurrency: 1 });
  6. queue.on("active", () => console.log(count++));
  7. // 1. Connect to Neo4j database
  8. const neo4j = NEO4J.driver(
  9. env.NEO4J_DB_URI,
  10. NEO4J.auth.basic(env.NEO4J_DB_USERNAME, env.NEO4J_DB_PASSWORD)
  11. );
  12. (async function() {
  13. const startTime = Date.now();
  14. const nodes = [
  15. ["VISITED_IN", "DATE"]
  16. // ['BROWSED_BY', 'BROWSER'],
  17. // ['OS', 'OS'],
  18. // ['LOCATED_IN', 'COUNTRY'],
  19. // ['REFERRED_BY', 'REFERRER'],
  20. ];
  21. // 3. [NEO4J] Get all hosts
  22. const session = neo4j.session();
  23. const { records } = await session.run(
  24. "MATCH (v:VISIT) WITH COUNT(v) as count RETURN count;"
  25. );
  26. const total = records[0].get("count").toNumber();
  27. const limit = 100000;
  28. function main(index = 0) {
  29. nodes.forEach(([r, n]) => {
  30. queue.add(() => {
  31. return session.run(`
  32. MATCH (a:VISIT)-[r:${r}]->(b:${n})
  33. WITH a, r, b SKIP ${index * limit} LIMIT ${limit}
  34. WITH a, b, TYPE(r) AS t, COLLECT(r) AS rr
  35. WHERE SIZE(rr) > 1
  36. WITH rr
  37. FOREACH (r IN TAIL(rr) | DELETE r);
  38. `);
  39. });
  40. });
  41. if ((index + 1) * limit < total) {
  42. main(index + 1);
  43. } else {
  44. queue.add(() => {
  45. const endTime = Date.now();
  46. console.log(
  47. `✅ Done! It took ${(endTime - startTime) / 1000} seconds.`
  48. );
  49. });
  50. }
  51. }
  52. main();
  53. })();