Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Current state: Accepted

...

To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:

        1. client calls import() to pass some file paths to Milvus proxy node  

        2. proxy node passes the file paths to data coordinator node

        3. data coordinator node picks a data node or multiple data nodes (according to the sharding number) to parse files, each file can be parsed into a segment or multiple segments.

SDK Interfaces

The python API declaration:

...

Code Block
import(collection_name="test", files={"uid": "file_1.json", "vector": "file_2.npy"})

...


Proxy RPC Interfaces

Code Block
service MilvusService {
  rpc Import(ImportRequest) returns (ImportResponse) {}
  rpc GetImportState(GetImportStateRequest) returns (GetImportStateResponse) {}
}

message ImportRequest {
  string collection_name = 1;  // target collection
  string partition_name = 2;  // target partition
  bool rowBasedrow_based = 3;  // the file is row-based or column-based
  repeated string files = 4;  // file paths to be imported
  repeated common.KeyValuePair options = 5;  // import options
}

message ImportResponse {
  common.Status status = 1;
  repeated int64 taskIDstasks = 2; } // messageid GetImportStateRequestarray {of   int64 taskID = 1;import tasks
}

message GetImportStateResponseGetImportStateRequest {
  common.Statusint64 statustask = 1;   bool finished = 2;
  int64 rowCount = 3;
}

...

Code Block
service MilvusService {
  rpc Import(ImportRequest) returns (ImportResponse) {}// id of animport task
}

message ImportRequestGetImportStateResponse {
   common.MsgBaseStatus basestatus = 1;
   stringbool optionsfinished = 2;      // optionsis inthis JSONimport formattask }finished or not
message ImportResponse {
  common.Status status = 1;
  repeated schema.IDs IDs = 2;  int64 row_count = 3;  // auto-generatedif idsthe fortask succeedis chunksfinished, this value uint32is succ_indexhow =many 3;rows are imported. if the task is not finished, this //value numberis ofhow chunksmany thatrows successfullyare importedparsed.
}


Datacoord RPC interfaces

The declaration of import API in datacoord RPC:

Code Block
service DataCoord {
  rpc Import(milvuspbmilvus.ImportRequest) returns (milvuspbmilvus.ImportResponse) {}
  rpc GetImportState(milvus.GetImportStateRequest) returns (milvus.GetImportStateResponse) {}
  rpc CompleteImport(ImportResult) returns (common.Status) {}
}

message ImportResult {
  common.Status status = 1;
  schema.IDs IDsrepeated int64 segments = 2;    // id array of new sealed segments
  // auto-generated ids
  repeated int64 segmentsrow_count = 3; // how many //rows idare arrayimported ofby newthis sealed segmentstask
}


Datanode interfaces

The declaration of import API in datanode RPC:

...