[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[robocop] branch master updated: add script to convert XML input into sa
From: |
Admin |
Subject: |
[robocop] branch master updated: add script to convert XML input into saner JSON |
Date: |
Sat, 07 Jun 2025 23:30:38 +0200 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository robocop.
The following commit(s) were added to refs/heads/master by this push:
new fb27b3e add script to convert XML input into saner JSON
fb27b3e is described below
commit fb27b3edb21d058307ce9031670056ac29e6e698
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sat Jun 7 23:30:28 2025 +0200
add script to convert XML input into saner JSON
---
robocop-ch-to-json | 1003 ++++++++++++++++++++++++++++++++++++++++++++++
robocop-json-postprocess | 4 +
2 files changed, 1007 insertions(+)
diff --git a/robocop-ch-to-json b/robocop-ch-to-json
new file mode 100755
index 0000000..ce9e3e8
--- /dev/null
+++ b/robocop-ch-to-json
@@ -0,0 +1,1003 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# robocop-ch-to-json
+#
+# Copyright (C) 2025 Taler Systems SA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""
+Swiss Sanctions XML to JSON Converter
+
+This program converts Swiss sanctions XML files (following the
swiss-sanctions-list XSD)
+to JSON format, mapping XML elements to registry identifiers and inlining all
references.
+
+Features:
+- Converts targets (individuals, entities, objects) to self-contained JSON
records
+- Maps XML elements to registry identifiers from GANA registry
+- Inlines place references (location, area, country)
+- Flattens name parts and includes spelling variants
+- Handles multiple identities and addresses
+- Preserves all identification documents and relationships
+
+Usage:
+ robocop-ch-to-json < input.xml > output.json
+"""
+
+import xml.etree.ElementTree as ET
+import json
+import sys
+from datetime import datetime
+from typing import Dict, List, Any, Optional
+import argparse
+import re
+
+class SwissSanctionsConverter:
+ """Converts Swiss sanctions XML to JSON format with registry mapping."""
+
+ def __init__(self):
+ # Registry mapping from XML elements to standardized identifiers
+ self.registry_mapping = {
+ # Personal information
+ 'given-name': 'PERSON_FIRST_NAMES',
+ 'further-given-name': 'PERSON_FIRST_NAMES',
+ 'family-name': 'PERSON_LAST_NAME',
+ 'maiden-name': 'PERSON_LAST_NAME',
+ 'whole-name': 'FULL_NAME',
+ 'day-month-year': 'DATE_OF_BIRTH',
+ 'nationality': 'NATIONALITY',
+ 'identification-document': 'PERSON_NATIONAL_ID',
+
+ # Business information
+ 'entity_name': 'COMPANY_NAME',
+ 'business_name': 'BUSINESS_DISPLAY_NAME',
+
+ # Address information
+ 'address-details': 'ADDRESS_LINES',
+ 'zip-code': 'ADDRESS_ZIPCODE',
+ 'c-o': 'ADDRESS_LINES',
+ 'p-o-box': 'ADDRESS_LINES',
+
+ # Contact information
+ 'contact-person': 'CONTACT_PERSON_NAME',
+ }
+
+ def parse_xml(self, xml_file: str) -> ET.Element:
+ """Parse the XML file and return the root element."""
+ try:
+ if xml_file:
+ tree = ET.parse(xml_file)
+ else:
+ tree = ET.parse(sys.stdin)
+ return tree.getroot()
+ except ET.ParseError as e:
+ raise ValueError(f"Invalid XML file: {e}")
+ except FileNotFoundError:
+ raise FileNotFoundError(f"XML file not found: {xml_file}")
+
+ def _parse_element(self, element: ET.Element) -> Dict[str, Any]:
+ """Recursively parse XML element to dictionary."""
+ result = {}
+
+ # Add attributes
+ if element.attrib:
+ result.update(element.attrib)
+ # Convert numeric attributes to integers where appropriate
+ for key, value in element.attrib.items():
+ if key in ['ssid', 'day', 'month', 'year', 'place-id',
'target-id', 'order']:
+ try:
+ result[key] = int(value)
+ except ValueError:
+ pass # Keep as string if conversion fails
+ elif key in ['main', 'current']:
+ result[key] = value.lower() == 'true'
+
+ # Handle text content
+ if element.text and element.text.strip():
+ if len(element) == 0: # Leaf node with text only
+ return element.text.strip()
+ else: # Mixed content
+ result['_text'] = element.text.strip()
+
+ # Process child elements
+ children_by_tag = {}
+ for child in element:
+ tag = child.tag
+ child_data = self._parse_element(child)
+
+ if tag not in children_by_tag:
+ children_by_tag[tag] = []
+ children_by_tag[tag].append(child_data)
+
+ # Add children to result
+ for tag, children in children_by_tag.items():
+ if len(children) == 1:
+ result[tag] = children[0]
+ else:
+ result[tag] = children
+
+ return result
+
+ def build_place_lookup(self, root: ET.Element) -> Dict[str, Dict[str,
Any]]:
+ """Build a lookup dictionary for place references."""
+ places = {}
+
+ for place_elem in root.findall('place'):
+ ssid = place_elem.get('ssid')
+ if ssid:
+ place_data = {
+ 'location': None,
+ 'location_variants': [],
+ 'area': None,
+ 'area_variants': [],
+ 'country': None,
+ 'country_code': None
+ }
+
+ # Extract location
+ location_elem = place_elem.find('location')
+ if location_elem is not None and location_elem.text:
+ place_data['location'] = location_elem.text.strip()
+
+ # Extract location variants
+ for variant in place_elem.findall('location-variant'):
+ if variant.text:
+ place_data['location_variants'].append({
+ 'value': variant.text.strip(),
+ 'type': variant.get('variant-type', 'unknown')
+ })
+
+ # Extract area
+ area_elem = place_elem.find('area')
+ if area_elem is not None and area_elem.text:
+ place_data['area'] = area_elem.text.strip()
+
+ # Extract area variants
+ for variant in place_elem.findall('area-variant'):
+ if variant.text:
+ place_data['area_variants'].append({
+ 'value': variant.text.strip(),
+ 'type': variant.get('variant-type', 'unknown')
+ })
+
+ # Extract country
+ country_elem = place_elem.find('country')
+ if country_elem is not None:
+ place_data['country'] = country_elem.text.strip() if
country_elem.text else None
+ place_data['country_code'] = country_elem.get('iso-code')
+
+ places[ssid] = place_data
+
+ return places
+
+ def resolve_place(self, place_id: str, places_lookup: Dict[str, Dict]) ->
Dict[str, List[str]]:
+ """Resolve a place reference and return flattened address
components."""
+ if place_id not in places_lookup:
+ return {}
+
+ place = places_lookup[place_id]
+ result = {}
+
+ # Add country information
+ if place['country_code']:
+ result['ADDRESS_COUNTRY'] = [place['country_code']]
+
+ # Add location (town/city)
+ locations = []
+ if place['location']:
+ locations.append(place['location'])
+ for variant in place['location_variants']:
+ locations.append(variant['value'])
+ if locations:
+ result['ADDRESS_TOWN_LOCATION'] = locations
+
+ # Add area (district/subdivision)
+ areas = []
+ if place['area']:
+ areas.append(place['area'])
+ for variant in place['area_variants']:
+ areas.append(variant['value'])
+ if areas:
+ result['ADDRESS_COUNTRY_SUBDIVISION'] = areas
+
+ return result
+
+ def extract_names(self, identity_elem: ET.Element) -> Dict[str, List[str]]:
+ """Extract and flatten name information from an identity element."""
+ result = {
+ 'PERSON_FIRST_NAMES': [],
+ 'PERSON_LAST_NAME': [],
+ 'FULL_NAME': []
+ }
+
+ for name_elem in identity_elem.findall('name'):
+ # Process name parts
+ name_parts = []
+ first_names = []
+ last_names = []
+
+ for name_part in name_elem.findall('name-part'):
+ part_type = name_part.get('name-part-type', '')
+ value_elem = name_part.find('value')
+
+ if value_elem is not None and value_elem.text:
+ value = value_elem.text.strip()
+ name_parts.append(value)
+
+ # Categorize name parts
+ if part_type in ['given-name', 'further-given-name']:
+ first_names.append(value)
+ elif part_type in ['family-name', 'maiden-name']:
+ last_names.append(value)
+ elif part_type == 'whole-name':
+ result['FULL_NAME'].append(value)
+
+ # Add spelling variants
+ for variant in name_part.findall('spelling-variant'):
+ if variant.text:
+ variant_value = variant.text.strip()
+ if part_type in ['given-name',
'further-given-name']:
+ first_names.append(variant_value)
+ elif part_type in ['family-name', 'maiden-name']:
+ last_names.append(variant_value)
+ elif part_type == 'whole-name':
+ result['FULL_NAME'].append(variant_value)
+
+ # Add categorized names
+ result['PERSON_FIRST_NAMES'].extend(first_names)
+ result['PERSON_LAST_NAME'].extend(last_names)
+
+ # If we have separate parts but no whole name, combine them
+ if name_parts and not any(part.get('name-part-type') ==
'whole-name'
+ for part in
name_elem.findall('name-part')):
+ full_name = ' '.join(name_parts)
+ result['FULL_NAME'].append(full_name)
+
+ # Remove duplicates while preserving order
+ for key in result:
+ seen = set()
+ result[key] = [x for x in result[key] if not (x in seen or
seen.add(x))]
+
+ return result
+
+ def extract_birth_info(self, identity_elem: ET.Element) -> Dict[str,
List[str]]:
+ """Extract birth date and nationality information."""
+ result = {}
+
+ # Extract birth dates
+ birth_dates = []
+ for dmy_elem in identity_elem.findall('day-month-year'):
+ day = dmy_elem.get('day')
+ month = dmy_elem.get('month')
+ year = dmy_elem.get('year')
+
+ date_parts = []
+ if year:
+ date_parts.append(year)
+ if month:
+ date_parts.append(f"{int(month):02d}")
+ if day:
+ date_parts.append(f"{int(day):02d}")
+
+ if date_parts:
+ # Format as ISO date if complete, otherwise partial
+ if len(date_parts) == 3:
+
birth_dates.append(f"{date_parts[0]}-{date_parts[1]}-{date_parts[2]}")
+ else:
+ birth_dates.append('-'.join(date_parts))
+
+ if birth_dates:
+ result['DATE_OF_BIRTH'] = birth_dates
+
+ # Extract nationalities
+ nationalities = []
+ for nat_elem in identity_elem.findall('nationality'):
+ country_elem = nat_elem.find('country')
+ if country_elem is not None:
+ country_code = country_elem.get('iso-code')
+ if country_code:
+ nationalities.append(country_code)
+
+ if nationalities:
+ result['NATIONALITY'] = nationalities
+
+ return result
+
+ def extract_addresses(self, identity_elem: ET.Element, places_lookup:
Dict[str, Dict]) -> Dict[str, List[str]]:
+ """Extract address information from identity element."""
+ result = {}
+
+ for addr_elem in identity_elem.findall('address'):
+ place_id = addr_elem.get('place-id')
+
+ # Resolve place reference
+ if place_id:
+ place_info = self.resolve_place(place_id, places_lookup)
+ for key, values in place_info.items():
+ if key not in result:
+ result[key] = []
+ result[key].extend(values)
+
+ # Extract address details
+ details_elem = addr_elem.find('address-details')
+ if details_elem is not None and details_elem.text:
+ if 'ADDRESS_LINES' not in result:
+ result['ADDRESS_LINES'] = []
+ result['ADDRESS_LINES'].append(details_elem.text.strip())
+
+ # Extract zip code
+ zip_elem = addr_elem.find('zip-code')
+ if zip_elem is not None and zip_elem.text:
+ if 'ADDRESS_ZIPCODE' not in result:
+ result['ADDRESS_ZIPCODE'] = []
+ result['ADDRESS_ZIPCODE'].append(zip_elem.text.strip())
+
+ # Extract c/o
+ co_elem = addr_elem.find('c-o')
+ if co_elem is not None and co_elem.text:
+ if 'ADDRESS_LINES' not in result:
+ result['ADDRESS_LINES'] = []
+ result['ADDRESS_LINES'].append(f"c/o {co_elem.text.strip()}")
+
+ # Extract P.O. Box
+ po_elem = addr_elem.find('p-o-box')
+ if po_elem is not None and po_elem.text:
+ if 'ADDRESS_LINES' not in result:
+ result['ADDRESS_LINES'] = []
+ result['ADDRESS_LINES'].append(f"P.O. Box
{po_elem.text.strip()}")
+
+ return result
+
+ def extract_identification_documents(self, identity_elem: ET.Element,
places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]:
+ """Extract identification document information."""
+ result = {}
+
+ for doc_elem in identity_elem.findall('identification-document'):
+ doc_type = doc_elem.get('document-type', 'unknown')
+
+ # Extract document number
+ number_elem = doc_elem.find('number')
+ if number_elem is not None and number_elem.text:
+ doc_info = f"{doc_type}: {number_elem.text.strip()}"
+
+ # Add issuer information
+ issuer_elem = doc_elem.find('issuer')
+ if issuer_elem is not None:
+ issuer_code = issuer_elem.get('code')
+ if issuer_code:
+ doc_info += f" (issued by {issuer_code})"
+
+ # Add dates if available
+ issue_date = doc_elem.find('date-of-issue')
+ expiry_date = doc_elem.find('expiry-date')
+ if issue_date is not None and issue_date.text:
+ doc_info += f" issued: {issue_date.text}"
+ if expiry_date is not None and expiry_date.text:
+ doc_info += f" expires: {expiry_date.text}"
+
+ if 'PERSON_NATIONAL_ID' not in result:
+ result['PERSON_NATIONAL_ID'] = []
+ result['PERSON_NATIONAL_ID'].append(doc_info)
+
+ return result
+
+ def process_individual(self, individual_elem: ET.Element, places_lookup:
Dict[str, Dict]) -> Dict[str, List[str]]:
+ """Process an individual target and extract all relevant
information."""
+ result = {}
+
+ # Process all identities
+ for identity_elem in individual_elem.findall('identity'):
+ # Extract names
+ names = self.extract_names(identity_elem)
+ for key, values in names.items():
+ if key not in result:
+ result[key] = []
+ result[key].extend(values)
+
+ # Extract birth information
+ birth_info = self.extract_birth_info(identity_elem)
+ for key, values in birth_info.items():
+ if key not in result:
+ result[key] = []
+ result[key].extend(values)
+
+ # Extract addresses
+ addresses = self.extract_addresses(identity_elem, places_lookup)
+ for key, values in addresses.items():
+ if key not in result:
+ result[key] = []
+ result[key].extend(values)
+
+ # Extract identification documents
+ id_docs = self.extract_identification_documents(identity_elem,
places_lookup)
+ for key, values in id_docs.items():
+ if key not in result:
+ result[key] = []
+ result[key].extend(values)
+
+ # Remove duplicates
+ for key in result:
+ seen = set()
+ result[key] = [x for x in result[key] if not (x in seen or
seen.add(x))]
+
+ return result
+
+ def process_entity(self, entity_elem: ET.Element, places_lookup: Dict[str,
Dict]) -> Dict[str, List[str]]:
+ """Process an entity target and extract all relevant information."""
+ result = {}
+
+ # Process all identities
+ for identity_elem in entity_elem.findall('identity'):
+ # Extract entity names
+ names = self.extract_names(identity_elem)
+ # Map entity names to business identifiers
+ if names.get('FULL_NAME'):
+ result['COMPANY_NAME'] = names['FULL_NAME']
+ result['BUSINESS_DISPLAY_NAME'] = names['FULL_NAME'].copy()
+
+ # Extract addresses (registered office)
+ addresses = self.extract_addresses(identity_elem, places_lookup)
+ # Map to registered office address for entities
+ for key, values in addresses.items():
+ if 'OFFICE' not in key:
+ new_key = key.replace('ADDRESS_',
'REGISTERED_OFFICE_ADDRESS_')
+ else:
+ new_key = key
+ if new_key not in result:
+ result[new_key] = []
+ result[new_key].extend(values)
+
+ # Remove duplicates
+ for key in result:
+ if isinstance(result[key], list):
+ seen = set()
+ result[key] = [x for x in result[key] if not (x in seen or
seen.add(x))]
+
+ return result
+
+ def process_object(self, object_elem: ET.Element, places_lookup: Dict[str,
Dict]) -> Dict[str, List[str]]:
+ """Process an object target and extract all relevant information."""
+ result = {}
+ object_type = object_elem.get('object-type', 'unknown')
+
+ # Process all identities
+ for identity_elem in object_elem.findall('identity'):
+ # Extract object names
+ names = self.extract_names(identity_elem)
+ if names.get('FULL_NAME'):
+ # Use a generic name field for objects
+ result['FULL_NAME'] = names['FULL_NAME']
+ # Add object type information
+ object_names = [f"{name} ({object_type})" for name in
names['FULL_NAME']]
+ result['BUSINESS_DISPLAY_NAME'] = object_names
+
+ # Add object type as additional information
+ if 'FULL_NAME' not in result:
+ result['FULL_NAME'] = [f"Unknown {object_type}"]
+
+ return result
+
+ def _is_target_active(self, target: Dict[str, Any]) -> bool:
+ """Check if a target is active (most recent modification is not
'de-listed')."""
+
+ if 'modification' not in target:
+ return True # No modifications, consider active
+
+ modifications = target['modification']
+ if not isinstance(modifications, list):
+ modifications = [modifications]
+
+ # Find the most recent modification by effective-date, then by
enactment-date
+ most_recent = None
+ most_recent_date = None
+
+ for mod in modifications:
+ mod_type = mod.get('modification-type', '')
+
+ # Determine the date to use for comparison
+ date_str = None
+ if 'effective-date' in mod:
+ date_str = mod['effective-date']
+ elif 'enactment-date' in mod:
+ date_str = mod['enactment-date']
+ elif 'publication-date' in mod:
+ date_str = mod['publication-date']
+
+ if date_str:
+ try:
+ mod_date = datetime.strptime(date_str, '%Y-%m-%d')
+ if most_recent_date is None or mod_date > most_recent_date:
+ most_recent_date = mod_date
+ most_recent = mod
+ except ValueError:
+ continue # Skip invalid dates
+ elif most_recent is None:
+ # If no dates available, use the last modification in the list
+ most_recent = mod
+
+ if most_recent is None:
+ return True # No valid modification found, consider active
+
+ return most_recent.get('modification-type') != 'de-listed'
+
+ def process_target(self, target_elem: ET.Element, places_lookup: Dict[str,
Dict]) -> Optional[Dict[str, Any]]:
+ """Process a single target element and return JSON representation."""
+ ssid = target_elem.get('ssid')
+ if not ssid:
+ return None
+
+ # Base target information
+ target_data = {
+ 'ssid': ssid,
+ 'sanctions_set_ids': [],
+ 'foreign_identifier': None,
+ 'target_type': None,
+ 'justification': [],
+ 'relations': [],
+ 'other_information': [],
+ 'PERSON_NATIONAL_ID': [],
+ 'DATE_OF_BIRTH': [],
+ 'CONTACT_EMAIL': [],
+ 'CONTACT_PHONE': [],
+ 'COMMERCIAL_REGISTER_NUMBER': [],
+ 'FOUNDING_DATE': [],
+ 'generic_attributes': {}
+ }
+
+ # Extract sanctions set IDs
+ for ss_id_elem in target_elem.findall('sanctions-set-id'):
+ if ss_id_elem.text:
+
target_data['sanctions_set_ids'].append(ss_id_elem.text.strip())
+
+ # Extract foreign identifier
+ foreign_id_elem = target_elem.find('foreign-identifier')
+ if foreign_id_elem is not None and foreign_id_elem.text:
+ target_data['foreign_identifier'] = foreign_id_elem.text.strip()
+
+ # Process target type and extract specific information
+ registry_data = {}
+
+ individual_elem = target_elem.find('individual')
+ entity_elem = target_elem.find('entity')
+ object_elem = target_elem.find('object')
+
+ if individual_elem is not None:
+ target_data['target_type'] = 'individual'
+ target_data['sex'] = individual_elem.get('sex')
+ registry_data = self.process_individual(individual_elem,
places_lookup)
+
+ # Extract justifications
+ for just_elem in individual_elem.findall('justification'):
+ if just_elem.text:
+ target_data['justification'].append(just_elem.text.strip())
+
+ # Extract relations
+ for rel_elem in individual_elem.findall('relation'):
+ relation_info = {
+ 'target_id': rel_elem.get('target-id'),
+ 'relation_type': rel_elem.get('relation-type'),
+ 'remark': None
+ }
+ remark_elem = rel_elem.find('remark')
+ if remark_elem is not None and remark_elem.text:
+ relation_info['remark'] = remark_elem.text.strip()
+ target_data['relations'].append(relation_info)
+
+ # Extract other information
+ for other_elem in individual_elem.findall('other-information'):
+ if other_elem.text:
+ # "other-information" is very messy. We try our best to
match
+ # it against various regular expressions and extract bits.
+ oi = other_elem.text.strip()
+ found = False;
+ match = re.search(r'Passport Number:\s*([A-Za-z0-9]+)',
oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'([A-Za-z])*\s*national
number:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'Personal ID:\s*([A-Za-z0-9]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'National ID:\s*([A-Za-z0-9]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'National ID\.:\s*([A-Za-z0-9]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'National identification
number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'National identification
no:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'Personal
identification:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match =
re.search(r'Passport:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match =
re.search(r'Passport\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'ID Card
Number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'Passport or ID
number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'National
ID:\s*([A-Za-z0-9]+)\s*;\s*Passport:\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+ nnum = match.group(1) if match else None
+ if nnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(nnum)
+ found = True
+ pnum = match.group(2) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'State Identification
Number\s*([A-Za-z()]*)\s*:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE)
+ pnum = match.group(2) if match else None
+ if pnum is not None:
+ target_data['PERSON_NATIONAL_ID'].append(pnum)
+ found = True
+ match = re.search(r'e-mail:\s*([A-Za-z0-9@]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_EMAIL'].append(pnum)
+ found = True
+ match = re.search(r'email:\s*([A-Za-z0-9@]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_EMAIL'].append(pnum)
+ found = True
+ match = re.search(r'e-mail address:\s*([A-Za-z0-9@]+)',
oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_EMAIL'].append(pnum)
+ found = True
+ match = re.search(r'email address:\s*([A-Za-z0-9@]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_EMAIL'].append(pnum)
+ found = True
+ match = re.search(r'Tel.:\s*([A-Za-z0-9() +-]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_PHONE'].append(pnum)
+ found = True
+ match = re.search(r'Phone:\s*([A-Za-z0-9() +-]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_PHONE'].append(pnum)
+ found = True
+ match = re.search(r'Tel. \(office\):\s*([A-Za-z0-9()
+-]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_PHONE'].append(pnum)
+ found = True
+ match = re.search(r'DOB:\s*([A-Za-z0-9:\. -]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['DATE_OF_BIRTH'].append(pnum)
+ found = True
+ match = re.search(r'Date range: DOB
between\s*([A-Za-z0-9:\. -]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['DATE_OF_BIRTH'].append(pnum)
+ found = True
+ if not found:
+ target_data['other_information'].append(oi)
+
+ elif entity_elem is not None:
+ target_data['target_type'] = 'entity'
+ registry_data = self.process_entity(entity_elem, places_lookup)
+
+ # Extract justifications, relations, other info (same structure as
individual)
+ for just_elem in entity_elem.findall('justification'):
+ if just_elem.text:
+ target_data['justification'].append(just_elem.text.strip())
+
+ for rel_elem in entity_elem.findall('relation'):
+ relation_info = {
+ 'target_id': rel_elem.get('target-id'),
+ 'relation_type': rel_elem.get('relation-type'),
+ 'remark': None
+ }
+ remark_elem = rel_elem.find('remark')
+ if remark_elem is not None and remark_elem.text:
+ relation_info['remark'] = remark_elem.text.strip()
+ target_data['relations'].append(relation_info)
+
+ for other_elem in entity_elem.findall('other-information'):
+ if other_elem.text:
+ # "other-information" is very messy. We try our best to
match
+ # it against various regular expressions and extract bits.
+ oi = other_elem.text.strip()
+ found = False;
+ match = re.search(r'Tel.:\s*([A-Za-z0-9() +-]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_PHONE'].append(pnum)
+ found = True
+ match = re.search(r'Company phone:\s*([A-Za-z0-9() +-]+)',
oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_PHONE'].append(pnum)
+ found = True
+ match = re.search(r'Phone:\s*([A-Za-z0-9() +-]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_PHONE'].append(pnum)
+ found = True
+ match = re.search(r'e-mail:\s*([A-Za-z0-9@]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_EMAIL'].append(pnum)
+ found = True
+ match = re.search(r'e-mail address:\s*([A-Za-z0-9@]+)',
oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_EMAIL'].append(pnum)
+ found = True
+ match = re.search(r'email address:\s*([A-Za-z0-9@]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_EMAIL'].append(pnum)
+ found = True
+ match = re.search(r'company email:\s*([A-Za-z0-9@]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['CONTACT_EMAIL'].append(pnum)
+ found = True
+ match = re.search(r'Date of
registration:\s*([A-Za-z0-9\/\.]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['FOUNDING_DATE'].append(pnum)
+ found = True
+ match =
re.search(r'([A-Za-z]*)\s*Number([A-Za-z()]*)\s:\s*([A-Za-z0-9 -]+)', oi,
re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum)
+ found = True
+ match = re.search(r'Registration no:\s*([A-Za-z0-9 -]+)',
oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum)
+ found = True
+ match = re.search(r'Registration Number:\s*([A-Za-z0-9
-]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum)
+ found = True
+ if not found:
+ target_data['other_information'].append(oi)
+
+ elif object_elem is not None:
+ target_data['target_type'] = 'other'
+ target_data['object_type'] = object_elem.get('object-type')
+ registry_data = self.process_object(object_elem, places_lookup)
+
+ # Extract justifications, relations, other info (same structure)
+ for just_elem in object_elem.findall('justification'):
+ if just_elem.text:
+ target_data['justification'].append(just_elem.text.strip())
+
+ for rel_elem in object_elem.findall('relation'):
+ relation_info = {
+ 'target_id': rel_elem.get('target-id'),
+ 'relation_type': rel_elem.get('relation-type'),
+ 'remark': None
+ }
+ remark_elem = rel_elem.find('remark')
+ if remark_elem is not None and remark_elem.text:
+ relation_info['remark'] = remark_elem.text.strip()
+ target_data['relations'].append(relation_info)
+
+ for other_elem in object_elem.findall('other-information'):
+ if other_elem.text:
+ # "other-information" is very messy. We try our best to
match
+ # it against various regular expressions and extract bits.
+ oi = other_elem.text.strip()
+ found = False
+ match = re.search(r'Registration no\.:\s*([A-Za-z0-9
-]+)', oi, re.IGNORECASE)
+ pnum = match.group(1) if match else None
+ if pnum is not None:
+ target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum)
+ found = True
+ if not found:
+ target_data['other_information'].append(oi)
+
+ # Extract generic attributes
+ for attr_elem in target_elem.findall('generic-attribute'):
+ attr_name = attr_elem.get('name')
+ if attr_name and attr_elem.text:
+ target_data['generic_attributes'][attr_name] =
attr_elem.text.strip()
+
+ # Merge registry data into target data
+ target_data.update(registry_data)
+
+ return target_data
+
+ def convert_xml_to_json(self, xml_file: str, active_only: bool = False) ->
Dict[str, Any]:
+ """Convert Swiss sanctions XML file to JSON format."""
+ root = self.parse_xml(xml_file)
+
+ # Build place lookup
+ places_lookup = self.build_place_lookup(root)
+
+ # Extract metadata
+ metadata = {
+ 'list_type': root.get('list-type'),
+ 'date': root.get('date'),
+ 'conversion_timestamp': datetime.now().isoformat(),
+ 'total_targets': 0,
+ 'total_places': len(places_lookup)
+ }
+
+ # Process sanctions programs
+ programs = []
+ for program_elem in root.findall('sanctions-program'):
+ program_data = {
+ 'ssid': program_elem.get('ssid'),
+ 'version_date': program_elem.get('version-date'),
+ 'predecessor_version_date':
program_elem.get('predecessor-version-date'),
+ 'program_keys': {},
+ 'program_names': {},
+ 'sanctions_sets': {},
+ 'origin': None
+ }
+
+ # Extract program keys
+ for key_elem in program_elem.findall('program-key'):
+ lang = key_elem.get('lang')
+ if lang and key_elem.text:
+ program_data['program_keys'][lang] = key_elem.text.strip()
+
+ # Extract program names
+ for name_elem in program_elem.findall('program-name'):
+ lang = name_elem.get('lang')
+ if lang and name_elem.text:
+ program_data['program_names'][lang] =
name_elem.text.strip()
+
+ # Extract sanctions sets
+ for set_elem in program_elem.findall('sanctions-set'):
+ lang = set_elem.get('lang')
+ ssid = set_elem.get('ssid')
+ if lang and ssid and set_elem.text:
+ if ssid not in program_data['sanctions_sets']:
+ program_data['sanctions_sets'][ssid] = {}
+ program_data['sanctions_sets'][ssid][lang] =
set_elem.text.strip()
+
+ # Extract origin
+ origin_elem = program_elem.find('origin')
+ if origin_elem is not None and origin_elem.text:
+ program_data['origin'] = origin_elem.text.strip()
+
+ programs.append(program_data)
+
+ # Process targets
+ targets = []
+
+ # Filter targets if active_only is requested
+ if active_only and 'target' in root:
+ print(f"Filtering for active targets", file=sys.stderr)
+ targets = root['target'] if isinstance(root['target'], list) else
[root['target']]
+ active_targets = [target for target in targets if
self._is_target_active(target)]
+
+ if active_targets:
+ root['target'] = active_targets if len(active_targets) > 1
else active_targets[0]
+ else:
+ # Remove targets key if no active targets
+ del root['target']
+
+ for target_elem in root.findall('target'):
+ # The "_is_target_active" logic expects JSON, convert first
+ data = self._parse_element (target_elem)
+ if self._is_target_active(data) or not active_only:
+ target_data = self.process_target(target_elem, places_lookup)
+ else:
+ target_data = None
+ if target_data:
+ targets.append(target_data)
+
+ metadata['total_targets'] = len(targets)
+
+ # Build final JSON structure
+ result = {
+ 'metadata': metadata,
+ 'sanctions_programs': programs,
+ 'targets': targets,
+ 'places': places_lookup
+ }
+
+ return result
+
+def main():
+ """Main entry point for the converter."""
+ parser = argparse.ArgumentParser(
+ description='Convert Swiss sanction list from XML to JSON format',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Examples:
+ robocop-ch-to-json --active < sanctions.xml > sanctions.json
+ robocop-ch-to-json --input sanctions.xml --output sanctions.json
+ """
+ )
+
+ parser.add_argument('--active', action='store_true',
+ help='Only include active targets (exclude de-listed)')
+ parser.add_argument('--input', help='Input XML file path')
+ parser.add_argument('--output', '-o', help='Output JSON file path
(default: stdout)')
+ parser.add_argument('-v', '--verbose', action='store_true', help='Enable
verbose output')
+ parser.add_argument('--indent', type=int, default=2, help='JSON
indentation level (default: 2)')
+
+ args = parser.parse_args()
+
+ try:
+ converter = SwissSanctionsConverter()
+
+ # Convert XML to JSON
+ json_data = converter.convert_xml_to_json(args.input, args.active)
+
+ # Save JSON file
+ json_result = json_data['targets']
+
+ # Output to file or stdout
+ try:
+ if args.output:
+ with open(args.output, 'w', encoding='utf-8') as f:
+ json.dump(json_result, f, indent=args.indent,
ensure_ascii=False)
+ print(f"Successfully converted XML to JSON:
{args.output}", file=sys.stderr)
+ else:
+ json.dump(json_result, sys.stdout, indent=args.indent,
ensure_ascii=False)
+ except IOError as e:
+ raise IOError(f"Failed to write JSON output: {e}")
+
+ if args.verbose:
+ print(f"Conversion completed successfully!", file=sys.stderr)
+ print(f"Total targets: {json_data['metadata']['total_targets']}",
file=sys.stderr)
+ print(f"Total places: {json_data['metadata']['total_places']}",
file=sys.stderr)
+ print(f"Total programs: {len(json_data['sanctions_programs'])}",
file=sys.stderr)
+
+ except Exception as e:
+ print(f"Error: {e}", file=sys.stderr)
+ sys.exit(1)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/robocop-json-postprocess b/robocop-json-postprocess
new file mode 100755
index 0000000..73f02ae
--- /dev/null
+++ b/robocop-json-postprocess
@@ -0,0 +1,4 @@
+#!/bin/sh
+# This script is in the public domain.
+# It removes empty arrays, objects and null values from the JSON data
structure it is given.
+exec jq 'walk(if type == "object" then with_entries(select(.value != [] and
.value != {} and .value != null)) else . end)'
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [robocop] branch master updated: add script to convert XML input into saner JSON,
Admin <=