Change Database connection for a table via Python

Hi Team,
I am currently working on auto-assignment of the DBConnection to the table all at once in the front end via the python code. But i am not getting any functions which can help me with that.
I am able to connect to the datamodel, fetch the table ,table data and change the table names too.
The only thing i am lacking is to change the DBConnection of everytable from python.
I could analyze from the data of the table fetched via python that each DBConnection has their own DBConnectionID , but i am unable to change that ID from python.
Please let me know if that change is possible, if so, how to do it.
Thanks in advance.

If someone is looking for the solution,its actually to use the

i.data[“store”][“dbConnectionId”]=target.data[“id”]

Where we find the target by looking through the results of target.database_connections (an instance of DatamodelTable ) to find a connection with the same name as the DBConnection used in the source table.

Thanks for sharing!

For what it’s worth, this is a function I build to transfer data models between 2 CPM (on-premise) instances, where I also try to re-connect tables in the datamodel to connections and tables in the data source.

Use at your own risk :slight_smile:

def datamodelMerger(cel_source, cel_target, source, target=None, folderToCreateTarget = "Zandbak"):
''' Merges two datamodels in-place

Parameters:
    cel_source: connection object to source Celonis environment
    cel_target: connection object to target Celonis environment
    source: source data model to transport/copy
    target: target datamodel to be replaced, DEFAULT None implies it will be created using the same name as the source datamodel
    folderToCreateTarget: if target=None this is the folder name which is searched for (i.e. needs to exist!) where the datamodel is created
    
Returns:
    Target datamodel instance, for possible futher use but can be ignored.
''' 

if target == None:
    #(try to) find the target datamodel
    info("We zoeken naar een target datamodel.")
    target = cli.askUserCelonisItemFilter(cel_target.datamodels, source.name, field="name")

    #if we & the user could not find a datamodel we will create one
    if target == None:
        target_folder = cli.selectFolder(cel_target)
        debug(f"target folder: {target_folder}")
        target = cel_target.create_datamodel(source.name, target_folder)
        debug(f"nieuw target data model aangemaakt: {target}")

#Now copy the contents over    
target.data = source.data #seems to work! too easy, but it really does work, partly...

#fix table connections                
DBIDsource_target_mapping = dict() #we keep a key-value dict to only check things once not twice
for table in target.tables:
    #find matching table in source
    #TODO handle file uploads, at least fail gracefully
    sourcetable = source.tables.find(table.name) #no complications expected here as we just synced source and target
            
    #find the DB connection in target which is the same as the one in source
    sourceDBID = int(sourcetable.data["store"]['dbConnectionId'])
    getLogger().debug(f"sourceDBID: {sourceDBID}")
    
    if sourceDBID not in DBIDsource_target_mapping:
        sourceDB = source.database_connections.find(sourceDBID) #the ID exists so we should always find 1
        getLogger().debug(f"sourceDB : {sourceDB} of type {type(sourceDB)}" )
        info("We zoeken nu naar de database verbinding om het datamodel aan te verbinden.")
        targetDBconn = cli.askUserCelonisItemFilter(target.database_connections, sourceDB.name, field="name")    
        if targetDBconn == None:
            getLogger().info(f"We vonden geen matchende database verbinding. We stoppen nu en raden je aan om de verbinding {sourceDB.name} zelf aan te maken op de doelomgeving.")
            return None #quit            
        
        #remember this mapping
        DBIDsource_target_mapping[sourceDBID] = targetDBconn
    else:
        #no complicated work, we know the desired target DB conn
        targetDBconn = DBIDsource_target_mapping[sourceDBID]
        getLogger().debug(f"Fetching DB conn. from mapping cache (source id {sourceDBID} maps to {targetDBconn}")

    table.data["store"]["dbConnectionId"] = targetDBconn.data["id"]
    #end for table
    
#TODO controleer of load schema (als ingesteld) ook niet overschreven wordt, BRD wil namelijk NIET dat load schema bijgewerkt wordt omdat dit in ACC uit staat maar in PRD aan moet blijven
#TODO geef link naar datamodel editor om load schema makkelijk te kunnen bewerken (misschien via Conf meteen openen?)
#TODO bij meerdere dashboards alle wel/niet bijladen of individueel kiezen
#TODO voor reload ook column syncs uitvoeren? gaat nu soms fout en hele script knalt er dan uit

info("Datamodel is inhoudelijk overgezet en tabellen zijn opnieuw verbonden aan de data connectie.")
info("Let wel op dat het laadschema *niet* is overgezet en mogelijk verloren is gegaan.")
info(f"URL: {target.url}")
if cli.askUserConf("Wil je het datamodel nu bijladen?"):
    getLogger().info(f"Reload van datamodel {target.id}:{target.name} gestart op {getNowAsString()}.")
    #TODO try/catch in case of load error.
    target.reload(from_cache=False, wait_for_reload=True)
    getLogger().info(f"Reload van datamodel {target.id}:{target.name} voltooid op {getNowAsString()}.")


return target

Thank you for the reply :slight_smile:

I have used your explanation in the above answer(for the people who are looking for this hint exactly) because thats what gave me the idea when i was going through your “DEV to PROD” topic,to change that is required and change the DBconnection and with one step further, the source table names corresponding to that DBConnection and syncronize the columns in a single shot for all the tables removing a lot of manual work .
And Ofcourse thank you for the code :slight_smile: this will help a lot
Thank you.