aggregate.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import { Db } from './index';
  2. import { EJSON } from 'bson';
  3. import { QuerySerializer } from './serializer/query';
  4. import { stringifyByEJSON } from './utils/utils';
  5. import { getType } from './utils/type';
  6. export default class Aggregation {
  7. constructor(db, collectionName) {
  8. this._stages = [];
  9. if (db && collectionName) {
  10. this._db = db;
  11. this._request = new Db.reqClass(this._db.config);
  12. this._collectionName = collectionName;
  13. }
  14. }
  15. async end() {
  16. if (!this._collectionName || !this._db) {
  17. throw new Error('Aggregation pipeline cannot send request');
  18. }
  19. const result = await this._request.send('database.aggregateDocuments', {
  20. collectionName: this._collectionName,
  21. stages: this._stages
  22. });
  23. if (result && result.data && result.data.list) {
  24. return {
  25. requestId: result.requestId,
  26. data: result.data.list.map(EJSON.parse)
  27. };
  28. }
  29. return result;
  30. }
  31. unwrap() {
  32. return this._stages;
  33. }
  34. done() {
  35. return this._stages.map(({ stageKey, stageValue }) => {
  36. return {
  37. [stageKey]: JSON.parse(stageValue)
  38. };
  39. });
  40. }
  41. _pipe(stage, param) {
  42. let transformParam = '';
  43. if (getType(param) === 'object') {
  44. transformParam = stringifyByEJSON(param);
  45. }
  46. else {
  47. transformParam = JSON.stringify(param);
  48. }
  49. this._stages.push({
  50. stageKey: `$${stage}`,
  51. stageValue: transformParam
  52. });
  53. return this;
  54. }
  55. addFields(param) {
  56. return this._pipe('addFields', param);
  57. }
  58. bucket(param) {
  59. return this._pipe('bucket', param);
  60. }
  61. bucketAuto(param) {
  62. return this._pipe('bucketAuto', param);
  63. }
  64. count(param) {
  65. return this._pipe('count', param);
  66. }
  67. geoNear(param) {
  68. if (param.query) {
  69. param.query = QuerySerializer.encode(param.query);
  70. }
  71. return this._pipe('geoNear', param);
  72. }
  73. group(param) {
  74. return this._pipe('group', param);
  75. }
  76. limit(param) {
  77. return this._pipe('limit', param);
  78. }
  79. match(param) {
  80. return this._pipe('match', QuerySerializer.encode(param));
  81. }
  82. project(param) {
  83. return this._pipe('project', param);
  84. }
  85. lookup(param) {
  86. return this._pipe('lookup', param);
  87. }
  88. replaceRoot(param) {
  89. return this._pipe('replaceRoot', param);
  90. }
  91. sample(param) {
  92. return this._pipe('sample', param);
  93. }
  94. skip(param) {
  95. return this._pipe('skip', param);
  96. }
  97. sort(param) {
  98. return this._pipe('sort', param);
  99. }
  100. sortByCount(param) {
  101. return this._pipe('sortByCount', param);
  102. }
  103. unwind(param) {
  104. return this._pipe('unwind', param);
  105. }
  106. }