CTIgor: An Agentic CTI Assistant

Continuing down the road of my ongoing experimentation with Generative AI, one of these experiments manifested into a Cyber Threat Intelligence assistant to help facilitate the threat post/report consumption process. This post will follow the steps of gradually building this out, as an educational exercise.

Young Frankenstein: Igor GIF

Introduction

Not too long ago, I came across this AI Agents for Beginners 10-lesson mini-course shared by one of my contacts. Working through the course, I decided to try applying some of the example use-cases to a practical area of cyber security presenting a common scaling problem: review + cataloging of Cyber Threat Intelligence (CTI) reports. The outcome of this is a project I chose to name CTIgor, an assistant in threat reporting review. In this blog post, I’ll discuss the steps in building it (somewhat) from scratch.

Azure OpenAI Deployment

As the AI Agents Courselet is a Microsoft publication, many of the examples are Azure-centric. For those wishing to utilize alternative services, or even locally-hosted LLMs, the semantic-kernel AI connectors offer support for AWS Bedrock, Ollama, and others. In the Python code at https://github.com/ckane/ctigor/tree/semantic-kernel-port/, the use of AzureChatCompletion and AzureChatPromptExecutionSettings can be substituted with instances of the relevant types for the target LLM platform or interface. The instructions below will use Azure’s OpenAI interface, but reliance upon it will be minimized so as to maintain portability.

In order to help bootstrap a fairly inexpensive AI deployment using Azure, I’ve written some Terraform code that can be used to deploy the needed OpenAI infrastructure in Azure. Microsoft has a partnership with OpenAI which has led them to offer a lot of OpenAI-derived services through Azure and an OpenAI-compatible API.

The code in main.tf deploys the following infrastructure into Azure to support generative AI:

A number of input variables are defined in variables.tf, that allows customization of the deployment. Placeholders for the required parameters are in terraform.tfvars.example. This file must be copied to terraform.tfvars and edited to add your Azure Subscription Id plus three unique descriptive names for the infrastructure defined in main.tf. This is largely intended to be a simple example for descriptive purposes, but with an active Azure Subscription, it should work nonetheless, to quickly bootstrap a working environment. Note that the infrastructure deployed will cost money per use, so the default model choice I’ve picked is gpt-4o-mini, which is priced around $0.15 per 1M input tokens and $0.60 per 1M output (generated) tokens. The model supports a context window of 128k tokens, which provides sufficient storage for many large CTI reports. Generally speaking, the more expensive models are targeted at more complex tasks or satisfying more ambiguous or lengthy queries.

Python Environment

For the project ctigor, I’ve built the project using the uv Python framework. The project should “just work” with an installed uv system after cloning the repository. However, for anyone wanting to use their preferred Python environment manager or more traditional pip system, I also maintain a requirements.txt for you in the repository, and all the uv details can be ignored if you prefer.

uv is nice, as you can run uv run script.py (substituting script.py for the Python script or command you’d like to run within the managed virtual environment) and it will handle the venv details behind the scenes for you.

Similar to Terraform above, for Python I’ve placed a local_settings.py.example file which needs to be copied into local_settings.py and then populated with the appropriate values from Terraform. The terraform output command can be used to retrieve them:

For local_settings.azure_api_key:

terraform output openai_primary_key

For local_settings.endpoint:

terraform output openai_endpoint

And then make sure that local_settings.deployment matches the name of the Cognitive Services deployment you defined in terraform.tfvars.

Example Testing AI Functionality

A quick test of the Azure OpenAI service is important, to validate everything is working alright. The following Python script will prompt the LLM to generate a fact about hedgehogs. The code has been commented to explain each of the steps

import asyncio

# Import the semantic-kernel objects used by the program
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, AzureChatPromptExecutionSettings
from semantic_kernel.contents import ChatHistory
from semantic_kernel.kernel import Kernel

# Import secrets from local_settings.py
import local_settings

def create_azure_chat_completion():
    # Define the Azure OpenAI AI Connector and connect to the deployment Terraform provisioned from main.tf
    chat_service = AzureChatCompletion(
        deployment_name=local_settings.deployment,
        api_key=local_settings.azure_api_key,
        endpoint=local_settings.endpoint,
    )
    return chat_service

