数据结构介绍
最完整的时间序列的逻辑数据模型如下:
[timestamp],[d1],[d2]...[dn],[v1],[v2]...[vn]
d1 ~ dn 是维度,比如 ip, idc, country 之类的值
v1 ~ vn 是值列,比如 cpu_usage, free_memeory_bytes 之类的值一些时间序列数据库在实现的时候为了简化实现,提高性能约束了一个更简化的数据模型:
[timestamp],[metric],[value]
这种数据模型对于数据库来说非常友好,可以很好的做一些优化。但是要求开发者去选择什么的维度信息编码到 metric名里,同时对于多值列的数据需要存成多个metric,而不是一个metric多个值。本文探讨的数据模型采用最完整的数据模型。
监控使用的时间序列数据具有以下的特点:
- 时序性:数据一般是按时间升序排列,同时统计之后的数据一般会在同一个时间点有很多个记录对应不同的维度组合
- 大部分维度字段经常重复(low cardinality):比如采集10台机器的性能指标,那么一天的数据里10台机器的ip地址是反复出现的。
- 有可能有部分数据是偶然的(high cardinality):比如一些时候采集外部输入的时候有脏数据统计进来,比如应该是ip地址的字段变成了用户id甚至是一些乱码,那么可能这个维度组合的数据在整天里只出现一次。
- 使用的时候经常批量拉取一天的数据作为图形展示,甚至是今天,昨天或者上周同期的数据。
- 展示的时候需要看总的数据,也需要能够按不同维度查看。需要具有一定的查询时聚合的能力。因为维度可能比较多,所以不能在统计的时候就完成所有的维度聚合的工作。
本测试采用的测试数据有838万行,其大部分维度字段是重复的,但是有少量的脏数据使得维度组合还是比较多的。总的维度组合数(不包括timestamp维度)为285522。排除掉大部分脏数据的维度组合数为5927。数据的周期是60秒,跨度为1428777120 ~ 1428793320。平均每个周期有 30922 行记录。一个周期有这么行记录很重要的原因是一个周期,对于同一个维度组合有多条统计数据(来源于不同的partition)。也就是数据是部分聚合的,并没有聚合到一个周期,一个维度组合,一条记录的程度。选择这样的测试数据是因为这种数据是约束最小的形式。它没有对采集频率,每周期记录数,维度组合密度有任何预先假设。比如 opentsdb 假设同周期内指定维度组合只有唯一的一个值,插入了两个不同值会怎么样?well,它报错……
测试数据的结构为:
[timestamp],[iResult],[vCmid],[vAppid],[totalCount],[dProcesssTime]
其中最后两列为值列,其余的都是维度。
每个doc对应时间序列的一行
所以按照测试数据来说,就会插入8380000个文档到mongodb里。
{ "sharded" : false, "primary" : "shard2_RS", "ns" : "wentao_test.sparse", "count" : 8.38534e+06, "size" : 2012533392.0000000000000000, "avgObjSize" : 240, "storageSize" : 2897301504.0000000000000000, "numExtents" : 21, "nindexes" : 1, "lastExtentSize" : 7.56662e+08, "paddingFactor" : 1.0000000000000000, "systemFlags" : 1, "userFlags" : 1, "totalIndexSize" : 2.72065e+08, "indexSizes" : { "_id_" : 2.72065e+08 }, "ok" : 1.0000000000000000, "$gleStats" : { "lastOpTime" : Timestamp(1429290120, 22), "electionId" : ObjectId("54c9f324adaa0bd054140fda") }}
值得关注的地方是平均的文档大小是240字节,也就是0.24k,非常非常的小。总的存储空间占用有2.9G之多。原因3显然是因为重复存储维度字段的值造成的。
每个doc对应一个维度组合的一天
这种表结构大概如下:
{ "_id" : "1428710400.wxa8fdfb5a9e7f64f6.10000.-502", "iResult" : "-502", "vCmdid" : "10000", "values" : [ {"t":1, "1": x, "2": y}, {"t":1, "1": x, "2": y}, .... {"t":1449, "1": x, "2": y}, ], "date" : 1.42871e+09, "timestamp" : 1.42878e+09, "vAppid" : "appid1"}
其中date是时间戳truncate到了天。然后dProcessTime里的每个key都是对应在一天内的第N个分钟。
这种按天打包的结构非常适合一次查询就要查一天的数据的需求。但是它的压缩效果很大程度上取决于维度组合的cardinality。如果维度里面有一些值cardinality很高,那么压缩之后仍然会有非常多的文档数。这对于需要查询的时候再去聚合的数据就非常不利了。但是对于查询的维度和存储的维度一一对应的情况,那么拉取一天的数据就只要读取一个文档,那么就会非常快了。
存储的效果并不是很好,因为文档数量仍然很多
{ "sharded" : false, "primary" : "shard2_RS", "ns" : "wentao_test.sparse_measurement", "count" : 285653, "size" : 4.69834e+08, "avgObjSize" : 1644, "storageSize" : 7.82356e+08, "numExtents" : 17, "nindexes" : 1, "lastExtentSize" : 2.07794e+08, "paddingFactor" : 1.0000000000000000, "systemFlags" : 1, "userFlags" : 1, "totalIndexSize" : 1.76029e+07, "indexSizes" : { "_id_" : 1.76029e+07 }, "ok" : 1.0000000000000000, "$gleStats" : { "lastOpTime" : Timestamp(1429230049, 5), "electionId" : ObjectId("54c9f324adaa0bd054140fda") }}
总的文档数是285653,平均大小只有1.6k,尺寸有470M之多。
转换代码如下:
pythonfor offset, batch in read_test_data_in_batches(): updates = collections.defaultdict(list) for doc in batch: seconds_in_day = doc['timestamp'] % 86400 date = doc['timestamp'] - seconds_in_day minute_index = seconds_in_day / 60 _id = '%s.%s.%s.%s' % (date, doc['vAppid'], doc['vCmdid'], doc['iResult']) if len(_id) > 256: continue updates[_id].append({'t':doc['timestamp'],'0':doc['dProcessTime'],'1':doc['totalCount']}) bulk = measurement_coll.initialize_unordered_bulk_op() new_ids = [] for doc in batch: seconds_in_day = doc['timestamp'] % 86400 date = doc['timestamp'] - seconds_in_day _id = '%s.%s.%s.%s' % (date, doc['vAppid'], doc['vCmdid'], doc['iResult']) if len(_id) > 256: continue if _id not in known_ids: known_ids.add(_id) new_ids.append(_id) bulk.find({'_id': _id}).upsert().update({'$set': { 'vAppid': doc['vAppid'], 'vCmdid': doc['vCmdid'], 'iResult': doc['iResult'], 'date': date, 'timestamp': doc['timestamp'] }}) for _id, set_fields in updates.iteritems(): bulk.find({'_id': _id}).update({"$push": {'values': {'$each': set_fields}}}) LOGGER.info(offset) try: res = bulk.execute() except pymongo.errors.BulkWriteError as bwe: import pprint pprint.pprint(bwe.details)
在这篇mongodb的官方博客()里提到了一种更精简的存储表结构:
{ timestamp_minute: ISODate(“2013-10-10T23:06:00.000Z”), num_samples: 58, total_samples: 108000000, type: “memory_used”, values: { 0: 999999, … 37: 1000000, 38: 1500000, … 59: 1800000 }}
这种格式对于无维度的时间序列是合适的。但是如果分维度存储,那么必然就牵涉到一个读取的时候按不同维度聚合的问题。而这种格式设计基本上无法在服务器端聚合的。而把数据拉出来在应用层作聚合就牵涉到大量IO,更加不可能快了。问题是,如果数据库只是能够把一天的数据存进去,然后可以原样取出来是不是意义不大呢?要求时间序列数据库作一些聚合是非常合理的要求吧。比如opentsdb就支持tag的功能,实际上就是分维度。
每个doc对应一段时间内的数据
这种表结构大概如下:
{ "_id" : ObjectId("5531b34c9469047155b3423b"), "count" : 41, "max_timestamp" : 1.42879e+09, "vAppid" : "appid1", "min_timestamp" : 1.42879e+09, "sum_totalCount" : 42, "sum_dProcessTime" : 468, "_" : [ { "c" : 4, "d" : 1.42879e+09, "1" : 69, "0" : 4, "_" : [ { "_" : [ { "d" : "16", "v" : [ { "1" : 41, "0" : 1 } ] }, { "d" : "10000", "v" : [ { "1" : 1, "0" : 1 }, { "1" : 1, "0" : 1 } ] }, { "d" : "18", "v" : [ { "1" : 26, "0" : 1 } ] } ], "d" : "0" } ] }, // .. many more rows ]}
这个表结构第一个利用的是时间序列的连续性。所以一段时间的数据打包存放在了一个mongodb的文档里。然后在文档上留下max_timestamp,min_timestamp两个字段用于快速过滤掉无须读取的文档。
第二个利用的特性是某些维度经常用于下钻查询,比如vAppid。如果appid1和appid2的数据放在同一个文档里,那么在查询appid1的时候,appid2的数据也会被读取到,从而拖慢了查询效率。这个clustering_fields的选择可以是空,也就是一段时间内的所有维度的数据都打包到了一起,反正查询的时候也没有特别突出的维度需要优化。
这两个设计基本上是模仿 mysql 的 clustering index,让索引值相同的数据彼此靠近的存放在同一个物理位置。因为mongodb没有clustering index的支持,但是其同一个文档内的数据是肯定物理存放在一起的。所以利用这个特性模仿了类似 clustering index的效果。同时因为按时间段打包了,文档的数据会非常的少,使用b tree索引可以很快的定位到所需的文档(特别是选择好了常用的下钻维度的情况)。如果文档数量多,b tree索引之后仍然会对应大量的文档id,用id去doc heap里查找对应的doc也是非常耗时的。对于 postgresql 之类的数据库,一般可以用按时间partition加上暴力的全partition扫描来避免这种b tree索引反而更慢了的尴尬,但是mongodb并没有partition的支持,除非我们在应用层作一天一个collection的分表操作。
第一个优化解决的是按时间,按维度索引的效率问题。接下来要解决的问题是存储过大的问题。我们前面看到区区800万行就用掉快3个G的磁盘。主要的磁盘是浪费在重复的维度字段的值的存储上了。次要的原因是维度名称本身也要占用存储空间。我们这里采用的是map嵌套的方式。对于某个维度,同样的值的记录会存在一个map的entry下。这样这个维度的这个值就不用反复重复了。为了最大话这种优化的效果,维度字段应该按照cardinality排序,也就是唯一值数量少的放在外层,唯一值数量多的嵌套在最内层。上面的 _.d
对应的 1.42879e+09 就是第一个维度(timestamp)的值。_._.d
对应的0就是第二个维度(iResult)的值。_._._.d
对应的10000对应的就是第三个维度(vCmdid)的值。嵌套的维度字段排序是 timestmap => iResult => vCmdid
。注意到除了d字段,还有c字段代表的是所有内嵌的记录的总count,0代表的是第一个值列的sum,1代表的是第二个值列的sum。注意到维度名并没有被存储到文档里,维度的信息是隐含在嵌套的层次里的。查询的时候需要根据额外存储的元数据知道不同的嵌套层次对应的是什么维度。同样值列的名称也是没有存储的,叶子节点的v就是最后的原始值列。
前面已经看到了,文档内还存储了一些统计信息。比如timestamp下存储了这个timestamp的count和所有值列的sum。这些统计值有助减少查询时候的计算量。同时嵌套存储还有助于在按条件过滤的情况下砍掉不需要递归查询的子文档数量。
分vAppid存放的结果如下:
{ "sharded" : true, "systemFlags" : 1, "userFlags" : 1, "ns" : "wentao_test.sparse_precomputed", "count" : 1278, "numExtents" : 15, "size" : 3.30918e+08, "storageSize" : 4.11038e+08, "totalIndexSize" : 130816, "indexSizes" : { "_id_" : 65408, "vAppid_hashed" : 65408 }, "avgObjSize" : 258934.0594679186178837, "nindexes" : 2, "nchunks" : 6, "ok" : 1.0000000000000000}
文档数只有1278个了,平均的尺寸是258k。总存储占用是411Mb。如果不按vAppid分,压缩效果会更好:
json{ "sharded" : false, "primary" : "shard2_RS", "ns" : "wentao_test.sparse_precomputed_no_appid", "count" : 39, "size" : 2.68435e+08, "avgObjSize" : 6.88294e+06, "storageSize" : 2.75997e+08, "numExtents" : 3, "nindexes" : 1, "lastExtentSize" : 1.58548e+08, "paddingFactor" : 1.0000000000000000, "systemFlags" : 1, "userFlags" : 1, "totalIndexSize" : 8176, "indexSizes" : { "_id_" : 8176 }, "ok" : 1.0000000000000000, "$gleStats" : { "lastOpTime" : Timestamp(1429319735, 3), "electionId" : ObjectId("54c9f324adaa0bd054140fda") }}
文档个数39个,平均文档大小6.9M,总存储占用275M。相比最初的3G磁盘占用,压缩效果非常明显。
如果允许丢弃掉原始的值,对于一个维度组合一个周期只保留一个聚合记录(这个其实是大部分的需求)。那么最后一层维度内就不需要内嵌v这个数组了,尺寸可以进一步降低。当然这种压缩是有损的,所以并不是公平的比较。因为一天一个文档的方式一般都会有同样的限制,所以在这里可以用于和另外一种表结构进行对比。
{ "sharded" : false, "primary" : "shard2_RS", "ns" : "wentao_test.sparse_precomputed_no_appid_no_val", "count" : 5, "size" : 5.45259e+07, "avgObjSize" : 1.09052e+07, "storageSize" : 2.01335e+08, "numExtents" : 2, "nindexes" : 1, "lastExtentSize" : 2.01327e+08, "paddingFactor" : 1.0000000000000000, "systemFlags" : 1, "userFlags" : 1, "totalIndexSize" : 8176, "indexSizes" : { "_id_" : 8176 }, "ok" : 1.0000000000000000, "$gleStats" : { "lastOpTime" : Timestamp(1429198926, 2), "electionId" : ObjectId("54c9f324adaa0bd054140fda") }}
压缩的结果是文档只有5个,平均大小是10M,总磁盘占用是55M左右。
转换的代码如下:
pythonjumbo_docs = {}# doc with same cluster field will be packed in one document so continous on physical layout# timestamp is always the first clustering field, as the nature of time series data isclustering_fields = [] # sorted from low cardinality to high to save sapcedimension_fields = ['timestamp', 'vAppid', 'iResult', 'vCmdid'] # precompute sum/count at those dimension levelsprecomputed_fields = set(['timestamp'])value_fields = ['totalCount', 'dProcessTime']store_raw_values = Truefor offset, batch in read_test_data_in_batches(): print(offset) for record in batch: clustering_key = tuple(record[f] for f in clustering_fields) jumbo_doc = jumbo_docs.get(clustering_key) if not jumbo_doc: jumbo_doc = { 'min_timestamp': record['timestamp'], 'max_timestamp': record['timestamp'], '_fast_lookup': {} } jumbo_doc['_all_levels'] = [jumbo_doc] for f in clustering_fields: jumbo_doc[f] = record[f] jumbo_docs[clustering_key] = jumbo_doc jumbo_doc['min_timestamp'] = min(jumbo_doc['min_timestamp'], record['timestamp']) jumbo_doc['max_timestamp'] = max(jumbo_doc['max_timestamp'], record['timestamp']) jumbo_doc['count'] = jumbo_doc.get('count', 0) + 1 for value_field in value_fields: jumbo_doc['sum_%s' % value_field] = jumbo_doc.get('sum_%s' % value_field, 0) + record[value_field] current_level = jumbo_doc for field in dimension_fields: next_levels = current_level.get('_') if next_levels is None: next_levels = [] current_level['_'] = next_levels dimension = record[field] next_level = current_level['_fast_lookup'].get(dimension) if not next_level: next_level = { 'd': dimension, '_fast_lookup': {} } if field in precomputed_fields: for i in range(len(value_fields)): next_level['%s' % i] = 0 jumbo_doc['_all_levels'].append(next_level) next_levels.append(next_level) current_level['_fast_lookup'][dimension] = next_level if field in precomputed_fields: next_level['c'] = next_level.get('c', 0) + 1 for i, value_field in enumerate(value_fields): next_level['%s' % i] += record[value_field] current_level = next_level if store_raw_values: # current_level is the last dimension now current_level['v'] = current_level.get('v', []) current_level['v'].append({str(i): record[f] for i, f in enumerate(value_fields)}) inserted_clustering_keys = [] for clustering_key, jumbo_doc in jumbo_docs.iteritems(): if len(jumbo_doc['_all_levels']) >= 10000 * 5: for level in jumbo_doc['_all_levels']: del level['_fast_lookup'] del jumbo_doc['_all_levels'] print('insert jumbo doc') sparse_precomputed_coll.insert(jumbo_doc) inserted_clustering_keys.append(clustering_key) for clustering_key in inserted_clustering_keys: del jumbo_docs[clustering_key]for jumbo_doc in jumbo_docs.values(): for level in jumbo_doc['_all_levels']: del level['_fast_lookup'] del jumbo_doc['_all_levels'] print('insert jumbo doc') sparse_precomputed_coll.insert(jumbo_doc)
第一个查询是统计出每个周期内的count
db.sparse.aggregate([ {$group: {_id: '$timestamp', 'count': {$sum: 1}}}])
得出的结果是这个格式的,每个周期一个count值。
[ { "_id" : 1.42879e+09, "count" : 2266.0000000000000000 }, ... { "_id" : 1.42878e+09, "count" : 6935.0000000000000000 }]
结果数据为272行,耗时大概是9.6秒。注意这个绝对值并没有意义,因为不同的硬件配置,不同的缓存设置,不同的sharding都会对这个结果产生影响。我们这里关注的是在同样配置的情况下,不同表结构对于查询时间的相对关系。
打包存储的数据,作同样的查询,需要些更复杂的聚合逻辑:
db.sparse_precomputed_no_appid.aggregate([ {$unwind: '$_'}, // timestamp {$group: {_id: '$_.d', count: {$sum: '$_.c'}}}])
这个查询耗时大概是7.1秒。可以看到打包存储之后数据量变少了,查询并没有变得特别快。上面的查询还使用了预先计算的字段。如果统计原始的数据,查询更复杂
db.sparse_precomputed_no_appid.aggregate([ {$unwind: '$_'}, // timestamp {$unwind: '$_._'}, // timestamp.vAppid {$unwind: '$_._._'}, // timestamp.vAppid.iResult {$unwind: '$_._._._'}, // timestamp.vAppid.iResult.vCmdid {$group: {_id: '$_.d', count: {$sum: {$size: '$_._._._.v'}}}} // size of the values array])
这个查询时间大概是9.1秒。
db.sparse_precomputed_no_appid.aggregate([ {$unwind: '$_'}, // timestamp {$unwind: '$_._'}, // timestamp.vAppid {$unwind: '$_._._'}, // timestamp.vAppid.iResult {$unwind: '$_._._._'}, // timestamp.vAppid.iResult.vCmdid {$group: {_id: '$_.d', count: {$sum: '$_._._._.c'}}} // size of the values array])
少做一个$size的操作要稍微快一些。大概是9秒。
结论是880万数据聚合,用这个格式并没有变得快很多,基本是一个数量级的。db.sparse_measurement.aggregate([ {$unwind: '$values'}, {$group: {_id: '$values.t', count:{$sum:'$values.1'}}}])
这个查询要10.2秒,比原始格式还要慢。说明一天一个doc的存放方式并不适合聚合查询。
第二个查询是分vCmdid统计出分周期的调用量(totalCount字段)
db.sparse.aggregate([ {$match: {vAppid: {$ne: ''}}}, {$group: {_id: { timestamp: '$timestamp', vCmdid: '$vCmdid' }, 'totalCount': {$sum: '$totalCount'}}}])
结果数据为13115行,这个查询需要21.4秒.
db.sparse_precomputed_no_appid.aggregate([ {$unwind: '$_'}, // timestamp {$unwind: '$_._'}, // timestamp.vAppid {$match: {'_._.d': {$ne: ''}}}, // vAppid != '' {$unwind: '$_._._'}, // timestamp.vAppid.iResult {$unwind: '$_._._._'}, // timestamp.vAppid.iResult.vCmdid {$unwind: '$_._._._.v'}, // timestamp.vAppid.iResult.vCmdid.values {$group: {_id: '$_.d', count: {$sum: '$_._._._.v.0'}}} // size of the values array])
这个查询需要18.4秒。稍微比一个数据点一行的原始表结构要快一些。
结论
测试做到这里,基本有一个结论了:
- 最原始的表结构,除了查询语法比较直观以外,全部是缺点。存储占用最大,要3个G。而且聚合查询效率也是最低的。
- 一天一个doc的结构,比较容易实现。存储要470M。但是缺点是几乎无法做服务器端的聚合,什么样的维度存进去就必须什么样的维度取出来。而且当维度组合比较多的时候,仍然会产生很多文档。比较适合的场景是无维度的数据,也就是传统的单一metric的时间序列。
- 一个时间段打包成一个文档的结构,实现比较复杂。存储是最省的,只要275M。而且查询效率还稍微比原始格式要快一些。 相对的效率搞清楚了,那么绝对的效率是否满足要求呢?这个就要看场景了,对于800万行聚合需要花9秒的效率,目测是有优化空间的,也肯定在数据库里谈不上快的。