Conduct Legal Research with AI: Part 1
Introduction
In a previous post, I detailed the process of crawling the Library of Congress API to generate json files that could be intergrated into you DB of choice.
In this discussion, we will integrate JSON data into a Neo4j graph database.
Overview
The process is fairly straightforward. The most difficult part is wrangling your json data into the right format for integration.
The main function first instantiates the database config informormation. It then gets the cwd from a context manager. We then import the files to be integrated. A master subject table is created to record only unique subjects to avoid duplicates. Finally, a json pipeline extracts the data from json, transforms it to integrate into neo4j, and finally we upload using the neomodels api.
=
=
=
=
Instantiate Neo Model Api
I extended the neo model api with a few helper functions. The repo is found at https://github.com/justin-napolitano/neo4jAPI.
You can also review the snapshot below.
We will be calling the initation function to set the config information, update, create Case, and Create Subject classes during this review.
create subject calls the custom subject class and returns an object that can later be integrated into the db with the .save() function.
Create case does exactly the same.
=
#config.DATABASE_URL = 'neo4j+s://{}:{}@{}'.format(user, psw, uri)
=
#config.DATABASE_URL = uri
return True
, =
=
return
return
return
return
return
return
=
return
return
return
=
return
#print("{}"+".connect" + "{}".format(source,target))
return
return
=
=
=
=
=
=
=
=
=
=
#primary_topic = StringProperty(unique_index=True, required=True)
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
#state = Relationship('State', 'OF')
=
=
=
=
=
=
=
=
=
=
=
=
Get Files
The get_files function returns a list of files within the input directory.
=
=
return
Create Master Subject File
Create maseter subject table generates an empty dataframe that will record every unique subject experienced in the data.
I will improve upon this later, by uploading a master file that will be saved following each modification. This would enable resuming the process following an error or fault.
=
=
=
=
return
Json Pipeline function
The json pipeline function is the runner for the etl job. It loads each file into dataframe, manipulates the data accordingly, and updates the neo4j database.
When I refactor the code, I will most likely create an object that calls static functions to generate then desired output.
I may also seperate the case, subject, and relationship pipeline into seperate classes in order to avoid shadowing functions within functions.
= 0
=
=
#pprint(data)
#pprint(data[0])
#filtered_data = filter_json_data(json_data = data, filter = filter)
# Creating the case nodes transaction nodes and df
=
=
=
=
# Creating the subject nodes transaction nodes and df
=
=
=
=
#pprint(master_subject_table.duplicated())
= +
=
#pprint(case_data)
#pprint(master_subject_table['transaction'])
#lets save data to the database
=
=
# Create Relationships
=
Case Pipeline
# Creating the case nodes transaction nodes and df
=
=
=
=
To create the case nodes four functions are called.
Clean Json Data
The first is clean_json_data which is actually unnecessary. The only operation that is required is moving the pdf froma list to a dicktionary key. It should and will be refactored. As it stands now, I am leaving iut as an artifact of a previous workflow.
# Select the keys that I want from the dictionary
# filter appropriatly into a df
# write df to file
#print(type(filtered_data))
#pprint(filtered_data)
#pprint(data)
#creat a dictionary of columns and values for each row. Combine them all into a df when we are done
# each dictionary must be a row.... which makes perfect sense, but they can not be nested...
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
Stringify Json Data
The Second is Stringify_json_data. The imporatance of this function is that it creates strings from lists in order to properly integrate into the neo4j databse. Iterables are permitted, however they can not be searched. For my use case, I decided to create csv strings instead that can later be parsed if necessary.
This function also moves the subject list to a dedicated key in the dictionary. This is important because it is used to generate the subject tables.
=
=
=
=
=
=
return
Pandify Case Data
The next function creates a pandas dataframe from a list of dictionaries. Thankfully this is easy to accommplish.
#case_df = pd.concat(data, sort=False)
=
=
return
Nodify Case Data
Nodify creates transaction objects that can be saved to the neo4j databse. I call the neomodel api to generate the results and save them into a dataframe that is used to apply the upload with a lambda function.
#non_submitted_nodes = case_data[case_data.notna()]
=
#pprint(non_submitted_nodes)
=
=
return
The Subject Pipeline
The subject pipeline slices the subject data from the current search result page.
It then identifies the unique subjects
The subject_lookup_table is a dataframe containing the subjects returned by subject list. They are unique only to the result page.
The master_subject_table is then updated by the integrate_to_master_table function that identifes new subjects to integrate into the master table.
finally, the nodify subject function creates transaction objects to be uploaded to the neo4j db.
# Creating the subject nodes transaction nodes and df
=
=
=
=
#pprint(master_subject_table.duplicated())
=
slice_subject_data
=
= +
#pprint(subject_list)
return
Identify Unique Subjects
# insert the list to the set
=
# convert the set to the list
=
return
Create Subject Lookup Table
=
=
=
return
Nodify Subject
=
#df[df.isna().any(axis=1)]
#pprint(non_submitted_nodes)
=
return
Uploading Case and Subject data
With the transaction object dataframes created, we can then update the data to the database.
=
=
Submit Subjects
This function selects the subject nodes from the master table that have not been uploaded to the neo4j database.
It identifies na in the submitted collumn in order to slice non-submitted nodes.
If that table can be created we upload all of the df with the update function from the neoapi. It simply calls the db and calls save() on the object.
#unsubmitted = master_subject_table[master_subject_table.notna()]
#pprint(master_subject_table)
#non_submitted_nodes=master_subject_table[[master_subject_table['submitted'] == np.nan]]
=
#pprint(non_submitted_nodes)
return
#pprint(non_submitted_nodes)
=
= True
#test = non_submitted_nodes.iloc[32]['transaction']
#return_obj = neo.neoAPI.update(test)
#pprint(master_subject_table)
return
Submit Cases
Initially i had copy and pasted the subject submission function. I realized that the checks were unnecessary. I am assuming that each result is unique. Therefore, every case is uploaded. If it proves that there are duplicates in the database, the neo4j cypher language would permit me to prune those duplicate edges.
#unsubmitted = master_subject_table[master_subject_table.notna()]
### in theory none of the cases wouldhave been submitted becasue i am pulling them from file. There is no need to check.. Just submit
#non_submitted_nodes = case_data[case_data['submitted'].isna()].copy()
#pprint(non_submitted_nodes)
##pprint(non_submitted_nodes)
#if non_submitted_nodes.empty:
# return case_data
#else:
=
#Assume all are submitted..
= True
#test = non_submitted_nodes.iloc[32]['transaction']
#return_obj = neo.neoAPI.update(test)
#case_data.update(non_submitted_nodes)
return
Submit the Relationships
The final step is to relate the cases to the subject nodes.
relationship_list= create_relationship_table(case_data=case_data, master_subject_table=master_subject_table)
This is accomplished by calling the relationship function declared in the Case class declared in the neomodel api.
View the reference below:
=
=
=
=
=
=
=
=
#primary_topic = StringProperty(unique_index=True, required=True)
=
=
=
Create Relationship Table
To create the relationships the case_data and the master_subject_table are necessary.
for every case a relationship is created to every subject within its subject list.
It is important to note, that in order for this function to work correctly, the cases and subjects must first be submitted to the database.
#pprint(case_data[])
#test = master_subject_table['subject']
# select
=
=
#pprint(unique_dataframe)
=
=
#create relationship
#pprint(case)
#pprint(subject)
=
#pprint(relationship)
return
Putting Everything Together
#realtor_graph.py
#from neo4j_connect_2 import NeoSandboxApp
#import neo4j_connect_2 as neo
#import GoogleServices as google
#from pyspark.sql import SparkSession
#from pyspark.sql.functions import struct
#import NeoNodes as nn
#import GoogleServices
#import sparkAPI as spark
#from neoModelAPI import NeoNodes as nn
#df.apply(lambda x: pprint(str(x) + str(type(x))))
=
#pprint(node_list)
return
=
#pprint(df1.columns)
#pprint(df1)
=
return
#pprint(self.df.columns)
#pprint(source_node)
=
return
=
=
return
=
return
=
return
=
return
=
return
=
#pprint(df)
return
=
return
=
#pprint(df)
return
return
return
return
#df.apply(lambda x: pprint(str(x) + str(type(x))))
=
return
#df['server_node'] = node_list
#pprint(df)
#pprint(self.df.columns)
=
return
#rel = self.df.url.connect(self.df.city)
#pprint(self.df.columns)
=
=
#rel = self.df.url.connect(self.df.city)
#pprint(self.df.columns)
#update_list = self.unique_state_nodes.apply(lambda x: neo.neoAPI.create_relationship(source = x.state_node.country,target = x.country_node.name), axis=1)
#pprint(update_list)
#rel = self.df.url.connect(self.df.city)
=
=
return
=
#pprint(df.city_nodes)
=
#pprint(state_dict)
=
#pprint(unique_states)
#self.df['state_nodes'] = unique_states['state_nodes'] where unique_states[state_name] = self.df_stateName
=
#self.df['state_node'] =
#pprint(self.df['state_name'].map(unique_states))
=
#pprint(self.df)
#mask = dfd['a'].str.startswith('o')
#self.df['state_nodes'] = self.df.apply(lambda x: neo.create_state_node(name = x.state_name, code = x.state) if x not in states_dict else states_dict[x], axis=1)
=
=
return
=
=
return
=
=
=
return
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
#upload nodes
return
=
# Reading from file
=
return
= 0
=
=
#pprint(data)
#pprint(data[0])
#filtered_data = filter_json_data(json_data = data, filter = filter)
# Creating the case nodes transaction nodes and df
=
=
=
=
# Creating the subject nodes transaction nodes and df
=
=
=
=
#pprint(master_subject_table.duplicated())
= +
=
#pprint(case_data)
#pprint(master_subject_table['transaction'])
#lets save data to the database
=
=
# Create Relationships
=
#unsubmitted = master_subject_table[master_subject_table.notna()]
### in theory none of the cases wouldhave been submitted becasue i am pulling them from file. There is no need to check.. Just submit
#non_submitted_nodes = case_data[case_data['submitted'].isna()].copy()
#pprint(non_submitted_nodes)
##pprint(non_submitted_nodes)
#if non_submitted_nodes.empty:
# return case_data
#else:
=
#Assume all are submitted..
= True
#test = non_submitted_nodes.iloc[32]['transaction']
#return_obj = neo.neoAPI.update(test)
#case_data.update(non_submitted_nodes)
return
#Relationships must need to be created following saving to the df
#relationships = create_relationship_table(case_data, master_subject_table)
#unsubmitted = master_subject_table[master_subject_table.notna()]
#pprint(master_subject_table)
#non_submitted_nodes=master_subject_table[[master_subject_table['submitted'] == np.nan]]
=
#pprint(non_submitted_nodes)
return
#pprint(non_submitted_nodes)
=
= True
#test = non_submitted_nodes.iloc[32]['transaction']
#return_obj = neo.neoAPI.update(test)
#pprint(master_subject_table)
return
return
#pprint(case_data[])
#test = master_subject_table['subject']
# select
=
=
#pprint(unique_dataframe)
=
=
#create relationship
#pprint(case)
#pprint(subject)
=
#pprint(relationship)
return
#create relationship between the case and each uid in the unique_data_frame_transaction_list
## Creating the realation table
# Thoughts
# pass subject and case table
# case_subject list collumn
# where that list is in the master table
#return the subjects
# make a connection to between each subject and the case in the returned tableuid in the table
# return a transaction list
# with the list commit a transaction for eachn
#
#case_data= filter_case_data(data)
#non_submitted_nodes = case_data[case_data.notna()]
=
#pprint(non_submitted_nodes)
=
=
return
=
#df[df.isna().any(axis=1)]
#pprint(non_submitted_nodes)
=
return
#check_if subject in list is in subject of the table
# if so drop it from the temp table
# append what is left to the master table
#pprint(subject_lookup_table)
=
=
#pprint(unique_dataframe)
#duplicate_list = (master_subject_table[~master_subject_table['subject'].isin(subject_lookup_table['subject'])])
=
#master_subject_table.update(unique_dataframe)
#pprint(master_subject_table)
#pprint(master_subject_table.duplicated())
return
=
=
=
return
# insert the list to the set
=
# convert the set to the list
=
return
=
= +
#pprint(subject_list)
return
#case_df = pd.concat(data, sort=False)
=
=
return
=
=
=
=
=
=
return
#pprint(data)
# Select the keys that I want from the dictionary
# filter appropriatly into a df
# write df to file
#print(type(filtered_data))
#pprint(filtered_data)
#pprint(data)
#creat a dictionary of columns and values for each row. Combine them all into a df when we are done
# each dictionary must be a row.... which makes perfect sense, but they can not be nested...
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
=
# convert to strings maybe move into another function to be called. Actually will definitely move to a nother function
return
#uid = UniqueIdProperty()
##date = date
#dates = dates
#group = group
#id = id
#pdf = pdf
#shelf_id = shelf_id
#subject = subject
#primary_topic = primary_topic
#title = title
#url = url
#description = description
#source_collection = source_collection
# Using dict()
# Extracting specific keys from dictionary
=
=
return
=
=
=
=
return
=
=
=
=
#neo_sandbox_app = instantiate_neo_sandbox_app()
#google_creds = load_google_creds()
#sheets_app = instantiate_sheets_app(google_creds.credentials)
#drive_app = instantiate_drive_app(google_creds.credentials)
#googleAPI = instantiate_google_API()
#sparkAPI = instantiate_spark_API()
#neoAPI = NeoAPI()
#nodified_df = pandas_functions.nodify_dataframe()
#test()
#google_api = googleServices.GoogleAPI()
###neo_model_api = instantiate_neo_model_api()
###df_pipeline_dictionary = prepare_data_pipeline()
#final_df_dictionary = upload_data_pipeline_to_neo(df_pipeline_dictionary)
#for k,v in final_df_dictionary.items():
# cwd = os.getcwd()
# path = str(k) +"Final"
# path = os.sep.join([cwd,path])
# with open(path, "w") as file:
# v.to_csv(path, index=False)
#prepared_dfs = prepare_pandas_df()
#pprint(prepared_df)
#upload_df_to_db(df = prepared_df, neo_model_api = neo_model_api)