async def main():
    # Instantiate a new AI Kernel
    kernel = Kernel()

    # Define a ChatHistory object and add a user prompt to it
    chat_history = ChatHistory()
    chat_history.add_user_message("Tell me a fact about hedgehogs")

    # Create a new Azure Chat Completion instance to interact with
    chat_service = create_azure_chat_completion()

    # Define the request settings to use model defaults
    request_settings = AzureChatPromptExecutionSettings()

    # Prompt the model with the given chat history
    response = await chat_service.get_chat_message_content(
        chat_history=chat_history, settings=request_settings, kernel=kernel
    )

    # Fail if no response
    assert response is not None

    # Display response on console
    print(f"=======\n{response}\n=======")

if __name__ == "__main__":
    asyncio.run(main())

When run, it should produce output similar to the following:

=======
Hedgehogs are nocturnal animals, meaning they are primarily active during the night.
They have excellent senses of hearing and smell, which help them navigate and find
food in the dark.
=======

The instantiation of the model works by being fed an incrementally updated ChatHistory which is appended to any time a new prompt from the user is given or an answer is generated by the LLM. Each new prompt or answer can be appended to the ChatHistory calling the appropriate ChatHistory.add_user_message or ChatHistory.add_assistant_message method. This ensures that for each subsequent interaction, the instance retains a “memory” of the conversation, allowing parts of the conversation to be referred back to and used for subsequent answers.

CTIgor Class

After verifying the above, the symantic-kernel initialization code will next be moved into a new ctiagent.py module:

import asyncio

from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, AzureChatPromptExecutionSettings
from semantic_kernel.contents import ChatHistory
from semantic_kernel.kernel import Kernel

# Import secrets from local_settings.py
import local_settings

class CTIgor(object):
    def __init__(self):
        # Initialize a new Kernel to work with
        self.kernel = Kernel()

        # Define the Azure OpenAI AI Connector and connect to the deployment Terraform provisioned from main.tf
        self.chat_service = AzureChatCompletion(
            deployment_name=local_settings.deployment,
            api_key=local_settings.azure_api_key,
            endpoint=local_settings.endpoint,
        )

        # Define a ChatHistory object
        self.chat_history = ChatHistory()

        # Define the request settings to use model defaults
        self.request_settings = AzureChatPromptExecutionSettings()

    async def prompt(self, input_prompt: str):
        self.chat_history.add_user_message(input_prompt)

        # Prompt the model with the given chat history, waiting for response
        response = await self.chat_service.get_chat_message_content(
            chat_history=self.chat_history, settings=self.request_settings, kernel=self.kernel
        )

        # Ensure response isn't None
        assert response is not None

        # Append the response to the chat_history
        self.chat_history.add_assistant_message(response.content)

        return response

The new CTIgor object simplifies the interaction with the model instance to have a single prompt(str) call which performs all the work of:

  • Adding the user prompt to the ChatHistory instance
  • Sending the updated ChatHistory instance to the LLM
  • Waiting on a response
  • Validating the response completed successfully
  • Adding the response from the AI assistant to the ChatHistory
  • Returning the validated response to the caller

Additionally, all of the variables declared in the main() and global scopes are now cleanly organized inside a data type that will handle the AI interactions.

With these adjustments, refactoring the earlier test_openai.py script now looks like this:

import asyncio

from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, AzureChatPromptExecutionSettings
from semantic_kernel.contents import ChatHistory
from semantic_kernel.kernel import Kernel

from ctiagent import CTIgor

async def main():
    ctigor = CTIgor()
    response = await ctigor.prompt("Tell me a fact about hedgehogs")

    # Display response on console
    print(f"=======\n{response}\n=======")

if __name__ == "__main__":
    asyncio.run(main())

Successive calls to CTIgor.prompt will append the new input and output to the instance’s internal ChatHistory.

Add a Helper Function Plugin

The semantic_kernel.functions package contains an interface to provide context-aware function plugins to the agent for performing tasks. Declaring function plugins, and supplying English-language metadata with them when registering with the Kernel provides the LLM with awareness of code that it can choose to call. You define the conditions under which the LLM will call the utility function(s) much in the same manner that you would use to explain to another person how and when to use the tool. Because of this, annotations for semantic-kernel double as documentation for your code: a nice perk!

Consider the following utility function definition: it defines a function to call that takes two input values, a high and a low bound, and will return an int value that falls within the range (inclusive of the bounds):

from random import randint
from typing import Annotated
from semantic_kernel.functions import kernel_function

