Data Exchange
Example Process for Automating Data Upload
The following Python scripts illustrate how a process can be setup to check an upload folder for new uploads and manage the upload via Data Exchange, including checks for upload errors, using the API endpoint POST /api/v2/DataExchangeJob. For a more general overview of performing data exchange integration, please visit this guide.
"""
Example script to automate data exchange (Assetic.DataExchangeProcessNew.py)
First of two scripts
This script uploads to Assetic the files in the 'ToLoad' folder. The
filename is parsed to get the asset category which allows the data exchange
profile to be determined. A data exchange task is created for the document
The name of the file is changed to the task id and moved to the 'InProgress'
folder. This allows the subsequent script to identify the task and check
progress
"""
import assetic
import os
import shutil
import sys
import base64
# Assetic SDK instance
asseticsdk = assetic.AsseticSDK("C:/Users/kwilton/assetic.ini",None,"Info")
##API instances
# Document API
docapi = assetic.DocumentApi()
# Data Exchange API
dataexchangeapi = assetic.DataExchangeJobApi()
# File Processing Directory structure
intdir = "C:/temp/Integration/"
sourcedir = intdir + "ToLoad/"
inprogress = intdir + "InProgress/"
errordir = intdir + "Error/"
processeddir = intdir + "Processed/"
##preview
profiles = {'Sewer Nodes': 'c7e6d3ed-3917-e611-9458-06edd62954d7',
'Roads': 'd889d99b-3317-e611-9458-06edd62954d7'}
##demo
profiles = {'Sewer Nodes': 'c7e6d3ed-3917-e611-9458-06edd62954d7',
'Roads': '876a0a4e-c732-e611-945f-06edd62954d7',
'Water Pressure Pipes': '3abd431c-2017-e611-9812-0632cf3be881'}
##Loop through files in folder
files = os.listdir(sourcedir)
for filename in files:
fullfilename = sourcedir + filename
#get category, assumes file name has a whitespace between date and category
filedate, categorywithext = filename.split(None,1)
category, fileext = os.path.splitext(categorywithext)
asseticsdk.logger.info("File Date: {0}, Category: {1}".format(
filedate, category))
##read file and encode for upload
with open(fullfilename, "rb") as f:
data = f.read()
if sys.version_info < (3,0):
filecontents = data.encode("base64")
else:
filecontents = base64.b64encode(data)
filecontents = filecontents.decode(encoding="utf-8", errors="strict")
##create file properties object and set values
file_properties = \
assetic.Assetic3IntegrationRepresentationsFilePropertiesRepresentation()
file_properties.name = filename
file_properties.mimetype = 'csv'
file_properties.filecontent = filecontents
filearray = [file_properties]
##create document object and assign values, including file properties
document = \
assetic.Assetic3IntegrationRepresentationsDocumentRepresentation()
document.label = 'Data Exchange - ' + filename
document.file_extension = 'csv'
document.document_type = 'DataExchange'
document.mime_type = 'csv'
document.doc_group_id = 1
document.file_property = filearray
#Upload document and get ID from response
doc = docapi.document_post(document)
docid = doc[0].get('Id')
#get dataexchange profile for category
profileid = profiles.get(category)
##prepare data exchange parameters
job = assetic.Assetic3IntegrationRepresentationsDataExchangeJobRepresentation()
job.profile_id = profileid
job.document_id = docid
asseticsdk.logger.info("Profile ID:{0}".format(job.profile_id))
asseticsdk.logger.info("Job Document ID:{0}".format(job.document_id))
try:
task = dataexchangeapi.data_exchange_job_post(job)
#move file to in-progress, and rename to task id to allow checks
#on success/failure
destination = inprogress + task + '.csv'
shutil.move(fullfilename,destination)
except assetic.rest.ApiException as e:
#gross error - move to error folder??
asseticsdk.logger.error('Status {0}, Reason: {1}'.format(
e.status,e.reason))
destination = errordir + task + '.csv'
shutil.move(fullfilename,destination)
How it works
The folder locations are defined in the script. It assumes that an external process is populating the source folder with files that are to be uploaded.
##File Processing Directory structure intdir = "C:/temp/Integration/" sourcedir = intdir + "ToLoad/" inprogress = intdir + "InProgress/" errordir = intdir + "Error/" processeddir = intdir + "Processed/"
Each file in the source folder will be processed one at a time
files = os.listdir(sourcedir) for filename in files:
In this example each Asset Category will have a separate Date Exchange profile, so the profile ID for each category is defined
##List of categories and associated dataexchange profile
profiles = {'Sewer Nodes': 'c7e6d3ed-3917-e611-9458-06edd62954d7',
'Roads': '876a0a4e-c732-e611-945f-06edd62954d7',
'Water Pressure Pipes': '3abd431c-2017-e611-9812-0632cf3be881'}
The filename is parsed to determine the category and hence the profile ID
#get category, assumes file name has a whitespace between date and category
filedate, categorywithext = filename.split(None,1)
category, fileext = os.path.splitext(categorywithext)
Get the dataexchange profile for the category
profileid = profiles.get(category)
The document is uploaded and the document ID obtained
#Upload document and get ID from response
doc = docapi.document_post(document)
docid = doc[0].get('Id')
After preparing the job information the job is posted to Data Exchange. If there is an exception returned it means there was a problem with creating the job. Perhaps the profile ID is incorrect or there are insufficient priviledges to create the job.
If there is no exception the Data Exchange task ID is returned. Since tasks are queued it may not run immediately. To allow the task to be checked at a later stage the file is renamed to the task ID and moved to the 'inprogress' folder.
try:
task = dataexchangeapi.data_exchange_job_post(job)
#move file to in-progress, and rename to task id to allow checks
#on success/failure
destination = inprogress + task + '.csv'
shutil.move(fullfilename,destination)
except assetic.rest.ApiException as e:
#gross error - move to error folder
destination = errordir + task + '.csv'
shutil.move(fullfilename,destination)
asseticsdk.logger.error('Status {0}, Reason: {1}'.format(e.status,e.reason))
Checking if a task has processed
The following Python script uses the file name to check to see if the data exchange task has run. If it has run successfully then the file is moved to the 'Processed' folder, otherwise it is moved to the 'Error' folder. This script is scheduled to run periodically.
""" Example script to automate data exchange (Assetic.DataExchangeCheckTask.py) Second of two scripts This script checks the 'InProcess' folder to see if task has beeen processed in data exchange. The names of the files in this folder have been changed from the original name to the data exchange task id If a task has been processed then it moves the file to either the 'Processed' folder or the 'Error' folder. If there is an error the error file generated by data exchange is also downloaded and placed in the error folder """ import assetic import os import shutil import sys ##Assetic SDK instance asseticsdk = assetic.AsseticSDK("c:/users/you/assetic.ini",None,"Info") ##create an instance of the DataExchange API dataexchangeapi = assetic.DataExchangeTaskApi() ##Create an instance for document get. ##Use sdk client for document get. Returns header 'content-disposition' ##The header has the file name of the doc #Document API docapi = assetic.DocumentApi(asseticsdk.client_for_docs) # Define file path for output report files intdir = "C:/temp/Integration/" sourcedir = intdir + "ToLoad/" inprogressdir = intdir + "InProgress/" errordir = intdir + "Error/" processeddir = intdir + "Processed/" ##define function for getting error file def getfile(docid,docapi,filedir): try: getfile = docapi.document_get_document_file(docid) except assetic.rest.ApiException as e: asseticsdk.logger.error('Status {0}, Reason: {1}'.format( e.status,e.reason)) if getfile != None: if 'attachment' in getfile[1] and 'filename=' in getfile[1]: filename = getfile[1].split('filename=',1)[1] if '"' in filename or "'" in filename: filename = filename[1:-1] fullfilename = filedir + filename else: fullfilename = filedir +filename data = getfile[0] if sys.version_info >= (3,0): try: data = data.encode('utf-8') except: data = data with open( fullfilename, 'wb' ) as out_file: out_file.write(data) asseticsdk.logger.info('Created file: {0}'.format(fullfilename)) ##end function files = os.listdir(inprogressdir) for filename in files: fullfilename = inprogressdir + filename taskguid, fileext = os.path.splitext(filename) try: task = dataexchangeapi.data_exchange_task_get(taskguid) except assetic.rest.ApiException as e: asseticsdk.logger.error('Status {0}, Reason: {1}'.format( e.status,e.reason)) else: status = task.get('StatusDescription') if status == 'Complete with error': #move file to error destination = errordir + filename shutil.move(fullfilename,destination) ##also get error file errordocid = task.get('ErrorDocumentId') errordesc = task.get('Summary') if errordocid == None: asseticsdk.logger.info("Error Description: {0}".format( errordesc)) else: asseticsdk.logger.info("Error document: {0}".format(errordocid)) errordirtask = errordir + taskguid getfile(errordocid,docapi,errordirtask) elif status == 'Completed': #move file to Completed destination = processeddir + filename shutil.move(fullfilename,destination)
How it works
The script loops through files in the inprogress folder and obtains the Data Exchange taskid using the file name of the file. This script is schedule to run periodically.
files = os.listdir(inprogressdir)
for filename in files:
fullfilename = inprogressdir + filename
taskguid, fileext = os.path.splitext(filename)
The task is requested and status of the response checked
try:
task = dataexchangeapi.data_exchange_task_get(taskguid)
except assetic.rest.ApiException as e:
asseticsdk.logger.error('Status {0}, Reason: {1}'.format(e.status,e.reason))
else:
status = task.get('StatusDescription')
If the status is 'Complete with Error' the file is moved to the error folder. If there error is due to an error with field mapping the the 'Summary' describes the problem.
If there is a data error then an error file is generated by Data Exchange. This file is downloaded and saved in the 'error' folder along with the document. The function 'getfile' is used to get the file.
#move file to error
destination = errordir + filename
shutil.move(fullfilename,destination)
##also get error file
errordocid = task.get('ErrorDocumentId')
errordesc = task.get('Summary')
if errordocid == None:
asseticsdk.logger.info(errordesc)
else:
asseticsdk.logger.info(errordocid)
errordirtask = errordir + taskguid
getfile(errordocid,docapi,errordirtask)
The function 'getfile' is defined in the script body. Defining functions within the script can make it easier to read the script and perform repetitive tasks
##define function for getting error file
def getfile(docid,docapi,filedir):
getfile = docapi.document_get_document_file(docid)
if getfile != None:
if 'attachment' in getfile[1] and 'filename=' in getfile[1]:
filename = getfile[1].split('filename=',1)[1]
if '"' in filename or "'" in filename:
filename = filename[1:-1]
fullfilename = filedir + filename
else:
fullfilename = filedir +filename
data = getfile[0]
if sys.version_info >= (3,0):
try:
data = data.encode('utf-8')
except:
data = data
with open( fullfilename, 'wb' ) as out_file:
out_file.write(data)
asseticsdk.logger.error ('Created file: {0}'.format(fullfilename))
##end function
If the status is 'completed' it means the Data Exchange task has completed without error. The file is moved to the completed folder and no further processing on that file takes place.
elif status == 'Completed': #move file to Completed destination = processeddir + filename shutil.move(fullfilename,destination)
If the task has not been processed then the file remains in the 'inprogress' folder.
