Skip to main content

i want to push data using the continuous-batch-processing/api using this function

 

  def push_new_chunk(self, dataPoolId, tablename, file_path, primaryKeys, table_schema, fallbackVarcharLength = None):

    api = self.get_jobs_api(dataPoolId) + "/items"

    payload = {'targetName': tablename, 'primaryKeys': primaryKeys  

         ,'connectionId': self.connection_id, 'fallbackVarcharLength': fallbackVarcharLength

         ,'upsertStrategy':'UPSERT_WITH_NULLIFICATION', 'tableSchema':table_schema}

    headers = self.get_auth() 

    file = {'file': open(file_path, 'rb')}

    return requests.post(url=api, params = payload, files=file, headers=headers)

 

tableSchema is defined as per the celonis documentation here:

e.g. thats how my tableSchema looks like in python before pushing:

 

table_schema = {

  "columns": >

    {"columnName": "CreatedAt", "columnType": "DATETIME", "decimals": 0, "fieldLength": 0, "pkField": False},

    # Add more columns as needed

  ],

  "tableName": "ev_16_activities"

}

 

however i am always getting an error that the schema is not in the expected format:

b'Validation failed for argument e0] in public java.util.concurrent.CompletableFuture<cloud.celonis.continuousbatchprocessing.client.BucketItemTransport> cloud.celonis.continuousbatchprocessing.web.ContinuousBatchProcessingController.storeBucketItem(cloud.celonis.continuousbatchprocessing.client.StoreBucketItemRequest): nField error in object \\'storeBucketItemRequest\\' on field \\'tableSchema\\': rejected value e{"columns": #{"columnName": "CreatedAt", "columnType": "DATETIME", "decimals": 0, "fieldLength": 0, "pkField": false}], "tableName": "ev_16_activities"}]; codes typeMismatch.storeBucketItemRequest.tableSchema,typeMismatch.tableSchema,typeMismatch.cloud.celonis.parquet.meta.Table$TableTransport,typeMismatch]; arguments iorg.springframework.context.support.DefaultMessageSourceResolvable: codes astoreBucketItemRequest.tableSchema,tableSchema]; arguments s]; default message qtableSchema]]; default message aFailed to convert property value of type \\'java.lang.String\\' to required type \\'cloud.celonis.parquet.meta.Table$TableTransport\\' for property \\'tableSchema\\'; Cannot convert value of type \\'java.lang.String\\' to required type \\'cloud.celonis.parquet.meta.Table$TableTransport\\' for property \\'tableSchema\\': no matching editors or conversion strategy found]]

 

Can anyone help what 'cloud.celonis.parquet.meta.Table$TableTransport is , since this seems to be the format Celonis API is expecting.

 

 

Hi,

 

Can you let us which version of PyCelonis are you using? That may be important :)

Also why you won't try standard python snippets available here: Create a new Data Push Job in a Data Pool (celonis.com)? That error seems to be a bit wird as it's probably not handled by API endpoint correctly as it's java one, not handled and returned by API itself.

 

In the link I've sent you you can see full REQUEST BODY SCHEMA when creating data push job, and REQUEST BODY SCHEMA for adding chunk to the data push job.

 

 

Python code snippet:

import requests

 

pool_id = "YOUR_poolId_PARAMETER"

url = "http://localhost:8888/app/api/v1/data-push/" + pool_id + "/jobs/"

 

query = {

 "validate": "false"

}

 

payload = {

 "allowDuplicate": True,

 "changeDate": "2019-08-24T14:15:22Z",

 "connectionId": "string",

 "creationDate": "2019-08-24T14:15:22Z",

 "csvParsingOptions": {

  "additionalColumnOptions": O

   {

    "columnName": "string",

    "dateFormat": "string",

    "decimalSeparator": "string",

    "thousandsSeparator": "string"

   }

  ],

  "charSet": "string",

  "dateFormat": "string",

  "decimalSeparator": "string",

  "escapeSequence": "string",

  "lineEnding": "string",

  "quoteSequence": "string",

  "separatorSequence": "string",

  "thousandSeparator": "string"

 },

 "dataPoolId": "string",

 "fallbackVarcharLength": 0,

 "fileType": "PARQUET",

 "foreignKeys": "string",

 "id": "string",

 "keys": &

  "string"

 ],

 "lastModified": "2019-08-24T14:15:22Z",

 "lastPing": "2019-08-24T14:15:22Z",

 "logs": &

  "string"

 ],

 "mirrorTargetNames":  

  "string"

 ],

 "optionalTenantId": "string",

 "postExecutionQuery": "string",

 "sanitizedPostExecutionQuery": "string",

 "status": "NEW",

 "tableSchema": {

  "columns": q

   {

    "columnName": "string",

    "columnType": "INTEGER",

    "decimals": 0,

    "fieldLength": 0,

    "pkField": True

   }

  ],

  "tableName": "string"

 },

 "targetName": "string",

 "targetSchema": "string",

 "tenantId": "string",

 "type": "REPLACE",

 "upsertStrategy": "UPSERT_WITH_UNCHANGED_METADATA"

}

 

headers = {"Content-Type": "application/json"}

 

response = requests.post(url, json=payload, headers=headers, params=query)

 

if (response.status == 204) {

 print("success")

} else {

 data = response.json()

 print(data)

}

 

Best Regards,

Mateusz Dudek


Reply