class RandomNumberPlugin:
    """Generates a Random Number"""

    @kernel_function(
      name='gen_random',
      description="Generate a random number given a low and high bound"
    )
    async def gen_random(
      self,
      low: Annotated[int, "Lower bound of the random number"],
      high: Annotated[int, "Upper bound of the random number"]
    ) -> Annotated[int, "Generated random number within the bounds"]:
        print(f"Running gen_random with low={low}, high={high}")
        return randint(low, high)

In the above code, the @kernel_function decorator is used to give an English-language description that the LLM will use to determine under what conditions to execute the function, as well as a short description of what the inputs mean, and what the function will return. In the above, the print call isn’t necessary for the logic to work, but instead provides a useful signal that validates to us when the agent is calling the function. Furthermore, the use of typing.Annotated on the input variables and the output will also be utilized by the LLM to decide how and when to use the function.

The above can be added to a new ctagent_functions.py file, and then loaded with from ctiagent_functions import RandomNumberPlugin. However, in order for the LLM to know to use it, the LLM needs to be told to enable function-calling, as well as have this new function class registered in the Kernel. To tell the LLM to enable function calling, the following needs to be added to the imports in ctiagent.py:

from semantic_kernel.connectors.ai import FunctionChoiceBehavior
from ctiagent_functions import RandomNumberPlugin

And then, in ctiagent.CTIgor, the AzureChatPromptExecutionSettings() call needs to be told to override the default behavior with FunctionChoiceBehavior, for our purpose, using Automatic invocation.

# Define the request settings to use automatic function-choice execution
self.request_settings = AzureChatPromptExecutionSettings(function_choice_behavior=FunctionChoiceBehavior.Auto())

The plugin can then be registered like this inside of ctiagent.CTIgor.__init__:

# Register the RandomNumberPlugin with the kernel
self.kernel.add_plugin(RandomNumberPlugin(), plugin_name="random_number")

The new ctiagent.py code should look like:

import asyncio

from semantic_kernel.connectors.ai import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.connectors.ai.open_ai import AzureChatPromptExecutionSettings
from semantic_kernel.contents import ChatHistory
from semantic_kernel.kernel import Kernel

from ctiagent_functions import RandomNumberPlugin

# Import secrets from local_settings.py
import local_settings

class CTIgor(object):
    def __init__(self):
        # Initialize a new Kernel to work with
        self.kernel = Kernel()

        # Define the Azure OpenAI AI Connector and connect to the deployment Terraform
        # provisioned from main.tf
        self.chat_service = AzureChatCompletion(
            deployment_name=local_settings.deployment,
            api_key=local_settings.azure_api_key,
            endpoint=local_settings.endpoint,
        )

        # Define a ChatHistory object
        self.chat_history = ChatHistory()

        # Define the request settings to use model defaults
        self.request_settings = AzureChatPromptExecutionSettings(
            function_choice_behavior=FunctionChoiceBehavior.Auto()
        )

        # Register the RandomNumberPlugin with the kernel
        self.kernel.add_plugin(RandomNumberPlugin(), plugin_name="random_number")

    async def prompt(self, input_prompt: str):
        self.chat_history.add_user_message(input_prompt)

        # Prompt the model with the given chat history, waiting for response
        response = await self.chat_service.get_chat_message_content(
            chat_history=self.chat_history, settings=self.request_settings, kernel=self.kernel
        )

        # Ensure response isn't None
        assert response is not None

        # Append the response to the chat_history
        self.chat_history.add_assistant_message(response.content)

        return response

Finally, we’ll make a new program named test_chat.py that implements a simple prompt-response chat loop. The addition of the RandomNumberPlugin to ctiagent.CTIgor should provide the glue necessary for the LLM to choose to run it when needed:

import asyncio
from ctiagent import CTIgor

async def main():
    ctigor = CTIgor()

    # Loop forever while the user has more input
    while True:
        try:
            # Get input from the user, display a prompt to indicate waiting on user input
            user_prompt = input('CTIgor> ')

            # If user says 'quit' then exit
            if user_prompt.lower() in ['quit', 'exit', 'bye']:
                raise EOFError

            # Send the user's prompt to the LLM and wait for the response
            response = await ctigor.prompt(user_prompt)

            # Display response on console
            print(f"=======\n{response}\n=======")
        except EOFError:
            # On EOF, exit the program gracefully
            print("Thank you, bye!")
            break

if __name__ == "__main__":
    asyncio.run(main())

Testing the Chat Agent

Now, if you run test_chat.py (uv run test_chat.py if using uv), the LLM can be given prompts for information like a normal AI chat assistant, like:

