Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
from pymilvus_orm import connections, Collection, FieldSchema, CollectionSchema, DataType
>>> import random
>>> schema = CollectionSchema([
... FieldSchema("film_name", DataType.String, is_primary=True),
... FieldSchema("films", dtype=DataType.FLOAT_VECTOR, dim=2)
... ])
>>> collection = Collection("film_collection", schema)
>>> # insert
>>> data = [
... ["film_%d"+str(i) for i in range(10)],
... [[random.random() for _ in range(2)] for _ in range(10)],
... ]
>>> collection.insert(data)
>>> # search
>>> res = collection.search(data=[1.0,1.0], 
anns_field="films",
param = {"metric_type":"L2"},
limit=2,
expr = "film_name != 'film_1'")

Design Details

Before introducing the design scheme, let's briefly review the data flow of the milvus2.0 system.

DataFlow And DataModel

In milvus2.0, MessageStroge is the backbone of the entire system. Milvus 2.0 implements the unified Lambda architecture, which integrates the processing of the incremental and historical data. Milvus 2.0 introduces log backfill, which stores log snapshots and indexes in the object storage to improve failure recovery efficiency and query performance.

Image Modified

Incremental data flows into MessageStorage through the AccessLayer(Proxy), and nodes such as QueryNode and DataNode consume data from MessageStorage. For incremental data, DataNode persists the data to ObjectStorage in units of Segments to form Historical data. The ObjectStorage layer mainly stores historical data, including Log Snapshot, DeltaFile, and IndexFile. QueryNode can also load historical data from ObjectStorage. Index Node reads historical data from Object Storage and builds an index, and writes the index file back to Object Storage.

The data model of milvus2.0 mainly includes Collection, Partition, and Segment. Collection can be logically divided into multiple Partitions, for example, we can divide different Partitions according to date. The collection is physically composed of multiple Segments. A Segment contains multiple Entities, and an Entity is equivalent to row data in a traditional database. An Entity contains multiple Field data. A Field is equivalent to a Column in a traditional database. These Fields include those specified by the Schema when the user creates the Collection, and some hidden Fields added by the system, such as timestamps.

For OLAP purposes, it is best to store information in a columnar format. Columnar storage lets you ignore all the data that doesn’t apply to a particular query because you can retrieve the information from just the columns you want. An added advantage is that, since each Field holds the same type of data, Field data can use a compression scheme selected specifically for the field data type.

In Milvus, the basic unit of reading and writing is the Field of the Segment. The basic module Storage encapsulates the reading and writing, encoding, and decoding of Field in the Object Storage. Therefore, the Storage module needs to support the String type Field.


DataNode's processing of String Field

DataNode's processing of String Field is the same as that of other Fields.

As mentioned earlier, segment data is stored by Fields. a Field is stored in multiple batches of small files. These small files and the data in it are arranged by the insertion order.

For example, Collection C1 has a field named field1. And a Segment of c1 is stored as m files. And the order of file names matters.


Code Block
languagetext
collection_id/partition_id/segment_id/field1/log_1
collection_id/partition_id/segment_id/field1/log_2
...
collection_id/partition_id/segment_id/field1/log_m


The Storage module of the system needs to provide support for String type data. In Milvus, we choose the Parquet format for String data storage.

QueryNode's processing of String Field

Query and Search are two different operations, although their parameters are very similar. For the convenience of description, we use Search to represent these two operations in the following text, unless otherwise specified.

For QueryNode, the Query requires Historical and Streaming data. Historical data can be considered Immutable, and Streaming data, as it is continuously consumed from MessageStorage, is constantly added. Therefore, in order to optimize the Search, different designs need to be adopted for the two data sources.

First, let's introduce the general processing flow of QueryNode to the Search requests.

QueryNode first determines the set of segments involved with the Search request for a Collection. The search operation is performed on each segment to obtain the sub-results, and then all the sub-results are merged into the final result. Therefore, Segment is the basic processing unit of Search operations. We need to focus on the search operation in the Segment.


Image Added

For segment hybrid search, a bitmask needs to be generated in advance. The size of the bitmask is the same as the number of Entities of the segment, and it corresponds to each Entity in the segment by position. For example, bistmask[i] corresponds to the i-th Entity in Segment. Bitmast[i] marks whether the corresponding i-th Entity should be filtered out. When bitmask[i] is 0, it means that the i-th Entity in the segment should be filtered out.

The bitmask roughly needs to go through the following 3 processing steps. First, a bitmask is generated through the query expression, then the bitmask is modified through the delete log in the DeltaFile, and finally, the bitmask is modified again according to the time-travel parameter and the timestamp field.

The final bitmask, together with the vector field and the target vector, participate in the approximate query and returns an array containing the positions of the Entities that meet the search conditions. For convenience, we named this array SegmentOffsets.

When the limit parameter of the Search is K, it means that only TopK results are needed. At this time, the length of SegmentOffsets is K. When doing a hybrid search, you can also specify output_field to retrieve the data of the corresponding Entity's Field.

It can be seen that segment offsets play a key role in segment processing. We need to calculate the segment offsets based on the approximate search on the vector and the filtering of the expression; we also need to extract the data of each Field in the corresponding entity of the Segment based on the offset.

We abstract the Stringfield Interface.



Code Block
languagecpp
type StringField inteface { 
extract(segmentOffsets []int32) []string
serialize() []bytes
deserialize([]bytes)
}
func Filter(expression string, field StringField) sgementOffsets []int32