Quicksight dataset with API table map

I have a question about the Quicksight API. When I create a data set i need to assign a PhysicalTableMap. I have noticed that when I create a data set in the QS GUI this table map is automatically generated from e.g. and RDS data source. When I do this programatically I have to define all the columns with its type.

PhysicalTableMap/CustomSql/Colums

Is there a way to generate this when using API?

Hi @Jesper_Almstrom - If you are planning to create data sets using QuickSight API ( using boto3 libraries) , yes you have to give Physical table map and logical table map as input ( jsons). You have to create them manually ( you can get the reference details - create-data-set — AWS CLI 1.27.104 Command Reference).

However you can develop a custom python wrapper which will take input of your table and column details and generate these jsons.

Regards - San

2 Likes

@Sanjeeb2022 Do you know of, or have an example of such a python-wrapper?

1 Like

Hi @Jesper_Almstrom - Please find a sample code below.

Our requirement is to create separate datasets for different source systems and we put the names like folder and data set name and table names, columns names in a glossary.

our source is RDS my sql, so depending upon source system you can extract the data source arn and change the details in the below code.

#!/usr/bin/env python
# coding: utf-8

import pandas as pd
import json
import sys


source=sys.argv[1]

if source=='AAA':
    sourceid=str(1)+'-'
elif source=='BBB':
    sourceid=str(2)+'-'
elif source=='CCC':
    sourceid=str(3)+'-'



data=pd.read_excel('Glossary.xlsx',sheet_name='SubjectAreas')
logi=pd.read_excel('Glossary.xlsx',sheet_name='table ID')

def generate_physical():

    #Create Physical JSON 
    phydict={}

    #Creating dictionary with key as table_id
    for i in logi.loc[:,'table_id'].unique():
        phydict[sourceid+i]={}

    #Create the nested physical JSON
    for k in phydict.keys():
        if source=='PHG':
            phydict[k]={'CustomSql':{'DataSourceArn':'a','Name':'b','SqlQuery':'c','Columns':[]}}
        else:
            phydict[k]={'CustomSql':{'DataSourceArn':'a','Name':'b','SqlQuery':'c','Columns':[]}}

    i=0
    for k in phydict.keys():
        phydict[k]['CustomSql']['DataSourceArn']='arn:aws:quicksight:<<region>>:<<account id>>:datasource/<<datasource arn>>'
        phydict[k]['CustomSql']['Name']=logi.loc[i,'Table Name']
        cols=''
        cols=cols.join([str(x)+', ' for x in data[(data['Table Name']==logi.loc[i,'Table Name']) & (data['Column Required Flag']=='Y')]['Column Name']])
        cols=cols[:-2]
    
        if sourceid==str(1)+'-':
            phydict[k]['CustomSql']['SqlQuery']="select "+ cols +" from "+logi.loc[i,'Table Name']+" where sys_source_id in ('1',3) "
        elif sourceid==str(2)+'-':
            phydict[k]['CustomSql']['SqlQuery']="select "+ cols +" from "+logi.loc[i,'Table Name']+" where sys_source_id in ('2') "
        elif sourceid==str(3)+'-':
            phydict[k]['CustomSql']['SqlQuery']="select "+ cols +" from "+logi.loc[i,'Table Name']




        cols=''
        dm=data[data['Table Name']==logi.loc[i,'Table Name']]['Column Name'].tolist()
        dt=data[data['Table Name']==logi.loc[i,'Table Name']]['Attribute Datatype'].tolist()
        for j in range(len(dm)):
            if sourceid==str(3)+'-':
                phydict[k]['CustomSql']['Columns'].append({"Name":dm[j],"Type":dt[j]})
            else:
                phydict[k]['CustomSql']['Columns'].append({"Name":dm[j],"Type":dt[j]})
        i+=1

    print(phydict)

    with open('physical.json','w') as f:
        f.write(json.dumps(phydict,indent=4))

    return phydict

def generate_phy_files():
    with open('physical.json','r') as f:
        phy=f.read()

    phy=json.loads(phy)

    for i in phy.keys():
        print({i:phy[i]})
        with open(source+'_p_'+phy[i]['CustomSql']['Name']+'.json','w') as g:
            g.write(json.dumps({i:phy[i]},indent=4))

def generate_logical():

    #Create Logical JSON
    logidict={}

    #Creating dictionary with key as table_id
    for i in logi.loc[:,'table_id'].unique():
        logidict[sourceid+i]={}

    #Create the nested physical JSON
    for k in logidict.keys():
        logidict[k]={'Alias':'a','DataTransforms':[]}

    i=0
    for k in logidict.keys():
        print(k)
        logidict[k]['Alias']=logi.loc[i,'Table Name']
        dm=data[(data['Table Name']==logi.loc[i,'Table Name']) & (data['Column Required Flag']=='Y')]['Column Name'].tolist()
        dt=data[(data['Table Name']==logi.loc[i,'Table Name']) & (data['Column Required Flag']=='Y')]['Data Item'].tolist()
        dd=data[(data['Table Name']==logi.loc[i,'Table Name']) & (data['Column Required Flag']=='Y')]['Data Item Description'].tolist()
        for j in range(len(dm)):
            print(dm[j])
            logidict[k]['DataTransforms'].append({'RenameColumnOperation':{}})
            logidict[k]['DataTransforms'][j]['RenameColumnOperation']={"ColumnName":dm[j],"NewColumnName":dt[j]}
        for j in range(len(dm)):
            logidict[k]['DataTransforms'].append({'TagColumnOperation':{'ColumnName':dt[j],'Tags':[]}})
            print(logidict[k]['DataTransforms'][j+len(dm)])
            logidict[k]['DataTransforms'][j+len(dm)]['TagColumnOperation']['Tags'].append({"ColumnDescription": {"Text": dd[j]}})
        logidict[k]['DataTransforms'].append({'ProjectOperation':{'ProjectedColumns':dt}})
        logidict[k]['Source']={"PhysicalTableId": k}
        i+=1

    # print(logidict)

    with open('logical.json','w') as g:
        g.write(json.dumps(logidict,indent=4))

    return logidict


def generate_logi_files():
    with open('logical.json','r') as f:
        log=f.read()

    log=json.loads(log)

    for i in log.keys():
        print({i:log[i]})
        with open(source+'_l_'+log[i]['Alias']+'.json','w') as g:
            g.write(json.dumps({i:log[i]},indent=4))

def subject_area():

    #Create the Subject Area JSON
    subject={}
    for i in data['Subject Area'].unique():
        tables=data[data['Subject Area']==i]['Table Name'].unique().tolist()
        subject[i]=tables

    print(subject)

    with open('subject_area.json','w') as g:
            g.write(json.dumps({"subject_area_table_details":subject},indent=4))

def table_name():
    #Create the Table Name JSON
    tbls=data['Table Name'].unique()
    name={}
    for j in tbls:
        names=data[data['Table Name']==j]['Entity Name'].unique()[0]
        name[j]=names

    print(name)

    with open('table_name.json','w') as g:
            g.write(json.dumps({"data_set_name":name},indent=4))

def generate_file_name():
    #Create the File Names JSON of Physical and Logical
    tbls=data['Table Name'].unique()
    physs={}
    logiss={}
    for j in tbls:
        physs[j]=source+'_p_'+j+'.json'
        logiss[j]=source+'_l_'+j+'.json'

    print(physs)

    with open('physical_filename.json','w') as g:
            g.write(json.dumps({"physical_table_map_file":physs},indent=4))

    print(logiss)

    with open('logical_filename.json','w') as g:
            g.write(json.dumps({"logical_table_map_file":physs},indent=4))


if __name__=="__main__":
    
    physical_json=generate_physical()
    logical_json=generate_logical()
    generate_phy_files()
    generate_logi_files()
    subject_area()
    table_name()
    generate_file_name()

Sample business glossary is an excel sheet having 2 tabs.

Tab -1 - Name is SubjectAreas and have the below details.

Tab-2 - Name is table ID and sample is :slight_smile:

Just have a look with all those details and you can develop your own logic from the above samples. Sorry the exact excel sheet i am not able to share as it has some confidential details but the logical and physical table map file creation logic, i shared with you and it is working for us.

Regards - San

1 Like