Current state: Accepted
...
To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:
1. client calls import() to pass some file paths to Milvus proxy node
2. proxy node passes the file paths to data coordinator node
3. data coordinator node picks a data node or multiple data nodes (according to the sharding number) to parse files, each file can be parsed into a segment or multiple segments.
SDK Interfaces
The python API declaration:
...
Code Block |
---|
import(collection_name="test", files={"uid": "file_1.json", "vector": "file_2.npy"}) |
...
Proxy RPC Interfaces
Code Block |
---|
service MilvusService { rpc Import(ImportRequest) returns (ImportResponse) {} rpc GetImportState(GetImportStateRequest) returns (GetImportStateResponse) {} } message ImportRequest { string collection_name = 1; // target collection string partition_name = 2; // target partition bool rowBasedrow_based = 3; // the file is row-based or column-based repeated string files = 4; // file paths to be imported repeated common.KeyValuePair options = 5; // import options } message ImportResponse { common.Status status = 1; repeated int64 taskIDstasks = 2; } // messageid GetImportStateRequestarray {of int64 taskID = 1;import tasks } message GetImportStateResponseGetImportStateRequest { common.Statusint64 statustask = 1; bool finished = 2; int64 rowCount = 3; } |
...
Code Block |
---|
service MilvusService { rpc Import(ImportRequest) returns (ImportResponse) {}// id of animport task } message ImportRequestGetImportStateResponse { common.MsgBaseStatus basestatus = 1; stringbool optionsfinished = 2; // optionsis inthis JSONimport formattask }finished or not message ImportResponse { common.Status status = 1; repeated schema.IDs IDs = 2; int64 row_count = 3; // auto-generatedif idsthe fortask succeedis chunksfinished, this value uint32is succ_indexhow =many 3;rows are imported. if the task is not finished, this //value numberis ofhow chunksmany thatrows successfullyare importedparsed. } |
Datacoord RPC interfaces
The declaration of import API in datacoord RPC:
Code Block |
---|
service DataCoord { rpc Import(milvuspbmilvus.ImportRequest) returns (milvuspbmilvus.ImportResponse) {} rpc GetImportState(milvus.GetImportStateRequest) returns (milvus.GetImportStateResponse) {} rpc CompleteImport(ImportResult) returns (common.Status) {} } message ImportResult { common.Status status = 1; schema.IDs IDsrepeated int64 segments = 2; // id array of new sealed segments // auto-generated ids repeated int64 segmentsrow_count = 3; // how many //rows idare arrayimported ofby newthis sealed segmentstask } |
Datanode interfaces
The declaration of import API in datanode RPC:
...