Skip to content

API Reference

Server Module

usa_npn_mcp_server.server

Module defining the NPN MCP server functionalities.

Include setting up the server, defining tools for querying the NPN API, and handling requests for observation data and comments.

The server is built using MCP (Model Content Protocol) to facilitate communication for the USA National Phenology Network (NPN) API.

serve async

serve(allowed_dirs=())

Start the MCP server for the NPN API.

Parameters:

Name Type Description Default
allowed_dirs tuple[str, ...]

A tuple of directory paths that are allowed for file export operations. If not provided, defaults to an empty tuple, and file export will be disabled unless directories are specified with NPN_MCP_ALLOWED_DIRS environment variable.

()
Source code in src/usa_npn_mcp_server/server.py
async def serve(allowed_dirs: tuple[str, ...] = ()) -> None:
    """
    Start the MCP server for the NPN API.

    Parameters
    ----------
    allowed_dirs : tuple[str, ...], optional
        A tuple of directory paths that are allowed for file export operations.
        If not provided, defaults to an empty tuple, and file export will be disabled
        unless directories are specified with NPN_MCP_ALLOWED_DIRS environment variable.
    """
    server: Server[None] = Server("usa-npn-mcp-server")
    logger.info("Starting MCP NPN Server...")
    api_client = APIClient()

    @server.list_tools()
    async def list_tools() -> list[Tool]:
        """
        Client can call this to get a list of available tools.

        Returns
        -------
        list[Tool]
            A list of available tools with their names, descriptions, and input schemas.
        """
        return [
            Tool(
                name=tool.name,
                description=tool.description,
                inputSchema=tool.input_schema,
            )
            for tool in api_client.get_tool_list()
        ]

    @server.list_resources()
    async def handle_list_resources() -> list[Resource]:
        """
        Client can call this to get a list of available resources.

        Returns
        -------
        list[Resource]
            A list of available resources.
        """
        return MCP_RESOURCES

    @server.read_resource()
    async def handle_read_resource(
        uri: AnyUrl,
    ) -> Dict[str, list[TextResourceContents]]:
        """
        Client can call this to read a resource.

        Parameters
        ----------
        uri : AnyUrl
            The URI of the resource to read.

        Returns
        -------
        Dict[str, list[TextResourceContents]]
            A dictionary containing the resource contents.
        """
        if uri.scheme != "npn-mcp":
            raise ValueError(f"Unsupported URI scheme: {uri.scheme}")

        resource_name = str(uri).replace("npn-mcp://", "")

        if resource_name == "recent-queries":
            # Return cache statistics for recent queries
            cache_stats = api_client.cache_manager.get_cache_stats()
            contents = [
                TextResourceContents(
                    uri=uri,
                    mimeType="application/json",
                    text=json.dumps(cache_stats, indent=2),
                )
            ]
        elif resource_name == "available-roots":
            # Return available roots for file operations
            roots_info = []
            for root in api_client.get_allowed_roots():
                roots_info.append(
                    {
                        "name": root.name or "Unnamed Root",
                        "uri": str(root.uri),
                        "path": str(root.uri)[7:]
                        if str(root.uri).startswith("file://")
                        else str(root.uri),
                    }
                )
            contents = [
                TextResourceContents(
                    uri=uri,
                    mimeType="application/json",
                    text=json.dumps(
                        {
                            "roots_available": len(roots_info) > 0,
                            "roots": roots_info,
                            "message": "File export operations will use these roots"
                            if roots_info
                            else "No roots available. MCP client needs to provide roots for file operations.",
                        },
                        indent=2,
                    ),
                )
            ]
        else:
            raise ValueError(f"Unknown resource: {resource_name}")

        return {"contents": contents}

    @server.call_tool()
    async def handle_call_tool(
        name: str, arguments: Union[Dict[str, str], None]
    ) -> Any:
        """
        Client can call this to use a tool.

        Parameters
        ----------
        name : str
            The name of the tool to call.
        arguments : Union[Dict[str, str], None]
            The arguments to pass to the tool.

        Returns
        -------
        Any
            The result of the tool execution.
        """
        return await api_client.handle_call_tool(name=name, arguments=arguments)

    @server.list_prompts()
    async def handle_list_prompts() -> list[Prompt]:
        """
        Client can call this to get a list of available prompts.

        Returns
        -------
        list[Prompt]
            A list of available prompt objects.
        """
        return get_prompts()

    @server.get_prompt()
    async def handle_get_prompt(
        prompt_name: str, arguments: Union[Dict[str, str], None]
    ) -> GetPromptResult:
        """
        Client can call this to get a prompt by name with formatted arguments.

        Parameters
        ----------
        prompt_name : str
            The name of the prompt to retrieve.
        arguments : Union[Dict[str, str], None]
            Arguments to format into the prompt template.

        Returns
        -------
        GetPromptResult
            A formatted prompt result ready for use.
        """
        if prompt_name not in PROMPTS:
            raise ValueError(f"Prompt '{prompt_name}' not found.")
        if arguments is None:
            arguments = {}
        prompt = str(PROMPTS[prompt_name]["template"]).format(**arguments)
        return GetPromptResult(
            messages=[
                PromptMessage(
                    role="user",
                    content=TextContent(type="text", text=prompt.strip()),
                )
            ],
        )

    # Initialize allowed directories/roots
    _initialize_roots(allowed_dirs, api_client)

    # Initialize server to listen for resource changes
    options = InitializationOptions(
        server_name="usa-npn-mcp-server",
        server_version="0.1.0",
        capabilities=ServerCapabilities(
            resources=ResourcesCapability(subscribe=True, listChanged=True),
            tools=ToolsCapability(listChanged=True),
        ),
    )
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, options)

API Client Module

usa_npn_mcp_server.api_client

Module with supplementary functions/classes for the NPN MCP server.

CacheManager

Manages hash-based caching with size and time limits.

Source code in src/usa_npn_mcp_server/api_client.py
class CacheManager:
    """Manages hash-based caching with size and time limits."""

    def __init__(self, max_size_mb: int = 100, max_age_minutes: int = 15):
        """
        Initialize the CacheManager.

        Parameters
        ----------
        max_size_mb : int, optional
            Maximum cache size in megabytes (default is 100).
        max_age_minutes : int, optional
            Maximum age of cache entries in minutes (default is 15).

        Returns
        -------
        None
        """
        self.cache: Dict[str, Dict[str, Any]] = {}
        self.max_size_bytes = max_size_mb * 1024 * 1024
        self.max_age = timedelta(minutes=max_age_minutes)

    def generate_hash(self, tool_name: str, params: Dict[str, Any]) -> str:
        """
        Generate MD5 hash from tool name and parameters.

        Parameters
        ----------
        tool_name : str
            The name of the tool.
        params : Dict[str, Any]
            The parameters dictionary.

        Returns
        -------
        str
            The MD5 hash string.
        """
        # Create consistent hash from tool name + sorted params
        content = f"{tool_name}:{json.dumps(params, sort_keys=True)}"
        return hashlib.md5(content.encode()).hexdigest()

    def add_entry(
        self,
        hash_id: str,
        tool_name: str,
        params: Dict[str, Any],
        raw_data: list[Dict[str, Any]],
    ) -> None:
        """
        Add new cache entry with metadata.

        Parameters
        ----------
        hash_id : str
            The unique identifier for this cache entry.
        tool_name : str
            The name of the tool that generated this data.
        params : Dict[str, Any]
            The parameters used for the query.
        raw_data : list[Dict[str, Any]]
            The raw data to cache.

        Returns
        -------
        None
        """
        data_size = sys.getsizeof(json.dumps(raw_data))

        entry = {
            "raw_data": raw_data,
            "metadata": {
                "tool_name": tool_name,
                "params": params,
                "timestamp": datetime.now(),
                "size_bytes": data_size,
                "record_count": len(raw_data),
            },
        }

        self.cache[hash_id] = entry
        self.cleanup_cache()

    def get_entry(self, hash_id: str) -> Optional[Dict[str, Any]]:
        """
        Get cache entry if it exists and hasn't expired.

        Parameters
        ----------
        hash_id : str
            The unique identifier for the cache entry.

        Returns
        -------
        Optional[Dict[str, Any]]
            The cache entry if found and not expired, None otherwise.
        """
        if hash_id not in self.cache:
            return None

        entry = self.cache[hash_id]
        if datetime.now() - entry["metadata"]["timestamp"] > self.max_age:
            del self.cache[hash_id]
            return None

        return entry

    def cleanup_cache(self) -> None:
        """
        Remove expired entries and enforce size limits.

        Returns
        -------
        None
        """
        now = datetime.now()

        # Remove expired entries
        expired_keys = [
            key
            for key, entry in self.cache.items()
            if now - entry["metadata"]["timestamp"] > self.max_age
        ]
        for key in expired_keys:
            del self.cache[key]

        # Check total size and remove oldest if necessary
        while self.get_total_size() > self.max_size_bytes and self.cache:
            oldest_key = min(
                self.cache.keys(),
                key=lambda k: self.cache[k]["metadata"]["timestamp"],
            )
            del self.cache[oldest_key]

    def get_total_size(self) -> int:
        """
        Get total cache size in bytes.

        Returns
        -------
        int
            The total size of all cached data in bytes.
        """
        return sum(entry["metadata"]["size_bytes"] for entry in self.cache.values())

    def get_cache_stats(self) -> Dict[str, Any]:
        """
        Get cache statistics for the recent-queries resource.

        Returns
        -------
        Dict[str, Any]
            A dictionary containing cache statistics including entries and size info.
        """
        entries_list: List[Dict[str, Any]] = []

        for hash_id, entry in self.cache.items():
            metadata = entry["metadata"]
            entries_list.append(
                {
                    "hash_id": hash_id,
                    "tool_name": metadata["tool_name"],
                    "timestamp": metadata["timestamp"].isoformat(),
                    "record_count": metadata["record_count"],
                    "size_kb": round(metadata["size_bytes"] / 1024, 2),
                    "params_summary": {
                        k: v
                        for k, v in metadata["params"].items()
                        if k in ["start_date", "end_date", "species_id", "state"]
                    },
                }
            )

        # Sort by timestamp, newest first
        entries_list.sort(key=lambda x: x["timestamp"], reverse=True)

        stats: Dict[str, Any] = {
            "total_entries": len(self.cache),
            "total_size_mb": round(self.get_total_size() / (1024 * 1024), 2),
            "entries": entries_list,
        }
        return stats

__init__

__init__(max_size_mb=100, max_age_minutes=15)

Initialize the CacheManager.

Parameters:

Name Type Description Default
max_size_mb int

Maximum cache size in megabytes (default is 100).

100
max_age_minutes int

Maximum age of cache entries in minutes (default is 15).

15

Returns:

Type Description
None
Source code in src/usa_npn_mcp_server/api_client.py
def __init__(self, max_size_mb: int = 100, max_age_minutes: int = 15):
    """
    Initialize the CacheManager.

    Parameters
    ----------
    max_size_mb : int, optional
        Maximum cache size in megabytes (default is 100).
    max_age_minutes : int, optional
        Maximum age of cache entries in minutes (default is 15).

    Returns
    -------
    None
    """
    self.cache: Dict[str, Dict[str, Any]] = {}
    self.max_size_bytes = max_size_mb * 1024 * 1024
    self.max_age = timedelta(minutes=max_age_minutes)

generate_hash

generate_hash(tool_name, params)

Generate MD5 hash from tool name and parameters.

Parameters:

Name Type Description Default
tool_name str

The name of the tool.

required
params Dict[str, Any]

The parameters dictionary.

required

Returns:

Type Description
str

The MD5 hash string.

Source code in src/usa_npn_mcp_server/api_client.py
def generate_hash(self, tool_name: str, params: Dict[str, Any]) -> str:
    """
    Generate MD5 hash from tool name and parameters.

    Parameters
    ----------
    tool_name : str
        The name of the tool.
    params : Dict[str, Any]
        The parameters dictionary.

    Returns
    -------
    str
        The MD5 hash string.
    """
    # Create consistent hash from tool name + sorted params
    content = f"{tool_name}:{json.dumps(params, sort_keys=True)}"
    return hashlib.md5(content.encode()).hexdigest()

add_entry

add_entry(hash_id, tool_name, params, raw_data)

Add new cache entry with metadata.

Parameters:

Name Type Description Default
hash_id str

The unique identifier for this cache entry.

required
tool_name str

The name of the tool that generated this data.

required
params Dict[str, Any]

The parameters used for the query.

required
raw_data list[Dict[str, Any]]

The raw data to cache.

required

Returns:

Type Description
None
Source code in src/usa_npn_mcp_server/api_client.py
def add_entry(
    self,
    hash_id: str,
    tool_name: str,
    params: Dict[str, Any],
    raw_data: list[Dict[str, Any]],
) -> None:
    """
    Add new cache entry with metadata.

    Parameters
    ----------
    hash_id : str
        The unique identifier for this cache entry.
    tool_name : str
        The name of the tool that generated this data.
    params : Dict[str, Any]
        The parameters used for the query.
    raw_data : list[Dict[str, Any]]
        The raw data to cache.

    Returns
    -------
    None
    """
    data_size = sys.getsizeof(json.dumps(raw_data))

    entry = {
        "raw_data": raw_data,
        "metadata": {
            "tool_name": tool_name,
            "params": params,
            "timestamp": datetime.now(),
            "size_bytes": data_size,
            "record_count": len(raw_data),
        },
    }

    self.cache[hash_id] = entry
    self.cleanup_cache()

get_entry

get_entry(hash_id)

Get cache entry if it exists and hasn't expired.

Parameters:

Name Type Description Default
hash_id str

The unique identifier for the cache entry.

required

Returns:

Type Description
Optional[Dict[str, Any]]

The cache entry if found and not expired, None otherwise.

Source code in src/usa_npn_mcp_server/api_client.py
def get_entry(self, hash_id: str) -> Optional[Dict[str, Any]]:
    """
    Get cache entry if it exists and hasn't expired.

    Parameters
    ----------
    hash_id : str
        The unique identifier for the cache entry.

    Returns
    -------
    Optional[Dict[str, Any]]
        The cache entry if found and not expired, None otherwise.
    """
    if hash_id not in self.cache:
        return None

    entry = self.cache[hash_id]
    if datetime.now() - entry["metadata"]["timestamp"] > self.max_age:
        del self.cache[hash_id]
        return None

    return entry

cleanup_cache

cleanup_cache()

Remove expired entries and enforce size limits.

Returns:

