Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Current state: Accepted

...

To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:

        1. client calls import() to pass some file paths to Milvus proxy node  

        2. proxy node passes the file paths to root coordinator, then root coordinator passes to data coordinator

        3. data coordinator node picks a data node or multiple data nodes (according to the sharding number) to parse files, each file can be parsed into a segment or multiple segments.

        4. once a task is finished, data node report to data coordinator, and data coordinator report to root coordinator, the generated segments will be sent to index node to build index

        5. the root coordinator will record a task list in Etcd, after the generated segments successfully build index, root coordinator marks the task as "completed"

1.  SDK Interfaces

The python API declaration:

def import(collection_name,  files, row_based, partition_name=None, options=None)

  • collection_name:  the target collection name  (required)
  • partition_name: target partition name  (optional)
  • row_based: a boolean to specify row-based or column-based
  • files: a list of files with row-based format or a dict of files with column-based format  format files (required)
    row-based files:   ["file_1.json", "file_2.json"]
    column-based files:  {"id": "id["file_1.json", "vectors": "embeddings.npy"}.npy"]

  • options: extra options in JSON format, for example: the MinIO/S3 bucket where the files come from  (optional)
    {"bucket": "mybucket"}
  • return a list of task ids

...

            Note: the "state" could be "pending", "started", "parsingdownloaded", "persistedparsed", "indexingpersisted", "completed", "failed"


Pre-defined format for import files

...

There are two ways to represent the collection with data files:

(1) Row-based data file, a JSON file contains multiple rows.

file_1.json:

Code Block
{
  "rows":[
    {"uid": 101, "vector": [1.1, 1.2, 1.3, 1.4]},
    {"uid": 102, "vector": [2.1, 2.2, 2.3, 2.4]},
    {"uid": 103, "vector": [3.1, 3.2, 3.3, 3.4]},
    {"uid": 104, "vector": [4.1, 4.2, 4.3, 4.4]},
    {"uid": 105, "vector": [5.1, 5.2, 5.3, 5.4]},
  ]
}

Call import() to import the file:

Code Block
import(collection_name="test", row_based = true, files=["file_1.json"])



(2) Column-based data file, each  a JSON file represents a column. We require the keyword "values" as a key of the field data.In this case, there are two fields, so we create 2 JSON files:
file_contains multiple columns.
file_1.json for the "uid" field and "vector" field:

Code Block
{
  "valuesuid": [101, 102, 103, 104, 105]
}

file_2.json for the "vector" field:

Code Block
{   "valuesvector": [[1.1, 1.2, 1.3, 1.4], [2.1, 2.2, 2.3, 2.4], [3.1, 3.2, 3.3, 3.4], [4.1, 4.2, 4.3, 4.4], [5.1, 5.2, 5.3, 5.4]]
}

...

Code Block
import(collection_name="test", row_based=false, files={"uid": ["file_1.json", "vector": "file_2.json"}])


We also support store vectors in a Numpy file, let we require the numpy file's name is equal to the filed name. Let's say the "vector" field is stored in file_2vector.npy, then we can call importthe "uid" field is stored in "file_1.json", then we can call import():

Code Block
import(collection_name="test", row_based=false, files={["uid": "file_1.json", "vector": "file_2.npy"}])

Error handling

The Import():

  • Return error "File list is empty" if the row-based files list is empty
  • For row-based files, all fields must be presented, otherwise, return the error "The field xxx is not provided"
  • For column-based files, each field must correspond to a file, otherwise, return the error "The field xxx is not provided"


The get_import_stateNote: for column-based, we don't support multiple json files, all columns should be stored in one json file. If user use a numpy file to store vector field, then other scalar fileds should be stored in one json file.


Error handling

The Import():

  • Return error "File xxx Collection doesn't exist" if could not open the file. 
  • The row count of each field must be equal, otherwise, return the error "Inconsistent row count between field xxx and xxx". (all segments generated by this file will be abandoned)
  • If a vector dimension doesn't equal to field schema, return the error "Incorrect vector dimension for field xxx". (all segments generated by this file will be abandoned)

2. Proxy RPC Interfaces

...

  • the target collection doesn't exist
  • Return error "Partition doesn't exist" if the target partition doesn't exist
  • Return error "Bucket doesn't exist" if the target bucket doesn't exist
  • Return error "File list is empty" if the files list is empty
  • ImportTask pending list has limit size, if a new import request exceed the limit size, return error "Import task queue max size is xxx, currently there are xx pending tasks. Not able to execute this request with x tasks."

The get_import_state():

  • Return error "File xxx doesn't exist" if could not open the file. 
  • All fields must be presented, otherwise, return the error "The field xxx is not provided"
  • For row-based json files, return "not a valid row-based json format, the key rows not found" if could not find the "rows" node
  • For column-based files, if a vector field is duplicated in numpy file and json file, return the error "The field xxx is duplicated
  • Return error "json parse error: xxxxx" if encounter illegal json format
  • The row count of each field must be equal, otherwise, return the error "Inconsistent row count between field xxx and xxx". (all segments generated by this file will be abandoned)
  • If a vector dimension doesn't equal to field schema, return the error "Incorrect vector dimension for field xxx". (all segments generated by this file will be abandoned)
  • If a data file size exceed 1GB, return error "Data file size must be less than 1GB"
  • If an import task is no response for more than 6 hours, it will be marked as failed
  • If datanode is crashed or restarted, the import task on it will be marked as failed

