基于Morphia实现MongoDB按小时、按天聚合操作方法

 更新时间:2020年7月11日 17:02  点击:1640

MongoDB按照天数或小时聚合

需求

最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询.
涉及到的技术栈分别为:Spring Boot,MongoDB,Morphia.

数据模型

@Data
@Builder
@Entity(value = "rawDevStatus", noClassnameStored = true)
// 设备状态索引
@Indexes({
    // 设置数据超时时间(TTL,MongoDB根据TTL在后台进行数据删除操作)
    @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class RawDevStatus {
  @Id
  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
  private ObjectId objectId;
  private String userId;
  private Instant time;
  @Embedded("points")
  List<Point> protocolPoints;
  @Data
  @AllArgsConstructor
  public static class Point {
    /**
     * 协议类型
     */
    private Protocol protocol;
    /**
     * 设备总数
     */
    private Integer total;
    /**
     * 设备在线数目
     */
    private Integer onlineNum;
    /**
     * 处于启用状态设备数目
     */
    private Integer enableNum;
  }
}

上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.

@Data
@Builder
@Entity(value = "aggregationDevStatus", noClassnameStored = true)
@Indexes({
    @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class AggregationDevStatus {
  @Id
  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
  private ObjectId objectId;
  /**
   * 用户ID
   */
  private String userId;
  /**
   * 设备总数
   */
  private Double total;
  /**
   * 设备在线数目
   */
  private Double onlineNum;
  /**
   * 处于启用状态设备数目
   */
  private Double enableNum;
  /**
   * 聚合类型(按照小时还是按照天聚合)
   */
  @Property("aggDuration")
  private AggregationDuration aggregationDuration;
  private Instant time;
  /**
   * 动态设置文档过期时间
   */
  private Instant expireAt;
}

上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.

聚合操作符介绍

聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.

此次聚合主要涉及以下操作:

•$project:指定输出文档中的字段.
•$unwind:拆分数据中的数组;
•match:选择要处理的文档数据;
•group:根据key分组聚合结果.

原始聚合语句

db.getCollection('raw_dev_status').aggregate([
  {$match:
    {
      time:{$gte: ISODate("2019-06-27T00:00:00Z")},
    }
  },
  {$unwind: "$points"},
  {$project:
    {
      userId:1,points:1,
      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
    }
  },
  {$project:
    {
      userId:1,points:1,
      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
    }
  },
  {$group:
    {
      _id:{user_id:'$userId', cal_time:'$groupTime'},
      devTotal:{'$avg':'$points.total'},
      onlineTotal:{'$avg':'$points.onlineNum'},
      enableTotal:{'$avg':'$points.enableNum'}
    }
  },
])

上述代码是按小时聚合数据,以下来逐步介绍处理思路:

(1) $match

根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.

(2) $unwind

raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;

(3) $project

  {$project:
    {
      userId:1,points:1,
      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
    }
  }

选择需要输出的数据,分别为:userId,points以及tmp.

需要注意,为了按照时间聚合,对$time属性进行操作,提取%Y:%m:%dT%H时信息至$tmp作为下一步的聚合依据.

如果需要按天聚合,则format数据可修改为:%Y:%m:%dT00:00:00Z即可满足要求.

(4) $project

  {$project:
    {
      userId:1,points:1,
      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
    }
  }

因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
因此,此处对$tmp进行操作,转换为时间类型数据,即groupTime.

(5) $group

对聚合结果进行分类操作,并生成最终输出结果.

 {$group:
    {
      # 根据_id进行分组操作,依据是`user_id`以及`$groupTime`
      _id:{user_id:'$userId', cal_time:'$groupTime'},
      # 求设备总数平均值
      devTotal:{'$avg':'$points.total'},
      # 求设备在线数平均值
      onlineTotal:{'$avg':'$points.onlineNum'},
      # ...
      enableTotal:{'$avg':'$points.enableNum'}
    }
  }

代码编写

此处ODM选择Morphia,亦可以使用MongoTemplate,原理类似.

 /**
   * 创建聚合条件
   *
   * @param pastTime   过去时间段
   * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
   * @return 聚合条件
   */
  private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
    Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);
    return datastore.createAggregation(RawDevStatus.class)
        .match(query.field("time").greaterThanOrEq(pastTime))
        .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
        .match(query.field("points.protocol").equal("ALL"))
        .project(Projection.projection("userId"),
            Projection.projection("points"),
            Projection.projection("convertTime",
                Projection.expression("$dateToString",
                    new BasicDBObject("format", dateToString)
                        .append("date", "$time"))
            )
        )
        .project(Projection.projection("userId"),
            Projection.projection("points"),
            Projection.projection("convertTime",
                Projection.expression("$dateFromString",
                    new BasicDBObject("format", stringToDate)
                        .append("dateString", "$convertTime"))
            )
        )
        .group(
            Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
            Group.grouping("total", Group.average("points.total")),
            Group.grouping("onlineNum", Group.average("points.onlineNum")),
            Group.grouping("enableNum", Group.average("points.enableNum"))
        );
  }
  /**
   * 获取聚合结果
   *
   * @param pipeline 聚合条件
   * @return 聚合结果
   */
  private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
    List<AggregationMidDevStatus> statuses = new ArrayList<>();
    Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(
        AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
    while (resultIterator.hasNext()) {
      statuses.add(resultIterator.next());
    }
    return statuses;
  }
  //......................................................................................
  // 获取聚合结果(省略若干代码)
  AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
  List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
  if (CollectionUtils.isEmpty(midStatuses)) {
    log.warn("Can not get dev status aggregation result.");
    return;
  }

总结

以上所述是小编给大家介绍的基于Morphia实现MongoDB按小时、按天聚合操作方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对猪先飞网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

[!--infotagslink--]

相关文章

  • PHP添加MongoDB扩展实例教程

    由于要使用mikoomi mongodb plugin插件,所以需要php对mongodb的扩展支持,默认通过源安装的php并没有mongodb的扩展支持,具体可以通过php -m|grep mongo 验证 。这里就结...2016-11-25
  • 安装使用Mongoose配合Node.js操作MongoDB的基础教程

    这篇文章主要介绍了安装使用Mongoose来让Node.js操作MongoDB的基础教程,前端js+后端node+js操作MongoDB正是所谓最流行的一种JavaScript全栈开发方案,需要的朋友可以参考下...2016-03-03
  • mongodb与mysql命令详细对比

    传统的关系数据库一般由数据库(database)、表(table)、记录(record)三个层次概念组成,MongoDB是由数据库(database)、集合(collection)、文档对象(document)三个层次组成。MongoDB对于关系型数据库里的表,但是集合中没有列、行和关...2013-09-11
  • 修复 Mac brew 安装 mongodb 报 Error: No available formula with the name ‘mongodb’ 问题详解

    最近在同事新的 Mac 电脑上安装 mongodb,报了错误 Error: No available formula with the name ‘mongodb’,今天就说说这个问题如何解决,需要的朋友可以参考下...2020-07-11
  • Windows10安装MongoDB4.0详细步骤及启动配置教程

    这篇文章主要介绍了Windows10安装MongoDB4.0详细步骤及启动配置教程 ,本文通过图文并茂的形式给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下...2020-07-11
  • MongoDb CPU利用率过高问题如何解决

    这篇文章主要介绍了MongoDb CPU利用率过高问题如何解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-12-08
  • CentOS7.2 安装 MongoDB 3.4的教程

    这篇文章主要介绍了CentOS7.2 安装 MongoDB 3.4的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-07-11
  • NestJs使用Mongoose对MongoDB操作的方法

    这篇文章主要介绍了NestJs使用Mongoose对MongoDB操作的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-02-22
  • MongoDB CRUD操作中的插入实例教程

    这篇文章主要给大家介绍了关于MongoDB CRUD操作中的插入的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用MongoDB具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧...2020-12-08
  • Navicat Premium连接mongodb详细教程

    这篇文章主要介绍了Navicat Premium连接mongodb详细教程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-03-05
  • MongoDB操作符中的$elemMatch问题

    这篇文章主要介绍了MongoDB操作符中的$elemMatch问题,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-07-11
  • MongoDB中4种日志的详细介绍

    这篇文章主要给大家介绍了关于MongoDB中4种日志的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用MongoDB具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧...2020-07-11
  • Node+Express+MongoDB实现登录注册功能实例

    这篇文章主要介绍了Node+Express+MongoDB实现登录注册功能,需要的朋友可以参考下...2017-04-27
  • Django项目连接MongoDB的三种方法

    本文主要介绍了Django项目连接MongoDB的三种方法,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-09-27
  • MongoDB 中聚合统计计算--$SUM表达式

    这篇文章主要介绍了MongoDB 中聚合统计计算--$SUM表达式的相关知识,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下...2020-07-11
  • MongoDB多条件模糊查询示例代码

    这篇文章主要给大家介绍了关于MongoDB多条件模糊查询的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用MongoDB具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧...2020-07-11
  • c# 插入数据效率测试(mongodb)

    这篇文章主要介绍了c# 插入数据效率测试(mongodb),插入的速度要比Mysql和sqlserver都要快需要的朋友可以参考下...2020-06-25
  • 详解Wondows下Node.js使用MongoDB的环境配置

    这篇文章主要介绍了详解Wondows下Node.js使用MongoDB的环境配置,这里使用到了Mongoose驱动来让JavaScript操作MongoDB,需要的朋友可以参考下...2016-03-03
  • dotnet core链接mongodb代码实例

    这篇文章主要介绍了dotnet core链接mongodb代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-06-25
  • 分布式文档存储数据库之MongoDB备份与恢复的实践详解

    这篇文章主要介绍了分布式文档存储数据库之MongoDB备份与恢复,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-12-08