Type Description
None
Source code in src/usa_npn_mcp_server/api_client.py
def cleanup_cache(self) -> None:
    """
    Remove expired entries and enforce size limits.

    Returns
    -------
    None
    """
    now = datetime.now()

    # Remove expired entries
    expired_keys = [
        key
        for key, entry in self.cache.items()
        if now - entry["metadata"]["timestamp"] > self.max_age
    ]
    for key in expired_keys:
        del self.cache[key]

    # Check total size and remove oldest if necessary
    while self.get_total_size() > self.max_size_bytes and self.cache:
        oldest_key = min(
            self.cache.keys(),
            key=lambda k: self.cache[k]["metadata"]["timestamp"],
        )
        del self.cache[oldest_key]

get_total_size

get_total_size()

Get total cache size in bytes.

Returns:

Type Description
int

The total size of all cached data in bytes.

Source code in src/usa_npn_mcp_server/api_client.py
def get_total_size(self) -> int:
    """
    Get total cache size in bytes.

    Returns
    -------
    int
        The total size of all cached data in bytes.
    """
    return sum(entry["metadata"]["size_bytes"] for entry in self.cache.values())

get_cache_stats

get_cache_stats()

Get cache statistics for the recent-queries resource.

Returns:

Type Description
Dict[str, Any]

A dictionary containing cache statistics including entries and size info.

Source code in src/usa_npn_mcp_server/api_client.py
def get_cache_stats(self) -> Dict[str, Any]:
    """
    Get cache statistics for the recent-queries resource.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing cache statistics including entries and size info.
    """
    entries_list: List[Dict[str, Any]] = []

    for hash_id, entry in self.cache.items():
        metadata = entry["metadata"]
        entries_list.append(
            {
                "hash_id": hash_id,
                "tool_name": metadata["tool_name"],
                "timestamp": metadata["timestamp"].isoformat(),
                "record_count": metadata["record_count"],
                "size_kb": round(metadata["size_bytes"] / 1024, 2),
                "params_summary": {
                    k: v
                    for k, v in metadata["params"].items()
                    if k in ["start_date", "end_date", "species_id", "state"]
                },
            }
        )

    # Sort by timestamp, newest first
    entries_list.sort(key=lambda x: x["timestamp"], reverse=True)

    stats: Dict[str, Any] = {
        "total_entries": len(self.cache),
        "total_size_mb": round(self.get_total_size() / (1024 * 1024), 2),
        "entries": entries_list,
    }
    return stats

APIClient

API Client for mediating MCP server and NPN API interactions.

Source code in src/usa_npn_mcp_server/api_client.py
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
class APIClient:
    """API Client for mediating MCP server and NPN API interactions."""

    # Base URL for the NPN API observations endpoints.
    API_BASE_URL: str = "https://services.usanpn.org/npn_portal/observations"

    def __init__(self) -> None:
        """
        Initialize the APIClient.

        Sets up the HTTP client, tool list, cache manager, and allowed roots.

        Returns
        -------
        None
        """
        self.client = httpx.AsyncClient(timeout=180.0, base_url=self.API_BASE_URL)
        self._tool_list: list[NPNTool] = [
            NPNTools.StatusIntensity,
            NPNTools.ObservationComment,
            NPNTools.MagnitudePhenometrics,
            NPNTools.SitePhenometrics,
            NPNTools.IndividualPhenometrics,
            NPNTools.Mapping,
            NPNTools.QueryReferenceMaterial,
            NPNTools.QueryLiterature,
            NPNTools.GetRawData,
            NPNTools.ExportRawData,
        ]
        self.cache_manager = CacheManager()
        self.allowed_roots: list[Root] = []

    def get_allowed_roots(self) -> list[Root]:
        """
        Get the current allowed roots.

        Returns
        -------
        list[Root]
            A list of currently allowed root directories.
        """
        return self.allowed_roots

    def update_allowed_roots(self, roots: list[Root]) -> None:
        """
        Update the allowed roots from client.

        Parameters
        ----------
        roots : list[Root]
            A list of Root objects representing allowed directories.

        Returns
        -------
        None
        """
        self.allowed_roots = roots
        logger.info(f"Updated allowed roots: {len(roots)} roots available")

    def _validate_path_in_roots(self, path: str) -> bool:
        """
        Check if a path is within the allowed roots.

        Parameters
        ----------
        path : str
            The file path to validate.

        Returns
        -------
        bool
            True if the path is within allowed roots, False otherwise.
        """
        if not self.allowed_roots:
            return False

        abs_path = os.path.abspath(os.path.normpath(path))
        for root in self.allowed_roots:
            # Extract the path from the file:// URI
            if str(root.uri).startswith("file://"):
                root_path = str(root.uri)[7:]  # Remove file://
                root_abs = os.path.abspath(os.path.normpath(root_path))

                # Check if the path is within this root
                try:
                    # Get relative path from root to target
                    rel_path = os.path.relpath(abs_path, root_abs)
                    # Check that the relative path doesn't escape the root with ..
                    # and isn't an absolute path (indicating it's outside allowed roots)
                    if not rel_path.startswith("..") and not os.path.isabs(rel_path):
                        return True
                except ValueError:
                    # Different drives on Windows or other path issues
                    # Ex if abs_path is not under root_abs
                    continue
        return False

    def _resolve_export_path(self, output_path: str, filename: str) -> str:
        """
        Resolve the full file path for export within allowed roots.

        Parameters
        ----------
        output_path : str
            The output directory path.
        filename : str
            The filename to resolve.

        Returns
        -------
        str
            The full resolved file path.
        """
        # Normalize the filename to prevent path traversal
        # Handle both Unix and Windows path separators
        filename = filename.replace("\\", "/")  # Convert Windows separators
        filename = os.path.basename(filename)

        if output_path:
            # Check if it's an absolute path
            if os.path.isabs(output_path):
                full_path = os.path.join(output_path, filename)
            else:
                # Relative path - use the first root
                root = self.allowed_roots[0]
                if str(root.uri).startswith("file://"):
                    base_path = str(root.uri)[7:]  # Remove file://
                    full_dir = os.path.join(base_path, output_path)
                    full_path = os.path.join(full_dir, filename)
                else:
                    raise ValueError(f"Invalid root URI format: {root.uri}")
        else:
            # No output path specified - use the first root directly
            root = self.allowed_roots[0]
            if str(root.uri).startswith("file://"):
                base_path = str(root.uri)[7:]  # Remove file://
                full_path = os.path.join(base_path, filename)
            else:
                raise ValueError(f"Invalid root URI format: {root.uri}")

        # Normalize and validate the final path
        full_path = os.path.abspath(os.path.normpath(full_path))

        # Validate the final path is within allowed roots
        if not self._validate_path_in_roots(full_path):
            available_roots = [str(r.uri) for r in self.allowed_roots]
            raise ValueError(
                f"Final file path '{full_path}' is not within allowed roots. "
                f"Available roots: {', '.join(available_roots)}"
            )

        # Ensure the directory exists
        os.makedirs(os.path.dirname(full_path), exist_ok=True)
        return full_path

    async def __aenter__(self) -> APIClient:
        """
        Asynchronous context manager entry method.

        Returns
        -------
            self: The instance of the class.
        """
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        """
        Asynchronous context manager exit method.

        This method is called when exiting the runtime context of the asynchronous
        context manager. It ensures that the `close` method is awaited to properly
        release any resources.

        Args:
            exc_type (Optional[Type[BaseException]]): The exception type if an exception
                was raised, otherwise None.
            exc_val (Optional[BaseException]): The exception instance if an exception
                was raised, otherwise None.
            exc_tb (Optional[TracebackType]): The traceback object if an exception
                was raised, otherwise None.

        Returns
        -------
            None
        """
        await self.close()

    async def close(self) -> None:
        """
        Close the underlying HTTP client.

        Returns
        -------
        None
        """
        await self.client.aclose()

    def get_tool_list(self) -> list[NPNTool]:
        """
        Get the list of available tools.

        Returns
        -------
        list[NPNTool]
            A list of available NPN tools.
        """
        return self._tool_list

    @log_call
    async def _get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """
        Make a logged GET request to the NPN API.

        Parameters
        ----------
            endpoint (str): The API endpoint to query.
            params (Dict[str, Any]): The parameters to pass to the API.

        Returns
        -------
            Dict[str, Any]: The JSON response from the API.
        -----------

        Raises
        ------
            Exception: If the request fails or if the response contains an error.
        """
        if params is None:
            params = {}
        # Ensure 'request_src' is always 'vectorMCP'
        params["request_src"] = "vectorMCP"
        # Restructure query params containing comma-separated lists
        for key, value in list(params.items()):
            if isinstance(value, str) and "," in value:
                elements = value.strip("[]").split(",")
                del params[key]
                for idx, element in enumerate(elements):
                    params[f"{key}[{idx}]"] = element
        query_params = urlencode(params)
        request_url = f"{self.API_BASE_URL}/{endpoint}.json?{query_params}"
        response = await self.client.get(request_url)
        logger.info(f"Querying APIClient's _get with: {request_url}")
        try:
            response.raise_for_status()
        except Exception as ex:
            logger.error(f"Error in APIClient's _get: {ex}")
            raise Exception(response.json().get("error", str(ex))) from ex
        return response.json()

    async def query_api(self, endpoint: str, arguments: Dict[str, Any]) -> str:
        """
        Query the API and store the response with hash-based caching.

        Parameters
        ----------
            endpoint (str): The API endpoint to query.
            arguments (Dict[str, Any]): The arguments to pass to the API.

        Returns
        -------
            str: The hash ID of the cached response.
        """
        name = next(
            (tool.name for tool in self._tool_list if tool.endpoint == endpoint), None
        )
        if not name:
            raise ValueError(f"No matching tool name found for endpoint: {endpoint}")

        response = await self._get(endpoint, params=arguments)

        # Generate hash and store in new cache system
        hash_id = self.cache_manager.generate_hash(name, arguments)
        self.cache_manager.add_entry(hash_id, name, arguments, response)

        logger.info(f"Response stored for {name} with hash ID: {hash_id}")
        return hash_id

    def summarize_response(self, hash_id: str) -> Dict[str, Any]:
        """
        Get unique variables and entries from cached API response by hash ID.

        Parameters
        ----------
        hash_id : str
            The unique identifier for the cached response.

        Returns
        -------
        Dict[str, Any]
            A dictionary containing summary statistics of the cached response.
        """
        logger.info(f"Summarizing response for hash ID: {hash_id}")

        entry = self.cache_manager.get_entry(hash_id)
        if not entry:
            raise ValueError(f"No cached data found for hash ID: {hash_id}")

        raw_data = entry["raw_data"]
        if not raw_data:
            return {"result": None}

        unique_keys_summary, full_dataset = self._collect_unique_keys(raw_data)
        discrete_summary, continuous_summary, only_null = self._process_unique_values(
            unique_keys_summary, full_dataset
        )

        summary: dict[str, Any] = {
            "discrete": {
                key: val
                for key, val in discrete_summary.items()
                if key not in continuous_summary and key not in only_null
            },
            "continuous": continuous_summary,
            "only_null": only_null,
        }
        return {"result": summary}

    def _collect_unique_keys(
        self, raw_data: list[Dict[str, Any]]
    ) -> tuple[Dict[str, set[Any]], Dict[str, list[Any]]]:
        """
        Collect unique keys and their values from raw data.

        Parameters
        ----------
        raw_data : list[Dict[str, Any]]
            The raw data to process.

        Returns
        -------
        tuple[Dict[str, set[Any]], Dict[str, list[Any]]]
            A tuple containing unique keys summary and full dataset.
        """
        unique_keys_summary: Dict[str, set[Any]] = {}
        full_dataset: Dict[str, list[Any]] = {}

        for entry_data in raw_data:
            if isinstance(entry_data, dict):
                for key, value in entry_data.items():
                    if key not in unique_keys_summary:
                        unique_keys_summary[key] = set()
                        full_dataset[key] = []
                    unique_keys_summary[key].add(value)
                    full_dataset[key].append(value)

        return unique_keys_summary, full_dataset

    def _process_unique_values(
        self,
        unique_keys_summary: Dict[str, set[Any]],
        full_dataset: Dict[str, list[Any]],
    ) -> tuple[Dict[str, Any], Dict[str, Dict[str, float]], List[str]]:
        """
        Process unique values into discrete and continuous summaries.

        Parameters
        ----------
        unique_keys_summary : Dict[str, set[Any]]
            Dictionary mapping keys to sets of unique values.
        full_dataset : Dict[str, list[Any]]
            Dictionary mapping keys to lists of all values.

        Returns
        -------
        tuple[Dict[str, Any], Dict[str, Dict[str, float]], List[str]]
            A tuple containing discrete summary, continuous summary,
            and null-only fields.
        """
        discrete_summary: Dict[str, Any] = {}
        continuous_summary: Dict[str, Dict[str, float]] = {}
        only_null: List[str] = []

        id_like_variables = [
            "observation_id",
            "individual_id",
            "station_id",
            "site_id",
            "species_id",
            "phenophase_id",
            "dataset_id",
            "network_id",
        ]
        int_encoded = ["phenophase_status", "patch", "itis_number", "year"]
        continuous = [
            "elevation_in_meters",
            "mean_first_yes_year",
            "mean_first_yes_doy",
            "mean_first_yes_julian_date",
            "se_first_yes_in_days",
            "mean_numdays_since_prior_no",
            "se_numdays_since_prior_no",
            "mean_last_yes_year",
            "mean_last_yes_doy",
            "mean_last_yes_julian_date",
            "se_last_yes_in_days",
            "mean_numdays_until_next_no",
            "se_numdays_until_next_no",
        ]

        for key, values in unique_keys_summary.items():
            if values == {-9999}:
                only_null.append(key)
            elif key in id_like_variables:
                self._process_id_like_variables(key, values, discrete_summary)
            elif "_id" in key or key in int_encoded:
                discrete_summary[key] = list(values)
            elif key in continuous or all(
                isinstance(v, (int, float)) and v != -9999 for v in values
            ):
                self._process_continuous_variables(
                    key, full_dataset, continuous_summary
                )
            else:
                discrete_summary[key] = list(values)

        return discrete_summary, continuous_summary, only_null

    def _process_id_like_variables(
        self, key: str, values: set[Any], discrete_summary: Dict[str, Any]
    ) -> None:
        """
        Process ID-like variables for truncation.

        Parameters
        ----------
        key : str
            The variable key name.
        values : set[Any]
            The set of unique values.
        discrete_summary : Dict[str, Any]
            The discrete summary dictionary to update.

        Returns
        -------
        None
        """
        values_list = list(values)
        if len(values_list) > 15:
            discrete_summary[key] = {
                "sample": values_list[:15],
                "total_count": len(values_list),
                "truncated": True,
                "message": f"Showing first 15 of {len(values_list)} unique values",
            }
        else:
            discrete_summary[key] = values_list

    def _process_continuous_variables(
        self,
        key: str,
        full_dataset: Dict[str, list[Any]],
        continuous_summary: Dict[str, Dict[str, float]],
    ) -> None:
        """
        Process continuous variables for statistical summaries.

        Parameters
        ----------
        key : str
            The variable key name.
        full_dataset : Dict[str, list[Any]]
            Dictionary mapping keys to lists of all values.
        continuous_summary : Dict[str, Dict[str, float]]
            The continuous summary dictionary to update.

        Returns
        -------
        None
        """
        values_list = [v for v in full_dataset[key] if v != -9999]
        if values_list:
            continuous_summary[key] = {
                "length": len(values_list),
                "min": min(values_list),
                "max": max(values_list),
                "mean": mean(values_list),
                "median": median(values_list),
                "1st_quartile": np.percentile(values_list, 25),
                "3rd_quartile": np.percentile(values_list, 75),
            }

    def read_ancillary_file(self, sql_query: str) -> str:
        """
        Read the ancillary file from the database.

        Parameters
        ----------
            sql_query (str): The SQL query to execute.

        Returns
        -------
            str: The result of the SQL query.
        """
        db_path = Path(__file__).parent / "data" / "ancillary_data.db"
        conn = sqlite3.connect(db_path)
        df = pd.read_sql(sql_query, conn)
        conn.close()
        result = df.to_dict(orient="records")
        if not result:
            raise ValueError(f"No results found for query: {sql_query}")
        return json.dumps(result)

    def read_output_schema(self, hash_id: str) -> Dict[str, Any]:
        """
        Get the schema from cached API response by hash ID.

        Parameters
        ----------
        hash_id : str
            The unique identifier for the cached response.

        Returns
        -------
        Dict[str, Any]
            A dictionary containing the output schema for the cached response.
        """
        logger.info(f"Reading output schema for hash ID: {hash_id}")

        entry = self.cache_manager.get_entry(hash_id)
        if not entry:
            raise ValueError(f"No cached data found for hash ID: {hash_id}")

        tool_name = entry["metadata"]["tool_name"]
        raw_data = entry["raw_data"]

        if not raw_data:
            return {"result": None}

        # Get the full schema for the tool
        full_schema = API_SCHEMAS[tool_name]["properties"]

        if isinstance(raw_data[0], dict):
            keys = [key for key, val in raw_data[0].items() if val]
        else:
            keys = []

        select_schema = {key: full_schema[key] for key in keys if key in full_schema}
        logger.info(f"Output schema keys: {keys}")
        return {"result": select_schema} if select_schema else {"result": None}

    def query_response(
        self, hash_id: str
    ) -> list[Union[TextContent, ImageContent, EmbeddedResource]]:
        """
        Get the Server response by query hash ID.

        Parameters
        ----------
        hash_id : str
            The unique identifier for the cached query.

        Returns
        -------
        list[Union[TextContent, ImageContent, EmbeddedResource]]
            A list of content objects representing the query response.
        """
        logger.info(f"Returning query response for hash ID: {hash_id}")

        entry = self.cache_manager.get_entry(hash_id)
        if not entry:
            raise ValueError(f"No cached data found for hash ID: {hash_id}")

        tool_name = entry["metadata"]["tool_name"]
        summary = self.summarize_response(hash_id=hash_id)
        schema = self.read_output_schema(hash_id=hash_id)

        result: list[Union[TextContent, ImageContent, EmbeddedResource]] = [
            TextContent(
                type="text",
                text=f"Output variables of API response for {tool_name} tool (Hash: {hash_id})",
            ),
            EmbeddedResource(
                type="resource",
                resource=TextResourceContents(
                    uri=AnyUrl(f"npn-mcp://{tool_name}_output_schema"),
                    mimeType="plain/text",
                    text=json.dumps(schema),
                ),
            ),
            TextContent(
                type="text",
                text=f"Summary of unique entries across API response for {tool_name} tool",
            ),
            EmbeddedResource(
                type="resource",
                resource=TextResourceContents(
                    uri=AnyUrl(f"npn-mcp://{tool_name}"),
                    mimeType="plain/text",
                    text=json.dumps(summary),
                ),
            ),
        ]

        return result

    async def get_raw_data(self, arguments: Dict[str, Any]) -> list[TextContent]:
        """
        Get raw data from cache with size limits.

        Parameters
        ----------
        arguments : Dict[str, Any]
            Dictionary containing the hash_id for the cached data.

        Returns
        -------
        list[TextContent]
            A list of text content objects containing the raw data.
        """
        hash_id = arguments["hash_id"]

        entry = self.cache_manager.get_entry(hash_id)
        if not entry:
            raise ValueError(f"No cached data found for hash ID: {hash_id}")

        raw_data = entry["raw_data"]
        metadata = entry["metadata"]

        # Apply 300 record limit
        if len(raw_data) > 300:
            truncated_data = raw_data[:300]
            result = [
                TextContent(
                    type="text",
                    text=f"Raw data for {metadata['tool_name']} (TRUNCATED)",
                ),
                TextContent(
                    type="text",
                    text=f"Warning: Data truncated to 300 records out of {len(raw_data)} total records.",
                ),
                TextContent(
                    type="text",
                    text=json.dumps(truncated_data, indent=2),
                ),
            ]
        else:
            result = [
                TextContent(
                    type="text",
                    text=f"Raw data for {metadata['tool_name']} ({len(raw_data)} records)",
                ),
                TextContent(
                    type="text",
                    text=json.dumps(raw_data, indent=2),
                ),
            ]

        return result

    @log_call
    async def export_raw_data(self, arguments: Dict[str, Any]) -> list[TextContent]:
        """Export raw data to file."""
        # Check if we have any roots available
        if not self.allowed_roots:
            raise ValueError(
                "No roots available. File export requires MCP client to provide roots. "
                "Please ensure your MCP client supports roots and has configured allowed directories."
            )

        hash_id = arguments["hash_id"]
        file_format = arguments["file_format"]
        filename = arguments.get("filename", f"{hash_id}.{file_format}")
        output_path = arguments.get("output_path", "")

        entry = self.cache_manager.get_entry(hash_id)
        if not entry:
            raise ValueError(f"No cached data found for hash ID: {hash_id}")

        raw_data = entry["raw_data"]

        # Determine the full file path using helper method
        filepath = self._resolve_export_path(output_path, filename)

        try:
            with open(filepath, "w") as f:
                if file_format == "json":
                    json.dump(raw_data, f, indent=2)
                else:  # jsonl
                    for record in raw_data:
                        f.write(json.dumps(record) + "\n")

            return [
                TextContent(
                    type="text",
                    text=f"Successfully exported {len(raw_data)} records to: {filepath}",
                )
            ]
        except Exception as e:
            raise ValueError(f"Failed to export data: {str(e)}") from e

    @log_call
    async def create_plot(
        self, data: list[Dict[str, Any]], arguments: Dict[str, Any]
    ) -> list[Union[TextContent, ImageContent]]:
        """
        Create a matplotlib plot of a particular category over time.

        Imaged returned as a base64 encoded JPG image.

        Parameters
        ----------
            data (list[Dict[str, Any]]): The input data returned from API query.
            y_variable (str): The variable to use as the y-axis in the plot.

        Returns
        -------
            str: The JPG image of the plot as a base64 encoded string.
        """
        if not data:
            raise ValueError("Data cannot be empty.")
        if not arguments:
            raise ValueError("Arguments cannot be empty.")
        if not arguments["plot_type"] == "map":
            raise ValueError("Plot type cannot be anything but map right now.")
        plot_result = await generate_map(
            data=data,
            color_by=arguments["color_by"],
        )
        result: list[Union[TextContent, ImageContent]] = [
            TextContent(
                type="text",
                text=f"Map of {arguments['tool_name']} data colored by {arguments['color_by']}.",
            ),
            ImageContent(type="image", data=plot_result, mimeType="image/jpeg"),
        ]
        return result

    @log_call
    async def query_reference_material(
        self, arguments: Dict[str, Any]
    ) -> list[TextContent]:
        """Query USA-NPN reference material using a SQL query."""
        if not arguments:
            raise ValueError("Arguments cannot be empty.")
        if not arguments["sql_query"]:
            raise ValueError("SQL query cannot be empty.")
        sql_query = arguments["sql_query"]
        logger.info(f"Checking references with SQL query: {sql_query}")
        result: list[TextContent] = [
            TextContent(
                type="text",
                text=self.read_ancillary_file(sql_query=sql_query),
            )
        ]
        # Note: Reference material results are returned directly, not cached
        return result

    async def handle_call_tool(
        self, name: str, arguments: Union[Dict[str, str], None]
    ) -> Any:
        """
        Client can call this to use a tool.

        Parameters
        ----------
        name : str
            The name of the tool to call.
        arguments : Union[Dict[str, str], None]
            The arguments to pass to the tool.

        Returns
        -------
        Any
            The result of the tool execution.
        """
        logger.info(f"Calling tool {name} with parameters: {arguments}")
        if arguments is None:
            raise ValueError("Arguments cannot be None")

        if name not in [tool.name for tool in self.get_tool_list()]:
            raise ValueError(f"Tool {name} not found.")

        if name in [
            NPNTools.QueryReferenceMaterial.name,
            NPNTools.QueryLiterature.name,
            NPNTools.GetRawData.name,
            NPNTools.ExportRawData.name,
            NPNTools.Mapping.name,
        ]:
            return await self._handle_special_tools(name, arguments)

        # Regular API query tools - return hash ID
        return await self._handle_regular_tool(name, arguments)

    async def _handle_special_tools(self, name: str, arguments: Dict[str, str]) -> Any:
        """
        Handle special tools with unique logic.

        Parameters
        ----------
        name : str
            The name of the special tool.
        arguments : Dict[str, str]
            The arguments for the tool.

        Returns
        -------
        Any
            The result of the special tool execution.
        """
        if name in {
            NPNTools.QueryReferenceMaterial.name,
            NPNTools.QueryLiterature.name,
        }:
            return await self.query_reference_material(arguments)
        if name == NPNTools.GetRawData.name:
            return await self.get_raw_data(arguments)
        if name == NPNTools.ExportRawData.name:
            return await self.export_raw_data(arguments)
        if name == NPNTools.Mapping.name:
            return await self._handle_mapping_tool(arguments)

        raise ValueError(f"No result generated for tool: {name}")

    async def _handle_mapping_tool(self, arguments: Dict[str, str]) -> Any:
        """
        Handle the Mapping tool.

        Parameters
        ----------
        arguments : Dict[str, str]
            The arguments for the mapping tool.

        Returns
        -------
        Any
            The result of the mapping tool execution.
        """
        hash_id = arguments.get("hash_id")
        if not hash_id:
            raise ValueError(
                "Mapping tool now requires hash_id parameter from cached query"
            )

        entry = self.cache_manager.get_entry(hash_id)
        if not entry:
            raise ValueError(f"No cached data found for hash ID: {hash_id}")

        data = entry["raw_data"]
        return await self.create_plot(data, arguments)

    async def _handle_regular_tool(self, name: str, arguments: Dict[str, str]) -> Any:
        """
        Handle regular API query tools.

        Parameters
        ----------
        name : str
            The name of the tool.
        arguments : Dict[str, str]
            The arguments for the tool.

        Returns
        -------
        Any
            The result of the regular tool execution.
        """
        tool = next(tool for tool in self.get_tool_list() if tool.name == name)
        hash_id = await self.query_api(tool.endpoint, arguments)
        return self.query_response(hash_id=hash_id)