2. Proxy RPC Interfaces

Code Block
service MilvusService {
  rpc Import(ImportRequest) returns (ImportResponse) {}
  rpc GetImportState(GetImportStateRequest) returns (GetImportStateResponse) {}
}

message ImportRequest {
  string collection_name = 1;                // target collection
  string partition_name = 2;                 // target partition
  bool row_based = 3;                        // the file is row-based or column-based
  repeated string files = 4;                 // target collection file paths to be imported
  repeated string partition_namecommon.KeyValuePair options = 25;  // import options, bucket, etc.
 }

message ImportResponse {
  common.Status status //= target1;
partition  repeated boolint64 row_basedtasks = 32;  // id array of import tasks
}

message GetImportStateRequest {
  int64 task = 1;  // id of an //import thetask
file}
is
row-basedenum orImportState column-based{
  repeated string filesImportPending = 40;
    ImportFailed = 1;
    ImportDownloaded = 2;
  // file pathsImportParsed to= be3;
imported   repeated common.KeyValuePair optionsImportPersisted = 54;
  // import options,ImportCompleted bucket, etc.= 5;
}

message ImportResponseGetImportStateResponse {
  common.Status status = 1;
  repeatedImportState int64state tasks = 2;  // id array of import tasks }  message GetImportStateRequest {   int64 task = 1;  // idis ofthis an import task }finished or enumnot
ImportState { int64 row_count Pending = 0;3;               Failed = 1;   Parsing =// 2;if the task Persistedis =finished, 3;this value is Indexinghow =many 4;rows are imported. Completedif =the 5;task }is not messagefinished, GetImportStateResponsethis {value is how common.Statusmany statusrows =are 1;parsed. return 0 ImportStateif statefailed.
= 2; repeated int64 id_list = 4;              // isauto thisgenerated importids taskif finishedthe orprimary notkey is autoid
int64 row_count = 3;                     // if the task is finished, this value is how many rows are imported. if the task is not finished, this value is how many rows are parsed. return 0 if failed.
  repeated int64 id_list = 4;              // auto generated ids if the primary key is autoid
  repeated common.KeyValuePair infos = 5;  // more informations about the task, progress percent, file path, failed reason, etc.
}

3. Rootcoord RPC interfaces

The declaration of import API in rootcoord RPC:

Code Block
service RootCoord {
  rpc Import(milvus.ImportRequest) returns (milvus.ImportResponse) {}
  rpc GetImportState(milvus.GetImportStateRequest) returns (milvus.GetImportStateResponse) {}
  rpc ReportImport(ImportResult) returns (common.Status) {}
}

message ImportResult {
  common.Status status = 1;
  repeated int64 segments = 2repeated common.KeyValuePair infos = 5;  // more informations about the task, progress percent, file path, failed reason, etc.
}


3. Rootcoord RPC interfaces

The declaration of import API in rootcoord RPC:

Code Block
service RootCoord {
  rpc Import(milvus.ImportRequest) returns (milvus.ImportResponse) {}
  rpc GetImportState(milvus.GetImportStateRequest) returns (milvus.GetImportStateResponse) {}
  rpc ReportImport(ImportResult) returns (common.Status) {}
}

message ImportResult {
  common.Status status = 1;
  int64 task_id = 2;                       // id of the task
  common.ImportState state = 3;            // state of the task
  repeated int64 segments = 4;             // id array of new sealed segments
  repeated int64 auto_ids = 5;             // idauto-generated arrayids offor newauto-id sealedprimary segmentskey
  int64 row_count = 36;                     // how many rows are imported by this task
  repeated common.KeyValuePair infos = 47;  // more informations about the task, file path, failed reason, etc.
} etc.
}


The call chain of import worflow:


Image Added

4. Datacoord RPC interfaces

...

Code Block
service DataCoord {
  rpc Import(ImportTask) returns (common.StatusImportTaskResponse) {}
}

message ImportTask {
  common.Status status = 1;
  string collection_name = 2;                // target collection
  string partition_name = 3;                 // target partition
  bool row_based = 4;                        // the file is // target partitionrow-based or column-based
  boolint64 rowtask_basedid = 45;                         // id of the file is row-based or column-based task
  repeated string files = 6;     int64 task_id = 5;         // file paths to be imported
  repeated common.KeyValuePair infos = 7;    // more idinformations ofabout the task, bucket, etc.
repeated}
string
filesmessage =ImportTaskResponse 6;{
  common.Status status = 1;
  int64 datanode_id = 2;     // file paths to be imported   repeated common.KeyValuePair infos = 7;    // which datanode moretakes informationsthis about the task, bucket, etc.
}task
}


The relationship between ImportRequest and ImportTask:

For row-based request, the RootCoord splits the request into multiple ImportTask, each json file is a ImportTask.

For column-based request, all files will be regarded as one ImportTask.

Image Added

5. Datanode interfaces

The declaration of import API in datanode RPC:

...

To achieve this property, the segments shall be marked as "LoadingImporting" state and be invisible before the whole loading procedure completes.

...