CTIgor> What are three planets in our solar system?
=======
Three planets in our solar system are:

1. Earth
2. Mars
3. Jupiter
=======

Testing the Chat Agent w/ Random Number Generation

Further, prompts that ask it to generate random numbers will trigger the print statement mentioned above, indicating the helper function is being run by the agent to generate the random value before providing it to us:

CTIgor> Give me a random number between 723 and 64
Running gen_random with low=64, high=723
=======
The random number between 64 and 723 is 489.
=======

Of note in the above is that due to our Annotation objects added to the function definition, regardless of what order the bounds are provided to the agent, it will properly identify which is the “low” and which is the “high” bound.

Even more abstract requests for random selection of information can trigger the function call too. For example, combining the prior two prompts:

CTIgor> Give me a random name of one of our solar system's planets
Running gen_random with low=0, high=7
=======
The random name of one of our solar system's planets is Saturn.
=======

From the above, it is clear that the LLM uses our random-number generation helper function to generate a random number between 0 and 7 (inclusive) - for 8 possible options. Clearly, the LLM is aware of Pluto’s demotion.

If you don’t like that behavior, instructing the LLM to consider Pluto a planet may look like this:

CTIgor> Give me another random name of one of our solar system's planets, but assume Pluto is also a planet.
Running gen_random with low=0, high=8
=======
The random name of one of our solar system's planets is Venus.
=======

Add Content-Reading Capability

Let’s start with adding another simple helper function to ctiagent_functions.py:

class FilePlugin:
    """A Plugin to handle work with files"""

    @kernel_function(
        name="load_file_content",
        description="Load a file from disk, given a filename. Returns a bytes object of the file contents."
    )
    async def load_file_content(
        self, file_name: Annotated[str, "The name and path of the file on disk to return the contents of"]
    ) -> Annotated[bytes, "The contents from the file"]:
        with open(file_name, "rb") as txtfile:
            return txtfile.read()

The above class implements a function named load_file_content which takes a file name as input (as a str type) and returns a bytes of the file’s content to the caller. The function assumes that the file is a regular text file (trusts user discretion) and performs no validation on the file before loading it.

This can be connected to the agent by adding the necessary code at the right places in ctiagent.py where the references to RandomNumberPlugin were placed:

from ctiagent_functions import RandomNumberPlugin, FilePlugin
class CTIgor(object):
    def __init__(self):
        ....
        # Register the RandomNumberPlugin, FilePlugin with the kernel
        self.kernel.add_plugin(RandomNumberPlugin(), plugin_name="random_number")
        self.kernel.add_plugin(FilePlugin(), plugin_name="file")

Once it is properly added to ctiagent.py, the chat loop inside of test_chat.py will now be able to take advantage of it, given appropriate direction. We can render an HTML CTI report into text using the lynx -dump <url> command. This will render the HTML to text on stdout, which can be redirected to a file:

lynx -dump https://cloud.google.com/blog/topics/threat-intelligence/china-nexus-espionage-targets-juniper-routers/ > /tmp/juniper-routers.txt

The file /tmp/juniper-routers.txt can then be loaded into the agent’s model, to be further analyzed and queried using natural language prompts. For example (long lines have been manually wrapped):

CTIgor> Summarize the contents of /tmp/juniper-routers.txt for me. I am a technical cyber security analyst.
=======
The document provides an analysis of a cyber espionage campaign conducted by 
the China-nexus group UNC3886, targeting Juniper Networks' Junos OS routers and 
deploying custom backdoors called TINYSHELL. Here is a summary of key points 
relevant to a technical cybersecurity analyst:

1. **Incident Overview**: In mid-2024, Mandiant discovered that threat actors 
had deployed **custom backdoors** on Juniper routers, attributed to UNC3886. 
These backdoors disable logging and allow for active and passive access to the 
compromised systems.

2. **Affected Devices**: The attackers primarily targeted **Juniper MX 
routers** operating with end-of-life hardware and software, demonstrating that 
outdated systems are more susceptible to such attacks.

3. **Malware Characteristics**:
   - The analysis identified **six distinct malware samples**, all based on 
     TINYSHELL, each with unique functionalities, including:
     - `appid`: Active backdoor mimicking a legitimate binary.
     - `to`, `irad`, `lmpad`, `jdosd`, and `oemd`: Various active and passive 
       backdoors with specific operational tasks.
   - The malware relies on **process injection techniques** to bypass security 
     mechanisms, specifically Juniper's verified exec subsystem (Veriexec), by 
     executing malicious code within legitimate process memory.