__init__

__init__()

Initialize the APIClient.

Sets up the HTTP client, tool list, cache manager, and allowed roots.

Returns:

Type Description
None
Source code in src/usa_npn_mcp_server/api_client.py
def __init__(self) -> None:
    """
    Initialize the APIClient.

    Sets up the HTTP client, tool list, cache manager, and allowed roots.

    Returns
    -------
    None
    """
    self.client = httpx.AsyncClient(timeout=180.0, base_url=self.API_BASE_URL)
    self._tool_list: list[NPNTool] = [
        NPNTools.StatusIntensity,
        NPNTools.ObservationComment,
        NPNTools.MagnitudePhenometrics,
        NPNTools.SitePhenometrics,
        NPNTools.IndividualPhenometrics,
        NPNTools.Mapping,
        NPNTools.QueryReferenceMaterial,
        NPNTools.QueryLiterature,
        NPNTools.GetRawData,
        NPNTools.ExportRawData,
    ]
    self.cache_manager = CacheManager()
    self.allowed_roots: list[Root] = []

get_allowed_roots

get_allowed_roots()

Get the current allowed roots.

Returns:

Type Description
list[Root]

A list of currently allowed root directories.

Source code in src/usa_npn_mcp_server/api_client.py
def get_allowed_roots(self) -> list[Root]:
    """
    Get the current allowed roots.

    Returns
    -------
    list[Root]
        A list of currently allowed root directories.
    """
    return self.allowed_roots

update_allowed_roots

update_allowed_roots(roots)

Update the allowed roots from client.

Parameters:

Name Type Description Default
roots list[Root]

A list of Root objects representing allowed directories.

required

Returns:

Type Description
None
Source code in src/usa_npn_mcp_server/api_client.py
def update_allowed_roots(self, roots: list[Root]) -> None:
    """
    Update the allowed roots from client.

    Parameters
    ----------
    roots : list[Root]
        A list of Root objects representing allowed directories.

    Returns
    -------
    None
    """
    self.allowed_roots = roots
    logger.info(f"Updated allowed roots: {len(roots)} roots available")

__aenter__ async

__aenter__()

Asynchronous context manager entry method.

Returns:

Type Description
self: The instance of the class.
Source code in src/usa_npn_mcp_server/api_client.py
async def __aenter__(self) -> APIClient:
    """
    Asynchronous context manager entry method.

    Returns
    -------
        self: The instance of the class.
    """
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Asynchronous context manager exit method.

This method is called when exiting the runtime context of the asynchronous context manager. It ensures that the close method is awaited to properly release any resources.

Args: exc_type (Optional[Type[BaseException]]): The exception type if an exception was raised, otherwise None. exc_val (Optional[BaseException]): The exception instance if an exception was raised, otherwise None. exc_tb (Optional[TracebackType]): The traceback object if an exception was raised, otherwise None.

Returns:

Type Description
None
Source code in src/usa_npn_mcp_server/api_client.py
async def __aexit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[TracebackType],
) -> None:
    """
    Asynchronous context manager exit method.

    This method is called when exiting the runtime context of the asynchronous
    context manager. It ensures that the `close` method is awaited to properly
    release any resources.

    Args:
        exc_type (Optional[Type[BaseException]]): The exception type if an exception
            was raised, otherwise None.
        exc_val (Optional[BaseException]): The exception instance if an exception
            was raised, otherwise None.
        exc_tb (Optional[TracebackType]): The traceback object if an exception
            was raised, otherwise None.

    Returns
    -------
        None
    """
    await self.close()

