Current state: Accepted
...
To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:
1. client calls import() to pass some file paths to Milvus proxy node
2. proxy node passes the file paths to root coordinator, then root coordinator passes to data coordinator
3. data coordinator node picks a data node or multiple data nodes (according to the sharding number) to parse files, each file can be parsed into a segment or multiple segments.
4. once a task is finished, data node report to data coordinator, and data coordinator report to root coordinator, the generated segments will be sent to index node to build index
5. the root coordinator will record a task list in Etcd, after the generated segments successfully build index, root coordinator marks the task as "completed"
1. SDK Interfaces
The python API declaration:
...
Code Block |
---|
service MilvusService { rpc Import(ImportRequest) returns (ImportResponse) {} rpc GetImportState(GetImportStateRequest) returns (GetImportStateResponse) {} } message ImportRequest { string collection_name = 1; // target collection string partition_name = 2; // target partition bool row_based = 3; // the file is row-based or column-based repeated string files = 4; // file paths to be imported repeated common.KeyValuePair options = 5; // import options, bucket, etc. } message ImportResponse { common.Status status = 1; repeated int64 tasks = 2; // id array of import tasks } message GetImportStateRequest { int64 task = 1; // id of an import task } enum ImportState { PendingImportPending = 0; Failed ImportFailed = 1; Parsing ImportDownloaded = 2; PersistedImportParsed = 3; IndexingImportPersisted = 4; CompletedImportCompleted = 5; } message GetImportStateResponse { common.Status status = 1; ImportState state = 2; // is this import task finished or not int64 row_count = 3; // if the task is finished, this value is how many rows are imported. if the task is not finished, this value is how many rows are parsed. return 0 if failed. repeated int64 id_list = 4; // auto generated ids if the primary key is autoid repeated common.KeyValuePair infos = 5; // more informations about the task, progress percent, file path, failed reason, etc. } |
...
The call chain of import worflow:
3. Rootcoord RPC interfaces
...
Code Block |
---|
service RootCoord { rpc Import(milvus.ImportRequest) returns (milvus.ImportResponse) {} rpc GetImportState(milvus.GetImportStateRequest) returns (milvus.GetImportStateResponse) {} rpc ReportImport(ImportResult) returns (common.Status) {} } message ImportResult { common.Status status = 1; repeated int64 segmentstask_id = 2; // id array of new sealed segments int64 row_count = 3; // id of the task common.ImportState state = 3; // howstate many rows are imported by thisof the task repeated common.KeyValuePairint64 infossegments = 4; // more informations about the task, file path, failed reason, etc. } |
...
// id array of new sealed segments
repeated int64 auto_ids = 5; // auto-generated ids for auto-id primary key
int64 row_count = 6; // how many rows are imported by this task
repeated common.KeyValuePair infos = 7; // more informations about the task, file path, failed reason, etc.
} |
4. Datacoord RPC interfaces
...
Code Block |
---|
service DataCoord { rpc Import(ImportTask) returns (common.StatusImportTaskResponse) {} } message ImportTask { common.Status status = 1; string collection_name = 2; // target collection string partition_name = 3; // target partition bool row_based = 4; partition bool row_based = 4; // the file is row-based or column-based int64 task_id = 5; // id of the file is row-based or column-basedtask repeated string files = 6; int64 task_id = 5; // file paths to be imported repeated common.KeyValuePair infos = 7; // idmore informations ofabout the task, bucket, etc. repeated} string filesmessage =ImportTaskResponse 6;{ common.Status status = 1; int64 datanode_id = 2; // file paths to be imported repeated common.KeyValuePair infos = 7; // morewhich informationsdatanode abouttakes thethis task, bucket, etc. } |
5. Datanode interfaces
The declaration of import API in datanode RPC:
...