4. **Attack Methodology**: 
   - Attacks exploit legitimate credentials to gain privileged access to the 
     routers.
   - Post-compromise activity involves manipulating process memory and 
     disabling logging functionalities to maintain stealth and long-term persistence.
   - Command and Control (C2) servers are utilized for remote access and 
     commands, with robust encryption for communications.

5. **Mitigation Recommendations**:
   - Organizations should upgrade their Juniper devices to the latest software 
     images.
   - Implement a centralized Identity and Access Management (IAM) system with 
     robust authentication protocols.
   - Enhance network monitoring, and validate configurations against defined 
     security standards.
   - Proactively leverage threat intelligence for ongoing evaluation of 
     security postures against emerging threats.

6. **Indicators of Compromise (IoCs)**: The document includes specific IoCs 
(e.g., filenames, hashes of malware samples, and C2 IP addresses) for detecting 
the identified malware.

In conclusion, this incident illustrates the evolving tactics of advanced 
persistent threats (APTs) and emphasizes the need for organizations to maintain 
updated security measures and conduct regular assessments of their network 
infrastructure to defend against such targeted attacks.
=======

Note that in the prompt, I told it my role is a technical cyber security analyst. Providing different roles for yourself (the user) can help guide the LLM to generate output tailored to your audience. You can try I am a CEO or I am an artist running an online studio. My personal favorite is I am a five-year-old child. Experimenting with these is an exercise left to the reader, I won’t demo them exhaustively here.

Further inquiries about the document can be made, as well:

CTIgor> Show me the C2 IP addresses, formatted as a JSON list of strings.
=======
Here are the C2 IP addresses formatted as a JSON list of strings:

```json
[
    "129.126.109.50:22",
    "116.88.34.184:22",
    "223.25.78.136:22",
    "45.77.39.28:22",
    "101.100.182.122:22",
    "118.189.188.122:22",
    "158.140.135.244:22",
    "8.222.225.8:22"
]
```
=======

In the above, the C2 IPs were present in the document, but the summary didn’t include them, so we inquired further about them. If we want to have the agent dig into and explain the details of how Veriexec was abused, we can do that too:

CTIgor> How exactly did it bypass security using veriexec?
=======
The malware managed to circumvent Veriexec protection through a process 
injection technique. Here's a detailed breakdown of how this was achieved:

1. **Understanding Veriexec**: Veriexec is a kernel-based file integrity 
subsystem used in Junos OS that prevents unauthorized binaries, libraries, and 
scripts from executing. Its primary role is to maintain the integrity of the 
operating system by blocking code that has not been verified.

2. **Challenge of Disabling Veriexec**: Directly disabling Veriexec would 
trigger alerts. Therefore, the attackers needed a method that allowed them to 
run untrusted code without raising alarms.

3. **Process Injection Technique**: Instead of trying to disable Veriexec, the 
threat actors executed malicious code within the context of a legitimate, 
trusted process. This technique allowed the malware to run without triggering 
Veriexec's security measures. 

4. **Execution Steps**:
   - The threat actor first gained privileged access to a Juniper router using 
     legitimate credentials.
   - They entered the FreeBSD shell from the Junos OS command-line interface 
     (CLI).
   - The malicious actors generated a Base64-encoded payload (named `ldb.b64`) 
     that was then decoded and extracted into a compressed archive containing the 
     malware binaries.
   - They injected malicious payloads into the memory of a legitimate process 
     (a newly spawned `cat` process). This involved using the `dd` command to write 
     the malicious code directly into the memory of the `cat` process.

5. **Impact of the Injection**: By redirecting the executed commands within the 
legitimate process, the malware effectively replaced certain function calls. 
For instance, they altered the global offset table entry for the `fclose` 
function, which allowed the `cat` process to execute the shellcode loader of 
the malware instead of its intended function, thus executing the injected code 
without alerting the Veriexec system.

6. **Consequences**: This method granted the attackers the ability to run their 
malicious payloads while leaving the original legitimate processes untouched, 
thereby maintaining a low profile within the compromised system and enabling 
continuous access.

In summary, by employing process injection within the context of a trusted 
process, the attackers successfully bypassed Veriexec's security measures 
without triggering alerts, allowing their malware to execute while minimizing 
the risk of detection.
=======