close async

close()

Close the underlying HTTP client.

Returns:

Type Description
None
Source code in src/usa_npn_mcp_server/api_client.py
async def close(self) -> None:
    """
    Close the underlying HTTP client.

    Returns
    -------
    None
    """
    await self.client.aclose()

get_tool_list

get_tool_list()

Get the list of available tools.

Returns:

Type Description
list[NPNTool]

A list of available NPN tools.

Source code in src/usa_npn_mcp_server/api_client.py
def get_tool_list(self) -> list[NPNTool]:
    """
    Get the list of available tools.

    Returns
    -------
    list[NPNTool]
        A list of available NPN tools.
    """
    return self._tool_list

query_api async

query_api(endpoint, arguments)

Query the API and store the response with hash-based caching.

Returns:

Type Description
str: The hash ID of the cached response.
Source code in src/usa_npn_mcp_server/api_client.py
async def query_api(self, endpoint: str, arguments: Dict[str, Any]) -> str:
    """
    Query the API and store the response with hash-based caching.

    Parameters
    ----------
        endpoint (str): The API endpoint to query.
        arguments (Dict[str, Any]): The arguments to pass to the API.

    Returns
    -------
        str: The hash ID of the cached response.
    """
    name = next(
        (tool.name for tool in self._tool_list if tool.endpoint == endpoint), None
    )
    if not name:
        raise ValueError(f"No matching tool name found for endpoint: {endpoint}")

    response = await self._get(endpoint, params=arguments)

    # Generate hash and store in new cache system
    hash_id = self.cache_manager.generate_hash(name, arguments)
    self.cache_manager.add_entry(hash_id, name, arguments, response)

    logger.info(f"Response stored for {name} with hash ID: {hash_id}")
    return hash_id

summarize_response

summarize_response(hash_id)

Get unique variables and entries from cached API response by hash ID.

Parameters:

Name Type Description Default
hash_id str

The unique identifier for the cached response.

required

Returns:

Type Description
Dict[str, Any]

A dictionary containing summary statistics of the cached response.

Source code in src/usa_npn_mcp_server/api_client.py
def summarize_response(self, hash_id: str) -> Dict[str, Any]:
    """
    Get unique variables and entries from cached API response by hash ID.

    Parameters
    ----------
    hash_id : str
        The unique identifier for the cached response.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing summary statistics of the cached response.
    """
    logger.info(f"Summarizing response for hash ID: {hash_id}")

    entry = self.cache_manager.get_entry(hash_id)
    if not entry:
        raise ValueError(f"No cached data found for hash ID: {hash_id}")

    raw_data = entry["raw_data"]
    if not raw_data:
        return {"result": None}

    unique_keys_summary, full_dataset = self._collect_unique_keys(raw_data)
    discrete_summary, continuous_summary, only_null = self._process_unique_values(
        unique_keys_summary, full_dataset
    )

    summary: dict[str, Any] = {
        "discrete": {
            key: val
            for key, val in discrete_summary.items()
            if key not in continuous_summary and key not in only_null
        },
        "continuous": continuous_summary,
        "only_null": only_null,
    }
    return {"result": summary}

read_ancillary_file

read_ancillary_file(sql_query)

Read the ancillary file from the database.

Returns:

Type Description
str: The result of the SQL query.
Source code in src/usa_npn_mcp_server/api_client.py
def read_ancillary_file(self, sql_query: str) -> str:
    """
    Read the ancillary file from the database.

    Parameters
    ----------
        sql_query (str): The SQL query to execute.

    Returns
    -------
        str: The result of the SQL query.
    """
    db_path = Path(__file__).parent / "data" / "ancillary_data.db"
    conn = sqlite3.connect(db_path)
    df = pd.read_sql(sql_query, conn)
    conn.close()
    result = df.to_dict(orient="records")
    if not result:
        raise ValueError(f"No results found for query: {sql_query}")
    return json.dumps(result)

read_output_schema

read_output_schema(hash_id)

Get the schema from cached API response by hash ID.

Parameters:

Name Type Description Default
hash_id str

The unique identifier for the cached response.

required

Returns:

Type Description
Dict[str, Any]

A dictionary containing the output schema for the cached response.

Source code in src/usa_npn_mcp_server/api_client.py
def read_output_schema(self, hash_id: str) -> Dict[str, Any]:
    """
    Get the schema from cached API response by hash ID.

    Parameters
    ----------
    hash_id : str
        The unique identifier for the cached response.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing the output schema for the cached response.
    """
    logger.info(f"Reading output schema for hash ID: {hash_id}")

    entry = self.cache_manager.get_entry(hash_id)
    if not entry:
        raise ValueError(f"No cached data found for hash ID: {hash_id}")

    tool_name = entry["metadata"]["tool_name"]
    raw_data = entry["raw_data"]

    if not raw_data:
        return {"result": None}

    # Get the full schema for the tool
    full_schema = API_SCHEMAS[tool_name]["properties"]

    if isinstance(raw_data[0], dict):
        keys = [key for key, val in raw_data[0].items() if val]
    else:
        keys = []

    select_schema = {key: full_schema[key] for key in keys if key in full_schema}
    logger.info(f"Output schema keys: {keys}")
    return {"result": select_schema} if select_schema else {"result": None}

query_response

query_response(hash_id)

Get the Server response by query hash ID.

Parameters:

Name Type Description Default
hash_id str

The unique identifier for the cached query.

required

Returns:

Type Description
list[Union[TextContent, ImageContent, EmbeddedResource]]

A list of content objects representing the query response.

Source code in src/usa_npn_mcp_server/api_client.py
def query_response(
    self, hash_id: str
) -> list[Union[TextContent, ImageContent, EmbeddedResource]]:
    """
    Get the Server response by query hash ID.

    Parameters
    ----------
    hash_id : str
        The unique identifier for the cached query.

    Returns
    -------
    list[Union[TextContent, ImageContent, EmbeddedResource]]
        A list of content objects representing the query response.
    """
    logger.info(f"Returning query response for hash ID: {hash_id}")

    entry = self.cache_manager.get_entry(hash_id)
    if not entry:
        raise ValueError(f"No cached data found for hash ID: {hash_id}")

    tool_name = entry["metadata"]["tool_name"]
    summary = self.summarize_response(hash_id=hash_id)
    schema = self.read_output_schema(hash_id=hash_id)

    result: list[Union[TextContent, ImageContent, EmbeddedResource]] = [
        TextContent(
            type="text",
            text=f"Output variables of API response for {tool_name} tool (Hash: {hash_id})",
        ),
        EmbeddedResource(
            type="resource",
            resource=TextResourceContents(
                uri=AnyUrl(f"npn-mcp://{tool_name}_output_schema"),
                mimeType="plain/text",
                text=json.dumps(schema),
            ),
        ),
        TextContent(
            type="text",
            text=f"Summary of unique entries across API response for {tool_name} tool",
        ),
        EmbeddedResource(
            type="resource",
            resource=TextResourceContents(
                uri=AnyUrl(f"npn-mcp://{tool_name}"),
                mimeType="plain/text",
                text=json.dumps(summary),
            ),
        ),
    ]

    return result

get_raw_data async

get_raw_data(arguments)

Get raw data from cache with size limits.

Parameters:

Name Type Description Default
arguments Dict[str, Any]

Dictionary containing the hash_id for the cached data.

required

Returns:

Type Description
list[TextContent]

A list of text content objects containing the raw data.

Source code in src/usa_npn_mcp_server/api_client.py
async def get_raw_data(self, arguments: Dict[str, Any]) -> list[TextContent]:
    """
    Get raw data from cache with size limits.

    Parameters
    ----------
    arguments : Dict[str, Any]
        Dictionary containing the hash_id for the cached data.

    Returns
    -------
    list[TextContent]
        A list of text content objects containing the raw data.
    """
    hash_id = arguments["hash_id"]

    entry = self.cache_manager.get_entry(hash_id)
    if not entry:
        raise ValueError(f"No cached data found for hash ID: {hash_id}")

    raw_data = entry["raw_data"]
    metadata = entry["metadata"]

    # Apply 300 record limit
    if len(raw_data) > 300:
        truncated_data = raw_data[:300]
        result = [
            TextContent(
                type="text",
                text=f"Raw data for {metadata['tool_name']} (TRUNCATED)",
            ),
            TextContent(
                type="text",
                text=f"Warning: Data truncated to 300 records out of {len(raw_data)} total records.",
            ),
            TextContent(
                type="text",
                text=json.dumps(truncated_data, indent=2),
            ),
        ]
    else:
        result = [
            TextContent(
                type="text",
                text=f"Raw data for {metadata['tool_name']} ({len(raw_data)} records)",
            ),
            TextContent(
                type="text",
                text=json.dumps(raw_data, indent=2),
            ),
        ]

    return result

export_raw_data async

export_raw_data(arguments)

Export raw data to file.

Source code in src/usa_npn_mcp_server/api_client.py
@log_call
async def export_raw_data(self, arguments: Dict[str, Any]) -> list[TextContent]:
    """Export raw data to file."""
    # Check if we have any roots available
    if not self.allowed_roots:
        raise ValueError(
            "No roots available. File export requires MCP client to provide roots. "
            "Please ensure your MCP client supports roots and has configured allowed directories."
        )

    hash_id = arguments["hash_id"]
    file_format = arguments["file_format"]
    filename = arguments.get("filename", f"{hash_id}.{file_format}")
    output_path = arguments.get("output_path", "")

    entry = self.cache_manager.get_entry(hash_id)
    if not entry:
        raise ValueError(f"No cached data found for hash ID: {hash_id}")

    raw_data = entry["raw_data"]

    # Determine the full file path using helper method
    filepath = self._resolve_export_path(output_path, filename)

    try:
        with open(filepath, "w") as f:
            if file_format == "json":
                json.dump(raw_data, f, indent=2)
            else:  # jsonl
                for record in raw_data:
                    f.write(json.dumps(record) + "\n")

        return [
            TextContent(
                type="text",
                text=f"Successfully exported {len(raw_data)} records to: {filepath}",
            )
        ]
    except Exception as e:
        raise ValueError(f"Failed to export data: {str(e)}") from e

create_plot async

create_plot(data, arguments)

Create a matplotlib plot of a particular category over time.

Imaged returned as a base64 encoded JPG image.

Returns:

Type Description
str: The JPG image of the plot as a base64 encoded string.
Source code in src/usa_npn_mcp_server/api_client.py
@log_call
async def create_plot(
    self, data: list[Dict[str, Any]], arguments: Dict[str, Any]
) -> list[Union[TextContent, ImageContent]]:
    """
    Create a matplotlib plot of a particular category over time.

    Imaged returned as a base64 encoded JPG image.

    Parameters
    ----------
        data (list[Dict[str, Any]]): The input data returned from API query.
        y_variable (str): The variable to use as the y-axis in the plot.

    Returns
    -------
        str: The JPG image of the plot as a base64 encoded string.
    """
    if not data:
        raise ValueError("Data cannot be empty.")
    if not arguments:
        raise ValueError("Arguments cannot be empty.")
    if not arguments["plot_type"] == "map":
        raise ValueError("Plot type cannot be anything but map right now.")
    plot_result = await generate_map(
        data=data,
        color_by=arguments["color_by"],
    )
    result: list[Union[TextContent, ImageContent]] = [
        TextContent(
            type="text",
            text=f"Map of {arguments['tool_name']} data colored by {arguments['color_by']}.",
        ),
        ImageContent(type="image", data=plot_result, mimeType="image/jpeg"),
    ]
    return result

query_reference_material async

query_reference_material(arguments)

Query USA-NPN reference material using a SQL query.

Source code in src/usa_npn_mcp_server/api_client.py
@log_call
async def query_reference_material(
    self, arguments: Dict[str, Any]
) -> list[TextContent]:
    """Query USA-NPN reference material using a SQL query."""
    if not arguments:
        raise ValueError("Arguments cannot be empty.")
    if not arguments["sql_query"]:
        raise ValueError("SQL query cannot be empty.")
    sql_query = arguments["sql_query"]
    logger.info(f"Checking references with SQL query: {sql_query}")
    result: list[TextContent] = [
        TextContent(
            type="text",
            text=self.read_ancillary_file(sql_query=sql_query),
        )
    ]
    # Note: Reference material results are returned directly, not cached
    return result

handle_call_tool async

handle_call_tool(name, arguments)

Client can call this to use a tool.

Parameters:

Name Type Description Default
name str

The name of the tool to call.

required
arguments Union[Dict[str, str], None]

The arguments to pass to the tool.

required

Returns:

Type Description
Any

The result of the tool execution.

Source code in src/usa_npn_mcp_server/api_client.py
async def handle_call_tool(
    self, name: str, arguments: Union[Dict[str, str], None]
) -> Any:
    """
    Client can call this to use a tool.

    Parameters
    ----------
    name : str
        The name of the tool to call.
    arguments : Union[Dict[str, str], None]
        The arguments to pass to the tool.

    Returns
    -------
    Any
        The result of the tool execution.
    """
    logger.info(f"Calling tool {name} with parameters: {arguments}")
    if arguments is None:
        raise ValueError("Arguments cannot be None")

    if name not in [tool.name for tool in self.get_tool_list()]:
        raise ValueError(f"Tool {name} not found.")

    if name in [
        NPNTools.QueryReferenceMaterial.name,
        NPNTools.QueryLiterature.name,
        NPNTools.GetRawData.name,
        NPNTools.ExportRawData.name,
        NPNTools.Mapping.name,
    ]:
        return await self._handle_special_tools(name, arguments)

    # Regular API query tools - return hash ID
    return await self._handle_regular_tool(name, arguments)

log_call

log_call(func)
Log function calls and their execution times, including exceptions.

Returns:

Type Description
Callable: The decorated function.
Source code in src/usa_npn_mcp_server/api_client.py
def log_call(func: Any) -> Any:
    """
        Log function calls and their execution times, including exceptions.

    Parameters
    ----------
        func: The function to be decorated.

    Returns
    -------
        Callable: The decorated function.
    """

    @wraps(func)
    async def wrapper(*args: Any, **kwargs: Any) -> Any:
        """
        Log the decorated function's execution details.

        Parameters
        ----------
        *args : Any
            Positional arguments passed to the wrapped function.
        **kwargs : Any
            Keyword arguments passed to the wrapped function.

        Returns
        -------
        Any
            The result of the wrapped function call.
        """
        function_name = func.__name__
        start = datetime.now()
        logger.info(f"Calling {function_name} with args: {args[1:]} kwargs: {kwargs}")
        try:
            result = await func(*args, **kwargs)
            duration = (datetime.now() - start).total_seconds()
            logger.info(f"Successfully completed {function_name} in {duration:.2f}s")
            return result
        except Exception as ex:
            duration = (datetime.now() - start).total_seconds()
            logger.error(
                f"Error running {function_name} after {duration:.2f}s: {str(ex)}\n"
                f"Traceback:\n{traceback.format_exc()}"
            )
            raise

    return wrapper

Utils Module

usa_npn_mcp_server.utils

Initialize utils module for MCP Server.

endpoints

NPN API endpoints available in MCP Server.

BaseQuery

Bases: BaseModel

Base class for endpoint queries.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class BaseQuery(BaseModel):
    """Base class for endpoint queries."""

    start_date: str = Field(
        ..., description="Start date in YYYY-MM-DD format. Must be used with end_date."
    )
    end_date: str = Field(
        ..., description="End date in YYYY-MM-DD format. Must be used with start_date."
    )
    bottom_left_x1: Optional[float] = Field(
        default=None,
        description="X coordinate of the bottom left corner for bounding box filtering.",
    )
    bottom_left_y1: Optional[float] = Field(
        default=None,
        description="Y coordinate of the bottom left corner for bounding box filtering.",
    )
    upper_right_x2: Optional[float] = Field(
        default=None,
        description="X coordinate of the upper right corner for bounding box filtering.",
    )
    upper_right_y2: Optional[float] = Field(
        default=None,
        description="Y coordinate of the upper right corner for bounding box filtering.",
    )
    species_id: Optional[int] = Field(
        default=None, description="Unique species identifier."
    )
    station_id: Optional[int] = Field(
        default=None,
        description="Unique identifier associated with an observer’s location.",
    )
    species_type: Optional[str] = Field(
        default=None,
        description="Species type(s) the organism belongs to. Must match values from getAnimalTypes and getPlantTypes.",
    )
    network: Optional[str] = Field(
        default=None,
        description="Name of the network(s)/group(s) where the organism is observed. Must match values from getPartnerNetworks.",
    )
    state: Optional[str] = Field(
        default=None,
        description="State where the observation occurred. Uses two-character postal abbreviation.",
    )
    phenophase_category: Optional[str] = Field(
        default=None,
        description="Phenophase category. Must match values from getPhenophase.",
    )
    phenophase_id: Optional[int] = Field(
        default=None, description="Unique identifier of the phenophase."
    )
    functional_type: Optional[str] = Field(
        default=None,
        description="Functional types of the species. Must match values from getSpeciesFunctionalTypes.",
    )
    climate_data: Optional[int] = Field(
        default=1,
        description="Flag to indicate whether all climate data fields should be returned. Accepts 0 or 1. Almost always beneficial to see climate data in relation to phenometric data.",
    )

StatusIntensityQuery

Bases: BaseQuery

Input parameters for the getObservations endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getObservations

Inherits all attributes from BaseQuery.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class StatusIntensityQuery(BaseQuery):
    """
    Input parameters for the getObservations endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getObservations

    Inherits all attributes from BaseQuery.
    """

    additional_field: Optional[
        Literal[
            "observedby_person_id",
            "partner_group",
            "species_functional_type",
            "species_category",
        ]
    ] = Field(
        default=None,
        description="Additional fields to include in output.",
    )

ObservationCommentQuery

Bases: BaseModel

Input parameters for the getObservationComment endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getObservationComment

Attributes:

Name Type Description
observation_id int

The ID of the observation for which to retrieve the comment.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class ObservationCommentQuery(BaseModel):
    """
    Input parameters for the getObservationComment endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getObservationComment

    Attributes
    ----------
    observation_id : int
        The ID of the observation for which to retrieve the comment.
    """

    observation_id: int = Field(
        description="The ID of the observation for which to retrieve the comment"
    )

IndividualPhenometricsQuery

Bases: BaseQuery

Input parameters for the getSummarizedData endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getSummarizedData

Attributes:

Name Type Description
individual_ids (Optional[List[int]], optional)

List of unique individual identifiers. Inherits all other attributes from BaseQuery.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class IndividualPhenometricsQuery(BaseQuery):
    """
    Input parameters for the getSummarizedData endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getSummarizedData

    Attributes
    ----------
    individual_ids : Optional[List[int]], optional
        List of unique individual identifiers.
        Inherits all other attributes from BaseQuery.
    """

    individual_ids: Optional[List[int]] = Field(
        default=None,
        description="List of unique identifiers of the individuals for which the observations are made.",
    )
    additional_field: Optional[
        Literal[
            "observedby_person_id",
            "partner_group",
            "species_functional_type",
            "species_category",
        ]
    ] = Field(
        default=None,
        description="Additional fields to include in output.",
    )

SitePhenometricsQuery

Bases: BaseQuery

Input parameters for the getSiteLevelData endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getSiteLevelData

Attributes:

Name Type Description
individual_ids (Optional[List[int]], optional)

List of unique individual identifiers. Inherits all other attributes from BaseQuery.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class SitePhenometricsQuery(BaseQuery):
    """
    Input parameters for the getSiteLevelData endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getSiteLevelData

    Attributes
    ----------
    individual_ids : Optional[List[int]], optional
        List of unique individual identifiers.
        Inherits all other attributes from BaseQuery.
    """

    individual_ids: Optional[List[int]] = Field(
        default=None,
        description="List of unique identifiers of the individuals for which the observations are made.",
    )
    additional_field: Optional[
        Literal["partner_group", "species_functional_type", "species_category"]
    ] = Field(
        default=None,
        description="Additional fields to include in output.",
    )

MagnitudePhenometricsQuery

Bases: BaseQuery

Input parameters for the getMagnitudeData endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getMagnitudeData

Attributes:

Name Type Description
frequency int

Number of days by which to delineate the period of time. Should be less or equal to number of days between start_date and end_date. Inherits all other attributes from BaseQuery.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class MagnitudePhenometricsQuery(BaseQuery):
    """
    Input parameters for the getMagnitudeData endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getMagnitudeData

    Attributes
    ----------
    frequency : int
        Number of days by which to delineate the period of time.
        Should be less or equal to number of days between start_date and end_date.
        Inherits all other attributes from BaseQuery.
    """

    frequency: int = Field(
        ...,
        description="Number of days by which to delineate the period of time. Should be less or equal to number of days between start_date and end_date.",
    )
    additional_field: Optional[
        Literal["species_functional_type", "species_category"]
    ] = Field(
        default=None,
        description="Additional fields to include in output.",
    )

BasePlotModel

Bases: BaseModel

Base class for plotting input parameters.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class BasePlotModel(BaseModel):
    """Base class for plotting input parameters."""

    tool_name: str = Field(
        description="Name of the tool used to generate the data for the plot.",
    )
    plot_type: Literal["bar", "line", "scatter", "map"] = Field(
        description="Type of plot to generate.",
    )
    color_by: str = Field(
        ...,
        description="Variable to be used for color coding the data points.",
    )
    title: Optional[str] = Field(
        description="Title for the plot.",
    )

NonMapPlotModel

Bases: BasePlotModel

Input parameters for plotting data.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class NonMapPlotModel(BasePlotModel):
    """Input parameters for plotting data."""

    y_variable: Optional[str] = Field(
        description="Variable to be plotted on the y-axis.",
    )
    y_lab: Optional[str] = Field(
        description="Label for the y-axis of the plot.",
    )
    plot_type: Literal["bar", "line", "scatter"] = Field(
        description="Type of plot to generate.",
    )
    x_variable: Optional[str] = Field(
        description="Variable to be plotted on the x-axis.",
    )
    x_lab: Optional[str] = Field(
        description="Label for the x-axis of the plot.",
    )

MapModel

Bases: BasePlotModel

Input parameters for mapping data.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class MapModel(BasePlotModel):
    """Input parameters for mapping data."""

    plot_type: Literal["map"] = Field(
        description="Type of plot to generate.",
    )
    tool_name: Literal["site-phenometrics"] = Field(
        description="Name of the tool used to generate the data for the plot.",
    )
    color_by: str = Field(
        default="",
        description="Variable to be used for color coding the data points. Default is empty string for no coloring.",
    )

SQLQueryModel

Bases: BaseModel

Input parameters for querying custom database.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class SQLQueryModel(BaseModel):
    """Input parameters for querying custom database."""

    sql_query: str = Field(
        ...,
        description="SQL query to run against the SQLite3 database to fetch relevant data.",
    )

GetRawDataQuery

Bases: BaseModel

Input parameters for getting raw cached data.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class GetRawDataQuery(BaseModel):
    """Input parameters for getting raw cached data."""

    hash_id: str = Field(
        ..., description="Hash ID of cached query to retrieve raw data from"
    )

ExportRawDataQuery

Bases: BaseModel

Input parameters for exporting raw cached data to file.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class ExportRawDataQuery(BaseModel):
    """Input parameters for exporting raw cached data to file."""

    hash_id: str = Field(..., description="Hash ID of cached query to export")
    file_format: Literal["json", "jsonl"] = Field(
        ..., description="Export format: json or jsonl"
    )
    filename: Optional[str] = Field(
        default=None,
        description="Optional filename. If not provided, auto-generated from hash_id",
    )
    output_path: Optional[str] = Field(
        default=None,
        description="Output path for the file (relative to root or absolute within allowed roots). If not provided, saves to the first available root directory.",
    )

NPNTool

Bases: BaseModel

A class representing a tool available in the MCP server.

Attributes:

Name Type Description
name str

The name of the tool.

description str

A description of the tool.

docs_one_liner Optional[str]

A short one-line description for documentation purposes.

input_schema dict[str, Any]

The input schema for the tool.

endpoint str

The exact API endpoint for the tool.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class NPNTool(BaseModel):
    """
    A class representing a tool available in the MCP server.

    Attributes
    ----------
    name : str
        The name of the tool.
    description : str
        A description of the tool.
    docs_one_liner : Optional[str]
        A short one-line description for documentation purposes.
    input_schema : dict[str, Any]
        The input schema for the tool.
    endpoint : str
        The exact API endpoint for the tool.
    """

    name: str
    description: str
    docs_one_liner: Optional[str] = Field(
        default=None,
        description="Short one-line description for documentation purposes (not exposed via MCP)",
    )
    input_schema: dict[str, Any]
    endpoint: str

NPNTools

An enumeration of tools available for querying the NPN API.

Attributes:

Name Type Description
StatusIntensity NPNTool

Tool for querying intensity and status data from the NPN API.

ObservationComment NPNTool

Tool for retrieving comments associated with observations from the NPN API.

MagnitudePhenometrics NPNTool

Tool for querying magnitude phenometrics data from the NPN API.

SitePhenometrics NPNTool

Tool for querying site phenometrics data from the NPN API.

IndividualPhenometrics NPNTool

Tool for querying individual phenometrics data from the NPN API.

Mapping NPNTool

Tool for constructing maps from site phenometrics data.

QueryReferenceMaterial NPNTool

Tool for querying what reference material is available to translate natural language into specific ids and terms needed for querying the NPN API.

QueryLiterature NPNTool

Tool for querying 175 structured summaries of studies that used data collected by the National Phenology Network.

GetRawData NPNTool

Tool for retrieving raw data from cache using a hash ID.

ExportRawData NPNTool

Tool for exporting cached raw data to a JSON or JSONL file.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class NPNTools:
    """
    An enumeration of tools available for querying the NPN API.

    Attributes
    ----------
    StatusIntensity : NPNTool
        Tool for querying intensity and status data from the NPN API.
    ObservationComment : NPNTool
        Tool for retrieving comments associated with observations from the NPN API.
    MagnitudePhenometrics : NPNTool
        Tool for querying magnitude phenometrics data from the NPN API.
    SitePhenometrics : NPNTool
        Tool for querying site phenometrics data from the NPN API.
    IndividualPhenometrics : NPNTool
        Tool for querying individual phenometrics data from the NPN API.
    Mapping : NPNTool
        Tool for constructing maps from site phenometrics data.
    QueryReferenceMaterial : NPNTool
        Tool for querying what reference material is available to translate natural
        language into specific ids and terms needed for querying the NPN API.
    QueryLiterature : NPNTool
        Tool for querying 175 structured summaries of studies that used data collected
        by the National Phenology Network.
    GetRawData : NPNTool
        Tool for retrieving raw data from cache using a hash ID.
    ExportRawData : NPNTool
        Tool for exporting cached raw data to a JSON or JSONL file.
    """

    StatusIntensity = NPNTool(
        name="status-intensity",
        description="""
About the tool: Retrieves raw, unprocessed observation records from citizen and professional scientists documenting day-by-day phenological status (yes/no) and intensity measurements for individual plants and animal species. Each record represents a single observation event showing whether specific phenophases (like 'breaking leaf buds' or 'full bloom') were occurring on a particular date for a specific individual organism at a monitoring site.

When to use: Only for detailed analysis of specific observation events, quality control, or when you need the granular day-to-day data that underlies the aggregated metrics. Most users should use Individual, Site, or Magnitude Phenometrics instead.

Key applications: Data validation, understanding observer reporting patterns, analyzing day-to-day phenological transitions, custom aggregations not available in other tools.
Performance warning: This tool can return massive datasets (potentially millions of records). Always limit queries to small date ranges (≤30 days recommended) and specific geographic areas or species to prevent system crashes. Use aggregated tools (Individual/Site/Magnitude Phenometrics) for broader analyses.

Data interpretation: Values of -9999 represent missing/null data. Records include observation date, individual ID, phenophase status, intensity measurements, and site metadata.""",
        docs_one_liner="Fetches status and intensity data (raw observation data). Use sparingly (can return massive datasets), prioritize phenometrics tools.",
        input_schema=StatusIntensityQuery.model_json_schema(),
        endpoint="getObservations",
    )
    ObservationComment = NPNTool(
        name="observation-comment",
        description="Retrieve the comment for a given observation (from getObservationComment endpoint), results store as readable Resource 'observation_comment'",
        docs_one_liner="Fetches observation comments based on observation_id.",
        input_schema=ObservationCommentQuery.model_json_schema(),
        endpoint="getObservationComment",
    )
    MagnitudePhenometrics = NPNTool(
        name="magnitude-phenometrics",
        docs_one_liner="Fetches magnitude phenometrics (magnitude data).",
        description="""
About the tool: Summarizes the intensity and abundance of phenological activity across multiple individuals, sites, or time periods using aggregated status and intensity data. Shows 'how much' phenological activity is occurring (not just when), providing insights into the magnitude, synchrony, and temporal patterns of biological processes.

When to use: Understanding broad ecological patterns, studying synchrony between interacting species, analyzing peak activity timing, or investigating how environmental changes affect the intensity of biological processes across populations.

Key applications:

- Species synchrony analysis: Quantifying how synchronized phenological timing is between interacting species (pollinators and plants, herbivores and host plants, predators and prey)
- Peak activity timing: Identifying when maximum biological activity occurs across populations
- Climate change impacts: Studying how warming affects the magnitude and timing of phenological events
- Biodiversity patterns: Understanding temporal overlap in species activity within ecosystems
- Population-level responses: Analyzing how abundant or widespread phenological activity is across landscapes
- Conservation planning: Identifying critical timing windows for species management

Scientific context: Based on current research showing that phenological synchrony between species is shifting due to climate change, with implications for ecosystem functioning and species interactions. This tool helps quantify these critical ecological relationships.

Requires: Date range and frequency parameters (daily, weekly, etc.) are essential. Recommended to specify species and phenophases of interest to avoid overwhelming results.
Research applications:

- 'Are migrating birds arriving when their insect food sources are most abundant?'
- 'How synchronous is flowering across plant species in prairie communities?'
- 'Has climate change affected the temporal overlap between butterfly emergence and host plant activity?'

Data interpretation: Results show time-series data of phenological abundance/intensity aggregated by specified frequency. Values represent proportion of 'yes' records, animal abundance measures, or intensity metrics across the selected populations.""",
        input_schema=MagnitudePhenometricsQuery.model_json_schema(),
        endpoint="getMagnitudeData",
    )
    SitePhenometrics = NPNTool(
        name="site-phenometrics",
        docs_one_liner="Fetches site phenometrics (site-level data).",
        description="""
About the tool: Aggregates individual phenological data to provide average start and end dates of phenological activity for each species at each monitoring site. Represents the 'typical' timing for a species at a location by averaging across all individuals of that species at the site.

When to use: Creating phenological calendars, analyzing site-specific timing patterns, comparing phenology across locations, understanding regional growing seasons, or studying how local climate affects species timing.

Key applications:

- Phenological calendars: Creating seasonal timing guides for specific locations
- Growing season analysis: Quantifying length of active growing periods for sites/regions
- Climate relationship studies: Investigating how phenological timing relates to temperature, precipitation, and seasonal patterns
- Site comparisons: Comparing phenological timing across elevation gradients, latitude gradients, or different habitat types
- Regional management: Planning for activities like controlled burns, invasive species management, or ecotourism
- Agricultural applications: Understanding wild plant timing to inform crop management decisions

Scientific context: Site phenometrics average out individual variation to reveal location-specific phenological signatures. Essential for understanding how climate drivers affect species timing at landscape scales.

Research applications:

- 'When do oak leaves typically emerge at Yellowstone vs. Great Smoky Mountains?'
- 'How long is the typical growing season for maple species in Minnesota?'
- 'When should we expect peak wildflower blooms in different Colorado elevation zones?'

Data interpretation: Each record represents one species at one site for the specified time period. Start/end dates are averages across individuals. Sites represent uniform habitat areas ≤15 acres. Values of -9999 represent missing/null data.""",
        input_schema=SitePhenometricsQuery.model_json_schema(),
        endpoint="getSiteLevelData",
    )
    IndividualPhenometrics = NPNTool(
        name="individual-phenometrics",
        docs_one_liner="Fetches individual phenometrics (summarized data).",
        description="""
About the tool: Provides start and end dates of phenological activity for individual plants and animal species, derived from status data. Each record represents one 'phenological episode' - a period of continuous activity for a specific phenophase on an individual organism (like when one specific maple tree's leaves went from bud break to full leaf drop).

When to use: To understand phenological patterns within species, analyze individual plant behavior, study variation between organisms of the same species, or investigate multiple episodes of activity within a single growing season.

Key applications:

- Studying phenological diversity within populations
- Analyzing individual plant responses to local microclimates
- Documenting multiple flowering/leafing episodes in water-limited ecosystems
- Understanding species-specific phenological strategies
- Quality control for site-level aggregations
- Research on plant physiological responses to environmental triggers

Important considerations:

- For plants: Shows actual start/end dates for individual organisms
- For animals: Shows presence/absence periods at species level (since individual animals aren't tracked)
- Requires date range specification (typically calendar year)
- Multiple episodes may occur for same individual/phenophase within one season (e.g., after frost damage or drought recovery)
- Essential for understanding the biological basis of site-level patterns

Data interpretation: Records show individual_id, phenophase onset/end dates, and episode duration. -9999 values indicate missing data.""",
        input_schema=IndividualPhenometricsQuery.model_json_schema(),
        endpoint="getSummarizedData",
    )
    Mapping = NPNTool(
        name="mapping",
        description="Construct a map from results of a previous Site Phenometrics query to the NPN API, using longitude, latitude and specified variables to plot onto map of USA.",
        docs_one_liner="Maps site phenometrics onto a map of the USA with optional color labeling.",
        input_schema=MapModel.model_json_schema(),
        endpoint="",
    )
    QueryReferenceMaterial = NPNTool(
        name="query-reference-material",
        docs_one_liner="Queries database containing NPN API reference material using a generated SQL query.",
        description="""
            Query an SQL database for reference material that can be used to translate natural language into specific ids and terms needed for querying the NPN API with other tools. There is no need to query the 'datasets' table unless specific observer groups are mentioned. The Tables have the following structure:

            Table: species, Length: 1882, Headers: ['species_id', 'common_name', 'genus', 'genus_id', 'genus_common_name', 'species', 'kingdom', 'itis_taxonomic_sn', 'functional_type', 'class_id', 'class_common_name', 'class_name', 'order_id', 'order_common_name', 'order_name', 'family_id', 'family_name', 'family_common_name', 'species_type']
            Description: Contains info on species

            Table: phenophases, Length: 383, Headers: ['definition_id', 'dataset_id', 'phenophase_id', 'phenophase_name', 'definition', 'start_date', 'end_date', 'comments']
            Description: Contains info on phenophases

            Table: phenoclasses, Length: 226, Headers: ['phenophase_id', 'phenophase_description', 'definition_ids', 'phenophase_names']
            Description: Contains info on phenoclasses (a grouping of phenophases)

            Table: datasets, Length: 14, Headers: ['dataset_id', 'dataset_name', 'dataset_description', 'dataset_comments', 'dataset_documentation_url']
            Description: Contains info on datasets and their contributors

            Table: networks, Length: 854, Headers: ['network_id', 'network_name']
            Description: Contains info on observation groups or networks (aka partner groups)
""",
        input_schema=SQLQueryModel.model_json_schema(),
        endpoint="",
    )

    QueryLiterature = NPNTool(
        name="query-literature",
        docs_one_liner="Queries database of structured summaries from 175 papers that use phenology and phenometrics data.",
        description="""
            Query an SQL database for structured summaries of studies that used data collected by National Phenology Network. The tables have the following structure:

            Table: literature, Length: 175, Headers: ['Title', 'Authors', 'DOI', 'DOI link', 'Venue', 'Citation count', 'Year', 'Filename', 'Measured variables', 'Temporal Range', 'Spatial Scope', 'Data Filtering', 'Statistical Tests', 'Modelling', 'Software Tools', 'Limitations', 'Main findings', 'Research gaps', 'Future research', 'Independent variables', 'Dependent variables', 'Organism', 'Summary of discussion', 'API Query', "Supporting quotes for 'Measured variables'", "Supporting tables for 'Measured variables'", "Reasoning for 'Measured variables'", "Supporting quotes for 'Temporal Range'", "Supporting tables for 'Temporal Range'", "Reasoning for 'Temporal Range'", "Supporting quotes for 'Spatial Scope'", "Supporting tables for 'Spatial Scope'", "Reasoning for 'Spatial Scope'", "Supporting quotes for 'Data Filtering'", "Supporting tables for 'Data Filtering'", "Reasoning for 'Data Filtering'", "Supporting quotes for 'Statistical Tests'", "Supporting tables for 'Statistical Tests'", "Reasoning for 'Statistical Tests'", "Supporting quotes for 'Modelling'", "Supporting tables for 'Modelling'", "Reasoning for 'Modelling'", "Supporting quotes for 'Software Tools'", "Supporting tables for 'Software Tools'", "Reasoning for 'Software Tools'", "Supporting quotes for 'Limitations'", "Supporting tables for 'Limitations'", "Reasoning for 'Limitations'", "Supporting quotes for 'Main findings'", "Supporting tables for 'Main findings'", "Reasoning for 'Main findings'", "Supporting quotes for 'Research gaps'", "Supporting tables for 'Research gaps'", "Reasoning for 'Research gaps'", "Supporting quotes for 'Future research'", "Supporting tables for 'Future research'", "Reasoning for 'Future research'", "Supporting quotes for 'Independent variables'", "Supporting tables for 'Independent variables'", "Reasoning for 'Independent variables'", "Supporting quotes for 'Dependent variables'", "Supporting tables for 'Dependent variables'", "Reasoning for 'Dependent variables'", "Supporting quotes for 'Organism'", "Supporting tables for 'Organism'", "Reasoning for 'Organism'", "Supporting quotes for 'Summary of discussion'", "Supporting tables for 'Summary of discussion'", "Reasoning for 'Summary of discussion'", "Supporting quotes for 'API Query'", "Supporting tables for 'API Query'", "Reasoning for 'API Query'"]
            Description: Contains structured summaries of 175 papers that use phenology and phenometrics, included in the table is the reasoning and sourcing for each summary column.
        """,
        input_schema=SQLQueryModel.model_json_schema(),
        endpoint="",
    )

    GetRawData = NPNTool(
        name="get-raw-data",
        description="Retrieve raw data from cache using hash ID. Limited to 300 records with truncation message if exceeded. Use 'recent-queries' resource to see available hash IDs.",
        docs_one_liner="Fetches raw data instead of summaries as from other tools. Limited to 300 records with truncation message if exceeded.",
        input_schema=GetRawDataQuery.model_json_schema(),
        endpoint="",
    )

    ExportRawData = NPNTool(
        name="export-raw-data",
        description="Export cached raw data to JSON or JSONL file. Requires MCP client to provide roots (allowed directories) for file operations.",
        docs_one_liner="Exports raw data to JSON or JSONL files in allowed directories.",
        input_schema=ExportRawDataQuery.model_json_schema(),
        endpoint="",
    )

output_schema

Module for API output schemas for the NPN API.

plotting

Plottiing Module for MCP Server plotting functions.

generate_map async

generate_map(data, color_by)

Generate a map with lat/long as axes, overlaying a US map with state outlines.

Parameters:

Name Type Description Default
data list[Dict[str, Any]]

The input data, with fields containing latitude and longitude coordinates.

required
color_by str

The variable to color the points by. If empty string, uses default red color.

required

Returns:

Type Description
str

Base64 encoded image of the map in JPEG format.

Source code in src/usa_npn_mcp_server/utils/plotting.py
async def generate_map(data: list[Dict[str, Any]], color_by: str) -> str:
    """
    Generate a map with lat/long as axes, overlaying a US map with state outlines.

    Parameters
    ----------
    data : list[Dict[str, Any]]
        The input data, with fields containing latitude and longitude coordinates.
    color_by : str
        The variable to color the points by. If empty string, uses default red color.

    Returns
    -------
    str
        Base64 encoded image of the map in JPEG format.
    """
    if not data:
        raise ValueError("Data cannot be empty.")
    if not any(entry.get("longitude") for entry in data) and not any(
        entry.get("latitude") for entry in data
    ):
        raise ValueError("Latitude and Longitude cannot be empty.")
    # Load a GeoDataFrame of US states
    us_states = gpd.read_file(
        "https://raw.githubusercontent.com/PublicaMundi/MappingAPI/master/data/geojson/us-states.json"
    )
    # Create the plot
    fig, ax = plt.subplots(figsize=(15, 9))
    us_states.plot(ax=ax, color="white", edgecolor="black")

    # Create a GeoDataFrame for the input data
    points = [
        Point(entry["longitude"], entry["latitude"])
        for entry in data
        if "longitude" in entry and "latitude" in entry
    ]
    gdf = gpd.GeoDataFrame(data, geometry=points)

    if color_by:
        # Extract unique categories and assign colors
        cats = {entry.get(color_by) for entry in data if color_by in entry}
        colormap = cm.get_cmap("tab10", len(cats))
        cats_colors = {sp: to_hex(colormap(i)) for i, sp in enumerate(cats)}
        # Plot the data points
        for sp in cats:
            subset = gdf[gdf[color_by] == sp]
            subset.plot(
                ax=ax,
                marker="o",
                color=cats_colors[sp],
                label=sp,
                markersize=14,
            )
        plt.legend(
            title=color_by,
            bbox_to_anchor=(1.05, 1),  # Position legend outside the plot
            loc="upper left",
            borderaxespad=0,
            frameon=True,
        )
    else:
        gdf.plot(
            ax=ax,
            marker="o",
            color="red",
            label="Observations",
            markersize=14,
        )

    # Dynamically adjust the map extent to center on the observations
    min_lon = gdf.geometry.x.min()
    max_lon = gdf.geometry.x.max()
    min_lat = gdf.geometry.y.min()
    max_lat = gdf.geometry.y.max()
    ax.set_xlim(min_lon - 1, max_lon + 1)
    ax.set_ylim(min_lat - 1, max_lat + 1)

    # Customize the plot
    plt.xlabel("Longitude")
    plt.ylabel("Latitude")
    if color_by:
        plt.title(f"Map of Observations Colored by {color_by}")
    plt.grid(True)

    # Adjust layout to make space for the legend
    plt.tight_layout(rect=(0, 0, 0.85, 1))  # Leave space on the right for the legend

    # Save the map to a byte buffer
    buffer = io.BytesIO()
    plt.savefig(buffer, format="jpeg", bbox_inches="tight")
    plt.close()
    buffer.seek(0)

    # Encode the image in base64
    encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
    buffer.close()

    return encoded_image

prompts

Module for defining prompts for the NPN MCP server.

PromptEntry

Bases: TypedDict

A dictionary type that represents a prompt entry.

Attributes:

Name Type Description
prompt Prompt

The prompt object associated with the entry.

template str

The template string for the prompt.

Source code in src/usa_npn_mcp_server/utils/prompts.py
class PromptEntry(TypedDict):
    """
    A dictionary type that represents a prompt entry.

    Attributes
    ----------
    prompt : Prompt
        The prompt object associated with the entry.
    template : str
        The template string for the prompt.
    """

    prompt: Prompt
    template: str

get_prompts

get_prompts()

Extract and return a list of Prompt objects from the PROMPTS dictionary.

Returns:

Type Description
list[Prompt]: A list of Prompt objects extracted from the PROMPTS dictionary.
Source code in src/usa_npn_mcp_server/utils/prompts.py
def get_prompts() -> list[Prompt]:
    """
    Extract and return a list of Prompt objects from the PROMPTS dictionary.

    Returns
    -------
        list[Prompt]: A list of Prompt objects extracted from the PROMPTS dictionary.
    """
    return [each["prompt"] for each in PROMPTS.values()]

resources

Module for defining MCP resources for the NPN MCP server.

Endpoints

usa_npn_mcp_server.utils.endpoints

NPN API endpoints available in MCP Server.

BaseQuery

Bases: BaseModel

Base class for endpoint queries.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class BaseQuery(BaseModel):
    """Base class for endpoint queries."""

    start_date: str = Field(
        ..., description="Start date in YYYY-MM-DD format. Must be used with end_date."
    )
    end_date: str = Field(
        ..., description="End date in YYYY-MM-DD format. Must be used with start_date."
    )
    bottom_left_x1: Optional[float] = Field(
        default=None,
        description="X coordinate of the bottom left corner for bounding box filtering.",
    )
    bottom_left_y1: Optional[float] = Field(
        default=None,
        description="Y coordinate of the bottom left corner for bounding box filtering.",
    )
    upper_right_x2: Optional[float] = Field(
        default=None,
        description="X coordinate of the upper right corner for bounding box filtering.",
    )
    upper_right_y2: Optional[float] = Field(
        default=None,
        description="Y coordinate of the upper right corner for bounding box filtering.",
    )
    species_id: Optional[int] = Field(
        default=None, description="Unique species identifier."
    )
    station_id: Optional[int] = Field(
        default=None,
        description="Unique identifier associated with an observer’s location.",
    )
    species_type: Optional[str] = Field(
        default=None,
        description="Species type(s) the organism belongs to. Must match values from getAnimalTypes and getPlantTypes.",
    )
    network: Optional[str] = Field(
        default=None,
        description="Name of the network(s)/group(s) where the organism is observed. Must match values from getPartnerNetworks.",
    )
    state: Optional[str] = Field(
        default=None,
        description="State where the observation occurred. Uses two-character postal abbreviation.",
    )
    phenophase_category: Optional[str] = Field(
        default=None,
        description="Phenophase category. Must match values from getPhenophase.",
    )
    phenophase_id: Optional[int] = Field(
        default=None, description="Unique identifier of the phenophase."
    )
    functional_type: Optional[str] = Field(
        default=None,
        description="Functional types of the species. Must match values from getSpeciesFunctionalTypes.",
    )
    climate_data: Optional[int] = Field(
        default=1,
        description="Flag to indicate whether all climate data fields should be returned. Accepts 0 or 1. Almost always beneficial to see climate data in relation to phenometric data.",
    )

StatusIntensityQuery

Bases: BaseQuery

Input parameters for the getObservations endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getObservations

Inherits all attributes from BaseQuery.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class StatusIntensityQuery(BaseQuery):
    """
    Input parameters for the getObservations endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getObservations

    Inherits all attributes from BaseQuery.
    """

    additional_field: Optional[
        Literal[
            "observedby_person_id",
            "partner_group",
            "species_functional_type",
            "species_category",
        ]
    ] = Field(
        default=None,
        description="Additional fields to include in output.",
    )

ObservationCommentQuery

Bases: BaseModel

Input parameters for the getObservationComment endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getObservationComment

Attributes:

Name Type Description
observation_id int

The ID of the observation for which to retrieve the comment.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class ObservationCommentQuery(BaseModel):
    """
    Input parameters for the getObservationComment endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getObservationComment

    Attributes
    ----------
    observation_id : int
        The ID of the observation for which to retrieve the comment.
    """

    observation_id: int = Field(
        description="The ID of the observation for which to retrieve the comment"
    )

IndividualPhenometricsQuery

Bases: BaseQuery

Input parameters for the getSummarizedData endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getSummarizedData

Attributes:

Name Type Description
individual_ids (Optional[List[int]], optional)

List of unique individual identifiers. Inherits all other attributes from BaseQuery.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class IndividualPhenometricsQuery(BaseQuery):
    """
    Input parameters for the getSummarizedData endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getSummarizedData

    Attributes
    ----------
    individual_ids : Optional[List[int]], optional
        List of unique individual identifiers.
        Inherits all other attributes from BaseQuery.
    """

    individual_ids: Optional[List[int]] = Field(
        default=None,
        description="List of unique identifiers of the individuals for which the observations are made.",
    )
    additional_field: Optional[
        Literal[
            "observedby_person_id",
            "partner_group",
            "species_functional_type",
            "species_category",
        ]
    ] = Field(
        default=None,
        description="Additional fields to include in output.",
    )

SitePhenometricsQuery

Bases: BaseQuery

Input parameters for the getSiteLevelData endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getSiteLevelData

Attributes:

Name Type Description
individual_ids (Optional[List[int]], optional)

List of unique individual identifiers. Inherits all other attributes from BaseQuery.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class SitePhenometricsQuery(BaseQuery):
    """
    Input parameters for the getSiteLevelData endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getSiteLevelData

    Attributes
    ----------
    individual_ids : Optional[List[int]], optional
        List of unique individual identifiers.
        Inherits all other attributes from BaseQuery.
    """

    individual_ids: Optional[List[int]] = Field(
        default=None,
        description="List of unique identifiers of the individuals for which the observations are made.",
    )
    additional_field: Optional[
        Literal["partner_group", "species_functional_type", "species_category"]
    ] = Field(
        default=None,
        description="Additional fields to include in output.",
    )

MagnitudePhenometricsQuery

Bases: BaseQuery

Input parameters for the getMagnitudeData endpoint.

URL: https://services.usanpn.org/npn_portal/observations/getMagnitudeData

Attributes:

Name Type Description
frequency int

Number of days by which to delineate the period of time. Should be less or equal to number of days between start_date and end_date. Inherits all other attributes from BaseQuery.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class MagnitudePhenometricsQuery(BaseQuery):
    """
    Input parameters for the getMagnitudeData endpoint.

    URL: https://services.usanpn.org/npn_portal/observations/getMagnitudeData

    Attributes
    ----------
    frequency : int
        Number of days by which to delineate the period of time.
        Should be less or equal to number of days between start_date and end_date.
        Inherits all other attributes from BaseQuery.
    """

    frequency: int = Field(
        ...,
        description="Number of days by which to delineate the period of time. Should be less or equal to number of days between start_date and end_date.",
    )
    additional_field: Optional[
        Literal["species_functional_type", "species_category"]
    ] = Field(
        default=None,
        description="Additional fields to include in output.",
    )

BasePlotModel

Bases: BaseModel

Base class for plotting input parameters.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class BasePlotModel(BaseModel):
    """Base class for plotting input parameters."""

    tool_name: str = Field(
        description="Name of the tool used to generate the data for the plot.",
    )
    plot_type: Literal["bar", "line", "scatter", "map"] = Field(
        description="Type of plot to generate.",
    )
    color_by: str = Field(
        ...,
        description="Variable to be used for color coding the data points.",
    )
    title: Optional[str] = Field(
        description="Title for the plot.",
    )

NonMapPlotModel

Bases: BasePlotModel

Input parameters for plotting data.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class NonMapPlotModel(BasePlotModel):
    """Input parameters for plotting data."""

    y_variable: Optional[str] = Field(
        description="Variable to be plotted on the y-axis.",
    )
    y_lab: Optional[str] = Field(
        description="Label for the y-axis of the plot.",
    )
    plot_type: Literal["bar", "line", "scatter"] = Field(
        description="Type of plot to generate.",
    )
    x_variable: Optional[str] = Field(
        description="Variable to be plotted on the x-axis.",
    )
    x_lab: Optional[str] = Field(
        description="Label for the x-axis of the plot.",
    )

MapModel

Bases: BasePlotModel

Input parameters for mapping data.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class MapModel(BasePlotModel):
    """Input parameters for mapping data."""

    plot_type: Literal["map"] = Field(
        description="Type of plot to generate.",
    )
    tool_name: Literal["site-phenometrics"] = Field(
        description="Name of the tool used to generate the data for the plot.",
    )
    color_by: str = Field(
        default="",
        description="Variable to be used for color coding the data points. Default is empty string for no coloring.",
    )

SQLQueryModel

Bases: BaseModel

Input parameters for querying custom database.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class SQLQueryModel(BaseModel):
    """Input parameters for querying custom database."""

    sql_query: str = Field(
        ...,
        description="SQL query to run against the SQLite3 database to fetch relevant data.",
    )

GetRawDataQuery

Bases: BaseModel

Input parameters for getting raw cached data.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class GetRawDataQuery(BaseModel):
    """Input parameters for getting raw cached data."""

    hash_id: str = Field(
        ..., description="Hash ID of cached query to retrieve raw data from"
    )

ExportRawDataQuery

Bases: BaseModel

Input parameters for exporting raw cached data to file.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class ExportRawDataQuery(BaseModel):
    """Input parameters for exporting raw cached data to file."""

    hash_id: str = Field(..., description="Hash ID of cached query to export")
    file_format: Literal["json", "jsonl"] = Field(
        ..., description="Export format: json or jsonl"
    )
    filename: Optional[str] = Field(
        default=None,
        description="Optional filename. If not provided, auto-generated from hash_id",
    )
    output_path: Optional[str] = Field(
        default=None,
        description="Output path for the file (relative to root or absolute within allowed roots). If not provided, saves to the first available root directory.",
    )

NPNTool

Bases: BaseModel

A class representing a tool available in the MCP server.

Attributes:

Name Type Description
name str

The name of the tool.

description str

A description of the tool.

docs_one_liner Optional[str]

A short one-line description for documentation purposes.

input_schema dict[str, Any]

The input schema for the tool.

endpoint str

The exact API endpoint for the tool.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class NPNTool(BaseModel):
    """
    A class representing a tool available in the MCP server.

    Attributes
    ----------
    name : str
        The name of the tool.
    description : str
        A description of the tool.
    docs_one_liner : Optional[str]
        A short one-line description for documentation purposes.
    input_schema : dict[str, Any]
        The input schema for the tool.
    endpoint : str
        The exact API endpoint for the tool.
    """

    name: str
    description: str
    docs_one_liner: Optional[str] = Field(
        default=None,
        description="Short one-line description for documentation purposes (not exposed via MCP)",
    )
    input_schema: dict[str, Any]
    endpoint: str

NPNTools

An enumeration of tools available for querying the NPN API.

Attributes:

Name Type Description
StatusIntensity NPNTool

Tool for querying intensity and status data from the NPN API.

ObservationComment NPNTool

Tool for retrieving comments associated with observations from the NPN API.

MagnitudePhenometrics NPNTool

Tool for querying magnitude phenometrics data from the NPN API.

SitePhenometrics NPNTool

Tool for querying site phenometrics data from the NPN API.

IndividualPhenometrics NPNTool

Tool for querying individual phenometrics data from the NPN API.

Mapping NPNTool

Tool for constructing maps from site phenometrics data.

QueryReferenceMaterial NPNTool

Tool for querying what reference material is available to translate natural language into specific ids and terms needed for querying the NPN API.

QueryLiterature NPNTool

Tool for querying 175 structured summaries of studies that used data collected by the National Phenology Network.

GetRawData NPNTool

Tool for retrieving raw data from cache using a hash ID.

ExportRawData NPNTool

Tool for exporting cached raw data to a JSON or JSONL file.

Source code in src/usa_npn_mcp_server/utils/endpoints.py
class NPNTools:
    """
    An enumeration of tools available for querying the NPN API.

    Attributes
    ----------
    StatusIntensity : NPNTool
        Tool for querying intensity and status data from the NPN API.
    ObservationComment : NPNTool
        Tool for retrieving comments associated with observations from the NPN API.
    MagnitudePhenometrics : NPNTool
        Tool for querying magnitude phenometrics data from the NPN API.
    SitePhenometrics : NPNTool
        Tool for querying site phenometrics data from the NPN API.
    IndividualPhenometrics : NPNTool
        Tool for querying individual phenometrics data from the NPN API.
    Mapping : NPNTool
        Tool for constructing maps from site phenometrics data.
    QueryReferenceMaterial : NPNTool
        Tool for querying what reference material is available to translate natural
        language into specific ids and terms needed for querying the NPN API.
    QueryLiterature : NPNTool
        Tool for querying 175 structured summaries of studies that used data collected
        by the National Phenology Network.
    GetRawData : NPNTool
        Tool for retrieving raw data from cache using a hash ID.
    ExportRawData : NPNTool
        Tool for exporting cached raw data to a JSON or JSONL file.
    """

    StatusIntensity = NPNTool(
        name="status-intensity",
        description="""
About the tool: Retrieves raw, unprocessed observation records from citizen and professional scientists documenting day-by-day phenological status (yes/no) and intensity measurements for individual plants and animal species. Each record represents a single observation event showing whether specific phenophases (like 'breaking leaf buds' or 'full bloom') were occurring on a particular date for a specific individual organism at a monitoring site.

When to use: Only for detailed analysis of specific observation events, quality control, or when you need the granular day-to-day data that underlies the aggregated metrics. Most users should use Individual, Site, or Magnitude Phenometrics instead.

Key applications: Data validation, understanding observer reporting patterns, analyzing day-to-day phenological transitions, custom aggregations not available in other tools.
Performance warning: This tool can return massive datasets (potentially millions of records). Always limit queries to small date ranges (≤30 days recommended) and specific geographic areas or species to prevent system crashes. Use aggregated tools (Individual/Site/Magnitude Phenometrics) for broader analyses.

Data interpretation: Values of -9999 represent missing/null data. Records include observation date, individual ID, phenophase status, intensity measurements, and site metadata.""",
        docs_one_liner="Fetches status and intensity data (raw observation data). Use sparingly (can return massive datasets), prioritize phenometrics tools.",
        input_schema=StatusIntensityQuery.model_json_schema(),
        endpoint="getObservations",
    )
    ObservationComment = NPNTool(
        name="observation-comment",
        description="Retrieve the comment for a given observation (from getObservationComment endpoint), results store as readable Resource 'observation_comment'",
        docs_one_liner="Fetches observation comments based on observation_id.",
        input_schema=ObservationCommentQuery.model_json_schema(),
        endpoint="getObservationComment",
    )
    MagnitudePhenometrics = NPNTool(
        name="magnitude-phenometrics",
        docs_one_liner="Fetches magnitude phenometrics (magnitude data).",
        description="""
About the tool: Summarizes the intensity and abundance of phenological activity across multiple individuals, sites, or time periods using aggregated status and intensity data. Shows 'how much' phenological activity is occurring (not just when), providing insights into the magnitude, synchrony, and temporal patterns of biological processes.

When to use: Understanding broad ecological patterns, studying synchrony between interacting species, analyzing peak activity timing, or investigating how environmental changes affect the intensity of biological processes across populations.

Key applications:

- Species synchrony analysis: Quantifying how synchronized phenological timing is between interacting species (pollinators and plants, herbivores and host plants, predators and prey)
- Peak activity timing: Identifying when maximum biological activity occurs across populations
- Climate change impacts: Studying how warming affects the magnitude and timing of phenological events
- Biodiversity patterns: Understanding temporal overlap in species activity within ecosystems
- Population-level responses: Analyzing how abundant or widespread phenological activity is across landscapes
- Conservation planning: Identifying critical timing windows for species management

Scientific context: Based on current research showing that phenological synchrony between species is shifting due to climate change, with implications for ecosystem functioning and species interactions. This tool helps quantify these critical ecological relationships.

Requires: Date range and frequency parameters (daily, weekly, etc.) are essential. Recommended to specify species and phenophases of interest to avoid overwhelming results.
Research applications:

- 'Are migrating birds arriving when their insect food sources are most abundant?'
- 'How synchronous is flowering across plant species in prairie communities?'
- 'Has climate change affected the temporal overlap between butterfly emergence and host plant activity?'

Data interpretation: Results show time-series data of phenological abundance/intensity aggregated by specified frequency. Values represent proportion of 'yes' records, animal abundance measures, or intensity metrics across the selected populations.""",
        input_schema=MagnitudePhenometricsQuery.model_json_schema(),
        endpoint="getMagnitudeData",
    )
    SitePhenometrics = NPNTool(
        name="site-phenometrics",
        docs_one_liner="Fetches site phenometrics (site-level data).",
        description="""
About the tool: Aggregates individual phenological data to provide average start and end dates of phenological activity for each species at each monitoring site. Represents the 'typical' timing for a species at a location by averaging across all individuals of that species at the site.

When to use: Creating phenological calendars, analyzing site-specific timing patterns, comparing phenology across locations, understanding regional growing seasons, or studying how local climate affects species timing.

Key applications:

- Phenological calendars: Creating seasonal timing guides for specific locations
- Growing season analysis: Quantifying length of active growing periods for sites/regions
- Climate relationship studies: Investigating how phenological timing relates to temperature, precipitation, and seasonal patterns
- Site comparisons: Comparing phenological timing across elevation gradients, latitude gradients, or different habitat types
- Regional management: Planning for activities like controlled burns, invasive species management, or ecotourism
- Agricultural applications: Understanding wild plant timing to inform crop management decisions

Scientific context: Site phenometrics average out individual variation to reveal location-specific phenological signatures. Essential for understanding how climate drivers affect species timing at landscape scales.

Research applications:

- 'When do oak leaves typically emerge at Yellowstone vs. Great Smoky Mountains?'
- 'How long is the typical growing season for maple species in Minnesota?'
- 'When should we expect peak wildflower blooms in different Colorado elevation zones?'

Data interpretation: Each record represents one species at one site for the specified time period. Start/end dates are averages across individuals. Sites represent uniform habitat areas ≤15 acres. Values of -9999 represent missing/null data.""",
        input_schema=SitePhenometricsQuery.model_json_schema(),
        endpoint="getSiteLevelData",
    )
    IndividualPhenometrics = NPNTool(
        name="individual-phenometrics",
        docs_one_liner="Fetches individual phenometrics (summarized data).",
        description="""
About the tool: Provides start and end dates of phenological activity for individual plants and animal species, derived from status data. Each record represents one 'phenological episode' - a period of continuous activity for a specific phenophase on an individual organism (like when one specific maple tree's leaves went from bud break to full leaf drop).

When to use: To understand phenological patterns within species, analyze individual plant behavior, study variation between organisms of the same species, or investigate multiple episodes of activity within a single growing season.

Key applications:

- Studying phenological diversity within populations
- Analyzing individual plant responses to local microclimates
- Documenting multiple flowering/leafing episodes in water-limited ecosystems
- Understanding species-specific phenological strategies
- Quality control for site-level aggregations
- Research on plant physiological responses to environmental triggers

Important considerations:

- For plants: Shows actual start/end dates for individual organisms
- For animals: Shows presence/absence periods at species level (since individual animals aren't tracked)
- Requires date range specification (typically calendar year)
- Multiple episodes may occur for same individual/phenophase within one season (e.g., after frost damage or drought recovery)
- Essential for understanding the biological basis of site-level patterns

Data interpretation: Records show individual_id, phenophase onset/end dates, and episode duration. -9999 values indicate missing data.""",
        input_schema=IndividualPhenometricsQuery.model_json_schema(),
        endpoint="getSummarizedData",
    )
    Mapping = NPNTool(
        name="mapping",
        description="Construct a map from results of a previous Site Phenometrics query to the NPN API, using longitude, latitude and specified variables to plot onto map of USA.",
        docs_one_liner="Maps site phenometrics onto a map of the USA with optional color labeling.",
        input_schema=MapModel.model_json_schema(),
        endpoint="",
    )
    QueryReferenceMaterial = NPNTool(
        name="query-reference-material",
        docs_one_liner="Queries database containing NPN API reference material using a generated SQL query.",
        description="""
            Query an SQL database for reference material that can be used to translate natural language into specific ids and terms needed for querying the NPN API with other tools. There is no need to query the 'datasets' table unless specific observer groups are mentioned. The Tables have the following structure:

            Table: species, Length: 1882, Headers: ['species_id', 'common_name', 'genus', 'genus_id', 'genus_common_name', 'species', 'kingdom', 'itis_taxonomic_sn', 'functional_type', 'class_id', 'class_common_name', 'class_name', 'order_id', 'order_common_name', 'order_name', 'family_id', 'family_name', 'family_common_name', 'species_type']
            Description: Contains info on species

            Table: phenophases, Length: 383, Headers: ['definition_id', 'dataset_id', 'phenophase_id', 'phenophase_name', 'definition', 'start_date', 'end_date', 'comments']
            Description: Contains info on phenophases

            Table: phenoclasses, Length: 226, Headers: ['phenophase_id', 'phenophase_description', 'definition_ids', 'phenophase_names']
            Description: Contains info on phenoclasses (a grouping of phenophases)

            Table: datasets, Length: 14, Headers: ['dataset_id', 'dataset_name', 'dataset_description', 'dataset_comments', 'dataset_documentation_url']
            Description: Contains info on datasets and their contributors

            Table: networks, Length: 854, Headers: ['network_id', 'network_name']
            Description: Contains info on observation groups or networks (aka partner groups)
""",
        input_schema=SQLQueryModel.model_json_schema(),
        endpoint="",
    )

    QueryLiterature = NPNTool(
        name="query-literature",
        docs_one_liner="Queries database of structured summaries from 175 papers that use phenology and phenometrics data.",
        description="""
            Query an SQL database for structured summaries of studies that used data collected by National Phenology Network. The tables have the following structure:

            Table: literature, Length: 175, Headers: ['Title', 'Authors', 'DOI', 'DOI link', 'Venue', 'Citation count', 'Year', 'Filename', 'Measured variables', 'Temporal Range', 'Spatial Scope', 'Data Filtering', 'Statistical Tests', 'Modelling', 'Software Tools', 'Limitations', 'Main findings', 'Research gaps', 'Future research', 'Independent variables', 'Dependent variables', 'Organism', 'Summary of discussion', 'API Query', "Supporting quotes for 'Measured variables'", "Supporting tables for 'Measured variables'", "Reasoning for 'Measured variables'", "Supporting quotes for 'Temporal Range'", "Supporting tables for 'Temporal Range'", "Reasoning for 'Temporal Range'", "Supporting quotes for 'Spatial Scope'", "Supporting tables for 'Spatial Scope'", "Reasoning for 'Spatial Scope'", "Supporting quotes for 'Data Filtering'", "Supporting tables for 'Data Filtering'", "Reasoning for 'Data Filtering'", "Supporting quotes for 'Statistical Tests'", "Supporting tables for 'Statistical Tests'", "Reasoning for 'Statistical Tests'", "Supporting quotes for 'Modelling'", "Supporting tables for 'Modelling'", "Reasoning for 'Modelling'", "Supporting quotes for 'Software Tools'", "Supporting tables for 'Software Tools'", "Reasoning for 'Software Tools'", "Supporting quotes for 'Limitations'", "Supporting tables for 'Limitations'", "Reasoning for 'Limitations'", "Supporting quotes for 'Main findings'", "Supporting tables for 'Main findings'", "Reasoning for 'Main findings'", "Supporting quotes for 'Research gaps'", "Supporting tables for 'Research gaps'", "Reasoning for 'Research gaps'", "Supporting quotes for 'Future research'", "Supporting tables for 'Future research'", "Reasoning for 'Future research'", "Supporting quotes for 'Independent variables'", "Supporting tables for 'Independent variables'", "Reasoning for 'Independent variables'", "Supporting quotes for 'Dependent variables'", "Supporting tables for 'Dependent variables'", "Reasoning for 'Dependent variables'", "Supporting quotes for 'Organism'", "Supporting tables for 'Organism'", "Reasoning for 'Organism'", "Supporting quotes for 'Summary of discussion'", "Supporting tables for 'Summary of discussion'", "Reasoning for 'Summary of discussion'", "Supporting quotes for 'API Query'", "Supporting tables for 'API Query'", "Reasoning for 'API Query'"]
            Description: Contains structured summaries of 175 papers that use phenology and phenometrics, included in the table is the reasoning and sourcing for each summary column.
        """,
        input_schema=SQLQueryModel.model_json_schema(),
        endpoint="",
    )

    GetRawData = NPNTool(
        name="get-raw-data",
        description="Retrieve raw data from cache using hash ID. Limited to 300 records with truncation message if exceeded. Use 'recent-queries' resource to see available hash IDs.",
        docs_one_liner="Fetches raw data instead of summaries as from other tools. Limited to 300 records with truncation message if exceeded.",
        input_schema=GetRawDataQuery.model_json_schema(),
        endpoint="",
    )

    ExportRawData = NPNTool(
        name="export-raw-data",
        description="Export cached raw data to JSON or JSONL file. Requires MCP client to provide roots (allowed directories) for file operations.",
        docs_one_liner="Exports raw data to JSON or JSONL files in allowed directories.",
        input_schema=ExportRawDataQuery.model_json_schema(),
        endpoint="",
    )

Output Schema

usa_npn_mcp_server.utils.output_schema

Module for API output schemas for the NPN API.

Plotting

usa_npn_mcp_server.utils.plotting

Plottiing Module for MCP Server plotting functions.

generate_map async

generate_map(data, color_by)

Generate a map with lat/long as axes, overlaying a US map with state outlines.

Parameters:

Name Type Description Default
data list[Dict[str, Any]]

The input data, with fields containing latitude and longitude coordinates.

required
color_by str

The variable to color the points by. If empty string, uses default red color.

required

Returns:

Type Description
str

Base64 encoded image of the map in JPEG format.

Source code in src/usa_npn_mcp_server/utils/plotting.py
async def generate_map(data: list[Dict[str, Any]], color_by: str) -> str:
    """
    Generate a map with lat/long as axes, overlaying a US map with state outlines.

    Parameters
    ----------
    data : list[Dict[str, Any]]
        The input data, with fields containing latitude and longitude coordinates.
    color_by : str
        The variable to color the points by. If empty string, uses default red color.

    Returns
    -------
    str
        Base64 encoded image of the map in JPEG format.
    """
    if not data:
        raise ValueError("Data cannot be empty.")
    if not any(entry.get("longitude") for entry in data) and not any(
        entry.get("latitude") for entry in data
    ):
        raise ValueError("Latitude and Longitude cannot be empty.")
    # Load a GeoDataFrame of US states
    us_states = gpd.read_file(
        "https://raw.githubusercontent.com/PublicaMundi/MappingAPI/master/data/geojson/us-states.json"
    )
    # Create the plot
    fig, ax = plt.subplots(figsize=(15, 9))
    us_states.plot(ax=ax, color="white", edgecolor="black")

    # Create a GeoDataFrame for the input data
    points = [
        Point(entry["longitude"], entry["latitude"])
        for entry in data
        if "longitude" in entry and "latitude" in entry
    ]
    gdf = gpd.GeoDataFrame(data, geometry=points)

    if color_by:
        # Extract unique categories and assign colors
        cats = {entry.get(color_by) for entry in data if color_by in entry}
        colormap = cm.get_cmap("tab10", len(cats))
        cats_colors = {sp: to_hex(colormap(i)) for i, sp in enumerate(cats)}
        # Plot the data points
        for sp in cats:
            subset = gdf[gdf[color_by] == sp]
            subset.plot(
                ax=ax,
                marker="o",
                color=cats_colors[sp],
                label=sp,
                markersize=14,
            )
        plt.legend(
            title=color_by,
            bbox_to_anchor=(1.05, 1),  # Position legend outside the plot
            loc="upper left",
            borderaxespad=0,
            frameon=True,
        )
    else:
        gdf.plot(
            ax=ax,
            marker="o",
            color="red",
            label="Observations",
            markersize=14,
        )

    # Dynamically adjust the map extent to center on the observations
    min_lon = gdf.geometry.x.min()
    max_lon = gdf.geometry.x.max()
    min_lat = gdf.geometry.y.min()
    max_lat = gdf.geometry.y.max()
    ax.set_xlim(min_lon - 1, max_lon + 1)
    ax.set_ylim(min_lat - 1, max_lat + 1)

    # Customize the plot
    plt.xlabel("Longitude")
    plt.ylabel("Latitude")
    if color_by:
        plt.title(f"Map of Observations Colored by {color_by}")
    plt.grid(True)

    # Adjust layout to make space for the legend
    plt.tight_layout(rect=(0, 0, 0.85, 1))  # Leave space on the right for the legend

    # Save the map to a byte buffer
    buffer = io.BytesIO()
    plt.savefig(buffer, format="jpeg", bbox_inches="tight")
    plt.close()
    buffer.seek(0)

    # Encode the image in base64
    encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
    buffer.close()

    return encoded_image

Prompts

usa_npn_mcp_server.utils.prompts

Module for defining prompts for the NPN MCP server.

PromptEntry

Bases: TypedDict

A dictionary type that represents a prompt entry.

Attributes:

Name Type Description
prompt Prompt

The prompt object associated with the entry.

template str

The template string for the prompt.

Source code in src/usa_npn_mcp_server/utils/prompts.py
class PromptEntry(TypedDict):
    """
    A dictionary type that represents a prompt entry.

    Attributes
    ----------
    prompt : Prompt
        The prompt object associated with the entry.
    template : str
        The template string for the prompt.
    """

    prompt: Prompt
    template: str

get_prompts

get_prompts()

Extract and return a list of Prompt objects from the PROMPTS dictionary.

Returns:

Type Description
list[Prompt]: A list of Prompt objects extracted from the PROMPTS dictionary.
Source code in src/usa_npn_mcp_server/utils/prompts.py
def get_prompts() -> list[Prompt]:
    """
    Extract and return a list of Prompt objects from the PROMPTS dictionary.

    Returns
    -------
        list[Prompt]: A list of Prompt objects extracted from the PROMPTS dictionary.
    """
    return [each["prompt"] for each in PROMPTS.values()]