Hi @Jesper_Almstrom - Please find a sample code below.
Our requirement is to create separate datasets for different source systems and we put the names like folder and data set name and table names, columns names in a glossary.
our source is RDS my sql, so depending upon source system you can extract the data source arn and change the details in the below code.
#!/usr/bin/env python
# coding: utf-8
import pandas as pd
import json
import sys
source=sys.argv[1]
if source=='AAA':
sourceid=str(1)+'-'
elif source=='BBB':
sourceid=str(2)+'-'
elif source=='CCC':
sourceid=str(3)+'-'
data=pd.read_excel('Glossary.xlsx',sheet_name='SubjectAreas')
logi=pd.read_excel('Glossary.xlsx',sheet_name='table ID')
def generate_physical():
#Create Physical JSON
phydict={}
#Creating dictionary with key as table_id
for i in logi.loc[:,'table_id'].unique():
phydict[sourceid+i]={}
#Create the nested physical JSON
for k in phydict.keys():
if source=='PHG':
phydict[k]={'CustomSql':{'DataSourceArn':'a','Name':'b','SqlQuery':'c','Columns':[]}}
else:
phydict[k]={'CustomSql':{'DataSourceArn':'a','Name':'b','SqlQuery':'c','Columns':[]}}
i=0
for k in phydict.keys():
phydict[k]['CustomSql']['DataSourceArn']='arn:aws:quicksight:<<region>>:<<account id>>:datasource/<<datasource arn>>'
phydict[k]['CustomSql']['Name']=logi.loc[i,'Table Name']
cols=''
cols=cols.join([str(x)+', ' for x in data[(data['Table Name']==logi.loc[i,'Table Name']) & (data['Column Required Flag']=='Y')]['Column Name']])
cols=cols[:-2]
if sourceid==str(1)+'-':
phydict[k]['CustomSql']['SqlQuery']="select "+ cols +" from "+logi.loc[i,'Table Name']+" where sys_source_id in ('1',3) "
elif sourceid==str(2)+'-':
phydict[k]['CustomSql']['SqlQuery']="select "+ cols +" from "+logi.loc[i,'Table Name']+" where sys_source_id in ('2') "
elif sourceid==str(3)+'-':
phydict[k]['CustomSql']['SqlQuery']="select "+ cols +" from "+logi.loc[i,'Table Name']
cols=''
dm=data[data['Table Name']==logi.loc[i,'Table Name']]['Column Name'].tolist()
dt=data[data['Table Name']==logi.loc[i,'Table Name']]['Attribute Datatype'].tolist()
for j in range(len(dm)):
if sourceid==str(3)+'-':
phydict[k]['CustomSql']['Columns'].append({"Name":dm[j],"Type":dt[j]})
else:
phydict[k]['CustomSql']['Columns'].append({"Name":dm[j],"Type":dt[j]})
i+=1
print(phydict)
with open('physical.json','w') as f:
f.write(json.dumps(phydict,indent=4))
return phydict
def generate_phy_files():
with open('physical.json','r') as f:
phy=f.read()
phy=json.loads(phy)
for i in phy.keys():
print({i:phy[i]})
with open(source+'_p_'+phy[i]['CustomSql']['Name']+'.json','w') as g:
g.write(json.dumps({i:phy[i]},indent=4))
def generate_logical():
#Create Logical JSON
logidict={}
#Creating dictionary with key as table_id
for i in logi.loc[:,'table_id'].unique():
logidict[sourceid+i]={}
#Create the nested physical JSON
for k in logidict.keys():
logidict[k]={'Alias':'a','DataTransforms':[]}
i=0
for k in logidict.keys():
print(k)
logidict[k]['Alias']=logi.loc[i,'Table Name']
dm=data[(data['Table Name']==logi.loc[i,'Table Name']) & (data['Column Required Flag']=='Y')]['Column Name'].tolist()
dt=data[(data['Table Name']==logi.loc[i,'Table Name']) & (data['Column Required Flag']=='Y')]['Data Item'].tolist()
dd=data[(data['Table Name']==logi.loc[i,'Table Name']) & (data['Column Required Flag']=='Y')]['Data Item Description'].tolist()
for j in range(len(dm)):
print(dm[j])
logidict[k]['DataTransforms'].append({'RenameColumnOperation':{}})
logidict[k]['DataTransforms'][j]['RenameColumnOperation']={"ColumnName":dm[j],"NewColumnName":dt[j]}
for j in range(len(dm)):
logidict[k]['DataTransforms'].append({'TagColumnOperation':{'ColumnName':dt[j],'Tags':[]}})
print(logidict[k]['DataTransforms'][j+len(dm)])
logidict[k]['DataTransforms'][j+len(dm)]['TagColumnOperation']['Tags'].append({"ColumnDescription": {"Text": dd[j]}})
logidict[k]['DataTransforms'].append({'ProjectOperation':{'ProjectedColumns':dt}})
logidict[k]['Source']={"PhysicalTableId": k}
i+=1
# print(logidict)
with open('logical.json','w') as g:
g.write(json.dumps(logidict,indent=4))
return logidict
def generate_logi_files():
with open('logical.json','r') as f:
log=f.read()
log=json.loads(log)
for i in log.keys():
print({i:log[i]})
with open(source+'_l_'+log[i]['Alias']+'.json','w') as g:
g.write(json.dumps({i:log[i]},indent=4))
def subject_area():
#Create the Subject Area JSON
subject={}
for i in data['Subject Area'].unique():
tables=data[data['Subject Area']==i]['Table Name'].unique().tolist()
subject[i]=tables
print(subject)
with open('subject_area.json','w') as g:
g.write(json.dumps({"subject_area_table_details":subject},indent=4))
def table_name():
#Create the Table Name JSON
tbls=data['Table Name'].unique()
name={}
for j in tbls:
names=data[data['Table Name']==j]['Entity Name'].unique()[0]
name[j]=names
print(name)
with open('table_name.json','w') as g:
g.write(json.dumps({"data_set_name":name},indent=4))
def generate_file_name():
#Create the File Names JSON of Physical and Logical
tbls=data['Table Name'].unique()
physs={}
logiss={}
for j in tbls:
physs[j]=source+'_p_'+j+'.json'
logiss[j]=source+'_l_'+j+'.json'
print(physs)
with open('physical_filename.json','w') as g:
g.write(json.dumps({"physical_table_map_file":physs},indent=4))
print(logiss)
with open('logical_filename.json','w') as g:
g.write(json.dumps({"logical_table_map_file":physs},indent=4))
if __name__=="__main__":
physical_json=generate_physical()
logical_json=generate_logical()
generate_phy_files()
generate_logi_files()
subject_area()
table_name()
generate_file_name()
Sample business glossary is an excel sheet having 2 tabs.
Tab -1 - Name is SubjectAreas and have the below details.
Tab-2 - Name is table ID and sample is
Just have a look with all those details and you can develop your own logic from the above samples. Sorry the exact excel sheet i am not able to share as it has some confidential details but the logical and physical table map file creation logic, i shared with you and it is working for us.
Regards - San