Another example:

CTIgor> Are there any URLs that I should visit with more details about this specific attack?
=======
Here are some URLs to resources that provide detailed information about the 
specific attack by UNC3886 on Juniper routers, as well as broader insights into 
similar cyber espionage tactics:

1. **Mandiant Blog Post on the Incident**:
   - [Ghost in the Router: China-Nexus Espionage Actor UNC3886 Targets Juniper 
      Routers](https://cloud.google.com/blog/topics/threat-intelligence/china-nexus-espionage-targets-juniper-routers)
     - This blog post by Mandiant details the 
     discovery of the backdoors on Juniper routers, the techniques used, and 
     recommended actions for organizations.

2. **Mandiant Threat Intelligence Reports**:
   - [Mandiant Threat Intelligence 
      Reports](https://www.mandiant.com/resources/reports) - A collection of threat 
     intelligence reports, including those that detail activities of cyber espionage 
     groups like UNC3886.

3. **Juniper Networks Security Advisories**:
   - [Juniper Networks Security 
      Advisories](https://www.juniper.net/support/advisories/) - Here, you can find 
     security advisories related to vulnerabilities and incidents affecting Juniper 
     products, including any advisories related to the malware discovered by 
     Mandiant.

4. **MITRE ATT&CK Framework**:
   - [MITRE ATT&CK](https://attack.mitre.org/) - The database of tactics and 
     techniques that attackers may use. Search for techniques related to "process 
     injection" or other methods discussed in the report.

5. **Cisco Talos Intelligence Group**:
   - [Cisco Talos Intelligence Reports](https://talosintelligence.com/) - 
     Provides updates and reports on the latest threats, vulnerabilities, and 
     incidents relevant to Cisco products and the broader cybersecurity landscape.

6. **FireEye Threat Intelligence**:
   - [FireEye Intelligence 
      Reports](https://www.fireeye.com/current-threats/threat-intelligence-reports.html) -
     FireEye provides insights into various APT groups and their tactics, 
     including reports on activities of cyber espionage.

7. **Cybersecurity & Infrastructure Security Agency (CISA)**:
   - [CISA Alerts & Technical 
      Guidance](https://www.cisa.gov/uscert/ncas/current-activity) - CISA regularly 
     publishes alerts about vulnerabilities and threats affecting critical 
     infrastructure, which often includes network devices.

These resources will provide you with explicit insights and further context 
about the methodologies used in the attack, the implications for organizations, 
and general recommendations for improving security.
=======

Adding a Better Front-End

This is helpful, but it is basically an AI chat-bot at its core, with a few additional features. Due to this approach, using it is a bit cumbersome for providing CTI assistance to report review. A more purpose-built front end (still command-line, though) is warranted. Let’s add some command-line options, as well as a bootstrap behavior to streamline loading the specified CTI report.

A new program named ctigor_summarize.py built against the same interface, and derived from test_chat.py:

import asyncio
from argparse import ArgumentParser
from ctiagent import CTIgor

class CTIgorReportSummarizer:
    def __init__(self):
        pass

    def argparse():
        ap = ArgumentParser(description="Have the AI agent ummarize a CTI report, and optionally provide interactive analysis of it")
        ap.add_argument('-f', '--filename', required=False, type=str, help="File path on disk to summarize")
        ap.add_argument('-i', '--interactive', required=False, default=False, action='store_true',
                        help="Provide an interactive prompt for more analysis")
        return ap.parse_args()

    async def load_file(self):
        return await self.ctigor.prompt(f"Summarize the text file {self.args.filename} for me")

    async def main(self):
        self.args = CTIgorReportSummarizer.argparse()
        self.ctigor = CTIgor()

        if self.args.filename:
            response = await self.load_file()
            print(f'Summary of {self.args.filename}')
            print('================================')
            print(f"{response}\n")

        if not self.args.interactive:
            # If -i was not specified, then exit early
            return

        # Loop forever while the user has more input
        while True:
            try:
                # Get input from the user, display a prompt to indicate waiting on user input
                user_prompt = input('CTIgor> ')

                # If user says 'quit' then exit
                if user_prompt.lower() in ['quit', 'exit', 'bye']:
                    raise EOFError

                # Send the user's prompt to the LLM and wait for the response
                response = await self.ctigor.prompt(user_prompt)

                # Display response on console
                print(f"=======\n{response}\n=======")
            except EOFError:
                # On EOF, exit the program gracefully
                print("Thank you, bye!")
                break

if __name__ == "__main__":
    summarizer = CTIgorReportSummarizer()
    asyncio.run(summarizer.main())

Run the above as ctigor_summarize.py -f report.txt if you just want a summary of the report and for it to exit. Adding the -i argument will run it in interactive mode, which will give the user a prompt allowing further questions to be asked of the LLM. Note that when using online LLMs that are pay-by-request or similar, each prompt counts as additional usage. This new version provides an easily scriptable interface which can be used to bulk-summarize multiple reports, saved as text files.

Integrating Web Retrieval

Next, let’s add a helper function that will handle the web-retrieval step for us, so we don’t have to use an external tool like lynx or similar to get the content from the web. To accomplish this, we will use the Python library html2text, which needs to be added to requirements.txt or added via uv add, if using uv.

Similar to above with FilePlugin, edit ctiagent_functions.py to add the new plugin:

class WebPlugin:
    """A Plugin to work with Web URLs"""

    @kernel_function(name="download_report", description="Given a URL, convert the page to markdown text and return it as a string")
    async def load_from_web(
        self, url: Annotated[str, "URL to read from the web into markdown content"]
    ) -> Annotated[bytes, "The contents from the site, formatted as Markdown"]:
        async with aiohttp.ClientSession() as session:
            resphtml = await session.get(url)
            async with resphtml:
                resptxt = html2text.html2text(await resphtml.text())
                return resptxt

        return None

The above code uses two new modules, which need to be imported in ctiagent_functions.py:

import aiohttp
import html2text

Then, edit the appropriate lines in ctiagent.py to ensure it is loaded into the agent’s context:

from ctiagent_functions import RandomNumberPlugin, FilePlugin, WebPlugin
        # Register the RandomNumberPlugin, FilePlugin, WebPlugin with the kernel
        self.kernel.add_plugin(RandomNumberPlugin(), plugin_name="random_number")
        self.kernel.add_plugin(FilePlugin(), plugin_name="file")
        self.kernel.add_plugin(WebPlugin(), plugin_name="web") # This line was added

And, finally, we’ll update ctigor_summarize.txt to offer an interface to pull the content from the web or a file:

import asyncio
from argparse import ArgumentParser
from ctiagent import CTIgor

class CTIgorReportSummarizer:
    def __init__(self):
        pass

    def argparse():
        ap = ArgumentParser(description="Have the AI agent ummarize a CTI report, and optionally provide interactive analysis of it")
        ap.add_argument('-f', '--filename', required=False, type=str, help="File path on disk to summarize")
        ap.add_argument('-w', '--webpage', required=False, type=str, help="URL of HTML webpage to summarize")
        ap.add_argument('-i', '--interactive', required=False, default=False, action='store_true',
                        help="Provide an interactive prompt for more analysis")
        return ap.parse_args()

    # A prompt to instruct the LLM to load an already-converted text file of a report from a file on disk
    async def load_file(self):
        return await self.ctigor.prompt(f"Summarize the text file {self.args.filename} for me")

    # A prompt to instruct the LLM to call WebPlugin to fetch + convert the webpage to Markdown for us
    async def load_webpage(self):
        return await self.ctigor.prompt(f"Summarize the webpage {self.args.webpage} for me")

    async def main(self):
        self.args = CTIgorReportSummarizer.argparse()
        self.ctigor = CTIgor()

        # If both -w and -f are specified, the -w takes precedence and the -f will be ignored
        if self.args.webpage:
            response = await self.load_webpage()
            print(f'Summary of {self.args.webpage}')
            print('================================')
            print(f"{response}\n")
        elif self.args.filename:
            response = await self.load_file()
            print(f'Summary of {self.args.filename}')
            print('================================')
            print(f"{response}\n")

        if not self.args.interactive:
            # If -i was not specified, then exit early
            return

        # Loop forever while the user has more input
        while True:
            try:
                # Get input from the user, display a prompt to indicate waiting on user input
                user_prompt = input('CTIgor> ')

                # If user says 'quit' then exit
                if user_prompt.lower() in ['quit', 'exit', 'bye']:
                    raise EOFError

                # Send the user's prompt to the LLM and wait for the response
                response = await self.ctigor.prompt(user_prompt)

                # Display response on console
                print(f"=======\n{response}\n=======")
            except EOFError:
                # On EOF, exit the program gracefully
                print("Thank you, bye!")
                break

if __name__ == "__main__":
    summarizer = CTIgorReportSummarizer()
    asyncio.run(summarizer.main())

Example Output

ctigor_summarize.py -w \
  https://www.lookout.com/threat-intelligence/article/lookout-discovers-new-spyware-by-north-korean-apt37 \
  -i
Summary of https://www.lookout.com/threat-intelligence/article/lookout-discovers-new-spyware-by-north-korean-apt37
================================
The article on Lookout's website discusses the discovery of a new Android 
spyware known as KoSpy, attributed to the North Korean advanced persistent 
threat (APT) group APT37, also referred to as ScarCruft. Here are the key 
points:

1. **Nature of KoSpy**: 
   - KoSpy is designed to masquerade as legitimate utility applications, 
     targeting Korean and English-speaking users. It primarily infects devices by 
     appearing as apps like "File Manager" and "Software Update Utility."

2. **Discovery Timeline**: 
   - The spyware was first detected in March 2022, with active samples still 
     observed in March 2024.

3. **Operational Mechanics**:
   - It utilizes a two-stage command and control (C2) infrastructure, relying 
     on a Firebase database to load configurations and update its operations.
   - The spyware is capable of collecting sensitive data including SMS 
     messages, call logs, location information, and files, using dynamically loaded 
     plugins.

4. **Distribution Channels**:
   - KoSpy samples have been found in both Google Play and third-party app 
     stores, though they have been removed following detection.

5. **Connections to Other Groups**:
   - There are indications that the infrastructure behind KoSpy shares 
     similarities with another North Korean hacking group known as APT43 (Kimsuky).

6. **Language Support**:
   - The spyware app displays its user interface in both Korean and English, 
     indicating targeted attacks on specific demographics.

7. **Attribution and Threat Landscape**:
   - The research highlights the challenges in attributing cyber attacks due to 
     overlapping infrastructures and tactics used by different North Korean threat 
     actor groups.

The overall analysis underscores the sophistication and ongoing threat of the 
KoSpy spyware as part of North Korea's cyber espionage efforts.

Using the interactive mode, further information and/or customized summaries can be generated:

CTIgor> List a timeline of the KoSpy spyware activity described in the report
and include infrastructure like IPs and Domains
=======
Here’s a summarized timeline of the KoSpy spyware activity based on the 
information from the report, including infrastructure elements such as domains 
and IPs:

### KoSpy Spyware Activity Timeline

1. **March 2022**:
   - **Event**: Initial observations of KoSpy spyware samples begin.
   - **Infrastructure**: Specific domains or IPs are not detailed for this 
     early activity.

2. **2022-2023** (Ongoing):
   - **Event**: KoSpy remains active, targeting Korean and English-speaking 
     users with malicious applications, primarily disguised as utility apps.
   - **Infrastructure**: The report indicates usage of Firebase Firestore for 
     configuration retrieval, although exact domains are not specified yet.

3. **March 2024**:
   - **Event**: Identification of the most recent samples of KoSpy spyware.
   - **Infrastructure**:
     - **C2 Domains**:
       - `joinupvts.org`
       - `resolveissue.org`
       - `crowdon.info`
       - `st0746.net`
     - **IP Address**: `27.255.79.225` (associated with domain `st0746.net`, 
       located in South Korea).

4. **Ongoing** (post-March 2024):
   - **Event**: All KoSpy-related apps discovered on the Google Play Store have 
     been removed, and related Firebase projects have been deactivated by Google.

5. **Shared Infrastructure**:
   - **Event**: Evidence of KoSpy's infrastructure overlaps with another North 
     Korean group, APT43 (Kimsuky). The shared domains and IPs suggest coordinated 
     cyber activities.
   - **Additional Notes**: The report does not provide specific timestamps for 
     historical infrastructure but mentions past associations of the listed domains 
     with malicious activities aimed at Korean users.

This timeline captures key events and infrastructure related to the KoSpy 
spyware, including discovery, active operation phases, and notable 
infrastructure elements such as domains and IPs.
=======

Cost Analysis

In working through this blog post, using Azure OpenAI and the gpt-4o-mini model that is available, a total of 151 requests were made, resulting in 672,000+ tokens, for a total AI usage charge of about $0.10. It is worth remembering that “token count” is roughly equivalent to “word count” in the documents, so some usage on the input side can be estimated using wc -w on inputs.

Project URL

A companion GitHub project is available at the following URL, which includes some of the example code from this blog post: