Current state: Accepted
...
To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:
1. client calls import() to pass some file paths to Milvus proxy node
2. proxy node passes the file paths to data coordinator
3. data coordinator pick a data node or multiple data node (according to the files count) to parse files, each file can be parsed to a segment or multiple segments.
Some points to consider:
- JSON format is flexible, ideally, the import API ought to parse user's JSON files without asking user to reformat the files according to a strict rule.
- Users can store scalar fields and vector fields in a JSON file, with row-based or column-based. The import() API can support both of them.
...
Code Block |
---|
{ "data_source": { // required "type": "minio", // required, "minio" or "s3", case insensitive "address": "localhost:9000", // optional, milvus server will use its minio/s3 configuration if without this value "accesskey_id": "minioadmin", // optional, milvus server will use its minio/s3 configuration if without this value "accesskey_secret": "minioadmin", // optional, milvus server will use its minio/s3 configuration if without this value "use_ssl": false, // optional, milvus server will use its minio/s3 configuration if without this value "bucket_name": "aaa" // optional, milvus server will use its minio/s3 configuration if without this value }, "internal_data": { // optional, external_data or internal_data. (external files include json, npy, etc. internal files are exported by milvus) "path": "xxx/xxx/xx", // required, relative path to the source storage where store the exported data "collections_mapping": { // optional, give a new name to collection during importing "coll_a": "coll_b", // collection name mapping, key is the source collection name, value is a new collection name "coll_c": "coll_d" } }, "external_data": { // optional, external_data or internal_data. (external files include json, npy, etc. internal files are exported by milvus) "target_collection": "xxx", // required, target collection name "chunks": [{ // required, chunk list, each chunk can be import as one segment or split into multiple segments "files": [{ // required, files that provide data of a chunk "path": "xxxx / xx.json", // required, relative path under the storage source defined by DataSource, currently support json/npy "type": "row_based", // required, "row_based" or "column_based", tell milvus how to parse this json file, case insensitive "from": 0, // optional, import part of the file from a number "to": 1000, // optional, import part of the file end by a number "fields_mapping": { // optional, specify the target fields which should be imported. Milvus will import all fields if this list is empty "table.rows.id": "uid", // field name mapping, tell milvus how to insert data to correct field, key is a json node path, value is a field name of the collection. If the file is numpy format, the key is a field name of the collection same with value. "table.rows.year": "year", "table.rows.vector": "vector" } }] } ], "default_fields": { // optional, use default value to fill some fields "age": 0, // key is a field name, value is default value of this field, can be number or string "weight": 0.0 } } } |
...
Code Block |
---|
rpc Import(ImportRequest) returns (MutationResult) {} message ImportRequest { common.MsgBase base = 1; string options = 2; // the json options string } message MutationResult { common.Status status = 1; schema.IDs IDs = 2; // return auto-id for insert/import, deleted id for delete repeated uint32 succ_index = 3; // succeed indexes for insert/import repeated uint32 err_index = 4; // error indexes for insert/import bool acknowledged = 5; int64 insert_cnt = 6; // how many entities were inserted or imported int64 delete_cnt = 7; int64 upsert_cnt = 8; uint64 timestamp = 9; } |
...