Current state: Accepted
ISSUE: https://github.com/milvus-io/milvus/issues/15604
PRs:
Keywords: bulk load, import
Released: with Milvus 2.1
Authors:
Summary
Import data by a shortcut to get better performance compared with insert().
Motivation
Typically, it cost several hours to insert one billion entities with 128-dimensional vectors. Lots of time is wasted in two major areas: RPC transfer and Pulsar management.
We need a new interface to do bulk load without network bandwidth wasting and skip the Pulsar management. Brief requirements of the new interface:
- import data from JSON format files. (first stage)
- import data from Numpy format files. (first stage)
- copy a collection within one Milvus 2.0 service. (second stage)
- copy a collection from one Milvus 2.0 service to another. (second stage)
- import data from Milvus 1.x to Milvus 2.0 (third stage)
- parquet/faiss files (TBD)
...
Current state: Accepted
ISSUE: https://github.com/milvus-io/milvus/issues/15604
PRs:
Keywords: bulk load, import
Released: with Milvus 2.1
Authors:
Summary
Import data by a shortcut to get better performance compared with insert().
Motivation
- client calls insert() to transfer data to Milvus proxy node
- proxy split the data in do multiple parts according to sharding number of the collection
- proxy construct a message packet for each part, and send the message packets into Pulsar service
- data nodes pull the message packets from the Pulsar service, each data node pull a packet
- data nodes persist data into segment when flush() action is triggered
Typically, it cost several hours to insert one billion entities with 128-dimensional vectors. Lots of time is wasted in two major areas: network transmission and Pulsar management.
We can see there are at least three times network transmission in the process: 1) client => proxy 2) proxy => pulsar 3) pulsar => data node
We need a new interface to do bulk load without network bandwidth wasting and skip the Pulsar management. Brief requirements of the new interface:
- import data from JSON format files. (first stage)
- import data from Numpy format files. (first stage)
- copy a collection within one Milvus 2.0 service. (second stage)
- copy a collection from one Milvus 2.0 service to another. (second stage)
- import data from Milvus 1.x to Milvus 2.0 (third stage)
- parquet/faiss files (TBD)
To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:
1. client calls import() to pass some file paths to Milvus proxy node
2. proxy node passes the file paths to data coordinator
3. data coordinator pick a data node or multiple data node (according to the files count) to parse files, each file can be parsed to a segment or multiple segments.
Some points to consider:
- JSON format is flexible, ideally, the import API ought to parse user's JSON files without asking user to reformat the files according to a strict rule.
- Users can store scalar fields and vector fields in a JSON file, with row-based or column-based. The import() API can support both of them.
...
- Numpy file is a binary format, we only treat it as vector data. Each Numpy file represents a vector field.
- Transferring a large file from client to server proxy to datanode is time-consume work and occupies too much network bandwidth, we will ask users to upload data files to MinIO/S3 where the datanode can access directly. Let the datanode read and parse files from MinIO/S3.
- Users may store scalar fields and vector fields in different format files. For example, store scalar fields in JSON files and store vector fields in Numpy files.
- The parameter of import API is easy to expand in future.
SDK Interfaces
Based on the several points, we choose a JSON object as a parameter of python import() API, the API declaration will be like this:
def import(options)
The parameter "options" is a JSON object which has the following format:
Note: to improve usability, we also can declare an ORM class to wrap this json object.
Code Block |
---|
{ "data_source": { // required "type": "minio", // required, currently only support "minio"/"s3" "address": "localhost:9000", // optional, milvus server will use its minio setting if without this value "accesskey_id": "minioadmin", // optional, milvus server will use its minio setting if without this value "accesskey_secret": "minioadmin", // optional, milvus server will use its minio setting if without this value "use_ssl": false, // optional, milvus server will use its minio setting if without this value "bucket_name": "mybucket" // optional, milvus server will use its minio setting if without this value }, "internal_data": { // optional, external_data or internal_data. (external files include json, npy, etc. internal files are exported by milvus) "path": "xxx/xxx/xx", // relative path to the source storage where store the exported data "collections_mapping": { // optional, give a new name to collection during importing. "aaa": "bbb", "ccc": "ddd" } }, "external_data": { // optional, external_data or internal_data. (external files include json, npy, etc. internal files are exported by milvus) "target_collection": "xxx", // target collection name "files": [ // required { "file": xxxx/xx.json, // required, relative path under the storage source defined by DataSource, currently support json/npy "type": "row_based", // required, row_based or column_based "fields_mapping": { // optional, specify the target fields which should be imported. Milvus will import all fields if this list is empty. "table.rows.id": "uid", "table.rows.year": "year", "table.rows.vector": "vector", } }, { "file": xxxx/xx.json, // required, relative path under the storage source defined by DataSource, currently support json/npy "type": "column_based", // required, row_based or column_based "fields_mapping": { // optional, specify the target fields which should be imported. Milvus will import all fields if this list is empty. "table.columns.id": "uid", "table.columns.year": "year", "table.columns.vector": "vector", } }, { "file": xxxx/xx.npy, // required, relative path under the storage source defined by DataSource, currently support json/npy "type": "column_based", // required, row_based or column_based "fields_mapping": { // optional, specify the target fields which should be imported. Milvus will import all fields if this list is empty. "vector": "vector", } } ], "default_fields": { // optional, use default value to fill some fields "age": 0, "weight": 0.0 }, } } |
...
- "data_source": contains the address and login methods of MinIO/S3. If the address and login methods are not provided, Milvus will use its MinIO/S3 configurations.
- "internal_data": reserved field for collection clone and database clone, not available in the first stage. It requires another new API export().
- "external_data": for importing data from user's files. Tell datanode where to read the data files and how to parse them.
...
User has some JSON files store data with the row-based format: file_1.json, file_2.json.
Code Block { "data": { "rows": [ {"id": 1, "year": 2021, "vector": [1.0, 1.1, 1.2]}, {"id": 2, "year": 2022, "vector": [2.0, 2.1, 2.2]}, {"id": 3, "year": 2023, "vector": [3.0, 3.1, 3.2]} ] } }
The "options" will could be like this:
Code Block { "data_source": { "type": "Minio", "address": "localhost:9000", "accesskey_id": "minioadmin", "accesskey_secret": "minioadmin", "use_ssl": false, "bucket_name": "mybucket" }, "external_data": { "target_collection": "TEST", "files": [ { "file": file_1.json, "type": "row_based", "fields_mapping": { "data.rows.id": "uid", "data.rows.year": "year", "data.rows.vector": "embedding", } }, { "file": file_2.json, "type": "row_based", "fields_mapping": { "data.rows.id": "uid", "data.rows.year": "year", "data.rows.vector": "embedding", } } ] "default_fields": { "age": 0 }, } }
User has some JSON files store data with the column-based format: file_1.json, file_2.json.
Code Block { "table": { "columns": [ "id": [1, 2, 3], "year": [2021, 2022, 2023], "vector": [ [1.0, 1.1, 1.2], [2.0, 2.1, 2.2], [3.0, 3.1, 3.2] ] ] } }
The "options" will could be like this:
Code Block { "data_source": { "type": "Minio", "address": "localhost:9000", "accesskey_id": "minioadmin", "accesskey_secret": "minioadmin", "use_ssl": false, "bucket_name": "mybucket" }, "external_data": { "target_collection": "TEST", "files": [ { "file": file_1.json, "type": "column_based", "fields_mapping": { "data.columns.id": "uid", "data.columns.year": "year", "data.columns.vector": "embedding", } }, { "file": file_2.json, "type": "column_based", "fields_mapping": { "data.columns.id": "uid", "data.columns.year": "year", "data.columns.vector": "embedding", } } ] "default_fields": { "age": 0 }, } }
- User has some ha a JSON file store data with the column-based format: file_1.json, and a Numpy file store vectors data: file_2.npy
Note: for hybrid format files, we only allow inputting a pair of files to reduce the complexity.
The file_1.json:
...
The "options" will could be like this:
Code Block |
---|
{ "data_source": { "type": "Minio", "address": "localhost:9000", "accesskey_id": "minioadmin", "accesskey_secret": "minioadmin", "use_ssl": false, "bucket_name": "mybucket" }, "external_data": { "target_collection": "TEST", "files": [ { "file": file_1.json, "type": "column_based", "fields_mapping": { "data.columns.id": "uid", "data.columns.year": "year", "data.columns.age": "age", } }, { "file": file_2.npy, "type": "column_based", "fields_mapping": { "embedding": "embedding", } } ] } } |
The "options" for other SDK is not JSON object. A declaration for JAVA SDK would could be like this:
public class ImportParam {
private MinioDataSource data_source;
private List<DataFile> external_files;
}
RPC Interfaces
Internal machinery
...